什么是 Go kit?
Go是一种很棒的通用语言,但是微服务需要一定量的专业支持。 RPC安全性,系统可观察性,基础结构集成甚至程序设计。Go kit 填补了标准库留下的空白,并使 Go 成为在任何组织中编写微服务的一流语言
下面基于go-kit实现用户微服务,用户的登陆,注册和获取用户信息,使用jwt方式来加密和解码,本文事例比较简单,主要是实现微服务和理解微服务开发,通信使用grpc服务来编码和解码
涉及到技术的如下:
1.grpc
2.consul 服务注册与发现
3.golang/rate 服务限流
4.中间价
5.身份验证jwt
6.ctx context.Context 上下文传递全局参数
代码结构如下:
1.首先定义proto文件
我们需要实现三个方法分别是 RegistAccount,LoginAccount,GetUserInfoByToken 分别是注册,登陆和通过token获取用户,和其他请求体,相信代码如下:
syntax = "proto3";
package pb;
service UserServiceExt {
// 注册用户
rpc RegistAccount(RegistAccountReq) returns(RegistAccountRsp) {}
// 用户登录
rpc LoginAccount(LoginAccountReq) returns(LoginAccountRsp) {}
// 通过token 获取用户信息
rpc GetUserInfoByToken(GetUserInfoByTokenRequest) returns(GetUserInfoByTokenResponse) {}
}
message RegistAccountReq {
string email = 1 ;
string userName = 2 ;
string password = 3 ;
}
message RegistAccountRsp {
int64 code = 1;
}
message LoginAccountReq {
string email = 1 ;
string password = 2 ;
}
message UserInfo {
int64 user_id=1;
string userName=2;
string email=3;
string phone=4;
}
message LoginAccountRsp {
int64 code = 1;
string token = 2;
string uid = 3;
}
message GetUserInfoByTokenRequest {
string token=1;
}
message GetUserInfoByTokenResponse {
int64 code=1;
UserInfo userInfo=3;
}
定义好ptoto 文件后 使用ptotoc工具生产服务端和客服端代码 我是放在/common/pb文件下,你可以定义你自己的生成路径:protoc --proto_path=./pb --go_out=plugins=grpc:./common/pb ./pb/*.proto
2.接下来我们来实现接口
在 service 文件下定义 并实现接口,详细代码如下
package service
import (
"context"
"fmt"
"go.uber.org/zap"
"mSystem/src/common/db"
"mSystem/src/common/errors"
"mSystem/src/common/pb"
"mSystem/src/utils"
)
type UserService interface {
RegistAccount(ctx context.Context, req *pb.RegistAccountReq) (*pb.RegistAccountRsp, error)
LoginAccount(ctx context.Context, req *pb.LoginAccountReq) (*pb.LoginAccountRsp, error)
GetUserInfoByToken(ctx context.Context, req *pb.GetUserInfoByTokenRequest) (*pb.GetUserInfoByTokenResponse, error)
}
type baseServer struct {
logger *zap.Logger
}
func NewUserService(log *zap.Logger) UserService {
var server UserService
server = &baseServer{
logger: log,
}
//server = NewLogMiddlewareServer(log)(server)
return server
}
// 账户注册
func (u baseServer) RegistAccount(ctx context.Context, req *pb.RegistAccountReq) (*pb.RegistAccountRsp, error) {
userName := req.UserName
password := req.Password
email := req.Email
user, err := db.SelectUserByEmail(email)
if err != nil {
u.logger.Error("error", zap.Error(err))
return &pb.RegistAccountRsp{
Code: -1,
}, errors.ErrorUserFailed
}
if user != nil {
return &pb.RegistAccountRsp{
Code: -1,
}, errors.ErrorUserAlready
}
err = db.InsertUser(userName, password, email)
if err != nil {
u.logger.Error("error", zap.Error(err))
return &pb.RegistAccountRsp{
Code: -1,
}, errors.ErrorUserFailed
}
return &pb.RegistAccountRsp{
Code: 0,
}, nil
}
func (u baseServer) LoginAccount(ctx context.Context, req *pb.LoginAccountReq) (*pb.LoginAccountRsp, error) {
email := req.Email
password := req.Password
user, err := db.SelectUserByPasswordName(email, password)
if err != nil {
u.logger.Error("error", zap.Error(err))
return &pb.LoginAccountRsp{
}, errors.ErrorUserFailed
}
if user == nil {
return &pb.LoginAccountRsp{}, errors.ErrorUserLoginFailed
}
// jwt 加密
Token, err := utils.CreateJwtToken(user.UserName, int(user.UserId))
return &pb.LoginAccountRsp{
Token: Token,
Code: 0,
Uid: fmt.Sprint(ctx.Value(ContextReqUUid)),
}, err
}
func (u baseServer) GetUserInfoByToken(ctx context.Context, req *pb.GetUserInfoByTokenRequest) (*pb.GetUserInfoByTokenResponse, error) {
token := req.Token
if token == "" {
return &pb.GetUserInfoByTokenResponse{
Code: -1,
}, errors.ErrorTokenEmpty
}
MapClaims, err := utils.ParseToken(token)
fmt.Println("MapClaims", MapClaims, MapClaims["DcId"], MapClaims["Name"],err)
if err != nil {
return &pb.GetUserInfoByTokenResponse{
Code: -2,
}, errors.FormatError("user",err.Error())
}
user_id := MapClaims["DcId"].(float64)
username := MapClaims["Name"].(string)
fmt.Println("user-info", user_id, username)
user_info, e := db.SelectUserById(user_id, username)
if e != nil {
return &pb.GetUserInfoByTokenResponse{
Code: -2,
}, errors.ErrorUserLoginFailed
}
return &pb.GetUserInfoByTokenResponse{
Code: 0,
UserInfo: &pb.UserInfo{
UserId: user_info.UserId,
UserName: user_info.UserName,
Email: user_info.Email,
Phone: user_info.Phone,
},
}, nil
}
3,接下来实现endpoint ,它是微服务内部的桥梁,它把用户的请求的参数,组装成grpc需要的数据类型,再转发到我们具体的服务实现,在endpoint 可以限流,添加中间价等操作,代码如下
package endpoint
import (
"context"
"github.com/go-kit/kit/endpoint"
"go.uber.org/zap"
"golang.org/x/time/rate"
pb "mSystem/src/common/pb"
"mSystem/src/user/encode"
"mSystem/src/user/service"
)
type RegistRequest struct {
Email string `json:"email"`
UserName string `json:"userName"`
Password string `json:"password"`
}
type RegistResponse struct {
Email string `json:"email"`
UserName string `json:"userName"`
Password string `json:"password"`
}
type LoginRequest struct {
Email string `json:"email"`
Password string `json:"password"`
}
type GetTokenRequest struct {
Token string `json:"token"`
}
// y有几个函数就定义几个 endpoint
type Endpoints struct {
RegistAccount endpoint.Endpoint
LoginAccount endpoint.Endpoint
GetUserInfoByToken endpoint.Endpoint
}
func NewEndpoint(s service.UserService, log *zap.Logger, limit *rate.Limiter) Endpoints {
var RegistEndPoint endpoint.Endpoint
RegistEndPoint = MakeRegistEndPoint(s)
var LoginEndPoint endpoint.Endpoint
LoginEndPoint = MakeLoginEndPoint(s)
LoginEndPoint = LoggingMiddleware(log)(LoginEndPoint) // 登陆中间价
LoginEndPoint = NewGolangRateAllowMiddleware(limit)(LoginEndPoint) //限流
var GetUserInfoByToken endpoint.Endpoint
GetUserInfoByToken = MakeTokenEndPoint(s)
return Endpoints{RegistAccount: RegistEndPoint, LoginAccount: LoginEndPoint, GetUserInfoByToken: GetUserInfoByToken}
}
// 实现请求转发
func MakeRegistEndPoint(s service.UserService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(*RegistRequest) / 获取请求参数
val, err := s.RegistAccount(ctx, &pb.RegistAccountReq{ // 组装请求参数到servce
Email: req.Email,
Password: req.Password,
UserName: req.UserName,
})
return encode.Response{
Error: err,
Data: val,
}, err
}
}
// 实现请求转发
func MakeLoginEndPoint(s service.UserService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(*LoginRequest) /// 获取请求参数
val, err := s.LoginAccount(ctx, &pb.LoginAccountReq{ //组装请求参数到servce
Email: req.Email,
Password: req.Password,
})
return encode.Response{
Error: err,
Data: val,
}, err
}
}
func MakeTokenEndPoint(s service.UserService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(*GetTokenRequest) /// 获取请求参数
val, err := s.GetUserInfoByToken(ctx, &pb.GetUserInfoByTokenRequest{
Token: req.Token,
})
return encode.Response{
Error: err,
Data: val,
}, err
}
}
4.完成了具体服务实现和endpoint,接下来就可以把我们的服务暴露给外调用了,transport的任务就是暴露接口给外部使用,详细代码如下:
package transport
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/go-kit/kit/log"
kithttp "github.com/go-kit/kit/transport/http"
"github.com/gorilla/mux"
uuid "github.com/satori/go.uuid"
"go.uber.org/zap"
"io/ioutil"
"mSystem/src/user/encode"
"mSystem/src/user/endpoint"
"net/http"
)
var (
ErrorBadRequest = errors.New("invalid request parameter")
)
const ContextReqUUid = "req_uuid"
// MakeHttpHandler make http handler use mux
func MakeHttpHandler(ctx context.Context, endpoints endpoint.Endpoints, logger log.Logger) http.Handler {
r := mux.NewRouter()
options := []kithttp.ServerOption{
kithttp.ServerErrorLogger(logger),
kithttp.ServerErrorEncoder(kithttp.DefaultErrorEncoder),
kithttp.ServerErrorEncoder(func(ctx context.Context, err error, w http.ResponseWriter) {
logger.Log(fmt.Sprint(ctx.Value(ContextReqUUid)))
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(err)
}),
kithttp.ServerBefore(func(ctx context.Context, request *http.Request) context.Context {
UUID := uuid.NewV5(uuid.Must(uuid.NewV4(),nil), "req_uuid").String()
logger.Log("给请求添加uuid", zap.Any("UUID", UUID))
ctx = context.WithValue(ctx, ContextReqUUid, UUID)
return ctx
}),
}
// 暴露具体的 endpoint
r.Methods("POST").Path("/register").Handler(kithttp.NewServer(
endpoints.RegistAccount,
decodeRegisterRequest, // 请求参数
encode.JsonResponse,
options...,
))
r.Methods("POST").Path("/login").Handler(kithttp.NewServer(
endpoints.LoginAccount,
decodeLoginRequest, // 请求参数
encode.JsonResponse,
options...,
))
r.Methods("POST").Path("/userInfo").Handler(kithttp.NewServer(
endpoints.GetUserInfoByToken,
decodeGetTokenRequest, // 请求参数
encode.JsonResponse,
options...,
))
return r
}
// decodeStringRequest decode request params to struct
func decodeRegisterRequest(ctx context.Context, r *http.Request) (interface{}, error) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Printf("read body err, %v\n", err)
return nil, err
}
println("json:", string(body))
var rhe endpoint.RegistRequest
if err = json.Unmarshal(body, &rhe); err != nil {
fmt.Printf("Unmarshal err, %v\n", err)
return nil, err
}
return &endpoint.RegistRequest{
Email: rhe.Email,
UserName: rhe.UserName,
Password: rhe.Password,
}, nil
}
// 注册 decodeStringRequest decode request params to struct
func decodeLoginRequest(ctx context.Context, r *http.Request) (interface{}, error) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Printf("read body err, %v\n", err)
return nil, err
}
println("json:", string(body))
var rhe endpoint.LoginRequest
if err = json.Unmarshal(body, &rhe); err != nil {
fmt.Printf("Unmarshal err, %v\n", err)
return nil, err
}
return &endpoint.LoginRequest{
Email: rhe.Email,
Password: rhe.Password,
}, nil
}
func decodeGetTokenRequest(ctx context.Context, r *http.Request) (interface{}, error) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Printf("read body err, %v\n", err)
return nil, err
}
println("json:", string(body))
var rhe endpoint.GetTokenRequest
if err = json.Unmarshal(body, &rhe); err != nil {
fmt.Printf("Unmarshal err, %v\n", err)
return nil, err
}
return &endpoint.GetTokenRequest{
Token: rhe.Token,
}, nil
}
可以看到 我们暴露了3个地址分别是 /register ,/login,/userInfo 也就是我们之前实现的3个接口。大功告成了 接下来实现main函数,启动http和grpc服务,就可以访问到我们的服务了
main 函数如下:
package main
import (
"context"
"flag"
"fmt"
"github.com/go-kit/kit/log"
"golang.org/x/time/rate"
"google.golang.org/grpc"
dbconfig "mSystem/src/common/db"
"mSystem/src/common/pb"
register "mSystem/src/user/compone"
edpts "mSystem/src/user/endpoint"
"mSystem/src/user/service"
transport "mSystem/src/user/transport"
"mSystem/src/utils"
"net"
"net/http"
"os"
"os/signal"
"syscall"
)
func main() {
var (
consulHost = flag.String("consul.host", "127.0.0.1", "consul ip address")
consulPort = flag.String("consul.port", "8500", "consul port")
serviceHost = flag.String("service.host", "localhost", "service ip address")
servicePort = flag.String("service.port", "9001", "service port")
grpcAddr = flag.String("grpc", ":8001", "gRPC listen address.")
)
flag.Parse()
ctx := context.Background()
errChan := make(chan error)
var logger log.Logger
{
logger = log.NewLogfmtLogger(os.Stderr)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
}
// 接口定义
// 具体服务实现了
svc := service.NewUserService(utils.GetLogger())
//创建Endpoint
utils.NewLoggerServer()
golangLimit := rate.NewLimiter(10, 1)
endpoint := edpts.NewEndpoint(svc,utils.GetLogger(),golangLimit)
//创建http.Handler
r := transport.MakeHttpHandler(ctx, endpoint, logger)
//创建注册对象
registar := register.Register(*consulHost, *consulPort, *serviceHost, *servicePort, logger)
// http 服务
go func() {
fmt.Println("Http Server start at port:" + *servicePort)
//启动前执行注册
registar.Register()
errChan <- http.ListenAndServe(":"+*servicePort, r)
}()
// 数据库连接初始化。。。。。。
dbconfig.InitDB()
//grpc server
go func() {
fmt.Println("grpc Server start at port" + *grpcAddr)
listener, err := net.Listen("tcp", *grpcAddr)
if err != nil {
errChan <- err
return
}
baseServer := grpc.NewServer()
pb.RegisterUserServiceExtServer(baseServer, svc)
errChan <- baseServer.Serve(listener)
}()
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
errChan <- fmt.Errorf("%s", <-c)
}()
error := <-errChan
//服务退出取消注册
registar.Deregister()
fmt.Println(error)
}