https://github.com/dtm-labs/dtm和seata类似是一个分布式事务管理器,不过是golang实现的,它有丰富的例子可以供我们学习https://github.com/dtm-labs/dtm-examples。常见的事务模式,支持对比如下:
1,TCC事务:dtm和Seata都支持了TCC事务。
2,XA事务:dtm和Seata都支持XA事务。dtm采用的是回调函数形式的接口,而Seata采用的是Java特有的注解形式接口,本质都是回调。
3,AT事务:AT事务是Seata独有的事务模式(类似XA,性能更高,但有脏回滚)
4,SAGA事务:Seata的Saga实现采用了状态机,优点是可以做到灵活配置,缺点是上手难度非常高。dtm支持并发的Saga。
5,二阶段消息:dtm支持了二阶段消息事务模式,该模式受到RocketMQ的事务消息启发。提供了比本地消息表和事务消息更简单的架构,更易用的接口:PrepareAndSubmit,适用于无需回滚的数据一致性场景
总的来说,对于golang用户学习分布式事务是一个非常不错的选择。在学习本篇之前,建议先学习下mysql的XAgolang源码分析:golang使用mysql XA事务,然后会发现大家的最终方案都是相似的。也可以对比seata的golang客户端来学习golang源码分析:seata-go (1)at模式,golang源码分析:seata-go (2)tcc模式。

dtm在传输协议上支持三种,grpc,http和http-json,它的通信链路大概可以分为三条:
1,对外提供服务的链路。
2,ap(应用程序)调用dtm(事务管理器)上报数据的链路
3,dtm回调应用程序的链路。
下面,我们基于dtm-examples的qs例子源码对dtm进行简单介绍:
1,对外提供的服务接口是基于gin http服务实现的,它的端口是8081
go busi.RunHTTP(app)
const (// BusiAPI busi api prefixBusiAPI = "/api/busi"// BusiPort busi server portBusiPort = 8081// BusiGrpcPort busi server portBusiGrpcPort = 58081)
提供了转入,转出等多个接口。
func BaseAddRoute(app *gin.Engine) {app.POST(BusiAPI+"/workflow/resume"app.POST(BusiAPI+"/TransIn",
2,上报数据链路,qs例子,实现了saga模式
saga := dtmcli.NewSaga(dtmServer, shortuuid.New()).
首先通过uuid产生了全局事务id,然后组装回调需要的接口和参数,然后上报给dtm服务器,地址是http://localhost:36789/api/dtmsvr/submit ,也可以是grpc模式
const dtmServer = "http://localhost:36789/api/dtmsvr"
对应实现在github.com/dtm-labs/client@v1.15.1/dtmcli/trans_saga.go
func NewSaga(server string, gid string) *Saga {&Saga{TransBase: *dtmimp.NewTransBase(gid, "saga", server, ""), orders: map[int][]int{}}
github.com/dtm-labs/client@v1.15.1/dtmcli/dtmimp/trans_base.go
func NewTransBase(gid string, transType string, dtm string, branchID string) *TransBase {return &TransBase{Gid: gid,TransType: transType,BranchIDGen: BranchIDGen{BranchID: branchID},Dtm: dtm,TransOptions: TransOptions{PassthroughHeaders: PassthroughHeaders},Context: context.Background(),}
然后调用Add方法添加需要上报的信息
func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga {s.Steps = append(s.Steps, map[string]string{"action": action, "compensate": compensate})s.Payloads = append(s.Payloads, dtmimp.MustMarshalString(postData))
最后通过submit提交给dtm
func (s *Saga) Submit() error {s.BuildCustomOptions()return dtmimp.TransCallDtm(&s.TransBase, "submit")
// TransCallDtm is the short call for TransCallDtmExtfunc TransCallDtm(tb *TransBase, operation string) error {_, err := TransCallDtmExt(tb, tb, operation)
func TransCallDtmExt(tb *TransBase, body interface{}, operation string) (*resty.Response, error) {resp, err := RestyClient.R().SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation))
3,dtm回调业务的接口也是基于gin的http服务实现的,端口是8082
// busi addressconst qsBusiAPI = "/api/busi_start"const qsBusiPort = 8082var qsBusi = fmt.Sprintf("http://localhost:%d%s", qsBusiPort, qsBusiAPI)
在启动1中的gin http服务后,会调用dtm的上报接口,将回调信息上报给dtm,其中就包括dtm 回调用的接口,详细信息如下:
[{"action": "http://localhost:8082/api/busi_start/TransOut","compensate": "http://localhost:8082/api/busi_start/TransOutCompensate"},{"action": "http://localhost:8082/api/busi_start/TransIn","compensate": "http://localhost:8082/api/busi_start/TransInCompensate"}],"payloads": ["{mount\:30}","{mount\:30}"]
对于qs实例来说,就包括两个分支事务的正向操作接口和对应的补偿接口,具体实现在qsAddRoute方法里面:
func qsAddRoute(app *gin.Engine) {app.POST(qsBusiAPI+"/TransIn",
4,在dtm的server端提供了相应的submit接口,接收客户端也就是应用程序提交的submit请求,对应路由注册在dtmsvr/api_http.go
func addRoute(engine *gin.Engine) {engine.GET("/api/dtmsvr/newGid", dtmutil.WrapHandler2(newGid))engine.POST("/api/dtmsvr/prepare", dtmutil.WrapHandler2(prepare))engine.POST("/api/dtmsvr/submit", dtmutil.WrapHandler2(submit))engine.POST("/api/dtmsvr/abort", dtmutil.WrapHandler2(abort))engine.POST("/api/dtmsvr/registerBranch", dtmutil.WrapHandler2(registerBranch))
实现位于dtmsvr/api.go
func svcSubmit(t *TransGlobal) interface{}
func submit(c *gin.Context) interface{} {return svcSubmit(TransFromContext(c))
下面我们启动官方的例子跑一下:
首先启动dtm 服务端
cd dtmgo run main.go
然后运行例子中的qs例子
cd dtm-examplesgo run main.go qs
可以看到下面的日志
{"level":"info","ts":"2022-12-18T18:29:24.433+0800","caller":"dtmutil/db.go:103","msg":"connecting 'mysql' 'en.dtm.pub' 'dtm' '3306' ''"}{"level":"debug","ts":"2022-12-18T18:29:24.713+0800","caller":"dtmutil/db.go:79","msg":"installing db plugin: tracePlugin"}{"level":"debug","ts":"2022-12-18T18:29:24.748+0800","caller":"dtmutil/db.go:68","msg":"used: 34 ms affected: -1 sql is: xa recover"}{"level":"debug","ts":"2022-12-18T18:29:24.748+0800","caller":"busi/base_grpc.go:57","msg":"dtm client inited"}{"level":"info","ts":"2022-12-18T18:29:24.748+0800","caller":"busi/base_http.go:54","msg":"examples starting"}{"level":"debug","ts":"2022-12-18T18:29:24.748+0800","caller":"busi/base_http.go:69","msg":"initing BarrierSetup"}{"level":"debug","ts":"2022-12-18T18:29:24.949+0800","caller":"busi/base_http.go:77","msg":"Starting busi at: 8081"}2022/12/18 18:29:25 quick start examples listening at 8082{"level":"debug","ts":"2022-12-18T18:29:25.252+0800","caller":"dtmimp/vars.go:46","msg":"requesting: POST http://localhost:36789/api/dtmsvr/submit {\"gid\":\"QHEbkW3zWVwFEpWuAdRwQK\",\"trans_type\":\"saga\",\"concurrent\":false,\"steps\":[{\"action\":\"http://localhost:8082/api/busi_start/TransOut\",\"compensate\":\"http://localhost:8082/api/busi_start/TransOutCompensate\"},{\"action\":\"http://localhost:8082/api/busi_start/TransIn\",\"compensate\":\"http://localhost:8082/api/busi_start/TransInCompensate\"}],\"payloads\":[\"{\\\"amount\\\":30}\",\"{\\\"amount\\\":30}\"],\"protocol\":\"\"} resolved: http://localhost:36789/api/dtmsvr/submit"}{"level":"debug","ts":"2022-12-18T18:29:25.294+0800","caller":"dtmimp/vars.go:54","msg":"requested: 200 POST http://localhost:36789/api/dtmsvr/submit {\"dtm_result\":\"SUCCESS\"}"}2022/12/18 18:29:25 TransOut2022/12/18 18:29:25 TransIn2022/12/18 18:29:25 TransInCompensate2022/12/18 18:29:25 TransOutCompensate
当然,我们也可以请求下,我们本地服务提供的http服务
% curl -iv -X POST http://127.0.0.1:8081/api/busi/TransIn -d '{"amount":30}'Note: Unnecessary use of -X or --request, POST is already inferred.* Trying 127.0.0.1:8081...* Connected to 127.0.0.1 (127.0.0.1) port 8081 (#0)> POST api/busi/TransIn HTTP/1.1> Host: 127.0.0.1:8081> User-Agent: curl/7.79.1> Accept: */*> Content-Length: 13> Content-Type: application/x-www-form-urlencoded>* Mark bundle as not supporting multiuse< HTTP/1.1 200 OKHTTP/1.1 200 OK< Content-Type: application/json; charset=utf-8Content-Type: application/json; charset=utf-8< Date: Sun, 18 Dec 2022 10:38:13 GMTDate: Sun, 18 Dec 2022 10:38:13 GMT< Content-Length: 4Content-Length: 4<* Connection #0 to host 127.0.0.1 left intactnul

