基于 gRPC 的服务间通信示例

示例说明,存在两个服务,订单服务和产品服务。其中:

  • 订单服务提供 HTTP 接口,用于完成订单查询。订单中包含产品信息,要利用 grpc 从产品服务获取产品信息
  • 产品服务提供 grpc 接口,用于响应微服务内部产品信息查询

本例中,对于 grpc 来说,产品服务为服务端、订单服务为客户端。

同时不考虑其他业务逻辑,例如产品服务也需要对外提供 http 接口等,仅在乎 grpc 的通信示例。同时不考虑服务发现和网关等。

编码实现:

一:基于之前定义的 .proto 文件生成 pb.go 文件

注意,客户端和服务端,都需要使用生成的 pb.go 文件

二:实现订单服务

orderService/httpService.go

package main

import (
    "context"
    "encoding/json"
    "flag"
    "fmt"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "log"
    "net/http"
    "orderService/protos/codes"
    "time"
)

var (
    // 目标 grpc 服务器地址
    gRPCAddr = flag.String("grpc", "localhost:50051", "the address to connect to")
    // http 命令行参数
    addr = flag.String("addr", "127.0.0.1", "The Address for listen. Default is 127.0.0.1")
    port = flag.Int("port", 8080, "The Port for listen. Default is 8080.")
)

func main() {
    flag.Parse()
    // 连接 grpc 服务器
    conn, err := grpc.Dial(*gRPCAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()
    // 实例化 grpc 客户端
    c := codes.NewProductClient(conn)

    // 定义业务逻辑服务,假设为产品服务
    service := http.NewServeMux()
    service.HandleFunc("/orders", func(writer http.ResponseWriter, request *http.Request) {
        // 调用 grpc 方法,完成对服务器资源请求
        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
        defer cancel()
        r, err := c.ProductInfo(ctx, &codes.ProductInfoRequest{
            Int64: 42,
        })
        if err != nil {
            log.Fatalln(err)
        }

        resp := struct {
            ID       int                          `json:"id"`
            Quantity int                          `json:"quantity"`
            Products []*codes.ProductInfoResponse `json:"products"`
        }{
            9527, 1,
            []*codes.ProductInfoResponse{
                r,
            },
        }
        respJson, err := json.Marshal(resp)
        if err != nil {
            log.Fatalln(err)
        }
        writer.Header().Set("Content-Type", "application/json")
        _, err = fmt.Fprintf(writer, "%s", string(respJson))
        if err != nil {
            log.Fatalln(err)
        }
    })

    // 启动监听
    address := fmt.Sprintf("%s:%d", *addr, *port)
    fmt.Printf("Order service is listening on %s.\n", address)
    log.Fatalln(http.ListenAndServe(address, service))
}

三,实现产品服务

productService/grpcService.go

package main

import (
    "context"
    "flag"
    "fmt"
    "google.golang.org/grpc"
    "log"
    "net"
    "productService/protos/compiles"
)

//grpc 监听端口
var port = flag.Int("port", 50051, "The server port")

// ProductServer 实现 UnimplementedProductServer
type ProductServer struct {
    compiles.UnimplementedProductServer
}

func (ProductServer) ProductInfo(ctx context.Context, pr *compiles.ProductInfoRequest) (*compiles.ProductInfoResponse, error) {
    return &compiles.ProductInfoResponse{
        Name:   "马士兵 Go 云原生",
        Int64:  42,
        IsSale: true,
    }, nil
}

func main() {
    flag.Parse()
    //设置 tcp 监听器
    lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    // 新建 grpc Server
    s := grpc.NewServer()
    // 将 ProductServer 注册到 grpc Server 中
    compiles.RegisterProductServer(s, ProductServer{})
    log.Printf("server listening at %v", lis.Addr())
    // 启动监听
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

测试,访问 order 的 http 接口。获取订单信息中,包含产品信息。

gRPC 核心概念

服务定义

与许多 RPC 系统一样,gRPC 基于定义服务的思想,指定可以远程调用的方法及其参数和返回类型。默认情况下,gRPC 使用 Protocol Buffer 作为接口定义语言 (IDL) 来描述服务接口和有效负载消息的结构。

service HelloService {
  rpc SayHello (HelloRequest) returns (HelloResponse);
}

message HelloRequest {
  string greeting = 1;
}

message HelloResponse {
  string reply = 1;
}

gRPC 支持定义四种服务方法:

  • 一元 RPC,其中客户端向服务器发送单个请求并获得单个响应,就像正常的函数调用一样。
rpc SayHello(HelloRequest) returns (HelloResponse);
  • 服务器流式 RPC,其中客户端向服务器发送请求并获取流以读回一系列消息。客户端从返回的流中读取,直到没有更多消息为止。 gRPC 保证单个 RPC 调用中的消息顺序。
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
  • 客户端流式 RPC,其中客户端写入一系列消息并将它们发送到服务器,再次使用提供的流。一旦客户端完成了消息的写入,它就会等待服务器读取它们并返回它的响应。 gRPC 再次保证了单个 RPC 调用中的消息顺序。
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
  • 双向流式 RPC,双方使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以按照他们喜欢的任何顺序读取和写入:例如,服务器可以在写入响应之前等待接收所有客户端消息,或者它可以交替读取消息然后写入消息,或其他一些读取和写入的组合。保留每个流中消息的顺序。
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
使用 API
.proto
  • 在服务端,服务端实现服务声明的方法,并运行一个 gRPC 服务器来处理客户端调用。 gRPC 基础架构解码传入请求、执行服务方法并编码服务响应。
  • 在客户端,客户端有一个称为存根的本地对象(对于某些语言,首选术语是客户端),它实现与服务相同的方法。然后客户端可以在本地对象上调用这些方法,将调用的参数包装在适当的协议缓冲区消息类型中——gRPC 负责将请求发送到服务器并返回服务器的协议缓冲区响应。
同步与异步

在接收到服务端响应之前阻塞的同步 RPC 调用最接近 RPC 所希望的过程调用的抽象。另一方面,网络本质上是异步的,在许多情况下,能够在不阻塞当前线程的情况下启动 RPC 是很有用的。

大多数语言中的 gRPC 编程 API 有同步和异步两种风格。

gRPC 生命周期

生命周期指的是 gRPC 客户端调用 gRPC 服务端方法的过程。区别于不同的4种服务定义,过程如下:

一元 RPC

首先考虑最简单的 RPC 类型,其中客户端发送单个请求并返回单个响应。

  1. 一旦客户端调用了一个存根方法,服务器就会被通知该 RPC 已被调用,其中包含该调用的客户端元数据、方法名称和指定的截止日期(如果适用)。
  2. 然后,服务器可以立即发回自己的初始元数据(必须在任何响应之前发送),或者等待客户端的请求消息。首先发生的是特定于应用程序的。
  3. 一旦服务器收到客户端的请求消息,它就会执行任何必要的工作来创建和填充响应。然后将响应连同状态详细信息(状态代码和可选状态消息)和可选尾随元数据一起返回(如果成功)给客户端。
  4. 如果响应状态为 OK,则客户端得到响应,从而完成客户端的调用。
服务器流式 RPC

服务器流式 RPC 类似于一元 RPC,除了服务器返回消息流以响应客户端的请求。发送所有消息后,服务器的状态详细信息(状态代码和可选状态消息)和可选的尾随元数据将发送到客户端。这样就完成了服务器端的处理。客户端在拥有所有服务器消息后完成。

客户端流式 RPC

客户端流式 RPC 类似于一元 RPC,不同之处在于客户端向服务器发送消息流而不是单个消息。服务器响应一条消息(连同其状态详细信息和可选的尾随元数据),通常但不一定是在它收到所有客户端的消息之后。

双向流式 RPC

在双向流式 RPC 中,调用由调用方法的客户端和接收客户端元数据、方法名称和截止日期的服务器发起。服务器可以选择发回其初始元数据或等待客户端开始流式传输消息。

客户端和服务器端流处理是特定于应用程序的。由于这两个流是独立的,客户端和服务器可以以任意顺序读写消息。例如,服务器可以等到它收到客户端的所有消息后再写入它的消息,或者服务器和客户端可以玩 “ping-pong”——服务器收到请求,然后发回响应,然后客户端发送基于响应的另一个请求,依此类推。

截止日期/超时

gRPC 允许客户端指定在 RPC 因 DEADLINE_EXCEEDED 错误而终止之前,他们愿意等待 RPC 完成多长时间。在服务器端,服务器可以查询特定的 RPC 是否已超时,或者还剩多少时间来完成 RPC。

指定期限或超时是特定于语言的:一些语言 API 根据超时(持续时间)工作,而一些语言 API 根据期限(固定时间点)工作,可能有也可能没有默认期限。

RPC 终止

在 gRPC 中,客户端和服务器都对调用是否成功做出独立的本地判断,并且它们的结论可能不匹配。这意味着,例如,您可能有一个 RPC 在服务器端成功完成(“我已经发送了所有响应!”)但在客户端失败(“响应在我的截止日期之后到达!”)。服务器也可以在客户端发送所有请求之前决定完成。

取消 RPC

客户端或服务器都可以随时取消 RPC。取消会立即终止 RPC,以便不再进行任何工作。

Protocol buffer 语法参考

消息类型定义

以一个简单的请求消息为例:

syntax = "proto3";

message SearchRequest {
  string query = 1;
  int32 page_number = 2;
  int32 result_per_page = 3;
}

首先指定版本 proto3,否则编译器默认为 proto2。版本指定为文件的第一非空白、非注释行。

message 关键字用于定义消息,需要指定消息类型的名称。

消息由多个名称/值对组成,称为字段,每个字段要指定名字和类型。string、int32 是典型的标量类型,除了标量类型 protobuf 还支持构造类型,例如枚举或其他消息类型。

应该为每个字段分配唯一的字段序号,用于在二进制编码中标识该字段。序号范围1-15会消耗1个字节的存储,16-2047 会消耗2个字节。因此应该将常用的字段分配1-15字段序号。编号全部的范围是1到2^29-1,其中19000到19999是 proto编译器保留序号,不要使用。

消息的字段分为单一和重复两种规则:

  • 单一 Singular,proto3 中字段的默认规则。一个消息中仅可以包含0或1个该字段,就是字段不能重复。
  • 重复的 repeated,该规则说明此字段可以重复多次(包含0次)。重复值的顺序是保留的。
message SearchRequest {
  // 同上略
  repeated string keywords = 4
}
.proto///* ... */

标量类型

标量消息字段可以具有以下类型之一。该表显示了 .proto 文件中指定的类型,以及自动生成的类中的相应类型:

.proto Type 说明 Go Type
double float64
float float32
int32 变长编码,对负数进行编码效率低下。若字段可能有负值,请改用 sint32 int32
int64 变长编码,对负数进行编码效率低下。若字段可能有负值,请改用 sint64 int64
uint32 变长编码 uint32
uint64 变长编码 uint64
sint32 变长编码,带符号的 int 值。这些比常规 int32 更有效地编码负数 int32
sint64 变长编码,带符号的 int 值。这些比常规 int64 更有效地编码负数 int64
fixed32 固定4个字节,如果值通常大于 2^28,则比 uint32 更有效 uint32
fixed64 固定8个字节,如果值通常大于 2^56,则比 uint64 更有效 uint64
sfixed32 固定4个字节 int32
sfixed64 固定8个字节 int64
bool bool
string 始终包含 UTF-8 编码或 7 位 ASCII 文本,并且长度不能超过 2^32 string
bytes 可以包含不超过 2^32 的任意字节序列 []byte

解析消息时,如果编码的消息不包含特定元素,则解析对象中的相应字段将设置为该字段的默认值。这些默认值是基于类型的:

  • 对于字符串,默认值为空字符串。
  • 对于字节,默认值为空字节。
  • 对于布尔值,默认值为 false。
  • 对于数字类型,默认值为零。
  • 对于枚举,默认值是第一个定义的枚举值,必须为 0。
  • 对于消息字段,未设置该字段。它的确切值取决于语言。有关详细信息,请参阅生成的代码指南。
  • 重复字段的默认值为空(通常是相应语言的空列表)。

枚举值

corpus
message SearchRequest {
  // 同上略
  enum Corpus {
    UNIVERSAL = 0;
    WEB = 1;
    IMAGES = 2;
    LOCAL = 3;
    NEWS = 4;
    PRODUCTS = 5;
    VIDEO = 6;
  }
  Corpus corpus = 4;
}

枚举值列表的第一常量值必须为0,这样可以更好的处理默认值。(也为了向下兼容)

option allow_alias = true
message MyMessage1 {
  enum EnumAllowingAlias {
    option allow_alias = true;
    UNKNOWN = 0;
    STARTED = 1;
    RUNNING = 1;
  }
}

保留值

reserved
enum Foo {
  reserved 2, 15, 9 to 11, 40 to max;
  reserved "FOO", "BAR";
}

这样,以上的序号和字段名就不能后续使用了,避免了逻辑混乱。

使用其他消息类型

可以使用其他消息类型作为字段类型。例如:

message SearchResponse {
  repeated Result results = 1;
}

message Result {
  string url = 1;
  string title = 2;
  repeated string snippets = 3;
}

可以将不同类型的消息定义在不同的 .proto 文件中,需要时导入进来:

import "myproject/other_protos.proto";

未知字段

未知字段是格式良好的 Protocol Buffer 序列化数据,表示解析器无法识别的字段。例如,当旧二进制文件用新字段解析新二进制文件发送的数据时,这些新字段将成为旧二进制文件中的未知字段。

最初,proto3 消息在解析过程中总是丢弃未知字段,但在 3.5 版本中,我们重新引入了保留未知字段以匹配 proto2 行为。在 3.5 及更高版本中,未知字段在解析期间保留并包含在序列化输出中。

Any

Bytes
import "google/protobuf/any.proto";

message ErrorStatus {
  string message = 1;
  repeated google.protobuf.Any details = 2;
}

Oneof

如果您有一条包含多个字段的消息,并且最多同时设置一个字段,您可以强制执行此行为并使用 oneof 功能节省内存。

message SampleMessage {
  oneof test_oneof {
    string name = 4;
    SubMessage sub_message = 9;
  }
}

Map

如果您想创建关联映射作为数据定义的一部分,protocol buffers 提供了一种方便的快捷语法:

map<key_type, value_type> map_field = N;

例如:

map<string, Project> projects = 3;

Packages

package
package foo.bar;
message Open { ... }
option go_package

服务定义

如果您想在 RPC(远程过程调用)系统中使用您的消息类型,您可以在 .proto 文件中定义一个 RPC 服务接口,并且协议缓冲区编译器将以您选择的语言生成服务接口代码和存根。因此,例如,如果您想使用获取 SearchRequest 并返回 SearchResponse 的方法定义 RPC 服务,您可以在 .proto 文件中定义它,如下所示:

service SearchService {
  rpc Search(SearchRequest) returns (SearchResponse);
}

与 Proto Buffers 一起使用的最直接的 RPC 系统是 gRPC:由 Google 开发的一种语言和平台中立的开源 RPC 系统。 gRPC 特别适用于协议缓冲区,并允许您使用特殊的协议缓冲区编译器插件直接从 .proto 文件生成相关的 RPC 代码。

选项

google/protobuf/descriptor.proto

例如我们使用 option go_package 选项来控制生成的 go 代码所在的 package。

Protocol buffer 语法指导

消息队列

事件驱动

API 网关

API 网关介绍

API 网关是客户端访问服务的统一入口,API 网关封装了后端服务。核心功能是转发请求,基于客户端请求的 Host、Method、Path 将请求转发到目标服务上。可以解决客户端直接访问后端服务的入口不统一的问题,尤其是在微服务时代,网关尤其重要,否则如下图所示:

现代的 API 网关,除了具备基本的转发功能外,通常还具有:

  • 多协议支持:tcp,http、https、websock、gRPC
  • 负载均衡
  • 身份验证
  • 监控、日志
  • 缓存
  • 熔断、限流

等核心功能。目前市场上比较知名的 API 网关有:

  • Netflix Zuul,spring cloud 的一个推荐组件,java 系首选
  • Kong 是基于Nginx+Lua 进行二次开发的实现,社区比较活跃
  • Tyk Go编写,社区版相对薄弱