分布式事务 golang saga 模式实现

分布式事务介绍

在分布式系统环境下由不同的节点之间通过网络远程协作完成的事务称之为分布式事务。也就是事务的参与者、管理者、资源及资源管理者分别位于分布式系统的不同节点之上。分布式事务的作用就是用于保证不同节点中的数据一致性。

例如典型的创建订单业务逻辑由以下几个部分构成:

  • 仓库服务:扣减特定商品库存数量
  • 订单服务:生成订单

分布式事务的作用就是保证数据在订单服务和仓库服务这个整体上的一致性。不能说订单生成了,消耗了10件商品,而仓库服务中的库存说了没有一致性的改变。

分布式事务相关协议:

  • 2PC,Two Phases Commit,二阶段提交
  • 3PC,Three Phases Commit,三阶段提交
  • 最终一致性

典型的分布式事务模式,依据一致性由强到弱排序如下:

  1. XA,eXtended Architecture, 数据库层面的分布式事务规范,目前主流数据库基本都支持 XA 事务,已知方案中的最强的一致性方案
  2. TCC,Try、Confirm、Cancel,应用(服务)层参与的 2PC 方案
  3. 事务消息,利用消息队列异步确保事务的一致性
  4. Saga,将长事务拆分为多个本地短事务协调执行,若某个短事务失败,则反顺序调用补偿(undo)操作
  5. 最大努力通知,发起通知方通过一定的机制最大努力将事务结果通知到接收方,来保证一致性

还有几个事实模式,特定产品中实现的模式:

  • Seata 实现 AT 模式
  • DTM 推出的二阶段消息和 Workflow 模式

常见的实现分布式事务的产品(解决方案):

  • Seata,支持 TCC、Saga 和 AT(Seata(阿里团队)实现)模式,提供 Seata-go 支持 Go 技术栈
  • DTM,支持 XA,Saga、TCC、和 DTM 推出的 二阶段消息和 Workflow 模式,基于 Go 开发

分布式事务协议

2PC

二阶段提交协议(Two-phase Commit,即 2PC)是常用的分布式事务协议,即将事务的提交过程分为准备阶段和提交阶段两个阶段来进行处理。通过引入协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务。

2PC 中的两种角色:

  • 事务协调者(事务管理器),Coordinator:事务的发起者
  • 事务参与者(资源管理器), Resource Manager:事务的执行者

2PC 中的两个阶段:

  • 准备阶段,Prepare
    • 协调者向各个参与者发出准备执行事务指令
    • 参与者执行但不提交事务,数据库中的体现就是将操作记录在 redo/undo 日志中
    • 参与者将执行结果反馈给协调者,成功 yes,失败 no
  • 提交阶段,Commit
    • 若协调者收到全部的参与者的 yes 反馈,则向参与者发送 commit 指令
    • 若协调者未收到全部参与者的 yes 反馈(有的反馈no,有的超时未反馈),则向参与者发送 rollback 指令
    • 参与者根据收到的 commit 或 rollback 指令,完成事务的提交或者回滚
    • 协调者收到全部参与者的 commit 或 rollback 的 ack,完成事务

prepare 阶段全部反馈为 yes 情况,如图所示:

prepare 阶段未全部反馈为 yes 情况,如图所示:

2PC 的中心思想就是将事务拆解为两步,先确认再提交,是当前实现分布式事务的核心思想。在数据库层面实现就是 XA 模式,在业务层面实现就是 Saga、TCC 模式,Seata 的 At 也是二阶段的思路。

3PC

三阶段提交协议(Three-phase Commit,即 3PC)是基于 2PC 的改造版本,主要的改动是将 2PC 中的 Prepare 阶段分为了 canCommit 和 PreCommit 两个阶段,那三个阶段就是:

  • CanCommit,提交检查阶段,询问参与者是否具备条件执行事务
    • 协调者向全部参与者询问是否具备条件执行事务
    • 参与者根据自身状态,反馈结果,具备 yes,不具备 no。通常是资源的检测。
  • PreCommit,预提交阶段,与 2PC 中的 Prepare 阶段类型,执行事务但不提交
    • 若协调者收到全部的参与者的 yes 反馈,则向参与者发送 preCommit 指令
    • 若协调者未收到全部参与者的 yes 反馈(有的反馈no,有的超时未反馈),则向参与者发送 abort 中断指令
    • 参与者根据收到的 preCommit 或 abort 指令,选择执行但不提交事务,或中断
    • 参与者将执行结果反馈给协调者,成功的 ack,或失败 no
    • 若执行的是 abort 指令,那么不再继续,事务终止
  • DoCommit,提交阶段
    • 若协调者收到全部的参与者关于 preCommit 指令的 yes 反馈,则向参与者发送 doCommit 指令
    • 若协调者未收到全部参与者关于 preCommit 指令的 yes 反馈的 yes 反馈(有的反馈no,有的超时未反馈),则向参与者发送 abort 指令
    • 参与者根据收到的 commit 或 abort 指令,完成事务的提交或者中断
    • 协调者收到全部参与者的 commit 或 abort 的 ack,完成事务

canCommit、preCommit 阶段全部反馈 yes 的情况,如图:

canCommit 未全部反馈 yes 的情况,如图:

canCommit 全部反馈 yes,但 preCommit 未全部 ACK 的情况,如图:

3PC 增加了 canCommit 阶段,也就是在进入到事务执行阶段前,可以完成一些必要的检查,而不会像 2PC 那样直接进入事务执行而锁定资源。这样在一定成都上减少了资源的阻塞范围。但步骤多了,实现必然复杂了。

分布式事务模式

XA

XA,eXtended Architecture,是由 X/Open 组织提出的分布式事务的规范,XA 规范主要定义了事务管理器(TM)和资源管理器(RM)之间的接口。目前主流的数据库基本都支持XA事务,包括 mysql、oracle、sqlserver、postgre。

本地的数据库如 MySQL 在 XA 中扮演的是 RM 角色。而 XA 工具扮演的是 TM 的角色,例如 Seata 和 Dtm。通过 TM 与数据库RM的交互,完成分布式事务的调度控制。

XA 是 2PC 在数据库层面实现的一种规范,分为两阶段:

  • 第一阶段(prepare):所有的参与者 RM 准备执行事务并锁住需要的资源。参与者 ready 时,向 TM 报告已准备就绪
  • 第二阶段 (commit/rollback):当事务管理者(TM)确认所有参与者(RM)都 ready 后,向所有参与者发送 commit 命令或 rollback 命令

XA 模式在数据库层面实现,因此一致性非常严格,但同时并发性能较差。因此适合做那种要求强一致性的业务逻辑,例如转账、金融等。

MySQL 提供如下的语句实现 XA 事务:

# Begin
XA {START|BEGIN} xid [JOIN|RESUME]
# End
XA END xid [SUSPEND [FOR MIGRATE]]
# Prepare
XA PREPARE xid
# Commit
XA COMMIT xid [ONE PHASE]
# rollback
XA ROLLBACK xid
# recover
XA RECOVER [CONVERT XID]

MySQL 通过关联特定的 xid 来控制本地事务(子事务),TM 负责控制全局事务。

以确认订单为例:

  • 确认订单业务逻辑
    • 通知 TM 开启全局事务
    • 注册订单服务创建订单本地事务接口
    • 注册库存服务扣减库存本地事务接口
  • 订单服务
    • 实现创建订单本地事务接口
  • 库存服务
    • 实现扣减库存本地事务接口

全部 prepare 成功,如图所示:

Saga

Saga 最初出现在1987年 Hector Garcaa-Molrna & Kenneth Salem 发表的论文《SAGAS》里。其核心思想是将长事务拆分为多个短事务,由 Saga 事务协调器协调,如果每个短事务都成功提交完成,那么全局事务就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。

Saga 是业务逻辑层面实现的分布式事务管理。意味着任何一个事务内的操作,都要定义正向和反向(补偿)两个操作。正向就是常规业务逻辑,反向(补偿)就是取消正向业务逻辑带来的影响。总结就是,先使用资源,不行再退回去。

以确认订单为例:

  • 订单服务
    • 正向操作:创建订单和订单产品,同时订单为已确认状态。
    • 反向(补偿)操作:更新订单状态为未确认
  • 库存服务
    • 正向操作:扣减库存
    • 反向补偿操作:将扣减的库存退回去

实操时,我们需要在订单和库存服务中,分别实现正向和反向两个接口。而 Saga 事务管理器负责完成对接口的调用。

不需要补偿操作的情况,如图:

需要补偿操作的情况,如图:

真正实现时,确认订单为主业务逻辑,负责与 Saga 事务协调器(Seata, Dtm) 通讯,将仓库和订单服务的正向和反向操作接口注册到 Saga 事务协调器中。Saga 事务协调器负责根据正向操作的结果觉得是否调用补偿操作。示例请参考 《DTM 的 Saga 示例》

TCC

TCC 是 Try、Confirm、Cancel 三个词语的缩写,最早是由 Pat Helland 于 2007 年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出。

TCC 是 2PC 在业务逻辑层面的实现,也就意味着脱离数据库,完成准备和提交阶段。在 TCC 中,采用的是先冻结资源,若全部节点冻结成功,则提交占有资源,若存在失败阶段,则释放冻结的资源。总结就是:先冻结,成功则占有否则释放。

TCC 分为3个操作,2个阶段

  • Try 操作,1阶段:尝试执行,完成所有业务检查(一致性), 预留必须业务资源(准隔离性),冻结资源
  • Confirm 操作,2阶段:如果所有分支的 Try 都成功了,则走到Confirm阶段。Confirm真正执行业务,不作任何业务检查,只使用 Try 阶段预留的业务资源
  • Cancel 操作,2阶段:如果所有分支的Try有一个失败了,则走到 Cancel 阶段。Cancel释放 Try 阶段预留的业务资源。

以确认订单为例:

  • Try 操作,1阶段
    • 订单服务会生成确认中的订单
    • 库存服务会冻结需要的库存
  • Confirm 操作,2阶段,库存服务都成功,则确认事务
    • 订单状态更新为已确认
    • 执行扣减库存
  • Cancel 操作,2阶段,若订单、库存服务存在失败的情况,则取消事务
    • 释放冻结的库存
    • 将订单状态更新为未确认状态

我们需要在订单服务和库存服务中,分别实现 Try、Confirm和Cancel 接口。而 分布式事务管理器(Seata、Dtm)服务完成调度,根据结果选则 Confirm 还是 Cancel。

执行 Confirm 情况,如图所示:

执行 Cancel 情况,如图所示:

真正实现时,确认订单为主业务逻辑,负责与TCC事务协调器(Seata, Dtm)通讯,将仓库和订单服务的 Confirm 和 Cancel 接口注册到 TCC 事务协调器中。TCC 事务协调器负责根据 Try 的结果调用 Confirm 或 Cancel。

TCC vs Saga 思想:

  • Saga,先使用,不行再退回
  • TCC,先锁定(冻结),再选择占用或释放
事务消息

消息事务的原理是将两个事务通过消息中间件进行异步解耦。RocketMQ 实现的该方案。若服务 A、B要实现分布式事务,其思路是:

  • A 服务发布事务成功的 half 消息到 MQ。half 消息是消费者(服务 B)不可见的消息。
  • A 服务执行本地事务
    • 事务提交,则通知 MQ 将 half 消息对消费者可见
    • 事务回滚,则通知 MQ 将 half 消息删除
  • B 服务只有在 A 服务事务提交成功后,才会消费到消息,消费到消息后,B 服务完成自己的事务部分
    • 若消费失败,服务 B 会不断重试,直到消费成功

RocketMQ 还要求 A 服务提供一个接口,用于查询关于这条 half 消息对应的事务是否成功,来控制 half 消息的状态。

该方案主要适用于不需要业务回滚的场景,如某些附加操作。例如注册成功后,领取礼品,发送邮件等。

最大努力通知

最大努力通知的方案适用于一些最终一致性要求较低的业务。

执行流程:

  • 服务 A 本地事务执行完之后,发送个消息到 MQ;
  • 最大努力通知调度器,消费该消息,同时调用服务 B 的接口完成事务
  • 要是服务 B 执行失败了,那么最大努力通知调度器就定时尝试重新调用服务 B的事务几口, 反复 N 次,最后还是不行就放弃。

该方案主要适用于不需要业务回滚、一致性较低的场景,如某些附加操作。例如注册成功后,领取礼品,发送邮件等。

分布式事务异常

分布式系统最大的问题就是 NPC ,是 Network Delay, Process Pause, Clock Drift 的首字母缩写。指的是:

  • Network Delay,网络延迟
  • Process Pause,进程暂停。当基于某些需要,例如内存垃圾回收、CPU 排队、服务迁移等,某服务会暂时暂停。
  • Clock Drift,时钟漂移。分布式系统涉及大量的服务器,而不同服务器通常使用 NTP (Network Time Protocol)协议将本地设备的时间与时间服务器对齐对齐后,通常会导致本地时间跳跃。

由于分布式事务系统由于存在 NPC 问题,意味着分布式事务需要考虑:

  • 空补偿,也叫空回滚,补偿操作在主动操作未执行前执行,系统设计应该允许空补偿。因此本地事务的补偿操作需要判定出来主动操作是否执行。补偿操作,可以理解成逆向操作或 rollback 操作,主动操作可以理解为正向操作或 prepare 操作,例如:
    • TCC 中,就是 cancel 操作的执行需要判定 try 是否执行
    • Sage 中,就是逆向补偿操作需要判定正向操作是否执行
  • 悬挂,正向操作在执行时,补偿操作已经完成,因此本地事务的正向操作需要判定出补偿操作是否执行完成,例如:
    • TCC 中,就是 Try 操作执行时需要判定 cancel 操作是否执行完毕
    • Saga 中,就是正向操作执行时需要判定逆向补偿操作是否执行

以 Saga 模式的网络异常时序图为例:

[站外图片上传中...(image-5ca2a5-1672155282813)]

仓储服务存在 NPC 问题,导致操作1 扣减库存 操作迟迟到达不了仓储服务,同时假设 2 操作的成功结果由于 NPC 问题到达不了调度器,这就导致:

  • 空补偿:2 操作,因为超时而导致分布式事务失败,需要补偿。因此向仓储服务发出补偿操作请求。此时正向操作还未抵达仓储服务,因此是个空补偿操作。
  • 悬挂:1 操作,当1 扣减库存这个正向操作到达仓储服务时,由于已经执行过补偿操作,因此 1 操作出现悬挂。

不论空补偿还是悬挂,都需要在业务逻辑层面做出判定。通常的做法是通过分布式事务事件日志的方案来标识操作状态,进而决定是否需要处理空补偿和防止悬挂。

上图中,2 操作超时而执行的补偿,若在仓库服务执行成功,但反馈的结果由于 NPC 问题不能到达事务调度器,那么事务调度器还有可能再次发送 3 超时而执行补偿操作。这就意味着仓库服务的补偿操作会被多次重复调用。我们必须保证分布式事务的全部操作分支保证幂等性。也就是重复调用操作分支,但不会产生叠加的影响。

在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。

DTM 示例

DTM 是一款开源的分布式事务管理器,解决跨数据库、跨服务、跨语言栈更新数据的一致性问题。

DTM 安装
Docker
$ sudo docker run \
    --net host \
    --rm -it \
    --name dtmDev \
    -p 36789:36789 \
    -p 36790:36790 \
    yedf/dtm:latest

Unable to find image 'yedf/dtm:latest' locally
latest: Pulling from yedf/dtm
530afca65e2e: Already exists
cfc3c688efa0: Pull complete
4f4fb700ef54: Pull complete
Digest: sha256:1b7f6da9d959ba62ea4aa995f92696158ff82f8472064c081c262a58a73be9b4

二进制安装包

地址:

下载对应平台的版本,运行即可。

源码安装

源码为 Go,编译需要 Go 1.16+

$ git clone https://github.com/dtm-labs/dtm && cd dtm
$ go build
$ ./dtm
DTM 的 Saga 示例

完整代码位于:microService/dtm_saga

业务逻辑说明,共有三个服务:

  • bff/ensure_order.go,确认订单的聚合服务。负责开启 saga 事务
  • order/serviceOrder.go,订单服务,创建订单,当订单库存检测失败时,将订单状态更新为未确认
  • storage/serviceStorage.go,仓库服务,扣减库存,当库存不足时,错误。补偿接口完成库存回退

编码实现:

一:准备订单服务和仓库服务相关表:

drop table if exists saga.orders;
create table if not exists saga.orders (
    id int unsigned primary key auto_increment,
    tx_id varchar(255) unique,
    status enum('confirming', 'confirmed', 'not confirmed'),
    total int
);
drop table if exists saga.order_products;
create table if not exists saga.order_products (
    order_id int unsigned,
    product_id int unsigned,
    quantity int,
    price int,
    primary key (order_id, product_id)
);
drop table if exists saga.storages;
create table if not exists saga.storages (
    id int unsigned primary key auto_increment,
    inventory int
);
insert into saga.storages values 
(3, 108),
(8, 10);

二:在各自服务的 MySQL 中执行以上 SQL,准备好基础数据:

# order
$ sudo docker run \
    --rm \
    --net host \
    --name mysqlSagaOrder \
    -e MYSQL_TCP_PORT=3306 \
    -e MYSQL_ROOT_PASSWORD=mashibing \
    -e MYSQL_DATABASE=saga \
    -d mysql:latest
# login mysql && create table
$ sudo docker exec -it mysqlSagaOrder mysql -hlocalhost -pmashibing

# storage
$ sudo docker run \
    --rm \
    --net host \
    --name mysqlSagaStorage \
    -e MYSQL_TCP_PORT=3307 \
    -e MYSQL_ROOT_PASSWORD=mashibing \
    -e MYSQL_DATABASE=saga \
    -d mysql:latest
# login mysql && create table && insert some rows.
$ sudo docker exec -it mysqlSagaStorage mysql -hlocalhost -pmashibing

三:编写业务服务

聚合服务 bff/ensure_order.go

package main

import (
"github.com/dtm-labs/client/dtmcli"
"log"
)

// 业务请求数据对象
type Req struct {
Quantity int    `json:"quantity"`
Id       int    `json:"id"`
TxId     string `json:"tx_id"`
}

// 模拟创建订单的聚合业务逻辑
func main() {
// dtm 服务器地址
const dtmServer = "http://192.168.177.131:36789/api/dtmsvr"
// 关联的两个服务的地址
const orderServer = "http://192.168.177.1:8081"
const storageServer = "http://192.168.177.1:8082"

// dtm 生成 事务id
gid := dtmcli.MustGenGid(dtmServer)

// 伪造请求数据
req := Req{20, 3, gid}

// 启动 Saga 事务
saga := dtmcli.NewSaga(dtmServer, gid).
Add(orderServer+"/order-create", orderServer+"/order-create-compensate", req).
Add(storageServer+"/deduct", storageServer+"/deduct-compensate", req)

// 事务提交
err := saga.Submit()
log.Fatalln(err)
}

订单服务:

order/serviceOrder.go

package main

import (
"database/sql"
"github.com/gin-gonic/gin"
_ "github.com/go-sql-driver/mysql"
"log"
"net/http"
)

var DB *sql.DB

func init() {
db, err := sql.Open("mysql", "root:mashibing@tcp(192.168.177.131:3306)/saga?charset=utf8mb4&parseTime=True&loc=Local")
if err != nil {
log.Fatalln(err)
}
DB = db
}

type Req struct {
Quantity int    `json:"quantity"`
Id       int    `json:"id"`
TxId     string `json:"tx_id"`
}

func main() {
r := gin.Default()

r.POST("/order-create", func(c *gin.Context) {
req := &Req{}
if err := c.BindJSON(req); err != nil {
log.Fatalln(err)
}
// 创建订单和订单商品记录

query := "insert into orders values (null, ?, 'confirmed', 1024)"
result, err := DB.Exec(query, req.TxId)
if err != nil {
c.JSON(http.StatusOK, gin.H{"dtm_result": "FAILURE", "message": "order create failed"})
return
}
orderId, err := result.LastInsertId()
if err != nil {
c.JSON(http.StatusOK, gin.H{"dtm_result": "FAILURE", "message": "order create failed"})
return
}
query = "insert into order_products values (?, ?, ?, 99)"
if _, err := DB.Exec(query, orderId, req.Id, req.Quantity); err != nil {
c.JSON(http.StatusOK, gin.H{"dtm_result": "FAILURE", "message": "order products create failed"})
return
}

c.JSON(http.StatusOK, gin.H{"dtm_result": "SUCCESS"})

})
r.POST("/order-create-compensate", func(c *gin.Context) {
// 确认,生成订单信息,插入 orders 和 order_products 表记录,本例中,仅使用 order_products 表
req := &Req{}
if err := c.BindJSON(req); err != nil {
log.Fatalln(err)
}
// 将订单状态更新为未确认
query := "update orders set status='not confirmed' where tx_id=?"
if _, err := DB.Exec(query, req.TxId); err != nil {
c.JSON(http.StatusOK, gin.H{"dtm_result": "FAILURE", "message": "compensate order status failed"})
return
}

c.JSON(http.StatusOK, gin.H{"dtm_result": "SUCCESS"})
})

r.Run(":8081")
}

库存服务:

storage/serviceStorage.go

package main

import (
"database/sql"
"github.com/gin-gonic/gin"
_ "github.com/go-sql-driver/mysql"
"log"
"net/http"
)

var DB *sql.DB

func init() {
db, err := sql.Open("mysql", "root:mashibing@tcp(192.168.177.131:3307)/saga?charset=utf8mb4&parseTime=True&loc=Local")
if err != nil {
log.Fatalln(err)
}
DB = db
}

type Req struct {
Quantity int    `json:"quantity"`
Id       int    `json:"id"`
TxId     string `json:"tx_id"`
}

func main() {
r := gin.Default()

r.POST("/deduct", func(c *gin.Context) {
// 解析请求数据
req := &Req{}
if err := c.BindJSON(req); err != nil {
log.Fatalln(err)
}
// 执行 扣减库存
query := "update storages set inventory = inventory-? where id = ?"
if _, err := DB.Exec(query, req.Quantity, req.Id); err != nil {
c.JSON(http.StatusOK, gin.H{"dtm_result": "FAILURE", "message": "deduct inventory failed"})
return
}
// 判定库存是否为负数,负数失败
inventory := 0
query = "select inventory from storages where id=?"
if err := DB.QueryRow(query, req.Id).Scan(&inventory); err != nil {
log.Fatalln(err)
}
if inventory < 0 {
c.JSON(http.StatusOK, gin.H{"dtm_result": "FAILURE", "message": "inventory is not enough"})
return
}

c.JSON(http.StatusOK, gin.H{"dtm_result": "SUCCESS"})

})
r.POST("/deduct-compensate", func(c *gin.Context) {
req := &Req{}
if err := c.BindJSON(req); err != nil {
log.Fatalln(err)
}

// 执行 扣减库存的补偿操作
query := "update storages set inventory = inventory+? where id = ?"
if _, err := DB.Exec(query, req.Quantity, req.Id); err != nil {
c.JSON(http.StatusOK, gin.H{"dtm_result": "FAILURE", "message": "compensate inventory failed"})
return
}

c.JSON(http.StatusOK, gin.H{"dtm_result": "SUCCESS"})
})

r.Run(":8082")
}

启动 DTM,完成测试。通过数据库中的数据变化,体现分布式事务的实现。