目录
-
1、基础理论
- 1.1 事务
- 1.2 分布式事务
-
2、分布式事务的解决方案
- 2.1 两阶段提交/xa
- 2.2 saga
- 2.3 tcc
- 2.4 本地消息表
- 2.5 事务消息
- 2.6 最大努力通知
- 2.7 at事务模式
-
3、异常处理
- 3.1 异常情况
- 3.2 子事务屏障
- 3.3 子事务屏障原理
- 3.4 子事务屏障小结
-
4、分布式事务实践
- 4.1 一个saga事务
- 4.2 处理网络异常
- 4.3 处理回滚
- 5、总结
前言:
随着业务的快速发展、业务复杂度越来越高,几乎每个公司的系统都会从单体走向分布式,特别是转向微服务架构。随之而来就必然遇到分布式事务这个难题。
这篇文章首先介绍了相关的基础理论,然后总结了最经典的事务方案,最后给出了子事务乱序执行(幂等、空补偿、悬挂问题)的解决方案,分享给大家。
1、基础理论
在讲解具体方案之前,我们先了解一下分布式事务所涉及到的基础理论知识。
我们拿转账作为例子,a需要转100元给b,那么需要给a的余额-100元,给b的余额+100元,整个转账要保证,a-100和b+100同时成功,或者同时失败。看看在各种场景下,是如何解决这个问题的。
1.1 事务
把多条语句作为一个整体进行操作的功能,被称为数据库事务。数据库事务可以确保该事务范围内的所有操作都可以全部成功或者全部失败。
原子性一致性隔离性持久性
atomicityconsistencyisolationdurability
假如我们的业务系统不复杂,可以在一个数据库、一个服务内对数据进行修改,完成转账,那么,我们可以利用数据库事务,保证转账业务的正确完成。
1.2 分布式事务
bacid
a-100b+100
分布式事务在分布式环境下,为了满足可用性、性能与降级服务的需要,降低一致性与隔离性的要求,一方面遵循 base 理论(base相关理论,涉及内容非常多,感兴趣的同学,可以参考base理论):
basic availabilitysoft stateeventual consistency
同样的,分布式事务也部分遵循 acid 规范:
- 原子性:严格遵循
- 一致性:事务完成后的一致性严格遵循;事务中的一致性可适当放宽
- 隔离性:并行事务间不可影响;事务中间结果可见性允许安全放宽
- 持久性:严格遵循
2、分布式事务的解决方案
acid
2.1 两阶段提交/xa
x/openmysql
xa一共分为两阶段:
preparereadycommit/rollbackcommit
mysqloraclesqlserverpostgre
xa 事务由一个或多个资源管理器(rm)、一个事务管理器(tm)和一个应用程序(applicationprogram)组成。
这里的rm、tm、ap三个角色是经典的角色划分,会贯穿后续saga、tcc等事务模式。
把上面的转账作为例子,一个成功完成的xa事务时序图如下:
prepareprepare
xa事务的特点是:
- 简单易理解,开发较容易
- 对资源进行了长时间的锁定,并发度低
xagophppythonjavac#nodedtm
2.2 saga
sagas
把上面的转账作为例子,一个成功完成的saga事务时序图如下:
sagacancelcancelcancel
saga事务的特点:
- 并发度高,不用像xa事务那样长期锁定资源
- 需要定义正常操作以及补偿操作,开发量比xa大
- 一致性较弱,对于转账,可能发生a用户已扣款,最后转账又失败的情况
saga
saga
如果读者想要进一步研究saga,可参考dtm,里面包括了saga成功、失败回滚的例子,还包括各类网络异常的处理。
2.3 tcc
tcc
tcc分为3个阶段:
try confirm confirm confirm cancel cancel confirm
confirmcancel
一个成功完成的tcc事务时序图如下:
tccconfirm/cancelconfirm/cancel
tcc特点如下:
try/confirm/cancelsagatcc
tccdtm
2.4 本地消息表
ebay dan pritchett acm
大致流程如下:
写本地消息和业务操作放在一个事务里,保证了业务和发消息的原子性,要么他们全都成功,要么全都失败。
容错机制:
- 扣减余额事务 失败时,事务直接回滚,无后续步骤
- 轮序生产消息失败, 增加余额事务失败都会进行重试
本地消息表的特点:
- 长事务仅需要分拆成多个任务,使用简单
- 生产者需要额外的创建消息表
- 每个本地消息表都需要进行轮询
- 消费者的逻辑如果无法通过重试成功,那么还需要更多的机制,来回滚操作
适用于可异步执行的业务,且后续操作无需回滚的业务
2.5 事务消息
rocketmq 4.3rocketmq
事务消息发送及提交:
halfcommitrollbackcommit
正常发送的流程图如下:
补偿流程:
commit/rollbackpendingcommitrollback
事务消息特点如下:
- 长事务仅需要分拆成多个任务,并提供一个反查接口,使用简单
- 消费者的逻辑如果无法通过重试成功,那么还需要更多的机制,来回滚操作
适用于可异步执行的业务,且后续操作无需回滚的业务
2.6 最大努力通知
发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。具体包括:
有一定的消息重复通知机制。因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。
消息校对机制。如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。
前面介绍的的本地消息表和事务消息都属于可靠消息,与这里介绍的最大努力通知有什么不同?
可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。
最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。
解决方案上,最大努力通知需要:
ack1min5min10min30min1h2h5h10h
最大努力通知适用于业务通知类型,例如微信交易的结果,就是通过最大努力通知方式通知各个商户,既有回调通知,也有交易查询接口
2.7 at事务模式
这是阿里开源项目seata中的一种事务模式,在蚂蚁金服也被称为fmt。优点是该事务模式使用方式,类似xa模式,业务无需编写各类补偿操作,回滚由框架自动完成,缺点也类似xa,存在较长时间的锁,不满足高并发的场景。从性能的角度看,at模式会比xa更高一些,但也带来了脏回滚这样的新问题。有兴趣的同学可以参考seata-at
3、异常处理
在分布式事务的各个环节都有可能出现网络以及业务故障等问题,这些问题需要分布式事务的业务方做到防空回滚,幂等,防悬挂三个特性。
3.1 异常情况
下面以tcc事务说明这些异常情况:
空回滚:
在没有调用 tcc 资源 try 方法的情况下,调用了二阶段的 cancel 方法,cancel 方法需要识别出这是一个空回滚,然后直接返回成功。
出现原因是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候其实是没有执行try阶段,当故障恢复后,分布式事务进行回滚则会调用二阶段的cancel方法,从而形成空回滚。
幂等:
由于任何一个请求都可能出现网络异常,出现重复请求,所以所有的分布式事务分支,都需要保证幂等性
悬挂:
cancel
出现原因是在 rpc 调用分支事务try时,先注册分支事务,再执行rpc调用,如果此时 rpc 调用的网络发生拥堵,rpc 超时以后,tm就会通知rm回滚该分布式事务,可能回滚完成后,try 的 rpc 请求才到达参与者真正执行。
下面看一个网络异常的时序图,更好的理解上述几种问题
cancelcancelcancel
面对上述复杂的网络异常情况,目前看到各家建议的方案都是业务方通过唯一键,去查询相关联的操作是否已完成,如果已完成则直接返回成功。相关的判断逻辑较复杂,易出错,业务负担重。
3.2 子事务屏障
在项目https://github.com/yedf/dtm中,出现了一种子事务屏障技术,使用该技术,能够达到这个效果,看示意图:
所有这些请求,到了子事务屏障后:不正常的请求,会被过滤;正常请求,通过屏障。开发者使用子事务屏障之后,前面所说的各种异常全部被妥善处理,业务开发人员只需要关注实际的业务逻辑,负担大大降低。
子事务屏障提供了方法throughbarriercall,方法的原型为:
func throughbarriercall(db *sql.db, transinfo *transinfo, busicall busifunc)
throughbarriercallbusicall
子事务屏障会管理tcc、saga、事务消息等,也可以扩展到其他领域
3.3 子事务屏障原理
sub_trans_barriertry|confirm|cancel
insert ignoregid-branchid-tryinsert ignoregid-branchid-confirminsert ignoregid-branchid-trygid-branchid-cancel
在此机制下,解决了网络异常相关的问题
cancelgid-branchid-trygid-branchid-try
对于saga、事务消息等,也是类似的机制。
3.4 子事务屏障小结
子事务屏障技术,为https://github.com/yedf/dtm首创,它的意义在于设计简单易实现的算法,提供了简单易用的接口,在首创,它的意义在于设计简单易实现的算法,提供了简单易用的接口,在这两项的帮助下,开发人员彻底的从网络异常的处理中解放出来。
该技术目前需要搭配yedf/dtm事务管理器,目前sdk已经提供给go、python语言的开发者。其他语言的sdk正在规划中。对于其他的分布式事务框架,只要提供了合适的分布式事务信息,能够按照上述原理,快速实现该技术。
4、分布式事务实践
我们以前面介绍的saga事务为例,以dtm作为事务框架,来完成一个具体的分布式事务。本例子采用go语言,如果您对此不感兴趣,可以直接跳到文章最后的小结。
4.1 一个saga事务
我们先编写核心业务代码,调整用户的账户余额
?func qsadjustbalance(uid int, amount int) (interface{}, error) {_, err := dtmcli.sdbexec(sdbget(), "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)returndtmcli.resultsuccess, err}
下面我们来编写具体的正向操作/补偿操作的处理函数
?app.post(qsbusiapi+"/transin", common.wraphandler(func(c *gin.context) (interface{}, error) {returnqsadjustbalance(2, 30)}))app.post(qsbusiapi+"/transincompensate", common.wraphandler(func(c *gin.context) (interface{}, error) {returnqsadjustbalance(2, -30)}))app.post(qsbusiapi+"/transout", common.wraphandler(func(c *gin.context) (interface{}, error) {returnqsadjustbalance(1, -30)}))app.post(qsbusiapi+"/transoutcompensate", common.wraphandler(func(c *gin.context) (interface{}, error) {returnqsadjustbalance(1, 30)}))
到此各个子事务的处理函数已经ok了,然后是开启saga事务,进行分支调用
?req := &gin.h{"amount": 30} // 微服务的载荷// dtmserver为dtm服务的地址saga := dtmcli.newsaga(dtmserver, dtmcli.mustgengid(dtmserver)).// 添加一个transout的子事务,正向操作为url: qsbusi+"/transout", 逆向操作为url: qsbusi+"/transoutcompensate"add(qsbusi+"/transout", qsbusi+"/transoutcompensate", req).// 添加一个transin的子事务,正向操作为url: qsbusi+"/transout", 逆向操作为url: qsbusi+"/transincompensate"add(qsbusi+"/transin", qsbusi+"/transincompensate", req)// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务err := saga.submit()
至此,一个完整的saga分布式事务编写完成。
如果您想要完整运行一个成功的示例,那么按照yedf/dtm项目的说明搭建好环境之后,通过下面命令运行saga的例子即可:
go run app/main.go quick_start
4.2 处理网络异常
假设提交给dtm的事务中,调用转入操作时,出现短暂的故障怎么办?按照saga事务的协议,dtm会重试未完成的操作,这时我们要如何处理?故障有可能是转入操作完成后出网络故障,也有可能是转入操作完成中出现机器宕机。如何处理才能够保障账户余额的调整是正确无问题的?
我们使用了子事务屏障功能,保证多次重试,只会有一次成功提交。
我们把处理函数调整为:
?func sagabarrieradjustbalance(sdb *sql.tx, uid int, amount int) (interface{}, error) {_, err := dtmcli.stxexec(sdb, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)returndtmcli.resultsuccess, err}func sagabarriertransin(c *gin.context) (interface{}, error) {returndtmcli.throughbarriercall(sdbget(), mustgettrans(c), func(sdb *sql.tx) (interface{}, error) {returnsagabarrieradjustbalance(sdb, 1, reqfrom(c).amount)})}func sagabarriertransincompensate(c *gin.context) (interface{}, error) {returndtmcli.throughbarriercall(sdbget(), mustgettrans(c), func(sdb *sql.tx) (interface{}, error) {returnsagabarrieradjustbalance(sdb, 1, -reqfrom(c).amount)})}
dtmcli.troughbarriercall
您可以尝试多次调用这个transin服务,仅有一次余额调整。您可以运行以下命令,运行新的处理方式:
go run app/main.go saga_barrier
4.3 处理回滚
假如银行将金额准备转入用户2时,发现用户2的账户异常,返回失败,会怎么样?我们调整处理函数,让转入操作返回失败
?func sagabarriertransin(c *gin.context) (interface{}, error) {returndtmcli.resultfailure, nil}
我们给出事务失败交互的时序图
transintransin
transintransintransin
您可以将返回错误的transin改成:
?func sagabarriertransin(c *gin.context) (interface{}, error) {dtmcli.throughbarriercall(sdbget(), mustgettrans(c), func(sdb *sql.tx) (interface{}, error) {returnsagabarrieradjustbalance(sdb, 1, 30)})returndtmcli.resultfailure, nil}
最后的结果余额依旧没有问题
5、总结
本文介绍了分布式事务的一些基础理论,并对常用的分布式事务方案进行了讲解;在文章的后半部分还给出了事务异常的原因、分类以及优雅的解决方案;最后以一个可运行的分布式事务例子,将前面介绍的内容以简短的程序进行演示。
原文链接:https://segmentfault.com/a/1190000040321750