1.前言

RPC(Remote Procedure Call)远程过程调用,它可以使一台主机上的进程调用另一台主机的进程,由以访为其他若干个主机提供服务,也就是我们常说的C/S服务,Server与Client之间通过rpc方式进行通信。

2.如何实现

Golang官方提供的net/rpc库使用encoding/gob进行编解码,支持TCP或HTTP数据传输方式,由于其它语言不支持gob编解码方式,因此使用net/rpc库实现的RPC方法是没有办法进行跨语言调用。(重点介绍这个包)
但是官方还提供了net/rpc/jsonrpc库实现rpc方法,jsonrpc采用的是json格式进行数据编码,因此支持跨语言调用,目前jsonrpc是基于tcp协议实现的,暂不支持http传输方式。

3.RPC Service

3.1注册服务要求

  • 服务端注册一个对象,使其作为一个服务被暴露,服务的名字是该对象的类型名,注册后对象导出的方法就可以被远程访问。
  • 服务端可以注册多个不同类型的对象(服务),禁止注册具有相同类型的多个对象。
  • Golang的RPC函数需要满足以下条件才能被远程调用,不然会被忽略:
 func (t *T) MethodName(argType T1, replyType *T2) error
  • 结构体T字段首字母要大写,可以别人调用
  • 函数名必须首字母大写
  • 函数第一参数是接收参数,第二个参数是返回给客户端的参数,必须是指针类型
  • 函数还必须有一个返回值error

将本地方法放在服务端,注册服务器定义的方法。
服务端需要提供一个套接字服务,监听客户端发送的请求(Listen()),解析获得客户端的请求参数。
服务端会创建一个网络监听器然后调用Accept,服务端连接上调用ServeConn管理请求。或者,对于HTTP监听器,,调用HandleHTTP和http.ListenAndServe。
获取请求参数执行服务器的调用函数,将返回结果交给客户端。

3.2tcp service和 http service

package main

import (
	"net"
	"net/rpc"
)

type HelloService struct{}
func (p *HelloService) Hello(request string, reply *string) error {
	*reply = "hello:" + request
	return nil
}

func main() {
	//tcp
	listen, _ := net.Listen("tcp", ":8000")
	//2. 注册处理逻辑 handler
	_ = rpc.RegisterName("HelloService", &HelloService{})
	//3. 启动服务
	conn, _ := listen.Accept() //当一个新的连接进来的时候,
	rpc.ServeConn(conn)
  
   /**
    //http
    rpc.Register(new(HelloService))
    //注册用于处理RPC消息的HTTP处理器
    rpc.HandleHTTP()
    //监听端口等待RPC请求
    err = http.ListenAndServe(":8000", nil)
    if err!=nil {
        log.Fatal("ERROR:", err)
    }
    */
}

4.rpc包主要方法和对象

4.1server对象

//RPC服务结构体
type Server struct {
	serviceMap sync.Map   // 服务列表Map
	reqLock    sync.Mutex // 保护读取请求缓冲区的互斥锁
	freeReq    *Request   // 空闲Request地址,用于内存复用
	respLock   sync.Mutex // 保护写入响应缓冲区的互斥锁
	freeResp   *Response  // 空闲Response地址,用于内存复用
}
var DefaultServer = NewServer()

4.1.1 注册方法

  • func (server *Server) Register(rcvr interface{}) error
    • Register用来向Server服务端注册RPC服务
    • 默认server.Register()会将方法接收者(revr,receiver)的类型名作为方法名前缀。
  • func (server *Server) RegisterName(name string, rcvr interface{}) error
    • RegisterName用来向Server服务端注册RPC服务
    • 设置RPC方法名前缀

4.1.2监听器

  • func (server *Server) Accept(lis net.Listener)
    • Accept接收监听器l获取的连接,然后服务每一个连接

4.1.3服务端处理请求的相关方法

  • func (server *Server) ServeConn(conn io.ReadWriteCloser)

    • ServeConn在该连接使用gob(参见encoding/gob包)有线格式。
    • ServeConn在单个连接上执行server。
  • func (server *Server) ServeCodec(codec ServerCodec)

    • ServeCodec类似ServeConn,但使用指定的编解码器,以编码请求主体和解码回复主体。
  • func (server *Server) ServeRequest(codec ServerCodec) error

    • ServeRequest类似ServeCodec,但异步的服务单个请求。它不会在调用结束后关闭codec。
      ServeHTTP方法
  • func (server *Server) HandleHTTP(rpcPath, debugPath string)

    • HandleHTTP会注册到http.DefaultServeMux。之后,仍需要调用http.Serve()
    • HandleHTTP注册server的RPC信息HTTP处理器对应到rpcPath,注册server的debug信息HTTP处理器对应到debugPath。

注意:

var DefaultServer = NewServer()

上述方法,net/rpc进行了二次封装,由DefaultServer 默认的服务对象调用具体的实现。

4.1.4 实例:

type HelloService struct{}

func (s *HelloService) Hello(request string, reply *string) error {
	//返回值是通过修改reply的值
	*reply = "hello, " + request
	return nil
}
//不可跨语言,默认go序列化
func tcpServer(){
	//1. 实例化一个server
	listener, _ := net.Listen("tcp", ":8000")
	//2. 注册处理逻辑 handler
	_ = rpc.RegisterName("HelloService", &HelloService{})
	//3. 启动服务
	for {
		conn, _ := listen.Accept() //当一个新的连接进来的时候,
		rpc.ServeConn(conn)
	}
}
//可跨语言
func tcpJsonServer(){
	//1. 实例化一个server
	listener, _ := net.Listen("tcp", ":8000")
	//2. 注册处理逻辑 handler
	_ = rpc.RegisterName("HelloService", &HelloService{})
	//3. 启动服务
	for {
		conn, _ := listener.Accept() //当一个新的连接进来的时候,
		go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
	}
}
func httpServer(){
	_ = rpc.RegisterName("HelloService", &HelloService{})
	rpc.HandleHTTP()
	http.ListenAndServe(":8000", nil)
}
func main(){
	tcpServer()
 	tcpJsonServer()
 	httpServer()
}

4.1.5 Golang中开启HTTP服务

func (server Server) ServeHTTP(w http.ResponseWriter, req http.Request)
golang中只要实现接口
http.Handler就可以处理HTTP请求,http.Handler接口仅有一个方法ServeHTTP(ResponseWriter, *Request),定义了处理HTTP请求的逻辑。
Server对象实现了接口
http.Handler**
再通过http.Handle(pattern string, handler Handler)方法注册http.Handler到指定的pattern(即URL)。
最后通过http.Serve(l net.Listener, handler Handler)方法指定监听的端口开启HTTP服务即可。

4.2 client

// Client类型代表RPC客户端。同一个客户端可能有多个未返回的调用,也可能被多个go程同时使用。
type Client struct {
	codec ClientCodec //消息编解码方式
	reqMutex sync.Mutex // protects following
	request  Request
	mutex    sync.Mutex // protects following
	seq      uint64
	pending  map[uint64]*Call //记录请求序列号到请求内容的映射 请求的内容由Call结构体表 pending表示待处理的请求
	closing  bool // user has called Close
	shutdown bool // server has told us to stop
}

4.2.1 与服务端建立连接

  • func Dial(network, address string) (*Client, error)
    • rpc.Dial方法:根据传入的传输层协议名称以及目的地址建立连接,并初始化一个rpc客户端
  • func DialHTTP(network, address string) (*Client, error)
    • 通过指定的网络和地址与在默认HTTP RPC路径监听的HTTP RPC服务端连接。
  • func DialHTTPPath(network, address, path string) (*Client, error)
    • 通过在指定的网络、地址和路径与HTTP RPC服务端连接。
  • func NewClient(conn io.ReadWriteCloser) *Client
    • 编码方式默认使用gob,其中gobClientCodec指定了客户端的编解码方式,与之前介绍服务端时的初始化方式以及定义的方法一样,只不过客户端是写请求读响应,而服务端是读请求写响应。
  • func NewClientWithCodec(codec ClientCodec) *Client
    • NewClientWithCodec方法:根据消息编解码方式创建客户端,抽象出这个方法是为了扩展支持其他消息编解码方式。
    • 参数类型ClientCodec是接口类型,任何实现了ClientCodec接口的结构体都可以作为客户端的编解码方式。

4.2.2举例

对应4.1.4的server

func TcpClient(){
	//1. 建立连接
	client, err := rpc.Dial("tcp", "localhost:1234")
	if err != nil {
		panic("连接失败")
	}
	var reply string //string有默认值
	//调用连接。
	err = client.Call("HelloService.Hello", "bobby", &reply)
	if err != nil {
		fmt.Println(err)
		//panic("调用失败")
	}
	fmt.Println(reply)
}
func HttpClient(){
	//1. 建立连接
	client, err := rpc.DialHTTP("tcp", "localhost:1234")
	if err != nil {
		panic("连接失败")
	}
	var reply string //string有默认值
	//调用连接。
	err = client.Call("HelloService.Hello", "bobby", &reply)
	if err != nil {
		fmt.Println(err)
		//panic("调用失败")
	}
	fmt.Println(reply)
}
func TcpJsonClient(){
	//1. 建立连接
	conn, err := net.Dial("tcp", "localhost:1234")
	if err != nil {
		panic("连接失败")
	}
	var reply string //string有默认值
	client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
	err = client.Call("HelloService.Hello", "bobby", &reply)
	if err != nil {
		panic("调用失败")
	}
	fmt.Println(reply)
}
func main() {
	TcpClient()
	HttpClient()
	TcpJsonClient()
}

4.2.3如何表示一个请求

// Call represents an active RPC.

type Call struct {
   ServiceMethod string      // The name of the service and method to call.
   Args          interface{} // The argument to the function (*struct).
   Reply         interface{} // The reply from the function (*struct).
   Error         error       // After completion, the error status.
   Done          chan *Call  //Done作为一个channel来传递调用请求成功或者失败的信号
}

请求内容至少需要包括:

  • 请求的服务以及方法名
  • 请求参数
  • 记录响应对象的指针
  • 请求出错时返回的错误信息

同步与异步

  • func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error
    • Call同步调用会阻塞当前程序直到结果返回
  • func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
    • 异步调用通过调用<-call.Done阻塞当前程序直到RPC调用结束

Go方法,比Call方法多了一个参数done chan *Call,用于传递调用结束信号。done需要带缓冲区,防止阻塞(下面会说call.done()时会说到)

5.总结

使用时,服务端和客户端的请求和响应类型应保持一致,否则会触发panic
net/rpc通过反射解析服务结构体、调用方法、请求参数、服务端响应
通过加锁保护服务端的 Request 和 Response 结构体,通过重用已经创建的对象实例来减少堆内存申请
通过加锁保护并发读写请求和响应缓冲区
可以通过 mtype.numCalls 获得服务端每个方法的从启动开始的累计调用次数。