前言
Seata 是一款简单易用,高性能、开源的一站式分布式事务解决方案,由阿里开源。
2020 年 4 月,刘晓敏开始基于 Seata 着手做多语言 golang 项目,经过一年时间的开发,seata-golang 发布了 1.0.0 版本。
Seata架构
● TC:Transaction coordinator,它是一个分布式事务协调器。
● TM:Transaction manager,它是一个事务管理器,负责全局事务的开启、提交和回滚。
● RM:Resource Manager,它是管理分支事务资源的,它加入全局事务组后,向 TC 报告分支事务的执行状态。
● XID:TM 开启全局事务时,会在 TC 创建一个 GlobalSession,GlobalSession 的全局唯一标识即为 XID。
● BranchID:RM 向 TC 注册分支事务后,在 TC 侧生成一个 BranchSession,BranchID 全局唯一标识这个 BranchSession。
当 RM 向 TC 报告分支执行失败时,TC 会标记这个 BranchSession 的状态为失败,然后 TM 发起回滚时,TC 根据 XID 找到所有成功执行的事务分支,通知他们进行回滚。
下面具体是在下单购买业务上的一个完整流程
用户向聚合层发起下单请求
GlobalSessionBranchSessionBranchSessionBranchIdBranchReportRequestGlobalRollbackRequest
把上述流程中涉及到的seata方法抽象出来:
这里可以先忽略SAGA模式,seata-golang中没有去实现
Seata Golang 支持的模式
Seata-Golang支持AT和TCC两种模式。
概括地说:
-
AT依赖于本地数据库事务特性,包括回滚、提交等
-
TCC模式依赖于用户自定义实现对事务的回滚和提交,抽象出三个方法:Try、Confirm、Cancel,这也是为什么叫TCC
AT模式
AT模式的回滚依赖对执行操作的补偿,seata-golang基于TIDB的Parser来辨别用户做了什么操作,构造成Undo Log 之后针对这些操作进行回滚补偿。
TCC模式
依赖开发者实现seata的TCC接口,即自定义Try、Confirm、Cancel的方法
对开发者对约束较多
Seata源码浅析
1.TC 全局事务协调器
type TransactionCoordinator struct {
sync.Mutex
maxCommitRetryTimeout int64
maxRollbackRetryTimeout int64
rollbackDeadSeconds int64
rollbackRetryTimeoutUnlockEnable bool
asyncCommittingRetryPeriod time.Duration
committingRetryPeriod time.Duration
rollingBackRetryPeriod time.Duration
timeoutRetryPeriod time.Duration
streamMessageTimeout time.Duration
holder holder.SessionHolderInterface
resourceDataLocker lock.LockManagerInterface
locker GlobalSessionLocker
idGenerator *atomic.Uint64
futures *sync.Map
activeApplications *sync.Map
callBackMessages *sync.Map
}
Begin()方法开启全局事务
func (tc *TransactionCoordinator) Begin(ctx context.Context, request *apis.GlobalBeginRequest) (*apis.GlobalBeginResponse, error) {
// 生成XID
transactionID := uuid.NextID()
xid := common.GenerateXID(request.Addressing, transactionID)
// 全局事务
gt := model.GlobalTransaction{
// 全局会话
GlobalSession: &apis.GlobalSession{
Addressing: request.Addressing,
XID: xid,
TransactionID: transactionID,
TransactionName: request.TransactionName,
Timeout: request.Timeout,
},
}
// 开启全局事务,实际上是修改了全局事务的状态
gt.Begin()
// 把全局事务加到holder中
err := tc.holder.AddGlobalSession(gt.GlobalSession)
if err != nil {
return &apis.GlobalBeginResponse{
ResultCode: apis.ResultCodeFailed,
ExceptionCode: apis.BeginFailed,
Message: err.Error(),
}, nil
}
// 启动协程,接收全局事务事件
runtime.GoWithRecover(func() {
evt := event.NewGlobalTransactionEvent(gt.TransactionID, event.RoleTC, gt.TransactionName, gt.BeginTime, 0, gt.Status)
event.EventBus.GlobalTransactionEventChannel <- evt
}, nil)
log.Infof("successfully begin global transaction xid = {}", gt.XID)
return &apis.GlobalBeginResponse{
ResultCode: apis.ResultCodeSuccess,
XID: xid,
}, nil
}
TC 全局事务协调器有三大概念
1.1 LockManager
AT模式下,将分支事务上的数据锁到TC上,保证事务数据的隔离
1.2 SessionManager
管理BranchSession,记录事务执行状态、持久化事务数据
目前有两种实现方式:
- FileBasedSessionManager:事务数据保存到内存,并持久化到本地文件
- DBBasedSessionManager:事务数据持久化到数据库中,采用这种方式,TC可做到高可用
后续可能会考虑ETCD或者对TC做Raft,是后续社区会考虑到议题。
1.3 Transaction Manager
即TM,事务管理器,下面会介绍
2.TM 事务管理器
根据前面的描述,TM负责全局的一些操作,包括请求TC开启全局事务、全局提交、全局回滚等操作
type TransactionManagerInterface interface {
// GlobalStatus_Begin a new global transaction.
Begin(ctx context.Context, name string, timeout int32) (string, error)
// Global commit.
Commit(ctx context.Context, xid string) (apis.GlobalSession_GlobalStatus, error)
// Global rollback.
Rollback(ctx context.Context, xid string) (apis.GlobalSession_GlobalStatus, error)
// Get current status of the give transaction.
GetStatus(ctx context.Context, xid string) (apis.GlobalSession_GlobalStatus, error)
// Global report.
GlobalReport(ctx context.Context, xid string, globalStatus apis.GlobalSession_GlobalStatus) (apis.GlobalSession_GlobalStatus, error)
}
值得注意的是,TransactionManagerInterface中的方法,其实是通知TC去做对应操作
以Rollback方法为例,具体实现就是请求TC进行全局事务的回滚,TC会通知各个RM进行本地分支事务的回滚。
func (manager *TransactionManager) Rollback(ctx context.Context, xid string) (apis.GlobalSession_GlobalStatus, error) {
// RPC请求TC,让其进行全局事务回滚
request := &apis.GlobalRollbackRequest{XID: xid}
resp, err := manager.rpcClient.Rollback(ctx, request)
if err != nil {
return 0, err
}
if resp.ResultCode == apis.ResultCodeSuccess {
return resp.GlobalStatus, nil
}
return 0, &exception.TransactionException{
Code: resp.GetExceptionCode(),
Message: resp.GetMessage(),
}
}
3.RM 资源管理器
RM主要负责本地分支事务的管理
type ResourceManagerInterface interface {
BranchCommit(ctx context.Context, request *apis.BranchCommitRequest) (*apis.BranchCommitResponse, error)
BranchRollback(ctx context.Context, request *apis.BranchRollbackRequest) (*apis.BranchRollbackResponse, error)
// RegisterResource Register a Resource to be managed by Resource Manager.
RegisterResource(resource model.Resource)
// UnregisterResource Unregister a Resource from the Resource Manager.
UnregisterResource(resource model.Resource)
// GetBranchType ...
GetBranchType() apis.BranchSession_BranchType
}
除此之外,RM还负责在全局事务启动的时候,向TC注册事务分支,所以seata-golang抽取出来三个通知TC的接口
type ResourceManagerOutbound interface {
// BranchRegister 向TC注册事务分支
BranchRegister(ctx context.Context, xid string, resourceID string, branchType apis.BranchSession_BranchType,
applicationData []byte, lockKeys string) (int64, error)
// BranchReport 报告分支事务的执行状态
BranchReport(ctx context.Context, xid string, branchID int64, branchType apis.BranchSession_BranchType,
status apis.BranchSession_BranchStatus, applicationData []byte) error
// LockQuery lock resource by lockKeys.
LockQuery(ctx context.Context, xid string, resourceID string, branchType apis.BranchSession_BranchType, lockKeys string) (bool, error)
}