我们继续上一篇golang源码分析:dtm分布式事务(3)分析api服务的源码,位置位于dtmsvr/svr.go:
func StartSvr() *gin.Engine {dtmcli.GetRestyClient().SetTimeoutapp := dtmutil.GetGinApp()app = httpMetrics(app)addRoute(app)addJrpcRouter(app)go func() {err := app.Run(fmt.Sprintf(":%d", conf.HTTPPort))dtmgpb.RegisterDtmServer(s, &dtmServer{})go func() {err := s.Serve(lis)for i := 0; i < int(conf.UpdateBranchAsyncGoroutineNum); i++ {go updateBranchAsync()updateTopicsMap()go CronUpdateTopicsMap()err = dtmdriver.GetDriver().RegisterService(conf.MicroService.Target, conf.MicroService.EndPoint)
首先启动一个gin服务器,然后注册一个grpcserver :dtmServer,然后通过一个协程启动grpc服务,然后启动协程,每200ms一次将分支信息同步到存储。然后检查下kv存储里"topics"的值的版本,存储到topicsMap,紧接着启动一个协程任务在后台执行上面的kv存储到内存的更新。最后把我们的server注册到服务发现,一般是通过环境变量控制的
# Target: 'etcd://localhost:2379/dtmservice' # register dtm server to this url# EndPoint: 'localhost:36790'
总结下就5件事
1,启动http服务
2,启动grpc服务
3,将分支的更新同步到存储
4,将kc里面存储的topics数据同步到内存。
5,将服务注册到服务发现注册中心。
其中,启动http服务包括三部分
A,为监控注册路由
B,为http服务注册路由
C,为json-rpc注册路由
1, 其中http路由就是简单的gin路由注册,位于dtmsvr/api_http.go
func addRoute(engine *gin.Engine) {engine.GET("/api/dtmsvr/newGid", dtmutil.WrapHandler2(newGid))engine.POST("/api/dtmsvr/prepare", dtmutil.WrapHandler2(prepare))engine.POST("/api/dtmsvr/abort", dtmutil.WrapHandler2(abort))
提供了常见申请全局事务ID,prepare,abort,commit等事务执行动作。
func prepare(c *gin.Context) interface{} {return svcPrepare(TransFromContext(c))}
其中prepare需要获取全局事务的分支事务
dbt := GetTransGlobal(t.Gid)
trans := GetStore().FindTransGlobalStore(gid)
默认这些信息存储在boltdb里面dtmsvr/storage/boltdb/boltdb.go,可以看下获取全局存储的过程
func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStore) {trans = tGetGlobal(t, gid)bs := t.Bucket(bucketGlobal).Get([]byte(gid))
全局存储的定义如下:
type TransGlobalStore struct {dtmutil.ModelBaseGid string `json:"gid,omitempty"`TransType string `json:"trans_type,omitempty"`Steps []map[string]string `json:"steps,omitempty" gorm:"-"`Payloads []string `json:"payloads,omitempty" gorm:"-"`BinPayloads [][]byte `json:"-" gorm:"-"`Status string `json:"status,omitempty"`QueryPrepared string `json:"query_prepared,omitempty"`Protocol string `json:"protocol,omitempty"`FinishTime *time.Time `json:"finish_time,omitempty"`RollbackTime *time.Time `json:"rollback_time,omitempty"`Result string `json:"result,omitempty"`RollbackReason string `json:"rollback_reason,omitempty"`Options string `json:"options,omitempty"`CustomData string `json:"custom_data,omitempty"`NextCronInterval int64 `json:"next_cron_interval,omitempty"`NextCronTime *time.Time `json:"next_cron_time,omitempty"`Owner string `json:"owner,omitempty"`Ext TransGlobalExt `json:"-" gorm:"-"`ExtData string `json:"ext_data,omitempty"` // storage of ext. a db field to store many values. like Optionsdtmcli.TransOptions}
对于json_rpc也类似,就是通过gin的http路由,http协议的内容传输的是json数据,实现位于dtmsvr/api_json_rpc.go
func addJrpcRouter(engine *gin.Engine) {engine.POST("/api/json-rpc", func(c *gin.Context) {return handlers[req.Method](req.Params)
真实的路由是定义在一个map里面的
handlers := map[string]jrpcFunc{"newGid": jrpcNewGid,"prepare": jrpcPrepare,"submit": jrpcSubmit,"abort": jrpcAbort,"registerBranch": jrpcRegisterBranch,}
比如其中的获取全局事务id最终实现,和http协议是一样的
func jrpcNewGid(interface{}) interface{} {return map[string]interface{}{"gid": GenGid()}}
2, 然后我们看下grpc的实现dtmsvr/api_grpc.go
func (s *dtmServer) Abort(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) {r := svcAbort(TransFromDtmRequest(ctx, in))
中断事务的执行过程是通过全局事务id获取事务的元数据,修改全局事务的状态为中断,然后通过事务id获取所有的分支事务 ,最后处理分支事务:
func svcAbort(t *TransGlobal) interface{} {dbt := GetTransGlobal(t.Gid)dbt.changeStatus(dtmcli.StatusAborting, withRollbackReason(t.RollbackReason))branches := GetStore().FindBranches(t.Gid)return dbt.Process(branches)
处理过程位于dtmsvr/trans_process.go
func (t *TransGlobal) process(branches []TransBranch) error {rerr = t.getProcessor().ProcessOnce(branches)
每一种分布式事务模型的处理逻辑都不一样,以saga为例dtmsvr/trans_type_saga.go
func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))
它修改了boltdb里面的状态,dtmsvr/storage/boltdb/boltdb.go
func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {if finished {tDelIndex(t, g.NextCronTime.Unix(), g.Gid)tPutGlobal(t, global)err := t.Bucket(bucketGlobal).Put([]byte(global.Gid), bs)
如果状态是完成状态,会删除相应的记录。
3,然后,我们看下第二部分,同步分支状态到存储,dtmsvr/svr.go
k := updateBranch.gid + updateBranch.branchID + "-" + updateBranch.oprowAffected, err := GetStore().UpdateBranches(updates, []string{"status", "finish_time", "update_time"})for { // flush branches every 200msflushBranchs()}
对于存储过的状态不用再次操作,所以本地用了一个k来去重,它是通过全局事务id,分支id以及分支的处理动作名字来做的唯一键。以存储介质为mysql,动作为创建分支的操作来说,它的代码实现位于
dtmsvr/storage/sql/sql.go
db := dbGet().Clauses(clause.OnConflict{OnConstraint: "gid_branch_uniq",DoUpdates: clause.AssignmentColumns(updates),}).Create(branches)
4,第三部分加载topics到内存的操作流程如下: dtmsvr/cron.go
cronUpdateTopicsMapOnce()
dtmsvr/topics.go
func updateTopicsMap() {kvs := GetStore().FindKV(topicsCat, "")topicsMap[kv.K] = newTopic
func topic2urls(topic string) []string {for k, subscriber := range topicsMap[topic].Subscribers {urls[k] = subscriber.URL
事务模型是一致性消息的时候dtmsvr/trans_type_msg.go
func (t *transMsgProcessor) GenBranches() []TransBranch {for i, step := range t.Steps {urls := dtmimp.If(mayTopic == step[dtmimp.OpAction], []string{mayTopic}, topic2urls(mayTopic)).([]string)for j, url := range urls {b := TransBranch{
通过将消息中的url组装成分支事务信息的。
5,最后一步服务发现的注册的实现单独提供了一个包。
github.com/dtm-labs/dtmdriver@v0.0.6/driver-mgr.go

