本篇为【写给go开发者的gRPC教程系列】第二篇

  • 第二篇:通信模式

上一篇介绍了如何编写 protobuf 的 idl,并使用 idl 生成了 gRPC 的代码,现在来看看如何编写客户端和服务端的代码

Simple RPC (Unary RPC)

syntax = "proto3";

package ecommerce;

import "google/protobuf/wrappers.proto";

option go_package = "ecommerce/";

message Order {
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}

service OrderManagement {
rpc getOrder(google.protobuf.StringValue) returns (Order);
}

定义如上的 idl,需要关注几个事项

protobufsyntax = "proto3";protoc-gen-gooption go_package = "ecommerce/";methodmessageimportgoogle/protobuf/wrappers.proto

生成 go 和 grpc 的代码

$ protoc -I ./pb \
  --go_out ./ecommerce --go_opt paths=source_relative \
  --go-grpc_out ./ecommerce --go-grpc_opt paths=source_relative \
  ./pb/product.proto
ecommerce
├── product.pb.go
└── product_grpc.pb.go
pb
└── product.proto

server 实现

1、由 pb 文件生成的 gRPC 代码中包含了 service 的接口定义,它和我们定义的 idl 是吻合的

service OrderManagement {
rpc getOrder(google.protobuf.StringValue) returns (Order);
}
type OrderManagementServer interface {
 GetOrder(context.Context, *wrapperspb.StringValue) (*Order, error)
 mustEmbedUnimplementedOrderManagementServer()
}

2、我们的业务逻辑就是实现这个接口

package main

import (
 "context"
 "log"

 pb "github.com/liangwt/note/grpc/unary_rpc_example/ecommerce"
 "google.golang.org/grpc/codes"
 "google.golang.org/grpc/status"
 "google.golang.org/protobuf/types/known/wrapperspb"
)

var _ pb.OrderManagementServer = &OrderManagementImpl{}

var orders = make(map[string]pb.Order)

type OrderManagementImpl struct {
 pb.UnimplementedOrderManagementServer
}

// Simple RPC
func (s *OrderManagementImpl) GetOrder(ctx context.Context, orderId *wrapperspb.StringValue) (*pb.Order, error) {
 ord, exists := orders[orderId.Value]
 if exists {
  return &ord, status.New(codes.OK, "").Err()
 }

 return nil, status.Errorf(codes.NotFound, "Order does not exist. : ", orderId)
}

3、在实现完业务逻辑之后,我们可以创建并启动服务

package main

import (
 "net"

 pb "github.com/liangwt/note/grpc/unary_rpc_example/ecommerce"
 "google.golang.org/grpc"
)

func main() {
 s := grpc.NewServer()

 pb.RegisterOrderManagementServer(s, &OrderManagementImpl{})

 lis, err := net.Listen("tcp", ":8009")
 if err != nil {
  panic(err)
 }

 if err := s.Serve(lis); err != nil {
  panic(err)
 }
}

服务端代码实现的流程如下

client 实现

1、由 pb 文件生成的 gRPC 代码中包含了 client 的实现,它和我们定义的 idl 也是吻合的

service OrderManagement {
rpc getOrder(google.protobuf.StringValue) returns (Order);
}
type orderManagementClient struct {
 cc grpc.ClientConnInterface
}

func NewOrderManagementClient(cc grpc.ClientConnInterface) OrderManagementClient {
 return &orderManagementClient{cc}
}

func (c *orderManagementClient) GetOrder(ctx context.Context, in *wrapperspb.StringValue, opts ...grpc.CallOption) (*Order, error) {
 out := new(Order)
 err := c.cc.Invoke(ctx, "/ecommerce.OrderManagement/getOrder", in, out, opts...)
 if err != nil {
  return nil, err
 }
 return out, nil
}

2、直接使用 client 来进行 rpc 调用

package main

import (
 "context"
 "log"
 "time"

 pb "github.com/liangwt/note/grpc/unary_rpc_example/ecommerce"
 "google.golang.org/grpc"
 "google.golang.org/protobuf/types/known/wrapperspb"
)

func main() {
 conn, err := grpc.Dial("127.0.0.1:8009", grpc.WithInsecure())
 if err != nil {
  panic(err)
 }
 defer conn.Close()

 client := pb.NewOrderManagementClient(conn)

 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 defer cancel()

 // Get Order
 retrievedOrder, err := client.GetOrder(ctx, &wrapperspb.StringValue{Value: "101"})
 if err != nil {
  panic(err)
 }

 log.Print("GetOrder Response -> : ", retrievedOrder)
}

客户端代码实现的流程如下

小总结

protobuf协议
http1.xSimple RPCUnary RPC)
Simple RPC (Unary RPC)

Server-Streaming RPC 服务器端流式 RPC

服务器端流式 RPC

简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。大致如图:

Server-Streaming RPC

pb 定义

syntax = "proto3";

package ecommerce;

option go_package = "ecommerce/";

import "google/protobuf/wrappers.proto";

message Order {
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}

service OrderManagement {
rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
}

server 实现

Simple RPCstream OrderManagement_SearchOrdersServer
Send(...)
nilerror
func (s *server) SearchOrders(query *wrapperspb.StringValue,
                              stream pb.OrderManagement_SearchOrdersServer) error {
 for _, order := range orders {
  for _, str := range order.Items {
   if strings.Contains(str, query.Value) {
    err := stream.Send(&order)
    if err != nil {
     return fmt.Errorf("error send: %v", err)
    }
   }
  }
 }

 return nil
}

client 实现

Simple RPCstream
streamRecv
Recvio.EOF
c := pb.NewOrderManagementClient(conn)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

stream, err := c.SearchOrders(ctx, &wrapperspb.StringValue{Value: "Google"})
if err != nil{
  panic(err)
}

for{
  order, err := stream.Recv()
  if err == io.EOF{
    break
  }

  log.Println("Search Result: ", order)
}

小总结

通信模式-服务端流RPC

Client-Streaming RPC 客户端流式 RPC

客户端流式 RPC

服务端没有必要等到客户端发送完所有请求再响应,可以在收到部分请求之后就响应

Client-Streaming RPC

pb 定义

syntax = "proto3";

package ecommerce;

option go_package = "ecommerce/";

import "google/protobuf/wrappers.proto";

message Order {
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}

service OrderManagement {
rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
}

server 实现

Simple RPCstream OrderManagement_UpdateOrdersServer
stream OrderManagement_UpdateOrdersServerRecv
Recvio.EOF
stream OrderManagement_UpdateOrdersServerSendAndClose
// 在这段程序中,我们对每一个 Recv 都进行了处理
// 当发现 io.EOF (流关闭) 后,需要将最终的响应结果发送给客户端,同时关闭正在另外一侧等待的 Recv
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
 ordersStr := "Updated Order IDs : "
 for {
  order, err := stream.Recv()
  if err == io.EOF {
   // Finished reading the order stream.
   return stream.SendAndClose(
    &wrapperspb.StringValue{Value: "Orders processed " + ordersStr})
  }
  // Update order
  orders[order.Id] = *order

  log.Println("Order ID ", order.Id, ": Updated")
  ordersStr += order.Id + ", "
 }
}

Client 实现

Simple RPCstream
Send(...)
streamCloseAndRecv
c := pb.NewOrderManagementClient(conn)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

stream, err := c.UpdateOrders(ctx)
if err != nil {
  panic(err)
}

if err := stream.Send(&pb.Order{
  Id:          "00",
  Items:       []string{"A", "B"},
  Description: "A with B",
  Price:       0.11,
  Destination: "ABC",
}); err != nil {
  panic(err)
}

if err := stream.Send(&pb.Order{
  Id:          "01",
  Items:       []string{"C", "D"},
  Description: "C with D",
  Price:       1.11,
  Destination: "ABCDEFG",
}); err != nil {
  panic(err)
}

res, err := stream.CloseAndRecv()
if err != nil {
  panic(err)
}

log.Printf("Update Orders Res : %s", res)

小总结

通信模式-客户端流RPC

Bidirectional-Streaming RPC 双向流式 RPC

双向流式 RPC

首个请求一定是 Client 发起,但具体交互方式(谁先谁后、一次发多少、响应多少、什么时候关闭)根据程序编写的方式来确定(可以结合协程)

假设该双向流是按顺序发送的话,大致如图:

Bidirectional-Streaming RPC

pb 定义

syntax = "proto3";

package ecommerce;

option go_package = "ecommerce/";

import "google/protobuf/wrappers.proto";

message Order {
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}

message CombinedShipment {
string id = 1;
string status = 2;
repeated Order orderList = 3;
}

service OrderManagement {
rpc processOrders(stream google.protobuf.StringValue)
returns (stream CombinedShipment);
}

server 实现

OrderManagement_ProcessOrdersServer
Send(...)
Recv(...)Recvio.EOF
nilerror
func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error {

 batchMarker := 1
 var combinedShipmentMap = make(map[string]pb.CombinedShipment)
 for {
  orderId, err := stream.Recv()
  log.Printf("Reading Proc order : %s", orderId)
  if err == io.EOF {
   log.Printf("EOF : %s", orderId)
   for _, shipment := range combinedShipmentMap {
    if err := stream.Send(&shipment); err != nil {
     return err
    }
   }
   return nil
  }
  if err != nil {
   log.Println(err)
   return err
  }

  destination := orders[orderId.GetValue()].Destination
  shipment, found := combinedShipmentMap[destination]

  if found {
   ord := orders[orderId.GetValue()]
   shipment.OrderList = append(shipment.OrderList, &ord)
   combinedShipmentMap[destination] = shipment
  } else {
   comShip := pb.CombinedShipment{Id: "cmb - " + (orders[orderId.GetValue()].Destination), Status: "Processed!"}
   ord := orders[orderId.GetValue()]
   comShip.OrderList = append(shipment.OrderList, &ord)
   combinedShipmentMap[destination] = comShip
   log.Print(len(comShip.OrderList), comShip.GetId())
  }

  if batchMarker == orderBatchSize {
   for _, comb := range combinedShipmentMap {
    log.Printf("Shipping : %v -> %v", comb.Id, len(comb.OrderList))
    if err := stream.Send(&comb); err != nil {
     return err
    }
   }
   batchMarker = 0
   combinedShipmentMap = make(map[string]pb.CombinedShipment)
  } else {
   batchMarker++
  }
 }
}

Client 实现

OrderManagement_ProcessOrdersClient
Send(...)
Recv(...)Recvio.EOF
c := pb.NewOrderManagementClient(conn)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

stream, err := c.ProcessOrders(ctx)
if err != nil {
  panic(err)
}

go func() {
  if err := stream.Send(&wrapperspb.StringValue{Value: "101"}); err != nil {
    panic(err)
  }

  if err := stream.Send(&wrapperspb.StringValue{Value: "102"}); err != nil {
    panic(err)
  }

  if err := stream.CloseSend(); err != nil {
    panic(err)
  }
}()

for {
  combinedShipment, err := stream.Recv()
  if err == io.EOF {
    break
  }
  log.Println("Combined shipment : ", combinedShipment.OrderList)
}

小总结

双向流相对还是比较复杂的,大部分场景都是使用事件机制进行异步交互,需要精心的设计

通信模式-双向流RPC

示例代码

https://github.com/liangwt/grpc-example