https://github.com/dtm-labs/dtm和seata类似是一个分布式事务管理器,不过是golang实现的,它有丰富的例子可以供我们学习https://github.com/dtm-labs/dtm-examples。常见的事务模式,支持对比如下:

1,TCC事务:dtm和Seata都支持了TCC事务。

2,XA事务:dtm和Seata都支持XA事务。dtm采用的是回调函数形式的接口,而Seata采用的是Java特有的注解形式接口,本质都是回调。

3,AT事务:AT事务是Seata独有的事务模式(类似XA,性能更高,但有脏回滚)

4,SAGA事务:Seata的Saga实现采用了状态机,优点是可以做到灵活配置,缺点是上手难度非常高。dtm支持并发的Saga。

5,二阶段消息:dtm支持了二阶段消息事务模式,该模式受到RocketMQ的事务消息启发。提供了比本地消息表和事务消息更简单的架构,更易用的接口:PrepareAndSubmit,适用于无需回滚的数据一致性场景

        总的来说,对于golang用户学习分布式事务是一个非常不错的选择。在学习本篇之前,建议先学习下mysql的XAgolang源码分析:golang使用mysql XA事务,然后会发现大家的最终方案都是相似的。也可以对比seata的golang客户端来学习golang源码分析:seata-go (1)at模式,golang源码分析:seata-go (2)tcc模式。

      dtm在传输协议上支持三种,grpc,http和http-json,它的通信链路大概可以分为三条:

1,对外提供服务的链路。

2,ap(应用程序)调用dtm(事务管理器)上报数据的链路

3,dtm回调应用程序的链路。

        下面,我们基于dtm-examples的qs例子源码对dtm进行简单介绍:

1,对外提供的服务接口是基于gin http服务实现的,它的端口是8081

go busi.RunHTTP(app)  
const (
// BusiAPI busi api prefix
BusiAPI = "/api/busi"
// BusiPort busi server port
BusiPort = 8081
// BusiGrpcPort busi server port
BusiGrpcPort = 58081
)

提供了转入,转出等多个接口。

func BaseAddRoute(app *gin.Engine) {
app.POST(BusiAPI+"/workflow/resume"
  app.POST(BusiAPI+"/TransIn",

2,上报数据链路,qs例子,实现了saga模式

saga := dtmcli.NewSaga(dtmServer, shortuuid.New()).

首先通过uuid产生了全局事务id,然后组装回调需要的接口和参数,然后上报给dtm服务器,地址是http://localhost:36789/api/dtmsvr/submit ,也可以是grpc模式

 const dtmServer = "http://localhost:36789/api/dtmsvr"    

对应实现在github.com/dtm-labs/client@v1.15.1/dtmcli/trans_saga.go

func NewSaga(server string, gid string) *Saga {
&Saga{TransBase: *dtmimp.NewTransBase(gid, "saga", server, ""), orders: map[int][]int{}}

github.com/dtm-labs/client@v1.15.1/dtmcli/dtmimp/trans_base.go

func NewTransBase(gid string, transType string, dtm string, branchID string) *TransBase {
    return &TransBase{
Gid: gid,
TransType: transType,
BranchIDGen: BranchIDGen{BranchID: branchID},
Dtm: dtm,
TransOptions: TransOptions{PassthroughHeaders: PassthroughHeaders},
Context: context.Background(),
}

然后调用Add方法添加需要上报的信息

func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga {
  s.Steps = append(s.Steps, map[string]string{"action": action, "compensate": compensate})
s.Payloads = append(s.Payloads, dtmimp.MustMarshalString(postData))

最后通过submit提交给dtm

func (s *Saga) Submit() error {
      s.BuildCustomOptions()
return dtmimp.TransCallDtm(&s.TransBase, "submit")
// TransCallDtm is the short call for TransCallDtmExt
func TransCallDtm(tb *TransBase, operation string) error {
_, err := TransCallDtmExt(tb, tb, operation)
func TransCallDtmExt(tb *TransBase, body interface{}, operation string) (*resty.Response, error) {
   resp, err := RestyClient.R().
SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation))

3,dtm回调业务的接口也是基于gin的http服务实现的,端口是8082

// busi address
const qsBusiAPI = "/api/busi_start"
const qsBusiPort = 8082
var qsBusi = fmt.Sprintf("http://localhost:%d%s", qsBusiPort, qsBusiAPI)

        在启动1中的gin http服务后,会调用dtm的上报接口,将回调信息上报给dtm,其中就包括dtm 回调用的接口,详细信息如下:

[
{
"action": "http://localhost:8082/api/busi_start/TransOut",
"compensate": "http://localhost:8082/api/busi_start/TransOutCompensate"
},
{
"action": "http://localhost:8082/api/busi_start/TransIn",
"compensate": "http://localhost:8082/api/busi_start/TransInCompensate"
}
],
"payloads": ["{mount\:30}","{mount\:30}"
]

对于qs实例来说,就包括两个分支事务的正向操作接口和对应的补偿接口,具体实现在qsAddRoute方法里面:

func qsAddRoute(app *gin.Engine) {
app.POST(qsBusiAPI+"/TransIn",

4,在dtm的server端提供了相应的submit接口,接收客户端也就是应用程序提交的submit请求,对应路由注册在dtmsvr/api_http.go

func addRoute(engine *gin.Engine) {
engine.GET("/api/dtmsvr/newGid", dtmutil.WrapHandler2(newGid))
engine.POST("/api/dtmsvr/prepare", dtmutil.WrapHandler2(prepare))
engine.POST("/api/dtmsvr/submit", dtmutil.WrapHandler2(submit))
    engine.POST("/api/dtmsvr/abort", dtmutil.WrapHandler2(abort))
    engine.POST("/api/dtmsvr/registerBranch", dtmutil.WrapHandler2(registerBranch))

        实现位于dtmsvr/api.go

func svcSubmit(t *TransGlobal) interface{} 
func submit(c *gin.Context) interface{} {
return svcSubmit(TransFromContext(c))

下面我们启动官方的例子跑一下:

首先启动dtm 服务端

cd  dtm 
go run main.go

然后运行例子中的qs例子

cd dtm-examples 
go run main.go qs

         可以看到下面的日志

{"level":"info","ts":"2022-12-18T18:29:24.433+0800","caller":"dtmutil/db.go:103","msg":"connecting 'mysql' 'en.dtm.pub' 'dtm' '3306' ''"}
{"level":"debug","ts":"2022-12-18T18:29:24.713+0800","caller":"dtmutil/db.go:79","msg":"installing db plugin: tracePlugin"}
{"level":"debug","ts":"2022-12-18T18:29:24.748+0800","caller":"dtmutil/db.go:68","msg":"used: 34 ms affected: -1 sql is: xa recover"}
{"level":"debug","ts":"2022-12-18T18:29:24.748+0800","caller":"busi/base_grpc.go:57","msg":"dtm client inited"}
{"level":"info","ts":"2022-12-18T18:29:24.748+0800","caller":"busi/base_http.go:54","msg":"examples starting"}
{"level":"debug","ts":"2022-12-18T18:29:24.748+0800","caller":"busi/base_http.go:69","msg":"initing BarrierSetup"}
{"level":"debug","ts":"2022-12-18T18:29:24.949+0800","caller":"busi/base_http.go:77","msg":"Starting busi at: 8081"}
2022/12/18 18:29:25 quick start examples listening at 8082
{"level":"debug","ts":"2022-12-18T18:29:25.252+0800","caller":"dtmimp/vars.go:46","msg":"requesting: POST http://localhost:36789/api/dtmsvr/submit {\"gid\":\"QHEbkW3zWVwFEpWuAdRwQK\",\"trans_type\":\"saga\",\"concurrent\":false,\"steps\":[{\"action\":\"http://localhost:8082/api/busi_start/TransOut\",\"compensate\":\"http://localhost:8082/api/busi_start/TransOutCompensate\"},{\"action\":\"http://localhost:8082/api/busi_start/TransIn\",\"compensate\":\"http://localhost:8082/api/busi_start/TransInCompensate\"}],\"payloads\":[\"{\\\"amount\\\":30}\",\"{\\\"amount\\\":30}\"],\"protocol\":\"\"} resolved: http://localhost:36789/api/dtmsvr/submit"}
{"level":"debug","ts":"2022-12-18T18:29:25.294+0800","caller":"dtmimp/vars.go:54","msg":"requested: 200 POST http://localhost:36789/api/dtmsvr/submit {\"dtm_result\":\"SUCCESS\"}"}
2022/12/18 18:29:25 TransOut
2022/12/18 18:29:25 TransIn
2022/12/18 18:29:25 TransInCompensate
2022/12/18 18:29:25 TransOutCompensate

当然,我们也可以请求下,我们本地服务提供的http服务

% curl -iv -X POST http://127.0.0.1:8081/api/busi/TransIn -d '{"amount":30}'
Note: Unnecessary use of -X or --request, POST is already inferred.
* Trying 127.0.0.1:8081...
* Connected to 127.0.0.1 (127.0.0.1) port 8081 (#0)
> POST api/busi/TransIn HTTP/1.1
> Host: 127.0.0.1:8081
> User-Agent: curl/7.79.1
> Accept: */*
> Content-Length: 13
> Content-Type: application/x-www-form-urlencoded
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
HTTP/1.1 200 OK
< Content-Type: application/json; charset=utf-8
Content-Type: application/json; charset=utf-8
< Date: Sun, 18 Dec 2022 10:38:13 GMT
Date: Sun, 18 Dec 2022 10:38:13 GMT
< Content-Length: 4
Content-Length: 4


<
* Connection #0 to host 127.0.0.1 left intact
nul