1.前言
RPC(Remote Procedure Call)远程过程调用,它可以使一台主机上的进程调用另一台主机的进程,由以访为其他若干个主机提供服务,也就是我们常说的C/S服务,Server与Client之间通过rpc方式进行通信。
2.如何实现
Golang官方提供的net/rpc库使用encoding/gob进行编解码,支持TCP或HTTP数据传输方式,由于其它语言不支持gob编解码方式,因此使用net/rpc库实现的RPC方法是没有办法进行跨语言调用。(重点介绍这个包)
但是官方还提供了net/rpc/jsonrpc库实现rpc方法,jsonrpc采用的是json格式进行数据编码,因此支持跨语言调用,目前jsonrpc是基于tcp协议实现的,暂不支持http传输方式。
3.RPC Service
3.1注册服务要求
- 服务端注册一个对象,使其作为一个服务被暴露,服务的名字是该对象的类型名,注册后对象导出的方法就可以被远程访问。
- 服务端可以注册多个不同类型的对象(服务),禁止注册具有相同类型的多个对象。
- Golang的RPC函数需要满足以下条件才能被远程调用,不然会被忽略:
func (t *T) MethodName(argType T1, replyType *T2) error
- 结构体T字段首字母要大写,可以别人调用
- 函数名必须首字母大写
- 函数第一参数是接收参数,第二个参数是返回给客户端的参数,必须是指针类型
- 函数还必须有一个返回值error
将本地方法放在服务端,注册服务器定义的方法。
服务端需要提供一个套接字服务,监听客户端发送的请求(Listen()),解析获得客户端的请求参数。
服务端会创建一个网络监听器然后调用Accept,服务端连接上调用ServeConn管理请求。或者,对于HTTP监听器,,调用HandleHTTP和http.ListenAndServe。
获取请求参数执行服务器的调用函数,将返回结果交给客户端。
3.2tcp service和 http service
package main
import (
"net"
"net/rpc"
)
type HelloService struct{}
func (p *HelloService) Hello(request string, reply *string) error {
*reply = "hello:" + request
return nil
}
func main() {
//tcp
listen, _ := net.Listen("tcp", ":8000")
//2. 注册处理逻辑 handler
_ = rpc.RegisterName("HelloService", &HelloService{})
//3. 启动服务
conn, _ := listen.Accept() //当一个新的连接进来的时候,
rpc.ServeConn(conn)
/**
//http
rpc.Register(new(HelloService))
//注册用于处理RPC消息的HTTP处理器
rpc.HandleHTTP()
//监听端口等待RPC请求
err = http.ListenAndServe(":8000", nil)
if err!=nil {
log.Fatal("ERROR:", err)
}
*/
}
4.rpc包主要方法和对象
4.1server对象
//RPC服务结构体
type Server struct {
serviceMap sync.Map // 服务列表Map
reqLock sync.Mutex // 保护读取请求缓冲区的互斥锁
freeReq *Request // 空闲Request地址,用于内存复用
respLock sync.Mutex // 保护写入响应缓冲区的互斥锁
freeResp *Response // 空闲Response地址,用于内存复用
}
var DefaultServer = NewServer()
4.1.1 注册方法
- func (server *Server) Register(rcvr interface{}) error
- Register用来向Server服务端注册RPC服务
- 默认server.Register()会将方法接收者(revr,receiver)的类型名作为方法名前缀。
- func (server *Server) RegisterName(name string, rcvr interface{}) error
- RegisterName用来向Server服务端注册RPC服务
- 设置RPC方法名前缀
4.1.2监听器
- func (server *Server) Accept(lis net.Listener)
- Accept接收监听器l获取的连接,然后服务每一个连接
4.1.3服务端处理请求的相关方法
-
func (server *Server) ServeConn(conn io.ReadWriteCloser)
- ServeConn在该连接使用gob(参见encoding/gob包)有线格式。
- ServeConn在单个连接上执行server。
-
func (server *Server) ServeCodec(codec ServerCodec)
- ServeCodec类似ServeConn,但使用指定的编解码器,以编码请求主体和解码回复主体。
-
func (server *Server) ServeRequest(codec ServerCodec) error
- ServeRequest类似ServeCodec,但异步的服务单个请求。它不会在调用结束后关闭codec。
ServeHTTP方法
- ServeRequest类似ServeCodec,但异步的服务单个请求。它不会在调用结束后关闭codec。
-
func (server *Server) HandleHTTP(rpcPath, debugPath string)
- HandleHTTP会注册到http.DefaultServeMux。之后,仍需要调用http.Serve()
- HandleHTTP注册server的RPC信息HTTP处理器对应到rpcPath,注册server的debug信息HTTP处理器对应到debugPath。
注意:
var DefaultServer = NewServer()
上述方法,net/rpc进行了二次封装,由DefaultServer 默认的服务对象调用具体的实现。
4.1.4 实例:
type HelloService struct{}
func (s *HelloService) Hello(request string, reply *string) error {
//返回值是通过修改reply的值
*reply = "hello, " + request
return nil
}
//不可跨语言,默认go序列化
func tcpServer(){
//1. 实例化一个server
listener, _ := net.Listen("tcp", ":8000")
//2. 注册处理逻辑 handler
_ = rpc.RegisterName("HelloService", &HelloService{})
//3. 启动服务
for {
conn, _ := listen.Accept() //当一个新的连接进来的时候,
rpc.ServeConn(conn)
}
}
//可跨语言
func tcpJsonServer(){
//1. 实例化一个server
listener, _ := net.Listen("tcp", ":8000")
//2. 注册处理逻辑 handler
_ = rpc.RegisterName("HelloService", &HelloService{})
//3. 启动服务
for {
conn, _ := listener.Accept() //当一个新的连接进来的时候,
go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
}
}
func httpServer(){
_ = rpc.RegisterName("HelloService", &HelloService{})
rpc.HandleHTTP()
http.ListenAndServe(":8000", nil)
}
func main(){
tcpServer()
tcpJsonServer()
httpServer()
}
4.1.5 Golang中开启HTTP服务
func (server Server) ServeHTTP(w http.ResponseWriter, req http.Request)
golang中只要实现接口http.Handler就可以处理HTTP请求,http.Handler接口仅有一个方法ServeHTTP(ResponseWriter, *Request),定义了处理HTTP请求的逻辑。
Server对象实现了接口http.Handler**
再通过http.Handle(pattern string, handler Handler)方法注册http.Handler到指定的pattern(即URL)。
最后通过http.Serve(l net.Listener, handler Handler)方法指定监听的端口开启HTTP服务即可。
4.2 client
// Client类型代表RPC客户端。同一个客户端可能有多个未返回的调用,也可能被多个go程同时使用。
type Client struct {
codec ClientCodec //消息编解码方式
reqMutex sync.Mutex // protects following
request Request
mutex sync.Mutex // protects following
seq uint64
pending map[uint64]*Call //记录请求序列号到请求内容的映射 请求的内容由Call结构体表 pending表示待处理的请求
closing bool // user has called Close
shutdown bool // server has told us to stop
}
4.2.1 与服务端建立连接
- func Dial(network, address string) (*Client, error)
- rpc.Dial方法:根据传入的传输层协议名称以及目的地址建立连接,并初始化一个rpc客户端
- func DialHTTP(network, address string) (*Client, error)
- 通过指定的网络和地址与在默认HTTP RPC路径监听的HTTP RPC服务端连接。
- func DialHTTPPath(network, address, path string) (*Client, error)
- 通过在指定的网络、地址和路径与HTTP RPC服务端连接。
- func NewClient(conn io.ReadWriteCloser) *Client
- 编码方式默认使用gob,其中gobClientCodec指定了客户端的编解码方式,与之前介绍服务端时的初始化方式以及定义的方法一样,只不过客户端是写请求读响应,而服务端是读请求写响应。
- func NewClientWithCodec(codec ClientCodec) *Client
- NewClientWithCodec方法:根据消息编解码方式创建客户端,抽象出这个方法是为了扩展支持其他消息编解码方式。
- 参数类型ClientCodec是接口类型,任何实现了ClientCodec接口的结构体都可以作为客户端的编解码方式。
4.2.2举例
对应4.1.4的server
func TcpClient(){
//1. 建立连接
client, err := rpc.Dial("tcp", "localhost:1234")
if err != nil {
panic("连接失败")
}
var reply string //string有默认值
//调用连接。
err = client.Call("HelloService.Hello", "bobby", &reply)
if err != nil {
fmt.Println(err)
//panic("调用失败")
}
fmt.Println(reply)
}
func HttpClient(){
//1. 建立连接
client, err := rpc.DialHTTP("tcp", "localhost:1234")
if err != nil {
panic("连接失败")
}
var reply string //string有默认值
//调用连接。
err = client.Call("HelloService.Hello", "bobby", &reply)
if err != nil {
fmt.Println(err)
//panic("调用失败")
}
fmt.Println(reply)
}
func TcpJsonClient(){
//1. 建立连接
conn, err := net.Dial("tcp", "localhost:1234")
if err != nil {
panic("连接失败")
}
var reply string //string有默认值
client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
err = client.Call("HelloService.Hello", "bobby", &reply)
if err != nil {
panic("调用失败")
}
fmt.Println(reply)
}
func main() {
TcpClient()
HttpClient()
TcpJsonClient()
}
4.2.3如何表示一个请求
// Call represents an active RPC.
type Call struct {
ServiceMethod string // The name of the service and method to call.
Args interface{} // The argument to the function (*struct).
Reply interface{} // The reply from the function (*struct).
Error error // After completion, the error status.
Done chan *Call //Done作为一个channel来传递调用请求成功或者失败的信号
}
请求内容至少需要包括:
- 请求的服务以及方法名
- 请求参数
- 记录响应对象的指针
- 请求出错时返回的错误信息
同步与异步
- func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error
- Call同步调用会阻塞当前程序直到结果返回
- func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
- 异步调用通过调用<-call.Done阻塞当前程序直到RPC调用结束
Go方法,比Call方法多了一个参数done chan *Call,用于传递调用结束信号。done需要带缓冲区,防止阻塞(下面会说call.done()时会说到)
5.总结
使用时,服务端和客户端的请求和响应类型应保持一致,否则会触发panic
net/rpc通过反射解析服务结构体、调用方法、请求参数、服务端响应
通过加锁保护服务端的 Request 和 Response 结构体,通过重用已经创建的对象实例来减少堆内存申请
通过加锁保护并发读写请求和响应缓冲区
可以通过 mtype.numCalls 获得服务端每个方法的从启动开始的累计调用次数。