我们继续上一篇golang源码分析:dtm分布式事务(3)分析api服务的源码,位置位于dtmsvr/svr.go:

  func StartSvr() *gin.Engine {
   dtmcli.GetRestyClient().SetTimeout
  app := dtmutil.GetGinApp()
app = httpMetrics(app)
addRoute(app)
addJrpcRouter(app)

go func() {
err := app.Run(fmt.Sprintf(":%d", conf.HTTPPort))
  dtmgpb.RegisterDtmServer(s, &dtmServer{})
  
  go func() {
err := s.Serve(lis)
for i := 0; i < int(conf.UpdateBranchAsyncGoroutineNum); i++ {
go updateBranchAsync()
  updateTopicsMap()
go CronUpdateTopicsMap()

err = dtmdriver.GetDriver().RegisterService(conf.MicroService.Target, conf.MicroService.EndPoint)

首先启动一个gin服务器,然后注册一个grpcserver :dtmServer,然后通过一个协程启动grpc服务,然后启动协程,每200ms一次将分支信息同步到存储。然后检查下kv存储里"topics"的值的版本,存储到topicsMap,紧接着启动一个协程任务在后台执行上面的kv存储到内存的更新。最后把我们的server注册到服务发现,一般是通过环境变量控制的

#   Target: 'etcd://localhost:2379/dtmservice' # register dtm server to this url
# EndPoint: 'localhost:36790'

总结下就5件事

1,启动http服务

2,启动grpc服务

3,将分支的更新同步到存储

4,将kc里面存储的topics数据同步到内存。

5,将服务注册到服务发现注册中心。

其中,启动http服务包括三部分

A,为监控注册路由

B,为http服务注册路由

C,为json-rpc注册路由

      1,  其中http路由就是简单的gin路由注册,位于dtmsvr/api_http.go

func addRoute(engine *gin.Engine) {
engine.GET("/api/dtmsvr/newGid", dtmutil.WrapHandler2(newGid))
engine.POST("/api/dtmsvr/prepare", dtmutil.WrapHandler2(prepare))
engine.POST("/api/dtmsvr/abort", dtmutil.WrapHandler2(abort))

提供了常见申请全局事务ID,prepare,abort,commit等事务执行动作。

func prepare(c *gin.Context) interface{} {
return svcPrepare(TransFromContext(c))
}

其中prepare需要获取全局事务的分支事务

dbt := GetTransGlobal(t.Gid)
trans := GetStore().FindTransGlobalStore(gid)

默认这些信息存储在boltdb里面dtmsvr/storage/boltdb/boltdb.go,可以看下获取全局存储的过程

func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStore) {
     trans = tGetGlobal(t, gid)
bs := t.Bucket(bucketGlobal).Get([]byte(gid))

        全局存储的定义如下:

type TransGlobalStore struct {
dtmutil.ModelBase
Gid string `json:"gid,omitempty"`
TransType string `json:"trans_type,omitempty"`
Steps []map[string]string `json:"steps,omitempty" gorm:"-"`
Payloads []string `json:"payloads,omitempty" gorm:"-"`
BinPayloads [][]byte `json:"-" gorm:"-"`
Status string `json:"status,omitempty"`
QueryPrepared string `json:"query_prepared,omitempty"`
Protocol string `json:"protocol,omitempty"`
FinishTime *time.Time `json:"finish_time,omitempty"`
RollbackTime *time.Time `json:"rollback_time,omitempty"`
Result string `json:"result,omitempty"`
RollbackReason string `json:"rollback_reason,omitempty"`
Options string `json:"options,omitempty"`
CustomData string `json:"custom_data,omitempty"`
NextCronInterval int64 `json:"next_cron_interval,omitempty"`
NextCronTime *time.Time `json:"next_cron_time,omitempty"`
Owner string `json:"owner,omitempty"`
Ext TransGlobalExt `json:"-" gorm:"-"`
ExtData string `json:"ext_data,omitempty"` // storage of ext. a db field to store many values. like Options
dtmcli.TransOptions
}

对于json_rpc也类似,就是通过gin的http路由,http协议的内容传输的是json数据,实现位于dtmsvr/api_json_rpc.go

func addJrpcRouter(engine *gin.Engine) {
engine.POST("/api/json-rpc", func(c *gin.Context) {
return handlers[req.Method](req.Params)

真实的路由是定义在一个map里面的

handlers := map[string]jrpcFunc{
"newGid": jrpcNewGid,
"prepare": jrpcPrepare,
"submit": jrpcSubmit,
"abort": jrpcAbort,
"registerBranch": jrpcRegisterBranch,
}

比如其中的获取全局事务id最终实现,和http协议是一样的

func jrpcNewGid(interface{}) interface{} {
return map[string]interface{}{"gid": GenGid()}
}

       2, 然后我们看下grpc的实现dtmsvr/api_grpc.go

func (s *dtmServer) Abort(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) {
r := svcAbort(TransFromDtmRequest(ctx, in))

中断事务的执行过程是通过全局事务id获取事务的元数据,修改全局事务的状态为中断,然后通过事务id获取所有的分支事务 ,最后处理分支事务:

func svcAbort(t *TransGlobal) interface{} {
  dbt := GetTransGlobal(t.Gid)
  dbt.changeStatus(dtmcli.StatusAborting, withRollbackReason(t.RollbackReason))
branches := GetStore().FindBranches(t.Gid)
return dbt.Process(branches)

        处理过程位于dtmsvr/trans_process.go

func (t *TransGlobal) process(branches []TransBranch) error {
rerr = t.getProcessor().ProcessOnce(branches)

    每一种分布式事务模型的处理逻辑都不一样,以saga为例dtmsvr/trans_type_saga.go

func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
    t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))

它修改了boltdb里面的状态,dtmsvr/storage/boltdb/boltdb.go

func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
     if finished {
tDelIndex(t, g.NextCronTime.Unix(), g.Gid)
      tPutGlobal(t, global)
     err := t.Bucket(bucketGlobal).Put([]byte(global.Gid), bs)

如果状态是完成状态,会删除相应的记录。

        3,然后,我们看下第二部分,同步分支状态到存储,dtmsvr/svr.go

k := updateBranch.gid + updateBranch.branchID + "-" + updateBranch.op
rowAffected, err := GetStore().UpdateBranches(updates, []string{"status", "finish_time", "update_time"})
for { // flush branches every 200ms
flushBranchs()
}

对于存储过的状态不用再次操作,所以本地用了一个k来去重,它是通过全局事务id,分支id以及分支的处理动作名字来做的唯一键。以存储介质为mysql,动作为创建分支的操作来说,它的代码实现位于

dtmsvr/storage/sql/sql.go

db := dbGet().Clauses(clause.OnConflict{
OnConstraint: "gid_branch_uniq",
DoUpdates: clause.AssignmentColumns(updates),
}).Create(branches)

4,第三部分加载topics到内存的操作流程如下: dtmsvr/cron.go

cronUpdateTopicsMapOnce()

dtmsvr/topics.go

func updateTopicsMap() {
kvs := GetStore().FindKV(topicsCat, "")
topicsMap[kv.K] = newTopic
func topic2urls(topic string) []string {
for k, subscriber := range topicsMap[topic].Subscribers {
urls[k] = subscriber.URL

事务模型是一致性消息的时候dtmsvr/trans_type_msg.go

func (t *transMsgProcessor) GenBranches() []TransBranch {
for i, step := range t.Steps {
urls := dtmimp.If(mayTopic == step[dtmimp.OpAction], []string{mayTopic}, topic2urls(mayTopic)).([]string)
for j, url := range urls {
b := TransBranch{

通过将消息中的url组装成分支事务信息的。

      5,最后一步服务发现的注册的实现单独提供了一个包。

github.com/dtm-labs/dtmdriver@v0.0.6/driver-mgr.go