本教程提供了Go使用gRPC的基础教程
在教程中你将会学到如何:
.proto
proto3
为什么使用gRPC
我们的示例是一个简单的路线图应用,客户端可以获取路线特征信息、创建他们的路线摘要,还可以与服务器或者其他客户端交换比如交通状态更新这样的路线信息。
.proto
安装
安装grpc包
examples
$ go get google.golang.org/grpc
`grpc-go/examples/route_guide
$ cd $GOPATH/src/google.golang.org/grpc/examples/route_guide
安装相关工具和插件
- 安装protocol buffer编译器
Mac Os
PATHprotocPATH
- 安装protoc编译器插件
$ go get -u github.com/golang/protobuf/protoc-gen-go
protoc-gen-go$GOBIN$GOPATH/binprotoc$PATH
$ export PATH=$PATH:$GOPATH/bin
定义服务
examples/route_guide/routeguide/route_guide.proto.proto
.protoservice
service RouteGuide {
...
}
rpcRouteGuide
- 一个简单的RPC,客户端使用存根将请求发送到服务器,然后等待响应返回,就像普通的函数调用一样。
// 获得给定位置的特征
rpc GetFeature(Point) returns (Feature) {}
- 服务器端流式RPC,客户端向服务器发送请求,并获取流以读取回一系列消息。客户端从返回的流中读取,直到没有更多消息为止。如我们的示例所示,可以通过将stream关键字放在响应类型之前来指定服务器端流方法。
//获得给定Rectangle中可用的特征。结果是
//流式传输而不是立即返回
//因为矩形可能会覆盖较大的区域并包含大量特征。
rpc ListFeatures(Rectangle) returns (stream Feature) {}
- 客户端流式RPC,其中客户端使用gRPC提供的流写入一系列消息并将其发送到服务器。客户端写完消息后,它将等待服务器读取所有消息并返回其响应。通过将stream关键字放在请求类型之前,可以指定客户端流方法。
// 接收路线上被穿过的一系列点位, 当行程结束时
// 服务端会返回一个RouteSummary类型的消息.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
- 双向流式RPC,双方都使用读写流发送一系列消息。这两个流是独立运行的,因此客户端和服务器可以按照自己喜欢的顺序进行读写:例如,服务器可以在写响应之前等待接收所有客户端消息,或者可以先读取消息再写入消息,或其他一些读写组合。每个流中的消息顺序都会保留。您可以通过在请求和响应之前都放置stream关键字来指定这种类型的方法。
//接收路线行进中发送过来的一系列RouteNotes类型的消息,同时也接收其他RouteNotes(例如:来自其他用户)
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
.protoPoint
// Points被表示为E7表示形式中的经度-纬度对。
//(度数乘以10 ** 7并四舍五入为最接近的整数)。
// 纬度应在+/- 90度范围内,而经度应在
// 范围+/- 180度(含)
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
生成客户端和服务端代码
.protoprotoc
route_guide
protoc -I routeguide/ routeguide/route_guide.proto --go_out=plugins=grpc:routeguide
route_guiderouteguideroute_guide.pb.go
pb.go
RouteGuideRouteGuideServerRouteGuide
创建gRPC服务端
RouteGuideRouteGuide
- 实现我们从服务定义生成的服务接口:做服务实际要做的事情。
- 运行一个gRPC服务器监听客户端的请求然后把请求派发给正确的服务实现。
你可以在刚才安装的gPRC包的grpc-go/examples/route_guide/server/server.go找到我们示例中RouteGuide`服务的实现代码。下面让我们看看他是怎么工作的。
实现RouteGuide
routeGuideServerprotocpb.goRouteGuideServer
type routeGuideServer struct {
...
}
...
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
...
}
...
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
...
}
...
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
...
}
...
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
...
}
...
普通PRC
routeGuideServerGetFeaturePointFeatureFeature
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
return feature, nil
}
}
// No feature was found, return an unnamed feature
return &pb.Feature{"", point}, nil
}
PointFeatureFeaturenil
服务端流式RPC
ListFeaturesFeature
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
for _, feature := range s.savedFeatures {
if inRange(feature.Location, rect) {
if err := stream.Send(feature); err != nil {
return err
}
}
}
return nil
}
FeatureRectangleRouteGuide_ListFeaturesServe
FeatureSend()RouteGuide_ListFeaturesServernilnil
客户端流式RPC
RecordRouteRouteSummaryrequestRouteGuide_RecordRouteServerRecv()SendAndClose()
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount, featureCount, distance int32
var lastPoint *pb.Point
startTime := time.Now()
for {
point, err := stream.Recv()
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,
FeatureCount: featureCount,
Distance: distance,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}
if err != nil {
return err
}
pointCount++
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
featureCount++
}
}
if lastPoint != nil {
distance += calcDistance(lastPoint, point)
}
lastPoint = point
}
}
RouteGuide_RecordRouteServerRecv()PointRecv()nil
双向流式RPC
RouteChat()
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
key := serialize(in.Location)
s.mu.Lock()
s.routeNotes[key] = append(s.routeNotes[key], in)
// Note: this copy prevents blocking other clients while serving this one.
// We don't need to do a deep copy, because elements in the slice are
// insert-only and never modified.
rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
copy(rn, s.routeNotes[key])
s.mu.Unlock()
for _, note := range rn {
if err := stream.Send(note); err != nil {
return err
}
}
}
}
RouteGuide_RouteChatServer
Send()SendAndClose()
启动服务器
RouteGuide
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
pb.RegisterRouteGuideServer(grpcServer, &routeGuideServer{})
... // determine whether to use TLS
grpcServer.Serve(lis)
为了构建和启动服务器我们需要:
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))。grpc.NewServer()Serve()Stop()
创建客户端
RouteGuide
创建客户端存根
grpc.Dial()
conn, err := grpc.Dial(*serverAddr)
if err != nil {
...
}
defer conn.Close()
grpc.DialDialOptionsRouteGuide
.protopbNewRouteGuideClient
client := pb.NewRouteGuideClient(conn)
pb.goRouteGuideClientclient
type RouteGuideClient interface {
GetFeature(ctx context.Context, in *Point, opts ...grpc.CallOption) (*Feature, error)
ListFeatures(ctx context.Context, in *Rectangle, opts ...grpc.CallOption) (RouteGuide_ListFeaturesClient, error)
RecordRoute(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RecordRouteClient, error)
RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error)
}
每个实现方法会再去请求gRPC服务端相对应的方法获取服务端的响应,比如:
func (c *routeGuideClient) GetFeature(ctx context.Context, in *Point, opts ...grpc.CallOption) (*Feature, error) {
out := new(Feature)
err := c.cc.Invoke(ctx, "/routeguide.RouteGuide/GetFeature", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
RouteGuideClientpb.go
调用服务的方法
现在让我们看看如何调用服务的方法。注意在gRPC-Go中,PRC是在阻塞/同步模式下的运行的,也就是说RPC调用会等待服务端响应,服务端将返回响应或者是错误。
普通RPC
GetFeature
feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})
if err != nil {
...
}
context.Context
服务端流式RPC
ListFeatures
rect := &pb.Rectangle{ ... } // initialize a pb.Rectangle
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
...
}
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
}
log.Println(feature)
}
RouteGuide_ListFeaturesClientRouteGuide_ListFeaturesClient
RouteGuide_ListFeaturesClientRecv()FeatureRecv()errnilio.EOFerr
客户端流式RPC
RecordRouteRouteGuide_RecordRouteClient
// 随机的创建一些Points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
stream, err := client.RecordRoute(context.Background())// 调用服务中定义的客户端流式RPC方法
if err != nil {
log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
for _, point := range points {
if err := stream.Send(point); err != nil {// 向流中写入多个请求消息
if err == io.EOF {
break
}
log.Fatalf("%v.Send(%v) = %v", stream, point, err)
}
}
reply, err := stream.CloseAndRecv()// 从流中取回服务器的响应
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)
RouteGuide_RecordRouteClientSend()Send()写入流完成后,我们需要在流上调用方法让gRPC知道我们已经完成了请求的写入并且期望得到一个响应。我们从方法返回的中可以获得RPC状态。如果状态是,
双向流式RPC
RouteChat()RecordRoute
stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
for _, note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc
CloseSend()
启动应用
$ GOPATH/src/google.golang.org/grpc/examples/route_guide
$ go run server/server.go
同样,运行客户端:
$ go run client/client.go