本文主要介绍了如何使用 Go kit 构建基于 gRPC 的微服务,并额外补充了如何为 gRPC Server编写本地测试代码。

上一篇中,我们完成了一个基本的基于HTTP的 Go kit示例,本篇我们将添加代码使程序支持 gRPC 通信。

基于gRPC通信

proto

定义protobuf

addsrvproto
syntax = "proto3";

package pb;

option go_package="addsrv/pb";


service Add {
  // Sum 对两个数字求和
  rpc Sum (SumRequest) returns (SumResponse) {}

  // Concat 方法拼接两个字符串
  rpc Concat (ConcatRequest) returns (ConcatResponse) {}
}


// Sum方法的请求参数
message SumRequest {
  int64 a = 1;
  int64 b = 2;
}

// Sum方法的响应
message SumResponse {
  int64 v = 1;
  string err = 2;
}

// Concat方法的请求参数
message ConcatRequest {
  string a = 1;
  string b = 2;
}

// Concat方法的响应
message ConcatResponse {
  string v = 1;
  string err = 2;
}
pb/addsrv.proto
protoprotocprotoc-gen-go-grpc
protoc -I=pb \
   --go_out=pb --go_opt=paths=source_relative \
   --go-grpc_out=pb --go-grpc_opt=paths=source_relative \
   pb/addsrv.proto

此时项目目录如下:

├── go.mod
├── go.sum
├── main.go
└── pb
    ├── addsrv.pb.go
    ├── addsrv.proto
    └── addsrv_grpc.pb.go

grpcServer

main.gogrpcServersumconcatgrpctransport.Handler
import grpctransport "github.com/go-kit/kit/transport/grpc"


type grpcServer struct {
	pb.UnimplementedAddServer
	sum    grpctransport.Handler
	concat grpctransport.Handler
}
grpctransport.Handler
// Handler 应该从服务实现的gRPC绑定调用。
// 传入的请求参数和返回的响应参数都是gRPC类型,而不是用户域类型。
type Handler interface {
	ServeGRPC(ctx context.Context, request interface{}) (context.Context, interface{}, error)
}
grpctransport.Handlerhttptransport.Handler

我们先定义好处理请求和响应数据的编解码函数。

// decodeGRPCSumRequest 将Sum方法的gRPC请求参数转为内部的SumRequest
func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
	req := grpcReq.(*pb.SumRequest)
	return SumRequest{A: int(req.A), B: int(req.B)}, nil
}

// decodeGRPCConcatRequest 将Concat方法的gRPC请求参数转为内部的ConcatRequest
func decodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
	req := grpcReq.(*pb.ConcatRequest)
	return ConcatRequest{A: req.A, B: req.B}, nil
}

// encodeGRPCSumResponse 封装Sum的gRPC响应 
func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
	resp := response.(SumResponse)
	return &pb.SumResponse{V: int64(resp.V), Err: resp.Err}, nil
}

// encodeGRPCConcatResponse 封装Concat的gRPC响应
func encodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) {
	resp := response.(ConcatResponse)
	return &pb.ConcatResponse{V: resp.V, Err: resp.Err}, nil
}
grpctransport.NewServergrpctransport.Handler
// NewGRPCServer grpcServer构造函数
func NewGRPCServer(svc AddService) pb.AddServer {
	return &grpcServer{
		sum: grpctransport.NewServer(
			makeSumEndpoint(svc),
			decodeGRPCSumRequest,
			encodeGRPCSumResponse,
		),
		concat: grpctransport.NewServer(
			makeConcatEndpoint(svc),
			decodeGRPCConcatRequest,
			encodeGRPCConcatResponse,
		),
	}
}
grpcServer
func (s *grpcServer) Sum(ctx context.Context, req *pb.SumRequest) (*pb.SumResponse, error) {
	_, rep, err := s.sum.ServeGRPC(ctx, req)
	if err != nil {
		return nil, err
	}
	return rep.(*pb.SumResponse), nil
}

func (s *grpcServer) Concat(ctx context.Context, req *pb.ConcatRequest) (*pb.ConcatResponse, error) {
	_, rep, err := s.concat.ServeGRPC(ctx, req)
	if err != nil {
		return nil, err
	}
	return rep.(*pb.ConcatResponse), nil
}

启动gRPC服务

svc := addService{}

gs := NewGRPCServer(svc)

listener, err := net.Listen("tcp", ":8972")
if err != nil {
	fmt.Printf("failed to listen: %v", err)
	return
}
s := grpc.NewServer()       // 创建gRPC服务器
pb.RegisterAddServer(s, gs) // 在gRPC服务端注册服务
// 启动服务
err = s.Serve(listener)
if err != nil {
	fmt.Printf("failed to serve: %v", err)
	return
}

测试

SumConcat
// add_test.go
package main

import (
	"context"
	"gokit_demo1/pb"
	"log"
	"net"
	"testing"

	"github.com/stretchr/testify/assert"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/test/bufconn"
)

// 使用bufconn构建测试链接,避免使用实际端口号启动服务

const bufSize = 1024 * 1024

var bufListener *bufconn.Listener

func init() {
	bufListener = bufconn.Listen(bufSize)
	s := grpc.NewServer()
	gs := NewGRPCServer(addService{})
	pb.RegisterAddServer(s, gs)
	go func() {
		if err := s.Serve(bufListener); err != nil {
			log.Fatalf("Server exited with error: %v", err)
		}
	}()
}

func bufDialer(context.Context, string) (net.Conn, error) {
	return bufListener.Dial()
}

func TestSum(t *testing.T) {
	conn, err := grpc.DialContext(
		context.Background(),
		"bufnet",
		grpc.WithContextDialer(bufDialer),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewAddClient(conn)

	resp, err := c.Sum(context.Background(), &pb.SumRequest{
		A: 10,
		B: 2,
	})
	assert.Nil(t, err)
	assert.NotNil(t, resp)
	assert.Equal(t, int64(12), resp.V)
}

func TestConcat(t *testing.T) {
	conn, err := grpc.DialContext(
		context.Background(),
		"bufnet",
		grpc.WithContextDialer(bufDialer),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewAddClient(conn)

	resp, err := c.Concat(context.Background(), &pb.ConcatRequest{
		A: "10",
		B: "2",
	})
	assert.Nil(t, err)
	assert.NotNil(t, resp)
	assert.Equal(t, "102", resp.V)
}

项目目录下执行下面的命令,并查看测试结果。

go test -v ./...
=== RUN   TestSum
--- PASS: TestSum (0.00s)
=== RUN   TestConcat
--- PASS: TestConcat (0.00s)
PASS
ok      addsrv     0.016s