Mysql、Redis、Mongo都是十分风行的存储,并且各自有本人的劣势。在理论的利用中,经常会同时应用多种存储,也会遇见在多种存储中保证数据一致性的需要,例如保障数据库中的库存和Redis中的库存统一等。
本文基于分布式事务框架https://github.com/dtm-labs/dtm给出了一个跨Mysql、Redis、Mongo多种存储引擎的一个可运行的分布式事务实例,心愿可能帮忙大家解决这方面的问题。
这种灵便的组合多个存储引擎造成一个分布式事务的能力,也是dtm独创做到的,目前未看到其余的分布式事务框架有这样的能力。
问题场景
咱们先来看问题场景,假设当初用户加入一次流动,将本人的余额,充值话费,同时流动会赠送商城积分。其中余额存储在Mysql,话费保留在Redis,商城积分保留在Mongo,并且因为流动限时,因而可能呈现加入流动失败的状况,所以须要反对回滚。
对于上述问题场景,能够应用DTM的Saga事务,上面咱们就来具体解说计划。
筹备数据
首先是筹备数据,为了不便用户疾速上手相干的例子,咱们曾经把相干的数据筹备好了,地址在en.dtm.pub,外面包含Mysql、Redis、Mongo,具体的连贯用户名明码能够在https://github.com/dtm-labs/d…找到。
busi.*barrier.*
编写业务代码
咱们先看最相熟的Mysql的业务代码
func SagaAdjustBalance(db dtmcli.DB, uid int, amount int) error {
_, err := dtmimp.DBExec(db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
return err
}
这段代码次要是进行数据库中用户余额的调整
SagaAdjustBalance
对于Redis和Mongo,业务代码的解决也是相似的,只须要对相应的余额进行增减即可
如何做幂等
对于Saga事务模式来说,当咱们的子事务服务呈现长期故障,呈现故障就会进行重试,这个故障可能呈现在子事务提交前,也可能呈现在子事务提交之后,因而子事务服务就须要做到幂等。
DTM 提供了辅助表和辅助的函数,用于帮忙用户疾速实现幂等。对于Mysql,他会在业务数据库中创立辅助表barrier,当用户开启事务调整余额时,会先在barrier表中写入gid,如果这是一个反复申请,那么写入gid时,会发现反复而失败,此时跳过用户业务上的余额调整,保障幂等。辅助函数的应用代码如下:
app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
})
}))
Mongo解决幂等的原理与Mysql相近,不再赘述
Redis解决幂等的原理与Mysql不同,次要是因为事务的原理不同。Redis的事务次要是通过lua的原子执行来保障的。DTM的辅助函数会通过lua脚本来调整余额,调整余额前,会在redis中查问gid,如果存在,则跳过业务上的余额调整;如果不存在,则执行业务上的余额调整。辅助函数的应用代码如下:
app.POST(BusiAPI+"/SagaRedisTransOut", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), -reqFrom(c).Amount, 7*86400)
}))
如何做弥补
对于Saga来说,咱们还须要解决弥补操作,但弥补操作并不是简略的反向调整,也有很多坑须要留神,否则很容易弥补出错。
一方面,弥补须要思考幂等,因为在弥补过程中,也同样须要思考故障重试的状况,与前一大节中的幂等解决一样。另一方面,弥补还须要思考空弥补,因为正向分支返回失败,这个失败可能是在正向的数据曾经调整实现提交之后的失败,也可能是还没有提交就返回了失败。对于数据已提交的失败,咱们须要执行反向操作,对于数据未提交的失败,咱们须要跳过反向操作,即解决空弥补。
DTM 提供的辅助表与辅助函数中,一方面会依据正向操作插入的gid判断是否为空弥补,另一方面还会再插入gid+’compensate’,判断弥补是否为反复操作。如果是失常弥补操作,那么会执行业务上的弥补,如果是空弥补或者反复弥补,则会跳过弥补业务上的弥补。
Mysql的代码如下:
app.POST(BusiAPI+"/SagaBTransInCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return SagaAdjustBalance(tx, TransInUID, -reqFrom(c).Amount, "")
})
}))
Redis的代码如下:
app.POST(BusiAPI+"/SagaRedisTransOutCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), reqFrom(c).Amount, 7*86400)
}))
弥补代码与后面的正向操作代码简直一样,仅仅是把金额乘以-1。DTM 的辅助函数会在一个函数外部同时蕴含了幂等与弥补的相干逻辑
其余异样
编写子事务以及子事务的弥补时,其实还有一种异常情况是悬挂,可能呈现在全局事务超时回滚,或者重试达到上线后回滚,失常状况是先正向操作再弥补,然而极其状况可能呈现先弥补再正向操作,因而正向操作还须要判断弥补是否已执行,已执行的状况下,也须要跳过业务操作。
MustBarrierFromGin(c).Call
发动分布式事务
后面编写完了各个子事务服务,上面这部分代码发动一个Saga全局事务:
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, dtmcli.MustGenGid(dtmutil.DefaultHTTPServer)).
Add(busi.Busi+"/SagaBTransOut", busi.Busi+"/SagaBTransOutCom", &busi.TransReq{Amount: 50}).
Add(busi.Busi+"/SagaMongoTransIn", busi.Busi+"/SagaMongoTransInCom", &busi.TransReq{Amount: 30}).
Add(busi.Busi+"/SagaRedisTransIn", busi.Busi+"/SagaRedisTransOutIn", &busi.TransReq{Amount: 20})
err := saga.Submit()
在这部分代码中,创立了一个Saga全局事务,该Saga事务包含3个子事务:
- 从Mysql直达出50
- 向Mongo中转入30
- 向Redis中转入20
在整个事务过程中,如果所有的子事务都顺利完成,那么全局事务胜利;如果有一个子事务返回了业务上的失败,那么全局事务回滚。
运行
如果您想要残缺运行一个下面的示例,步骤如下:
- 运行dtm
git clone https://github.com/dtm-labs/dtm && cd dtm
go run main.go
- 运行胜利的例子
git clone https://github.com/dtm-labs/dtm-examples && cd dtm-examples
go run main.go http_saga_multidb
- 运行失败的例子
git clone https://github.com/dtm-labs/dtm-examples && cd dtm-examples
go run main.go http_saga_multidb_rollback
您能够对例子进行批改,模仿各种长期的故障,空弥补的状况,以及其余各种异样,当整个全局事务最终实现时,数据是统一的。
小结
本文给出了一个跨Mysql、Redis、Mongo的分布式事务例子,具体解说了其中须要解决的问题,以及解决方案。
本文的原理适宜于所有反对ACID事务的存储引擎,您能够将它疾速扩大,用于其余引擎,例如TiKV等。
欢送拜访https://github.com/dtm-labs/dtm 并star反对咱们