要做一个 golang 版的分布式事务框架,首先需要解决的一个问题就是如何实现 RPC 通信。dubbo-go 就是摆在眼前很好的一个例子,遂开始研究 dubbo-go 的底层 getty。
如何基于 getty 实现 RPC 通信
getty 框架的整体模型图如下:
下面结合相关代码,详述 seata-golang 的 RPC 通信过程。
1. 建立连接
实现 RPC 通信,首先要建立网络连接,这里先从client.go 开始看起。
connect方法通过 dial方法得到了一个 session 连接,进入 dial方法:
我们关注的是 TCP 连接,所以继续进入 c.dialTCP方法:
至此,我们知道了 getty 如何建立 TCP 连接,并返回 TCPSession。
2. 收发报文
那它是怎么收发报文的呢,我们回到 connection 方法接着往下看,有这样一行 ss.(*session).run,在这行代码之后,代码都是很简单的操作,我们猜测这行代码运行的逻辑里面一定包含收发报文的逻辑,接着进入 run方法:
这里起了两个 goroutine: handleLoop和 handlePackage,看字面意思符合我们的猜想,进入 handleLoop方法:
通过上面的代码,我们不难发现, handleLoop方法处理的是发送报文的逻辑,RPC 需要发送的消息首先由 s.writer编码成二进制比特,然后通过建立的 TCP 连接发送出去。这个 s.writer对应的 Writer 接口是 RPC 框架必须要实现的一个接口。
继续看 handlePackage方法:
进入 handleTCPPackage方法:
从上面的代码逻辑我们分析出,RPC 消费端需要将从 TCP 连接收到的二进制比特报文解码成 RPC 能消费的消息,这个工作由 s.reader 实现,所以,我们要构建 RPC 通信层也需要实现 s.reader 对应的 Reader 接口。
3. 底层处理网络报文的逻辑如何与业务逻辑解耦
我们都知道,netty 通过 boss 线程和 worker 线程实现了底层网络逻辑和业务逻辑的解耦。那么,getty 是如何实现的呢?
在handlePackage方法最后,我们看到,收到的消息被放入了 s.addTask(pkg)这个方法,接着往下分析:
pkg参数传递到了一个匿名方法,这个方法最终放入了 taskPool。这个方法很关键,在我后来写 seata-golang 代码的时候,就遇到了一个坑,这个坑后面分析。
接着我们看一下taskPool 的定义:
构建了一个缓冲大小为 size (默认为 runtime.NumCPU * 100) 的 channel sem。再看方法 AddTaskAlways(t task):
select {case p.work
加入的任务,会先由 len(p.sem) 个 goroutine 去消费,如果没有 goroutine 空闲,则会启动一个临时的 goroutine 去运行 t。相当于有 len(p.sem) 个 goroutine 组成了 goroutine pool,pool 中的 goroutine 去处理业务逻辑,而不是由处理网络报文的 goroutine 去运行业务逻辑,从而实现了解耦。写 seata-golang 时遇到的一个坑,就是忘记设置 taskPool 造成了处理业务逻辑和处理底层网络报文逻辑的 goroutine 是同一个,我在业务逻辑中阻塞等待一个任务完成时,阻塞了整个 goroutine,使得阻塞期间收不到任何报文。
4. 具体实现
下面的代码见getty.go :
通过对整个 getty 代码的分析,我们只要实现 ReadWriter来对 RPC 消息编解码,再实现 EventListener来处理 RPC 消息的对应的具体逻辑,将 ReadWriter实现和 EventLister实现注入到 RPC 的 Client 和 Server 端,则可实现 RPC 通信。
1)编解码协议实现
下面是 seata 协议的定义:
在 ReadWriter 接口的实现RpcPackageHandler 中,调用 Codec 方法对消息体按照上面的格式编解码:
2)Client 端实现
再来看 client 端EventListener的实现RpcRemotingClient :
3)Server 端 Transaction Coordinator 实现
代码见DefaultCoordinator :
coordinator.OnRegTmMessage(rpcMessage, session)注册 Transaction Manager, coordinator.OnRegRmMessage(rpcMessage, session)注册 Resource Manager。具体逻辑分析见下文。
消息进入coordinator.OnTrxMessage(rpcMessage, session)方法,将按照消息的类型码路由到具体的逻辑当中:
4)session manager 分析
Client 端同 Transaction Coordinator 建立连接起连接后,通过 clientSessionManager.RegisterGettySession(session)将连接保存在 serverSessions = sync.Map{}这个 map 中。map 的 key 为从 session 中获取的 RemoteAddress 即 Transaction Coordinator 的地址,value 为 session。这样,Client 端就可以通过 map 中的一个 session 来向 Transaction Coordinator 注册 Transaction Manager 和 Resource Manager 了。具体代码见getty_client_session_manager.go 。
Transaction Manager 和 Resource Manager 注册到 Transaction Coordinator 后,一个连接既有可能用来发送 TM 消息也有可能用来发送 RM 消息。我们通过 RpcContext 来标识一个连接信息:
当收到事务消息时,我们需要构造这样一个 RpcContext 供后续事务处理逻辑使用。所以,我们会构造下列 map 来缓存映射关系:
这样,Transaction Manager 和 Resource Manager 分别通过coordinator.OnRegTmMessage(rpcMessage, session)和 coordinator.OnRegRmMessage(rpcMessage, session)注册到 Transaction Coordinator 时,会在上述 client_sessions map 中缓存 applicationId、ip、port 与 session 的关系,在client_resources map 中缓存 applicationId 与 resourceIds(一个应用可能存在多个 Resource Manager) 的关系。
在需要时,我们就可以通过上述映射关系构造一个 RpcContext。这部分的实现和 java 版 seata 有很大的不同,感兴趣的可以深入了解一下。具体代码见getty_session_manager.go 。
至此,我们就分析完了seata-golang 整个 RPC 通信模型的机制。
seata-golang 的未来
seata-golang 从今年 4 月份开始开发,到 8 月份基本实现和 java 版seata 1.2 协议的互通,对 mysql 数据库实现了 AT 模式(自动协调分布式事务的提交回滚),实现了 TCC 模式,TC 端使用 mysql 存储数据,使 TC 变成一个无状态应用支持高可用部署。下图展示了 AT 模式的原理:
参考资料
seata 官方 : https://seata.io
java 版 seata : https://github.com/seata/seata
seata-golang 项目地址 : https://github.com/opentrx/seata-golang