获取示例代码
- 在 .proto 文件定义一个 service
- 使用 protocol buffer 编译器生成服务端、客户端代码
- 使用 Go gRPC API 为你的 service 编写一个简单的客户端、服务端
$ git clone -b v1.46.0 --depth 1 https://github.com/grpc/grpc-go
$ cd grpc-go/examples/route_guide
定义 service、message
service
service RouteGuide {
...
}
// 获取给定位置的特征
rpc GetFeature(Point) returns (Feature) {}
// 获取给定矩形内可用的特征。
// 结果是流式传输而不是立即返回(例如,在具有重复字段的响应消息中),因为矩形可能覆盖大面积并包含大量特征。
rpc ListFeatures(Rectangle) returns (stream Feature) {}
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
生成客户端、服务端代码
在 examples/route_guide 目录中,运行以下命令:
$ protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
routeguide/route_guide.proto
运行此命令会在 routeguide 目录中生成以下文件:
- route_guide.pb.go
其中包含用于填充、序列化、检索请求和响应消息类型 的所有 protocol buffer 代码 - route_guide_grpc.pb.go
- 一个接口类型(或存根),包含客户端可以调用的 RouteGuide 服务中定义的方法。
- 服务端要实现的接口类型,包含 RouteGuide 服务中定义的方法。
让我们的 RouteGuide 服务完成它的工作,有两个部分:
- 实现 proto 服务定义中生成的服务接口:doing the actual “work” of our service.
- 运行 gRPC 服务器 以侦听来自客户端的请求,并将它们分派到正确的服务实现。
可以在 server/server.go 中找到示例 RouteGuide 服务器。 让我们仔细看看它是如何工作的。
实现 RouteGuide
如您所见,我们的服务器有一个 routeGuideServer 结构类型,它实现了生成的 RouteGuideServer 接口:
type routeGuideServer struct {
...
}
...
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
...
}
...
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
...
}
...
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
...
}
...
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
...
}
...
简单 RPC
routeGuideServer 实现了我们所有的服务方法。
我们先来看最简单的类型:GetFeature,它只是从客户端获取一个Point,并从其数据库中返回对应的特征信息,通过 Feature 返回。
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
return feature, nil
}
}
// No feature was found, return an unnamed feature
return &pb.Feature{Location: point}, nil
}
服务端流 RPC
现在让我们看看我们的一个流式 RPC。
ListFeatures 是一个服务器端流式 RPC,因此我们需要将多个 Feature 发送回我们的客户端。
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
for _, feature := range s.savedFeatures {
if inRange(feature.Location, rect) {
if err := stream.Send(feature); err != nil {
return err
}
}
}
return nil
}
正如你所看到的,这次我们没有在我们的方法参数中获取简单的请求和响应对象,而是获取了一个请求对象(Rectangle,我们的客户想要在其中找到特征)和一个特殊的 RouteGuide_ListFeaturesServer 对象来写我们的响应。
Send()
客户端流 RPC
现在让我们看一些更复杂的东西:客户端流式传输方法 RecordRoute。
我们从客户端获取 Point 流,并返回一个包含旅行信息的 RouteSummary。
Recv()SendAndClose()
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount, featureCount, distance int32
var lastPoint *pb.Point
startTime := time.Now()
for {
point, err := stream.Recv()
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,
FeatureCount: featureCount,
Distance: distance,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}
if err != nil {
return err
}
pointCount++
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
featureCount++
}
}
if lastPoint != nil {
distance += calcDistance(lastPoint, point)
}
lastPoint = point
}
}
Recv()重复读入
- 如果这是 nil,流仍然是好的,它可以继续阅读;
- 如果是 io.EOF,则消息流已经结束,服务器可以返回其 RouteSummary;
- 如果它有任何其他值,我们会“按原样”返回错误,以便 gRPC 层将其转换为 RPC 状态。
双向流 RPC
最后,让我们看看我们的双向流式 RPC RouteChat()。
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
key := serialize(in.Location)
... // look for notes to be sent to client
for _, note := range s.routeNotes[key] {
if err := stream.Send(note); err != nil {
return err
}
}
}
}
这次我们得到一个 RouteGuide_RouteChatServer 流,就像在我们的客户端流示例中一样,可用于读取和写入消息。 但是,这一次我们通过方法的流返回值,而客户端仍在将消息写入其消息流。
Send()
启动服务端
一旦我们实现了所有方法,我们还需要启动一个 gRPC 服务器,以便客户端可以实际使用我们的服务。 以下片段显示了我们如何为 RouteGuide 服务执行此操作:
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
...
grpcServer := grpc.NewServer(opts...)
pb.RegisterRouteGuideServer(grpcServer, newServer())
grpcServer.Serve(lis)
指定我们要用来监听客户端请求的端口:
lis, err := net.Listen(...).使用 grpc.NewServer(...) 创建一个 gRPC 服务器实例。
向 gRPC 服务器注册我们的服务实现
在服务器上调用 Serve() 以进行阻塞等待,直到进程被杀死或调用 Stop()
在本节中,我们将着眼于为我们的 RouteGuide 服务创建一个 Go 客户端。
可以在 grpc-go/examples/route_guide/client/client.go 中看到完整的客户端示例代码
创建 stub
要调用服务方法,我们首先需要创建一个 gRPC 通道来与服务器通信。
我们通过将服务器地址和端口号传递给 grpc.Dial() 来创建它,如下所示:
var opts []grpc.DialOption
...
conn, err := grpc.Dial(*serverAddr, opts...)
if err != nil {
...
}
defer conn.Close()
当服务需要时,您可以使用 DialOptions 在 grpc.Dial 中设置身份验证凭据(例如,TLS、GCE 凭据或 JWT 凭据)。 RouteGuide 服务不需要任何凭据。
设置 gRPC 通道后,我们需要一个客户端存根来执行 RPC。
我们使用从示例 .proto 文件生成的 pb 包 提供的 NewRouteGuideClient 方法获取它。
client := pb.NewRouteGuideClient(conn)
调用服务方法
现在让我们看看我们如何调用我们的服务方法。
请注意,在 gRPC-Go 中,RPC 以 阻塞/同步 模式运行,这意味着 RPC 调用等待服务器响应,并且将返回响应或错误。
简单 RPC
调用简单的 RPC GetFeature 几乎与调用本地方法一样简单。
feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})
if err != nil {
...
}
如您所见,我们在之前获得的 存根 上调用该方法。
在我们的方法参数中,我们创建并填充了一个请求 protocol buffer 对象(在我们的例子中是 Point)。 我们还传递了一个 context.Context 对象,它允许我们在必要时更改 RPC 的行为,例如 timeout / cancel 运行中的 RPC。
如果调用没有返回错误,那么我们可以从第一个返回值中读取服务器的响应信息。
log.Println(feature)
服务端流 RPC
这里是我们调用服务器端流方法 ListFeatures 的地方,它返回地理特征流。
rect := &pb.Rectangle{ ... } // initialize a pb.Rectangle
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
...
}
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
}
log.Println(feature)
}
就像在简单的 RPC 中一样,我们向方法传递一个上下文和一个请求。 但是,我们没有返回响应对象,而是返回 RouteGuide_ListFeaturesClient 的实例。 客户端可以使用 RouteGuide_ListFeaturesClient 流来读取服务器的响应。
Recv()重复读入
- 如果为 nil,则流仍然是好的,它可以继续读;
- 如果是 io.EOF 则消息流结束;
- 否则一定有RPC错误,通过err传递过来。
客户端流 RPC
客户端流方法 RecordRoute 类似于服务器端方法,不同之处在于我们只向该方法传递一个上下文并返回一个 RouteGuide_RecordRouteClient 流,我们可以使用它来写入和读取消息。
// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
stream, err := client.RecordRoute(context.Background())
if err != nil {
log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
for _, point := range points {
if err := stream.Send(point); err != nil {
log.Fatalf("%v.Send(%v) = %v", stream, point, err)
}
}
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)
Send()CloseAndRecv()
双向流 RPC
最后,让我们看看我们的双向流式 RPC RouteChat()。
与 RecordRoute 的情况一样,我们只向该方法传递一个上下文对象,并返回一个我们可以用来写入和读取消息的流。 但是,这一次我们通过方法的流 返回值,而服务器仍在将消息写入其消息流。
stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
for _, note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc
CloseSend()
测试
- 运行server
go run server/server.go
- 运行客户端
go run client/client.go