前言
RPC(Remote Procedure Call),翻译过来为“远程过程调用”,是一种分布式系统中服务或节点之间的有效通信机制。通过 RPC,某个节点(或客户端)可以很轻松的调用远端(或服务端)的方法或服务,就像在本地调用一样简单。现有的很多 RPC 框架都要求暴露服务端地址,也就是需要知道服务器的 IP 和 RPC 端口。而本篇文章将介绍一种不需要暴露 IP 地址和端口的 RPC 通信方式。这种方式是基于 Redis BRPOP/BLPOP 操作实现的延迟队列,以及 Golang 中的 goroutine 协程异步机制,整个框架非常简单和易于理解,同时也很高效、稳定和安全。这种方式已经应用到了 Crawlab[1] 中的节点通信当中,成为了各节点即时传输信息的主要方式。下面我们将从 Crawlab 早期节点通信方案 PubSub 开始,介绍当时遇到的问题和解决方案,然后如何过渡到现在的 RPC 解决方案,以及它是如何在 Crawlab 中发挥作用的。
基于 PubSub 的方案
PubSub
早期的 Crawlab 是基于 Redis 的 PubSub,也就是发布订阅模式。这是 Redis 中主要用于一对多的单向通信的方案。其用法非常简单:
SUBSCRIBE channel1 channel2 ...PUBLISH channelx message
PubSubnodes:nodes:master>
以下为节点通信原理示意图。
PubSub
PubSub
通信架构
nodes:masternodes:masternodes:node_id
一个网络请求的简单过程如下:
PubSub
不是所有节点通信都是双向的,也就是说,主节点只会单方面对工作节点通信,工作节点并不会返回响应给主节点,所谓的单向通信。以下是Crawlab的通信类型。
changoroutine
chan
chanchanchan ),该阻塞才会释放,协程进行下一步操作)。在请求响应模式中,如果为双向通信,主节点收到请求后会起生成一个无缓冲通道来阻塞该请求,当收到来自工作节点的消息后,向该无缓冲通道赋值,阻塞释放,返回响应给客户端。
gogoroutinechan
基于延迟队列的方案
PubSub 方案的问题
PubSub 这种消息订阅-发布设计模式是一种有效的实现节点通信的方式,但是它有两个问题:
goroutinechannel
其中,第二个问题是比较棘手的。如果我们希望加入更多的功能,需要写大量的异步代码,这会加大系统模块间的耦合度,造成扩展性很差,而且代码阅读起来很痛苦。
因此,为了解决这个问题,我们采用了基于 Redis 延迟消息队列的 RPC 服务。
延迟队列架构
下图是基于延迟队列架构的 RPC 实现示意图。
每一个节点都有一个客户端(Client)和服务端(Server)。客户端用于发送消息到目标节点(Target Node)并接收其返回的消息,服务端用于接收、处理源节点(Source Node)的消息并返回消息给源节点的客户端。
整个 RPC 通信的流程如下:
LPUSHnodes:BRPOP nodes::BRPOPnodes:MethodLPUSHnodes::nodes::
这样,整个节点的通信流程就通过 Redis 完成了。这样做的好处在于不用暴露 HTTP 的 IP 地址和端口,只需要知道节点 ID 即可完成 RPC 通信。
rpc.ServiceClientHandleServerHandle
BRPOPBRPOP
如果对 Redis 的操作命令不熟悉的,可以参考一下掘金小册《Redis 深度历险:核心原理与应用实践》[2],这本小册深入介绍了 Redis 的原理以及工程实践,对于应用 Redis 到实际开发中非常实用。
代码实践
讲了这么多理论知识,我们还是需要看看代码的。老师常教育我们:“Talk is cheap. Show me the code.”
由于 Crawlab 后端是 Golang 开发的,要理解以下代码需要一些 Golang 的基础知识。
消息数据结构
首先我们需要定一个传输消息的数据结构。代码如下。
package entity
type RpcMessage struct {
Id string `json:"id"` // 消息ID
Method string `json:"method"` // 消息方法
NodeId string `json:"node_id"` // 节点ID
Params map[string]string `json:"params"` // 参数
Timeout int `json:"timeout"` // 超时
Result string `json:"result"` // 结果
Error string `json:"error"` // 错误
}
这里,我们定义了消息 ID、方法、节点 ID、参数等字段。消息 ID 是 UUID,保证了消息 ID 的唯一性。
基础接口
ServerHandleentityRpcMessageClientHandle
// RPC服务基础类
type Service interface {
ServerHandle() (entity.RpcMessage, error)
ClientHandle() (interface{}, error)
}
客户端通用方法
当我们调用客户端的通用方法的时候,需要实现两个逻辑:
- 发送消息:生成消息 ID,将消息序列化为 JSON,LPUSH 推入 Redis 消息队列;
- 通过 BRPOP 延迟获取返回的消息,返回给调用方。
以下是实现的代码。
// 客户端处理消息函数
func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) {
return func() (replyMsg entity.RpcMessage, err error) {
// 请求ID
msg.Id = uuid.NewV4().String()
// 发送RPC消息
msgStr := utils.ObjectToString(msg)
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", msg.NodeId), msgStr); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 获取RPC回复消息
dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s", msg.NodeId, msg.Id), msg.Timeout)
if err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 反序列化消息
if err := json.Unmarshal([]byte(dataStr), &replyMsg); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 如果返回消息有错误,返回错误
if replyMsg.Error != "" {
return replyMsg, errors.New(replyMsg.Error)
}
return
}
}
服务端处理
服务端处理的逻辑如下,大致的逻辑是:
- 在一个循环中,通过 BRPOP 获取该节点对应的消息;
- 当获取到消息时,生成一个 goroutine 异步处理该消息;
- 继续等待。
InitRpcServicehandleMsgGetService
// 获取RPC服务
func GetService(msg entity.RpcMessage) Service {
switch msg.Method {
case constants.RpcInstallLang:
return &InstallLangService{msg: msg}
case constants.RpcInstallDep:
return &InstallDepService{msg: msg}
case constants.RpcUninstallDep:
return &UninstallDepService{msg: msg}
case constants.RpcGetLang:
return &GetLangService{msg: msg}
case constants.RpcGetInstalledDepList:
return &GetInstalledDepsService{msg: msg}
}
return nil
}
// 处理RPC消息
func handleMsg(msgStr string, node model.Node) {
// 反序列化消息
var msg entity.RpcMessage
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
// 获取service
service := GetService(msg)
// 根据Method调用本地方法
replyMsg, err := service.ServerHandle()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
// 发送返回消息
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
}
// 初始化服务端RPC服务
func InitRpcService() error {
go func() {
for {
// 获取当前节点
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 获取获取消息队列信息
msgStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
if err != nil {
if err != redis.ErrNil {
log.Errorf(err.Error())
debug.PrintStack()
}
continue
}
// 处理消息
go handleMsg(msgStr, node)
}
}()
return nil
}
调用方法例子
Crawlab 的节点上经常需要为爬虫安装一些第三方依赖,例如 pymongo、requests 等。而其中,我们也需要直到某个节点上是否已经安装了某个依赖,这需要跨服务器通信,也就是需要在分布式网络中进行双向通信。而这个逻辑是通过 RPC 实现的。主节点向目标节点发起 RPC 调用,目标节点运行被调用方法,将运行结果也就是安装的依赖列表返回给客户端,客户端再返回给调用者。
下面的代码实现了获取目标节点上已安装的依赖的 RPC 方法。
// 获取已安装依赖服务
// 继承Service基础类
type GetInstalledDepsService struct {
msg entity.RpcMessage
}
// 服务端处理方法
// 重载ServerHandle
func (s *GetInstalledDepsService) ServerHandle() (entity.RpcMessage, error) {
lang := utils.GetRpcParam("lang", s.msg.Params)
deps, err := GetInstalledDepsLocal(lang)
if err != nil {
s.msg.Error = err.Error()
return s.msg, err
}
resultStr, _ := json.Marshal(deps)
s.msg.Result = string(resultStr)
return s.msg, nil
}
// 客户端处理方法
// 重载ClientHandle
func (s *GetInstalledDepsService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
s.msg, err = ClientFunc(s.msg)()
if err != nil {
return o, err
}
// 反序列化
var output []entity.Dependency
if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
return o, err
}
o = output
return
}
发起调用
GetServiceGetInstalledDepsServiceClientHandle
// 获取远端已安装依赖
func GetInstalledDepsRemote(nodeId string, lang string) (deps []entity.Dependency, err error) {
params := make(map[string]string)
params["lang"] = lang
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcGetInstalledDepList,
Params: params,
Timeout: 60,
})
o, err := s.ClientHandle()
if err != nil {
return
}
deps = o.([]entity.Dependency)
return
}
结语
本篇文章主要介绍了一种基于 Redis 延迟队列的 RPC 通信方式,这种方式不用暴露各个节点或服务的 IP 地址或端口,是一种非常安全的方式。而且,这种方式已经用 Golang 在 Crawlab[3] 中实现了双向通信,特别是 Golang 中的天生支持异步的 goroutine,让这种方式的实现变得简单。实际上,这种方式理论上是非常高效的,能够支持高并发数据传输。
但是,在 Crawlab[4] 的实现中还存在一些隐患,也就是它并没有限制服务端的处理并发数量。因此如果传输消息过多时,服务端资源会被占满,导致处理速度变慢甚至宕机的风险。修复方式是在服务端限制并发数量。另外,限于时间的原因,作者还没有来得及测试这种 RPC 通信方式的实际传输效率,容错机制也没有加入。因此总的来说还有很大的提升和优化空间。
虽然如此,这种方式对于 Crawlab[5] 的低并发远程通信来说是足够的了,在实际使用中也没有出现问题,非常稳定。对于隐秘性有要求、希望不暴露地址信息的开发者,我们也推荐将该种方式在实际应用中尝试。
参考
- Crawlab 主页[6]
- Crawlab Demo[7]
- Crawlab 文档[8]
- 通过 Redis 实现 RPC 远程方法调用[9]
参考资料
[1]Crawlab: https://github.com/crawlab-team/crawlab
[2]《Redis 深度历险:核心原理与应用实践》: https://juejin.im/book/5afc2e5f6fb9a07a9b362527
[3]Crawlab: https://github.com/crawlab-team/crawlab
[4]Crawlab: https://github.com/crawlab-team/crawlab
[5]Crawlab: https://github.com/crawlab-team/crawlab
[6]Crawlab 主页: https://github.com/crawlab-team/crawlab
[7]Crawlab Demo: https://crawlab.cn/demo
[8]Crawlab 文档: http://docs.crawlab.cn/
[9]通过 Redis 实现 RPC 远程方法调用: https://www.oschina.net/translate/rpc-using-redis