RPC(Remote Procedure Call),主要是帮助我们屏蔽网络编程细节 ,是我们更专注于业务逻辑,实现调用远程方法就像调用本地方法一样。
RPC通信过程
RPC通信过程如下图所示:
由服务提供者给出业务接口声明,在调用方的程序里面,RPC 框架根据调用的服务接口提前生成动态代理实现类,并通过依赖注入等技术注入到声明了该接口的相关业务逻辑里面。该代理实现类会拦截所有的方法调用,在提供的方法处理逻辑里面完成一整套的远程调用,并把远程调用结果返回给调用方,这样调用方在调用远程方法的时候就获得了像调用本地接口一样的体验。
RPC设计组成
以下是对RPC的四种角色的解释和说明:
-
客户端(Client): 服务调用发起方,也称为服务消费者。
-
客户端存根(Client Stub): 该程序运行在客户端所在的计算机机器上,主要用来存储要调用的服务器的地址,另外,该程序还负责将客户端请求远端服务器程序的数据信息打包成数据包,通过网络发送给服务端Stub程序;其次,还要接收服务端Stub程序发送的调用结果数据包,并解析返回给客户端。
-
服务端(Server): 远端的计算机机器上运行的程序,其中有客户端要调用的方法。
-
服务端存根(Server Stub): 接收客户Stub程序通过网络发送的请求消息数据包,并调用服务端中真正的程序功能方法,完成功能调用;其次,将服务端执行调用的结果进行数据处理打包发送给客户端Stub程序。
RPC原理和调用步骤
实际上,如果我们想要在网络中的任意两台计算机上实现远程调用过程,要解决很多问题,比如:
-
两台物理机器在网络中要建立稳定可靠的通信连接。
-
两台服务器的通信协议的定义问题,即两台服务器上的程序如何识别对方的请求和返回结果。也就是说两台计算机必须都能够识别对方发来的信息,并且能够识别出其中的请求含义和返回含义,然后才能进行处理。这其实就是通信协议所要完成的工作。
我们来看看RPC具体是如何解决这些问题的,RPC具体的调用步骤图如下:
在上述图中,通过1-10的步骤图解的形式,说明了RPC每一步的调用过程。具体描述为:
-
1、客户端想要发起一个远程过程调用,首先通过调用本地客户端Stub程序的方式调用想要使用的功能方法名;
-
2、客户端Stub程序接收到了客户端的功能调用请求,将客户端请求调用的方法名,携带的参数等信息做序列化操作,并打包成数据包。
-
3、客户端Stub查找到远程服务器程序的IP地址,调用Socket通信协议,通过网络发送给服务端。
-
4、服务端Stub程序接收到客户端发送的数据包信息,并通过约定好的协议将数据进行反序列化,得到请求的方法名和请求参数等信息。
-
5、服务端Stub程序准备相关数据,调用本地Server对应的功能方法进行,并传入相应的参数,进行业务处理。
-
6、服务端程序根据已有业务逻辑执行调用过程,待业务执行结束,将执行结果返回给服务端Stub程序。
-
7、服务端Stub程序将程序调用结果按照约定的协议进行序列化, 并通过网络发送回客户端Stub程序。
-
8、客户端Stub程序接收到服务端Stub发送的返回数据,对数据进行反序列化操作, 并将调用返回的数据传递给客户端请求发起者。
-
9、客户端请求发起者得到调用结果,整个RPC调用过程结束。
在对RPC进行简单介绍之后,我们先看一个简单的RPC服务注册和调用的demo,之后会分别从server和client的核心代码出发,深入到底层去理解RPC库的具体实现,并在学习的过程中思考实现一个RPC协议所需要做的工作以及如何基于RPC基础库进行进一步开发。
server.go
服务端代码如下:
package main
import (
"log"
"net"
"net/rpc"
)
func main() {
rpc.RegisterName("HelloService", new(HelloService))
listener, err := net.Listen("tcp", ":8888")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
rpc.Accept(listener)
}
type HelloService struct{}
func (p *HelloService) Hello(request string, reply *string) error {
*reply = "hello:" + request
log.Println("got req", request)
return nil
}
复制代码
client.go
调用端代码如下:
package main
import (
"fmt"
"log"
"net/rpc"
)
func main() {
client, err := rpc.Dial("tcp", "localhost:8888")
if err != nil {
log.Fatal("dialing:", err)
}
var reply string
err = client.Call("HelloService.Hello", "RPC", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
}
复制代码
启动server.go 之后启动client.go ,RPC调用成功,控制台分别打印:
// server
got req RPC
// client
hello:RPC
复制代码
接下来我们分别从服务端和客户端两个角度对其原理进行解析。
服务端在代码中,可以看到服务端方法主要包括RegisterName、Listen和Accept, 我将服务端主要的工作流程分为三个步骤:
-
服务方法注册
-
监听和参数处理
-
调用 RPC 方法
服务方法注册
首先我们来看注册过程,注册过程中主要调用的方法是server.register方法,其实主要需要注册的就是服务对象和服务方法,接下来通过代码来看是如何为实现的:
相关结构体:
type service struct {// 表示服务的结构体,用于注册服务对象
name string // 服务名
rcvr reflect.Value // 注册服务的结构体实例
typ reflect.Type // 注册服务的结构体类型
method map[string]*methodType // 方法名与方法的映射列表
}
type methodType struct {// 表示方法的结构体,用于注册服务方法
sync.Mutex
method reflect.Method // 方法名
ArgType reflect.Type // 参数类型
ReplyType reflect.Type // 返回值类型
numCalls uint // 调用次数
}
复制代码
注册主要流程:
func (server *Server) RegisterName(name string, rcvr interface{}) error {
return server.register(rcvr, name, true)
}
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
// 1\. 服务相关字段的注册
s := new(service)
s.typ = reflect.TypeOf(rcvr) // 获取实例类型
s.rcvr = reflect.ValueOf(rcvr) // 获取实例本身
// 当Type为指针时Name()返回空字符串,所以要先通过Indirect取指针的值
sname := reflect.Indirect(s.rcvr).Type().Name() // 取结构体类型名
// 是否指定服务名的处理:useName==true默认使用参数中的name
if useName {
sname = name
}
if sname == "" {
s := "rpc.Register: no service name for type " + s.typ.String()
log.Print(s)
return errors.New(s)
}
// IsExported:判断字符串首字符是否为大写字母
if !token.IsExported(sname) && !useName {
s := "rpc.Register: type " + sname + " is not exported"
log.Print(s)
return errors.New(s)
}
s.name = sname
// 2\. 注册RPC方法
s.method = suitableMethods(s.typ, true)
if len(s.method) == 0 {
str := ""
// reflect.PtrTo(s.typ)获取s.typ的指针类型 提醒用户修改注册服务的代码
method := suitableMethods(reflect.PtrTo(s.typ), false)
if len(method) != 0 {// 提醒使用方注册服务时传递结构体实例指针
str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
} else {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
}
log.Print(str)
return errors.New(str)
}
// 存入一个sync.map
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
return errors.New("rpc: service already defined: " + sname)
}
return nil
}
复制代码
这个方法的主要逻辑就是将传入的方法名和结构体进行校验解析之后整理成结构体service存入一个map(serviceMap) 中
其中方法的注册主要通过suitableMethods函数处理,主要就是通过大量的反射方法对注册方法的参数、返回值等进行合法性校验:
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m) // 获取实例类型对应的方法
mtype := method.Type // 获取方法类型
mname := method.Name // 获取方法名
// 通过PkgPath判断方法是否可导出
if method.PkgPath != "" {
continue
}
// 一个方法必须有且仅有三个参数:结构体实例、rpc参数、rpc响应值
if mtype.NumIn() != 3 { // NumIn()获取参数个数
if reportErr {
log.Printf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
}
continue
}
// 下标从0开始,第一个参数是实例本身
// 对于Object.Method(req, resp),第一个参数是Object
argType := mtype.In(1) // 参数类型
if !isExportedOrBuiltinType(argType) {// rpc参数必须为可导出或者内置类型
if reportErr {
log.Printf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
}
continue
}
replyType := mtype.In(2) // resp必须为指针
if replyType.Kind() != reflect.Ptr {
if reportErr {
log.Printf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
}
continue
}
// 响应值必须为可导出或者内置类型
if !isExportedOrBuiltinType(replyType) {
if reportErr {
log.Printf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
}
continue
}
if mtype.NumOut() != 1 { // 方法返回值个数
if reportErr {
log.Printf("rpc.Register: method %q has %d output parameters; needs exactly one\n", mname, mtype.NumOut())
}
continue
}
// 方法返回值必须为error
if returnType := mtype.Out(0); returnType != typeOfError {
if reportErr {
log.Printf("rpc.Register: return type of method %q is %q, must be error\n", mname, returnType)
}
continue
}
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
}
return methods
}
复制代码
在方法注册完成后,后面服务端接收到客户端的请求之后,会通过读取map(serviceMap) ,来实现方法的调用和处理,详情可参考后面解析。
监听和参数处理
介绍完注册过程,重点介绍Accept方法即服务端接收到客户端之后请求的处理过程。代码如下:
func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Print("rpc.Serve: accept:", err.Error())
return
}
go server.ServeConn(conn)
}
}
复制代码
Accept方法主要是一个不断接受新的请求连接的for循环 ,一旦监听器接收了一个连接,之后为每个连接开一个go协程调用ServerConn 进行处理。
接下来我们看下ServerConn方法的主要逻辑,代码如下:
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
server.ServeCodec(srv)
}
复制代码
ServerConn 方法也很简单,首先构建一个Codec结构体,去处理RPC协议,参数包括连接、序列化反序列化方法以及Writer,标准库的默认序列化方式为GobServeCodec。
golang官方还提供了net/ rpc /jsonrpc库实现RPC方法,JSON RPC采用JSON进行数据编解码,因而支持跨语言调用。有兴趣的同学可以自行查看。
在这里我们可以看到server.ServeCodec方法的参数为一个接口, 如果你要自己实现一个rpc协议的话,只需要实现ServerCodec接口对应的的方法就可以进行个性化开发了。
后面代码解读中会发现,处理过程中主要调用的就是这个接口的方法。
接口如下:
type ServerCodec interface {
ReadRequestHeader(*Request) error
ReadRequestBody(interface{}) error
WriteResponse(*Response, interface{}) error
// Close can be called multiple times and must be idempotent.
Close() error
}
默认序列化方式GobServeCodec的方法实现如下:
// 读取请求头信息,将c.dec中的数据写入到r
func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
return c.dec.Decode(r)
}
func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
return c.dec.Decode(body)
}
// 依次编码响应头和响应内容
func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) {
// 序列化响应头
if err = c.enc.Encode(r); err != nil {
if c.encBuf.Flush() == nil {
log.Println("rpc: gob error encoding response:", err)
c.Close()
}
return
}
// 序列化响应体
if err = c.enc.Encode(body); err != nil {
if c.encBuf.Flush() == nil {
log.Println("rpc: gob error encoding body:", err)
c.Close()
}
return
}
// 将buffer中的数据写入writer
return c.encBuf.Flush()
}
func (c *gobServerCodec) Close() error {
if c.closed {
return nil
}
c.closed = true
return c.rwc.Close()
}
接下来我们来看ServeCodec方法:
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
// 解析请求信息
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if debugLog && err != io.EOF {
log.Println("rpc:", err)
}
// 如果无法正确解析请求头部信息则keepReading==false 退出循环,关闭连接
if !keepReading {
break
}
// 发送解析请求信息出错的响应信息 invalidRequest
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error()) // 释放req对象回链表
server.freeRequest(req)
}
continue
}
wg.Add(1)
// 调用对应RPC函数
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
}
wg.Wait()
codec.Close()
}
ServeCodec方法的主要逻辑是一个for循环,在for循环中主要有两个方法:
-
server.readRequest——读取请求数据并解码
-
server.call——调用客户端要调用的方法,将返回值返回给客户端 接下来我们看server.readRequest方法:
func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
// 解析头部信息:若出错且选择跳过这个请求(keepReading==true)等待处理下一个请求,这时需要取出连接中的本次请求的消息主体避免影响对读取下一次请求时出错
service, mtype, req, keepReading, err = server.readRequestHeader(codec)
if err != nil {
if !keepReading {
return
}
// discard body
codec.ReadRequestBody(nil)
return
}
// 解析请求主体信息(即参数):通过要调用的方法对应的参数类型来构造参数实例指针(通过reflect.New),然后再通过ReadRequestBody解码信息。
argIsValue := false // if true, need to indirect before calling.
if mtype.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(mtype.ArgType.Elem())
} else {
argv = reflect.New(mtype.ArgType)
argIsValue = true
}
if err = codec.ReadRequestBody(argv.Interface()); err != nil {
return
}
if argIsValue {
argv = argv.Elem()
}
//构造响应值实例:响应类型为指针,所以需要.Elem()获取具体类型。当响应值为slice或者map时要调用反射的MakeSlice或者 MakeMap方法来申请内存(创建实例)
replyv = reflect.New(mtype.ReplyType.Elem())
switch mtype.ReplyType.Elem().Kind() {
case reflect.Map:
replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem()))
case reflect.Slice:
replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0, 0))
}
return
}
readRequest方法同样包含三个步骤:
-
解析头部信息:若出错且选择跳过本次请求(keepReading==true)等待处理下一个请求,这时需要取出连接中的本次请求的消息主体避免影响对读取下一次请求时出错
-
解析请求主体信息:通过要调用的方法对应的参数类型来构造参数实例指针(通过reflect.New),然后再通过ReadRequestBody解码参数信息。
-
构造响应值实例:调用反射的MakeSlice或者 MakeMap方法来申请内存(创建实例),构造响应实例
在介绍请求头以及请求体的数据解析过程之前我们首先对net/rpc定义的消息格式进行介绍:
net/rpc将消息分为头部和主体两部分,
-
对于request:头部包括请求的序列号、服务名等基础信息;主体为调用RPC方法需要的参数
-
对于response:头部同样包括请求的序列号、服务名等基础信息;主体则为方法返回值
- 请求头部Request
type Request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *Request // for free list in Server
}
- 响应头部Response
type Response struct {
ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request
Error string // error, if any.
next *Response // for free list in Server
}
- 由于每次请求和响应都需要定义Request/Response对象,为了减少内存分配,net/rpc实现了对象的复用,通过链表(freeReq/freeResp)的方式实现了一个对象池。复用流程如图所示:
getRequest和freeRequest方法代码如下:
func (server *Server) getRequest() *Request {
server.reqLock.Lock()
req := server.freeReq
if req == nil {
req = new(Request)
} else {
server.freeReq = req.next
*req = Request{}
}
server.reqLock.Unlock()
return req
}
func (server *Server) freeRequest(req *Request) {
server.reqLock.Lock()
req.next = server.freeReq
serve.freeReq = req
server.reqLock.Unlock()
}
复制代码
我们来看readRequestHeader方法:
func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
req = server.getRequest() // 复用request
err = codec.ReadRequestHeader(req) // 读取请求头信息
if err != nil {
req = nil
if err == io.EOF || err == io.ErrUnexpectedEOF {
return
}
err = errors.New("rpc: server cannot decode request: " + err.Error())
return
}
keepReading = true
// 获取服务名和调用方法
dot := strings.LastIndex(req.ServiceMethod, ".")
if dot < 0 {
err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
return
}
serviceName := req.ServiceMethod[:dot]
methodName := req.ServiceMethod[dot+1:]
svci, ok := server.serviceMap.Load(serviceName)
if !ok {
err = errors.New("rpc: can't find service " + req.ServiceMethod)
return
}
svc = svci.(*service)
mtype = svc.method[methodName]
if mtype == nil {
err = errors.New("rpc: can't find method " + req.ServiceMethod)
}
return
}
复制代码
readRequestHeader方法的主要方法包括:
-
codec.ReadRequestHeader方法读取请求头中信息
-
获取服务名和方法名
-
加载方法注册步骤中存储在map中的service信息
调用RPC方法
通过上面的步骤,我们已经从请求中获取了调用的服务、方法以及参数等信息,接下来就可以通过service.call调用对应的RPC方法处理请求了。
func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
if wg != nil {
defer wg.Done()
}
mtype.Lock()
mtype.numCalls++
mtype.Unlock()
function := mtype.method.Func
// Invoke the method, providing a new value for the reply.
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
// The return value for the method is an error.
errInter := returnValues[0].Interface()
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
}
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
server.freeRequest(req)
}
复制代码
需要注意的几点包括:
-
反射的Method类型的Func字段记录了调用方法所需的信息,包括方法地址等;
-
调用rpc方法时需要传递参数:调用的方法所属的结构体实例、方法参数、方法响应值;
-
returnValues中的对象是reflect.Value类型,转为interface{}类型再转为确切的类型。
之后调用server.sendResponse方法发送响应给客户端:
func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
resp := server.getResponse()
// 编辑响应头部信息
resp.ServiceMethod = req.ServiceMethod
if errmsg != "" {
resp.Error = errmsg
reply = invalidRequest
}
resp.Seq = req.Seq
sending.Lock()
err := codec.WriteResponse(resp, reply)
if debugLog && err != nil {
log.Println("rpc: writing response:", err)
}
sending.Unlock()
server.freeResponse(resp)
}
复制代码
主要设计包括:
-
通过sending互斥锁防止异步处理请求时,对同一链接写入相应信息造成冲突
-
发送消息后释放Response对象以便复用
在代码中可以看到客户端方法主要包括dial和call两个方法,我将主要流程同样划分为三个步骤:
-
建立 RPC 连接
-
调用 RPC 方法
-
返回值处理
建立RPC连接
Dial方法主要包括两个方法:
-
net.Dial——作用是根据传入的传输层协议和地址建立连接并初始化一个RPC客户端
-
NewClient——初始化client,编码方式默认使用god,与服务端的初始化方式相同
func Dial(network, address string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn), nil
}
func NewClient(conn io.ReadWriteCloser) *Client {
encBuf := bufio.NewWriter(conn)
client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
return NewClientWithCodec(client)
}
// 根据解码方式创建客户端
// 参数类型ClientCodec是接口类型,任何实现了ClientCodec接口的结构体都可以作为客户端的编解码方式。
// 成功创建客户端的同时也异步调用了client.input()用于处理rpc服务端的响应消息
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}
type ClientCodec interface {
WriteRequest(*Request, interface{}) error
ReadResponseHeader(*Response) error
ReadResponseBody(interface{}) error
Close() error
}
复制代码
之前在服务端代码解析中,说过实现ServerCodec接口的方法就可以进行个性化开发,这里也是这样,实现ClientCodec接口,才能进行个性化开发。区别在于客户端是写请求读响应,而服务端是读请求写响应。
在这里需要注意的是,在NewClientWithCodec方法中,通过异步的方式调用了client.input方法,这个方法其实是对返回值的处理,在后面第三步(返回值处理)中会详细解析。
默认gobClientCodec的实现方式如下:
type gobClientCodec struct {
rwc io.ReadWriteCloser
dec *gob.Decoder
enc *gob.Encoder
encBuf *bufio.Writer
}
func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
// 序列化请求头
if err = c.enc.Encode(r); err != nil {
return
}
// 序列化请求体
if err = c.enc.Encode(body); err != nil {
return
}
// 将buffer中数据写入writer
return c.encBuf.Flush()
}
func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
return c.dec.Decode(r)
}
func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
return c.dec.Decode(body)
}
func (c *gobClientCodec) Close() error {
return c.rwc.Close()
}
复制代码
调用RPC方法
请求结构体:
type Call struct {
ServiceMethod string // The name of the service and method to call.
Args interface{} // The argument to the function (*struct).
Reply interface{} // The reply from the function (*struct).
Error error // After completion, the error status.
Done chan *Call // Receives *Call when Go is complete.
}
复制代码
通过client.Call方法调用指定的RPC方法,属于同步调用,本质上调用了Go方法,然后等待接收调用结束信号,信号由Done传递。同步异步调用的控制也是在这里通过done这个channel来控制的。
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10)
} else {
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}
复制代码
client.Go方法主要做的过程就是初始化call结构体,之后通过client.send方法发送请求。
func (client *Client) send(call *Call) {
client.reqMutex.Lock() // 互斥锁,避免写入冲突
defer client.reqMutex.Unlock()
client.mutex.Lock()
if client.shutdown || client.closing { // 客户端是否关闭,主动关闭(closing),被动关闭(shutdowm)
client.mutex.Unlock()
call.Error = ErrShutdown
call.done()
return
}
seq := client.seq
client.seq++// 递增序列号唯一标记请求
client.pending[seq] = call
client.mutex.Unlock()
// 请求头
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
// 发送请求
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)// 请求失败,移除请求
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done() // 发送结束请求信号
}
}
复制代码
返回值处理
在前面,我们提到,在创建客户端的过程中,异步调用了client.input的方法用于对RPC调用的返回值进行处理。代码如下:
func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
switch {
case call == nil:
// We've got no pending call. That usually means that
// WriteRequest partially failed, and call was already
// removed; response is a server telling us about an
// error reading request body. We should still attempt
// to read error body, but there's no one to give it to.
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
case response.Error != "":
// We've got an error response. Give this to the request;
// any subsequent requests will get the ReadResponseBody
// error if there is one.
call.Error = ServerError(response.Error)
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
call.done()
default:
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// Terminate pending calls.
client.reqMutex.Lock()
client.mutex.Lock()
client.shutdown = true
closing := client.closing
if err == io.EOF {
if closing {
err = ErrShutdown
} else {
err = io.ErrUnexpectedEOF
}
}
for _, call := range client.pending {
call.Error = err
call.done()
}
client.mutex.Unlock()
client.reqMutex.Unlock()
if debugLog && err != io.EOF && !closing {
log.Println("rpc: client protocol error:", err)
}
}
复制代码
主要流程包括:
-
从socket连接中轮询获取响应消息(消息头+消息体)
-
首先读取消息头,通过序列号seq获取待处理请求
-
读取请求体信息
- Call == nil:此时我们没有pending的call,意味着写请求失败了,并且call已经删除,response是一个error信息,此时我们仍需要去读response body。
- response.Error != “”:RPC方法内部出错,需要将响应消息读取出来但是不需要得到具体的消息内容。call.Error = ServerError(response.Error)设置请求的返回值err,ServerError是string的别名。
- 正常的处理流程
-
处理过程出错,退出循环,关闭连接
- 处理服务端响应是启动一个goroutine进行轮询,为了防止在向服务端发送请求时该goroutine因出错而要关闭连接,因此采用client.reqMutex。
- 使用client.mutex是该逻辑涉及对map的读取,对client一些属性的写入,防止写入/读取冲突
- client.shutdown = true表示客户端异常退出,因此需要处理client.pending中待处理的call,防止一些RPC调用在Call方法处阻塞等待(<-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done)
以上,我们分别从服务端和客户端角度分别对net/rpc库的源码进行了详细的解析,个人认为比较好的设计在于request和response结构体的复用,避免了每次请求都需要创建结构体。