序言
我们通过一个系列文章跟大家详细展示一个 go-zero 微服务示例,整个系列分十篇文章,目录结构如下:
- 环境搭建
- 服务拆分
- 用户服务
- 产品服务
- 订单服务
- 支付服务
- RPC 服务 Auth 验证
- 服务监控
- 链路追踪
- 分布式事务(本文)
期望通过本系列带你在本机利用 Docker 环境利用 go-zero 快速开发一个商城系统,让你快速上手微服务。
首先,我们来看一下整体的服务拆分图:
DTM
golang
绝大多数的订单系统的事务都会跨服务,因此都有更新数据一致性的需求,都可以通过 DTM 大幅简化架构,形成一个优雅的解决方案。
而且 DTM 已经深度合作,原生的支持go-zero中的分布式事务,下面就来详细的讲解如何用 DTM 来帮助我们的订单系统解决一致性问题
go-zeroDTM
order rpcCreateOrderModelproduct rpcUpdate
func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) { // 查询用户是否存在 _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{ Id: in.Uid, }) if err != nil { return nil, err } // 查询产品是否存在 productRes, err := l.svcCtx.ProductRpc.Detail(l.ctx, &product.DetailRequest{ Id: in.Pid, }) if err != nil { return nil, err } // 判断产品库存是否充足 if productRes.Stock <= 0 { return nil, status.Error(500, "产品库存不足") } newOrder := model.Order{ Uid: in.Uid, Pid: in.Pid, Amount: in.Amount, Status: 0, } res, err := l.svcCtx.OrderModel.Insert(&newOrder) if err != nil { return nil, status.Error(500, err.Error()) } newOrder.Id, err = res.LastInsertId() if err != nil { return nil, status.Error(500, err.Error()) } _, err = l.svcCtx.ProductRpc.Update(l.ctx, &product.UpdateRequest{ Id: productRes.Id, Name: productRes.Name, Desc: productRes.Desc, Stock: productRes.Stock - 1, Amount: productRes.Amount, Status: productRes.Status, }) if err != nil { return nil, err } return &order.CreateResponse{ Id: newOrder.Id, }, nil }
之前我们说过,这里处理逻辑存在数据一致性问题,有可能订单创建成功了,但是在更新产品库存的时候可能会发生失败,这时候就会存在订单创建成功,产品库存没有减少的情况。
DTMSAGA
DTM
DTM
dtm->config.ymlMicroServiceTargetEndPointdtmetcd
# ...... # 微服务 MicroService: Driver: 'dtm-driver-gozero' # 要处理注册/发现的驱动程序的名称 Target: 'etcd://etcd:2379/dtmservice' # 注册 dtm 服务的 etcd 地址 EndPoint: 'dtm:36790' # ......
dtm_barrier
DTM
DTM
create database if not exists dtm_barrier /*!40100 DEFAULT CHARACTER SET utf8mb4 */ ; drop table if exists dtm_barrier.barrier; create table if not exists dtm_barrier.barrier( id bigint(22) PRIMARY KEY AUTO_INCREMENT, trans_type varchar(45) default '', gid varchar(128) default '', branch_id varchar(128) default '', op varchar(45) default '', barrier_id varchar(45) default '', reason varchar(45) default '' comment 'the branch type who insert this record', create_time datetime DEFAULT now(), update_time datetime DEFAULT now(), key(create_time), key(update_time), UNIQUE key(gid, branch_id, op, barrier_id) );
dtmcli.SetBarrierTableName
OrderModelProductModel
modelDTM
$ vim mall/service/order/model/ordermodel.go
package model ...... type ( OrderModel interface { TxInsert(tx *sql.Tx, data *Order) (sql.Result, error) TxUpdate(tx *sql.Tx, data *Order) error } ) ...... func (m *defaultOrderModel) TxInsert(tx *sql.Tx, data *Order) (sql.Result, error) { query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?)", m.table, orderRowsExpectAutoSet) ret, err := tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status) return ret, err } func (m *defaultOrderModel) TxUpdate(tx *sql.Tx, data *Order) error { productIdKey := fmt.Sprintf("%s%v", cacheOrderIdPrefix, data.Id) _, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) { query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, orderRowsWithPlaceHolder) return tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status, data.Id) }, productIdKey) return err } func (m *defaultOrderModel) FindOneByUid(uid int64) (*Order, error) { var resp Order query := fmt.Sprintf("select %s from %s where `uid` = ? order by create_time desc limit 1", orderRows, m.table) err := m.QueryRowNoCache(&resp, query, uid) switch err { case nil: return &resp, nil case sqlc.ErrNotFound: return nil, ErrNotFound default: return nil, err } }
$ vim mall/service/product/model/productmodel.go
package model ...... type ( ProductModel interface { TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error) } ) ...... func (m *defaultProductModel) TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error) { productIdKey := fmt.Sprintf("%s%v", cacheProductIdPrefix, id) return m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) { query := fmt.Sprintf("update %s set stock=stock+? where stock >= -? and id=?", m.table) return tx.Exec(query, delta, delta, id) }, productIdKey) }
product rpc
DecrStockDecrStockRevertproduct rpcDecrStockDecrStockRevert
$ vim mall/service/product/rpc/product.proto
syntax = "proto3"; package productclient; option go_package = "product"; ...... // 减产品库存 message DecrStockRequest { int64 id = 1; int64 num = 2; } message DecrStockResponse { } // 减产品库存 service Product { ...... rpc DecrStock(DecrStockRequest) returns(DecrStockResponse); rpc DecrStockRevert(DecrStockRequest) returns(DecrStockResponse); }
提示:修改后使用 goctl 工具重新生成下代码。
DecrStock
$ vim mall/service/product/rpc/internal/logic/decrstocklogic.go
package logic import ( "context" "database/sql" "mall/service/product/rpc/internal/svc" "mall/service/product/rpc/product" "github.com/dtm-labs/dtmcli" "github.com/dtm-labs/dtmgrpc" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/stores/sqlx" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) type DecrStockLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger } func NewDecrStockLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockLogic { return &DecrStockLogic{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), } } func (l *DecrStockLogic) DecrStock(in *product.DecrStockRequest) (*product.DecrStockResponse, error) { // 获取 RawDB db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB() if err != nil { return nil, status.Error(500, err.Error()) } // 获取子事务屏障对象 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) if err != nil { return nil, status.Error(500, err.Error()) } // 开启子事务屏障 err = barrier.CallWithDB(db, func(tx *sql.Tx) error { // 更新产品库存 result, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, -1) if err != nil { return err } affected, err := result.RowsAffected() // 库存不足,返回子事务失败 if err == nil && affected == 0 { return dtmcli.ErrFailure } return err }) // 这种情况是库存不足,不再重试,走回滚 if err == dtmcli.ErrFailure { return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) } if err != nil { return nil, err } return &product.DecrStockResponse{}, nil }
DecrStockRevertDecrStockDecrStock
$ vim mall/service/product/rpc/internal/logic/decrstockrevertlogic.go
package logic import ( "context" "database/sql" "mall/service/product/rpc/internal/svc" "mall/service/product/rpc/product" "github.com/dtm-labs/dtmcli" "github.com/dtm-labs/dtmgrpc" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/stores/sqlx" "google.golang.org/grpc/status" ) type DecrStockRevertLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger } func NewDecrStockRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockRevertLogic { return &DecrStockRevertLogic{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), } } func (l *DecrStockRevertLogic) DecrStockRevert(in *product.DecrStockRequest) (*product.DecrStockResponse, error) { // 获取 RawDB db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB() if err != nil { return nil, status.Error(500, err.Error()) } // 获取子事务屏障对象 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) if err != nil { return nil, status.Error(500, err.Error()) } // 开启子事务屏障 err = barrier.CallWithDB(db, func(tx *sql.Tx) error { // 更新产品库存 _, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, 1) return err }) if err != nil { return nil, err } return &product.DecrStockResponse{}, nil }
order rpc
CreateRevertorder rpcCreateDecrStockRevert
$ vim mall/service/order/rpc/order.proto
syntax = "proto3"; package orderclient; option go_package = "order"; ...... service Order { rpc Create(CreateRequest) returns(CreateResponse); rpc CreateRevert(CreateRequest) returns(CreateResponse); ...... }
提示:修改后使用 goctl 工具重新生成下代码。
CreateCreateproduct rpcDecrStock
$ vim mall/service/order/rpc/internal/logic/createlogic.go
package logic import ( "context" "database/sql" "fmt" "mall/service/order/model" "mall/service/order/rpc/internal/svc" "mall/service/order/rpc/order" "mall/service/user/rpc/user" "github.com/dtm-labs/dtmgrpc" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/stores/sqlx" "google.golang.org/grpc/status" ) type CreateLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger } func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateLogic { return &CreateLogic{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), } } func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) { // 获取 RawDB db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB() if err != nil { return nil, status.Error(500, err.Error()) } // 获取子事务屏障对象 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) if err != nil { return nil, status.Error(500, err.Error()) } // 开启子事务屏障 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error { // 查询用户是否存在 _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{ Id: in.Uid, }) if err != nil { return fmt.Errorf("用户不存在") } newOrder := model.Order{ Uid: in.Uid, Pid: in.Pid, Amount: in.Amount, Status: 0, } // 创建订单 _, err = l.svcCtx.OrderModel.TxInsert(tx, &newOrder) if err != nil { return fmt.Errorf("订单创建失败") } return nil }); err != nil { return nil, status.Error(500, err.Error()) } return &order.CreateResponse{}, nil }
CreateRevert9(无效状态)
$ vim mall/service/order/rpc/internal/logic/createrevertlogic.go
package logic import ( "context" "database/sql" "fmt" "mall/service/order/rpc/internal/svc" "mall/service/order/rpc/order" "mall/service/user/rpc/user" "github.com/dtm-labs/dtmgrpc" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/stores/sqlx" "google.golang.org/grpc/status" ) type CreateRevertLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger } func NewCreateRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateRevertLogic { return &CreateRevertLogic{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), } } func (l *CreateRevertLogic) CreateRevert(in *order.CreateRequest) (*order.CreateResponse, error) { // 获取 RawDB db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB() if err != nil { return nil, status.Error(500, err.Error()) } // 获取子事务屏障对象 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx) if err != nil { return nil, status.Error(500, err.Error()) } // 开启子事务屏障 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error { // 查询用户是否存在 _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{ Id: in.Uid, }) if err != nil { return fmt.Errorf("用户不存在") } // 查询用户最新创建的订单 resOrder, err := l.svcCtx.OrderModel.FindOneByUid(in.Uid) if err != nil { return fmt.Errorf("订单不存在") } // 修改订单状态9,标识订单已失效,并更新订单 resOrder.Status = 9 err = l.svcCtx.OrderModel.TxUpdate(tx, resOrder) if err != nil { return fmt.Errorf("订单更新失败") } return nil }); err != nil { return nil, status.Error(500, err.Error()) } return &order.CreateResponse{}, nil }
order api
order rpcCreateCreateRevertproduct rpcDecrStockDecrStockRevertorder apiSAGA事务模式
pproduct rpc
$ vim mall/service/order/api/etc/order.yaml
Name: Order Host: 0.0.0.0 Port: 8002 ...... OrderRpc: Etcd: Hosts: - etcd:2379 Key: order.rpc ProductRpc: Etcd: Hosts: - etcd:2379 Key: product.rpc
pproduct rpc
$ vim mall/service/order/api/internal/config/config.go
package config import ( "github.com/tal-tech/go-zero/rest" "github.com/tal-tech/go-zero/zrpc" ) type Config struct { rest.RestConf Auth struct { AccessSecret string AccessExpire int64 } OrderRpc zrpc.RpcClientConf ProductRpc zrpc.RpcClientConf }
pproduct rpc
$ vim mall/service/order/api/internal/svc/servicecontext.go
package svc import ( "mall/service/order/api/internal/config" "mall/service/order/rpc/orderclient" "mall/service/product/rpc/productclient" "github.com/tal-tech/go-zero/zrpc" ) type ServiceContext struct { Config config.Config OrderRpc orderclient.Order ProductRpc productclient.Product } func NewServiceContext(c config.Config) *ServiceContext { return &ServiceContext{ Config: c, OrderRpc: orderclient.NewOrder(zrpc.MustNewClient(c.OrderRpc)), ProductRpc: productclient.NewProduct(zrpc.MustNewClient(c.ProductRpc)), } }
gozerodtm
$ vim mall/service/order/api/order.go
package main import ( ...... _ "github.com/dtm-labs/driver-gozero" // 添加导入 `gozero` 的 `dtm` 驱动 ) var configFile = flag.String("f", "etc/order.yaml", "the config file") func main() { ...... }
order apiCreate
$ vim mall/service/order/api/internal/logic/createlogic.go
package logic import ( "context" "mall/service/order/api/internal/svc" "mall/service/order/api/internal/types" "mall/service/order/rpc/order" "mall/service/product/rpc/product" "github.com/dtm-labs/dtmgrpc" "github.com/tal-tech/go-zero/core/logx" "google.golang.org/grpc/status" ) type CreateLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext } func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) CreateLogic { return CreateLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, } } func (l *CreateLogic) Create(req types.CreateRequest) (resp *types.CreateResponse, err error) { // 获取 OrderRpc BuildTarget orderRpcBusiServer, err := l.svcCtx.Config.OrderRpc.BuildTarget() if err != nil { return nil, status.Error(100, "订单创建异常") } // 获取 ProductRpc BuildTarget productRpcBusiServer, err := l.svcCtx.Config.ProductRpc.BuildTarget() if err != nil { return nil, status.Error(100, "订单创建异常") } // dtm 服务的 etcd 注册地址 var dtmServer = "etcd://etcd:2379/dtmservice" // 创建一个gid gid := dtmgrpc.MustGenGid(dtmServer) // 创建一个saga协议的事务 saga := dtmgrpc.NewSagaGrpc(dtmServer, gid). Add(orderRpcBusiServer+"/orderclient.Order/Create", orderRpcBusiServer+"/orderclient.Order/CreateRevert", &order.CreateRequest{ Uid: req.Uid, Pid: req.Pid, Amount: req.Amount, Status: 0, }). Add(productRpcBusiServer+"/productclient.Product/DecrStock", productRpcBusiServer+"/productclient.Product/DecrStockRevert", &product.DecrStockRequest{ Id: req.Pid, Num: 1, }) // 事务提交 err = saga.Submit() if err != nil { return nil, status.Error(500, err.Error()) } return &types.CreateResponse{}, nil }
SagaGrpc.Addactiongrpcmall/service/order/rpc/order/order.pb.gomall/service/product/rpc/product/product.pb.goInvoke
go-zeroDTM
10.3.1 测试分布式事务正常流程
postman/api/product/createstock1
postman/api/order/createpid1
10
barrier
10.3.2 测试分布式事务失败流程1
10postman/api/order/create
219
barrier(gid = fqYS8CbYbK8GkL8SCuTRUF)(branch_id = 01)(branch_id = 02)
DTMorder rpcCreateDTMproduct rpcDecrStockpidDTMorder rpcCreateRevertDTMproduct rpcDecrStockRevertproduct rpcDecrStockDecrStockRevert
10.3.3 测试分布式事务失败流程2
1product rpcDecrStock
postman/api/order/createpid1
391100
barrier(gid = ZbjYHv2jNra7RMwyWjB5Lc)(branch_id = 01)(branch_id = 02)product rpcDecrStock
DTM
子事务屏障会自动识别正向操作是否已执行,失败流程1未执行业务操作,所以补偿时,也不会执行补偿的业务操作;失败流程2执行了业务操作,所以补偿时,也会执行补偿的业务操作。
项目地址
go-zero
微信交流群
关注『微服务实践』公众号并点击 交流群 获取社区群二维码。