什么是 Go kit?

Go是一种很棒的通用语言,但是微服务需要一定量的专业支持。 RPC安全性,系统可观察性,基础结构集成甚至程序设计。Go kit 填补了标准库留下的空白,并使 Go 成为在任何组织中编写微服务的一流语言

   下面基于go-kit实现用户微服务,用户的登陆,注册和获取用户信息,使用jwt方式来加密和解码,本文事例比较简单,主要是实现微服务和理解微服务开发,通信使用grpc服务来编码和解码

涉及到技术的如下:

1.grpc

2.consul 服务注册与发现

3.golang/rate 服务限流

4.中间价

5.身份验证jwt

6.ctx context.Context 上下文传递全局参数

代码结构如下:

1.首先定义proto文件

我们需要实现三个方法分别是 RegistAccount,LoginAccount,GetUserInfoByToken 分别是注册,登陆和通过token获取用户,和其他请求体,相信代码如下:

syntax = "proto3";
package pb;
service UserServiceExt {
   // 注册用户
   rpc RegistAccount(RegistAccountReq) returns(RegistAccountRsp) {}
   // 用户登录
   rpc LoginAccount(LoginAccountReq) returns(LoginAccountRsp) {}
   // 通过token 获取用户信息
    rpc GetUserInfoByToken(GetUserInfoByTokenRequest) returns(GetUserInfoByTokenResponse) {}
}
message RegistAccountReq {
    string email = 1 ;
    string userName = 2 ;
    string password = 3 ;
}
message RegistAccountRsp {
    int64 code = 1;
}
message LoginAccountReq {
    string email = 1 ;
    string password = 2 ;
}

message UserInfo {
    int64 user_id=1;
    string userName=2;
    string email=3;
    string phone=4;
}
message LoginAccountRsp {
    int64 code = 1;
    string token = 2;
    string uid = 3;
}
message GetUserInfoByTokenRequest {
    string token=1;
}

message GetUserInfoByTokenResponse {
    int64 code=1;
    UserInfo userInfo=3;
}

定义好ptoto 文件后 使用ptotoc工具生产服务端和客服端代码 我是放在/common/pb文件下,你可以定义你自己的生成路径:protoc --proto_path=./pb --go_out=plugins=grpc:./common/pb ./pb/*.proto

2.接下来我们来实现接口

在 service 文件下定义 并实现接口,详细代码如下

package service

import (
	"context"
	"fmt"
	"go.uber.org/zap"
	"mSystem/src/common/db"
	"mSystem/src/common/errors"
	"mSystem/src/common/pb"
	"mSystem/src/utils"
)

type UserService interface {
	RegistAccount(ctx context.Context, req *pb.RegistAccountReq) (*pb.RegistAccountRsp, error)
	LoginAccount(ctx context.Context, req *pb.LoginAccountReq) (*pb.LoginAccountRsp, error)
	GetUserInfoByToken(ctx context.Context, req *pb.GetUserInfoByTokenRequest) (*pb.GetUserInfoByTokenResponse, error)
}
type baseServer struct {
	logger *zap.Logger
}

func NewUserService(log *zap.Logger) UserService {
	var server UserService
	server = &baseServer{
		logger: log,
	}
	//server = NewLogMiddlewareServer(log)(server)
	return server
}

// 账户注册
func (u baseServer) RegistAccount(ctx context.Context, req *pb.RegistAccountReq) (*pb.RegistAccountRsp, error) {
	userName := req.UserName
	password := req.Password
	email := req.Email
	user, err := db.SelectUserByEmail(email)
	if err != nil {
		u.logger.Error("error", zap.Error(err))
		return &pb.RegistAccountRsp{
			Code: -1,
		}, errors.ErrorUserFailed
	}
	if user != nil {
		return &pb.RegistAccountRsp{
			Code: -1,
		}, errors.ErrorUserAlready
	}
	err = db.InsertUser(userName, password, email)
	if err != nil {
		u.logger.Error("error", zap.Error(err))
		return &pb.RegistAccountRsp{
			Code: -1,
		}, errors.ErrorUserFailed
	}
	return &pb.RegistAccountRsp{
		Code: 0,
	}, nil
}

func (u baseServer) LoginAccount(ctx context.Context, req *pb.LoginAccountReq) (*pb.LoginAccountRsp, error) {
	email := req.Email
	password := req.Password
	user, err := db.SelectUserByPasswordName(email, password)
	if err != nil {
		u.logger.Error("error", zap.Error(err))
		return &pb.LoginAccountRsp{
		}, errors.ErrorUserFailed
	}
	if user == nil {
		return &pb.LoginAccountRsp{}, errors.ErrorUserLoginFailed
	}
	// jwt 加密
	Token, err := utils.CreateJwtToken(user.UserName, int(user.UserId))
	return &pb.LoginAccountRsp{
		Token: Token,
		Code:  0,
		Uid:   fmt.Sprint(ctx.Value(ContextReqUUid)),
	}, err
}

func (u baseServer) GetUserInfoByToken(ctx context.Context, req *pb.GetUserInfoByTokenRequest) (*pb.GetUserInfoByTokenResponse, error) {
	token := req.Token
	if token == "" {
		return &pb.GetUserInfoByTokenResponse{
			Code: -1,
		}, errors.ErrorTokenEmpty
	}
	MapClaims, err := utils.ParseToken(token)
	fmt.Println("MapClaims",  MapClaims, MapClaims["DcId"], MapClaims["Name"],err)
	if err != nil {
		return &pb.GetUserInfoByTokenResponse{
			Code: -2,
		}, errors.FormatError("user",err.Error())
	}
	user_id := MapClaims["DcId"].(float64)
	username := MapClaims["Name"].(string)
	fmt.Println("user-info", user_id, username)
	user_info, e := db.SelectUserById(user_id, username)
	if e != nil {
		return &pb.GetUserInfoByTokenResponse{
			Code: -2,
		}, errors.ErrorUserLoginFailed
	}
	return &pb.GetUserInfoByTokenResponse{
		Code: 0,
		UserInfo: &pb.UserInfo{
			UserId:   user_info.UserId,
			UserName: user_info.UserName,
			Email:    user_info.Email,
			Phone:    user_info.Phone,
		},
	}, nil
}

3,接下来实现endpoint ,它是微服务内部的桥梁,它把用户的请求的参数,组装成grpc需要的数据类型,再转发到我们具体的服务实现,在endpoint 可以限流,添加中间价等操作,代码如下

package endpoint

import (
	"context"
	"github.com/go-kit/kit/endpoint"
	"go.uber.org/zap"
	"golang.org/x/time/rate"
	pb "mSystem/src/common/pb"
	"mSystem/src/user/encode"
	"mSystem/src/user/service"
)

type RegistRequest struct {
	Email    string `json:"email"`
	UserName string `json:"userName"`
	Password string `json:"password"`
}
type RegistResponse struct {
	Email    string `json:"email"`
	UserName string `json:"userName"`
	Password string `json:"password"`
}

type LoginRequest struct {
	Email    string `json:"email"`
	Password string `json:"password"`
}

type GetTokenRequest struct {
	Token string `json:"token"`
}

// y有几个函数就定义几个 endpoint
type Endpoints struct {
	RegistAccount      endpoint.Endpoint
	LoginAccount       endpoint.Endpoint
	GetUserInfoByToken endpoint.Endpoint
}

func NewEndpoint(s service.UserService, log *zap.Logger, limit *rate.Limiter) Endpoints {
	var RegistEndPoint endpoint.Endpoint
	RegistEndPoint = MakeRegistEndPoint(s)

	var LoginEndPoint endpoint.Endpoint
	LoginEndPoint = MakeLoginEndPoint(s)
	LoginEndPoint = LoggingMiddleware(log)(LoginEndPoint)              // 登陆中间价
	LoginEndPoint = NewGolangRateAllowMiddleware(limit)(LoginEndPoint) //限流

	var GetUserInfoByToken endpoint.Endpoint
	GetUserInfoByToken = MakeTokenEndPoint(s)
	return Endpoints{RegistAccount: RegistEndPoint, LoginAccount: LoginEndPoint, GetUserInfoByToken: GetUserInfoByToken}
}

// 实现请求转发
func MakeRegistEndPoint(s service.UserService) endpoint.Endpoint {

	return func(ctx context.Context, request interface{}) (response interface{}, err error) {
		req := request.(*RegistRequest) / 获取请求参数

		val, err := s.RegistAccount(ctx, &pb.RegistAccountReq{ // 组装请求参数到servce
			Email:    req.Email,
			Password: req.Password,
			UserName: req.UserName,
		})
		return encode.Response{
			Error: err,
			Data:  val,
		}, err
	}
}

// 实现请求转发
func MakeLoginEndPoint(s service.UserService) endpoint.Endpoint {
	return func(ctx context.Context, request interface{}) (response interface{}, err error) {
		req := request.(*LoginRequest) /// 获取请求参数
		val, err := s.LoginAccount(ctx, &pb.LoginAccountReq{ //组装请求参数到servce
			Email:    req.Email,
			Password: req.Password,
		})
		return encode.Response{
			Error: err,
			Data:  val,
		}, err
	}
}
func MakeTokenEndPoint(s service.UserService) endpoint.Endpoint {
	return func(ctx context.Context, request interface{}) (response interface{}, err error) {
		req := request.(*GetTokenRequest) /// 获取请求参数
		val, err := s.GetUserInfoByToken(ctx, &pb.GetUserInfoByTokenRequest{
			Token: req.Token,
		})
		return encode.Response{
			Error: err,
			Data:  val,
		}, err
	}
}

4.完成了具体服务实现和endpoint,接下来就可以把我们的服务暴露给外调用了,transport的任务就是暴露接口给外部使用,详细代码如下:

package transport

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"github.com/go-kit/kit/log"
	kithttp "github.com/go-kit/kit/transport/http"
	"github.com/gorilla/mux"
	uuid "github.com/satori/go.uuid"
	"go.uber.org/zap"
	"io/ioutil"
	"mSystem/src/user/encode"
	"mSystem/src/user/endpoint"
	"net/http"
)

var (
	ErrorBadRequest = errors.New("invalid request parameter")
)

const ContextReqUUid = "req_uuid"

// MakeHttpHandler make http handler use mux
func MakeHttpHandler(ctx context.Context, endpoints endpoint.Endpoints, logger log.Logger) http.Handler {
	r := mux.NewRouter()

	options := []kithttp.ServerOption{
		kithttp.ServerErrorLogger(logger),
		kithttp.ServerErrorEncoder(kithttp.DefaultErrorEncoder),
		kithttp.ServerErrorEncoder(func(ctx context.Context, err error, w http.ResponseWriter) {
			logger.Log(fmt.Sprint(ctx.Value(ContextReqUUid)))
			w.WriteHeader(http.StatusOK)
			json.NewEncoder(w).Encode(err)
		}),
		kithttp.ServerBefore(func(ctx context.Context, request *http.Request) context.Context {
			UUID := uuid.NewV5(uuid.Must(uuid.NewV4(),nil), "req_uuid").String()
			logger.Log("给请求添加uuid", zap.Any("UUID", UUID))
			ctx = context.WithValue(ctx, ContextReqUUid, UUID)
			return ctx
		}),
	}

     // 暴露具体的 endpoint
	r.Methods("POST").Path("/register").Handler(kithttp.NewServer(
		endpoints.RegistAccount,
		decodeRegisterRequest, // 请求参数
		encode.JsonResponse,
		options...,
	))

	r.Methods("POST").Path("/login").Handler(kithttp.NewServer(
		endpoints.LoginAccount,
		decodeLoginRequest, // 请求参数
		encode.JsonResponse,
		options...,
	))
	r.Methods("POST").Path("/userInfo").Handler(kithttp.NewServer(
		endpoints.GetUserInfoByToken,
		decodeGetTokenRequest, // 请求参数
		encode.JsonResponse,
		options...,
	))
	return r
}

// decodeStringRequest decode request params to struct
func decodeRegisterRequest(ctx context.Context, r *http.Request) (interface{}, error) {
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		fmt.Printf("read body err, %v\n", err)
		return nil, err
	}
	println("json:", string(body))

	var rhe endpoint.RegistRequest
	if err = json.Unmarshal(body, &rhe); err != nil {
		fmt.Printf("Unmarshal err, %v\n", err)
		return nil, err
	}
	return &endpoint.RegistRequest{
		Email:    rhe.Email,
		UserName: rhe.UserName,
		Password: rhe.Password,
	}, nil
}
// 注册 decodeStringRequest decode request params to struct
func decodeLoginRequest(ctx context.Context, r *http.Request) (interface{}, error) {
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		fmt.Printf("read body err, %v\n", err)
		return nil, err
	}
	println("json:", string(body))

	var rhe endpoint.LoginRequest
	if err = json.Unmarshal(body, &rhe); err != nil {
		fmt.Printf("Unmarshal err, %v\n", err)
		return nil, err
	}
	return &endpoint.LoginRequest{
		Email:    rhe.Email,
		Password: rhe.Password,
	}, nil
}
func decodeGetTokenRequest(ctx context.Context, r *http.Request) (interface{}, error) {
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		fmt.Printf("read body err, %v\n", err)
		return nil, err
	}
	println("json:", string(body))

	var rhe endpoint.GetTokenRequest
	if err = json.Unmarshal(body, &rhe); err != nil {
		fmt.Printf("Unmarshal err, %v\n", err)
		return nil, err
	}
	return &endpoint.GetTokenRequest{
		Token:    rhe.Token,
	}, nil
}

可以看到 我们暴露了3个地址分别是 /register ,/login,/userInfo 也就是我们之前实现的3个接口。大功告成了 接下来实现main函数,启动http和grpc服务,就可以访问到我们的服务了

main 函数如下:

package main

import (
	"context"
	"flag"
	"fmt"
	"github.com/go-kit/kit/log"
	"golang.org/x/time/rate"
	"google.golang.org/grpc"
	dbconfig "mSystem/src/common/db"
	"mSystem/src/common/pb"
	register "mSystem/src/user/compone"
	edpts "mSystem/src/user/endpoint"
	"mSystem/src/user/service"
	transport "mSystem/src/user/transport"
	"mSystem/src/utils"
	"net"
	"net/http"
	"os"
	"os/signal"
	"syscall"
)

func main() {
	var (
		consulHost  = flag.String("consul.host", "127.0.0.1", "consul ip address")
		consulPort  = flag.String("consul.port", "8500", "consul port")

		serviceHost = flag.String("service.host", "localhost", "service ip address")
		servicePort = flag.String("service.port", "9001", "service port")

		grpcAddr    = flag.String("grpc", ":8001", "gRPC listen address.")
	)
	flag.Parse()
	ctx := context.Background()
	errChan := make(chan error)

	var logger log.Logger
	{
		logger = log.NewLogfmtLogger(os.Stderr)
		logger = log.With(logger, "ts", log.DefaultTimestampUTC)
		logger = log.With(logger, "caller", log.DefaultCaller)
	}

	// 接口定义
	// 具体服务实现了
	svc := service.NewUserService(utils.GetLogger())
	//创建Endpoint
	utils.NewLoggerServer()
	golangLimit := rate.NewLimiter(10, 1)

	endpoint := edpts.NewEndpoint(svc,utils.GetLogger(),golangLimit)
	//创建http.Handler
	r := transport.MakeHttpHandler(ctx, endpoint, logger)

	//创建注册对象
	registar := register.Register(*consulHost, *consulPort, *serviceHost, *servicePort, logger)

    // http 服务
	go func() {
		fmt.Println("Http Server start at port:" + *servicePort)
		//启动前执行注册
		registar.Register()

		errChan <- http.ListenAndServe(":"+*servicePort, r)
	}()
	// 数据库连接初始化。。。。。。
	dbconfig.InitDB()
	//grpc server
	go func() {
		fmt.Println("grpc Server start at port" + *grpcAddr)
		listener, err := net.Listen("tcp", *grpcAddr)
		if err != nil {
			errChan <- err
			return
		}
		baseServer := grpc.NewServer()
		pb.RegisterUserServiceExtServer(baseServer, svc)
		errChan <- baseServer.Serve(listener)

	}()
	go func() {
		c := make(chan os.Signal, 1)
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
		errChan <- fmt.Errorf("%s", <-c)
	}()

	error := <-errChan
	//服务退出取消注册
	registar.Deregister()
	fmt.Println(error)
}

需要主要的是 服务发现我们使用了默认的8085端口,在本地访问8085端口 就可以看到注册的服务事例,9091端口提供http服务,也是服务的入口,8001端口开启grpc服务,使用golang.org/x/time/rate 来实现简单的限流功能,通过

Register方法把微服务注册到consul上,

dbconfig.InitDB() 初始化数据库连接,使用mysql来存储数据

最后是启动服务即可

启动成功如下: