序言

我们通过一个系列文章跟大家详细展示一个 go-zero 微服务示例,整个系列分十篇文章,目录结构如下:

  1. 环境搭建
  2. 服务拆分
  3. 用户服务
  4. 产品服务
  5. 订单服务
  6. 支付服务
  7. RPC 服务 Auth 验证
  8. 服务监控
  9. 链路追踪
  10. 分布式事务(本文)

期望通过本系列带你在本机利用 Docker 环境利用 go-zero 快速开发一个商城系统,让你快速上手微服务。

首先,我们来看一下整体的服务拆分图:

DTM
golang 

绝大多数的订单系统的事务都会跨服务,因此都有更新数据一致性的需求,都可以通过 DTM 大幅简化架构,形成一个优雅的解决方案。

而且 DTM 已经深度合作,原生的支持go-zero中的分布式事务,下面就来详细的讲解如何用 DTM 来帮助我们的订单系统解决一致性问题

go-zeroDTM
order rpcCreateOrderModelproduct rpcUpdate
func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) {

	// 查询用户是否存在

	_, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{

		Id: in.Uid,

	})

	if err != nil {

		return nil, err

	}



	// 查询产品是否存在

	productRes, err := l.svcCtx.ProductRpc.Detail(l.ctx, &product.DetailRequest{

		Id: in.Pid,

	})

	if err != nil {

		return nil, err

	}

	// 判断产品库存是否充足

	if productRes.Stock <= 0 {

		return nil, status.Error(500, "产品库存不足")

	}



	newOrder := model.Order{

		Uid:    in.Uid,

		Pid:    in.Pid,

		Amount: in.Amount,

		Status: 0,

	}



	res, err := l.svcCtx.OrderModel.Insert(&newOrder)

	if err != nil {

		return nil, status.Error(500, err.Error())

	}



	newOrder.Id, err = res.LastInsertId()

	if err != nil {

		return nil, status.Error(500, err.Error())

	}



	_, err = l.svcCtx.ProductRpc.Update(l.ctx, &product.UpdateRequest{

		Id:     productRes.Id,

		Name:   productRes.Name,

		Desc:   productRes.Desc,

		Stock:  productRes.Stock - 1,

		Amount: productRes.Amount,

		Status: productRes.Status,

	})

	if err != nil {

		return nil, err

	}



	return &order.CreateResponse{

		Id: newOrder.Id,

	}, nil

}

之前我们说过,这里处理逻辑存在数据一致性问题,有可能订单创建成功了,但是在更新产品库存的时候可能会发生失败,这时候就会存在订单创建成功,产品库存没有减少的情况。

DTMSAGA
DTM
DTM
dtm->config.ymlMicroServiceTargetEndPointdtmetcd
# ......



# 微服务

MicroService:

  Driver: 'dtm-driver-gozero'           # 要处理注册/发现的驱动程序的名称

  Target: 'etcd://etcd:2379/dtmservice' # 注册 dtm 服务的 etcd 地址

  EndPoint: 'dtm:36790'



# ......
dtm_barrier
DTM
DTM
create database if not exists dtm_barrier

/*!40100 DEFAULT CHARACTER SET utf8mb4 */

;

drop table if exists dtm_barrier.barrier;

create table if not exists dtm_barrier.barrier(

  id bigint(22) PRIMARY KEY AUTO_INCREMENT,

  trans_type varchar(45) default '',

  gid varchar(128) default '',

  branch_id varchar(128) default '',

  op varchar(45) default '',

  barrier_id varchar(45) default '',

  reason varchar(45) default '' comment 'the branch type who insert this record',

  create_time datetime DEFAULT now(),

  update_time datetime DEFAULT now(),

  key(create_time),

  key(update_time),

  UNIQUE key(gid, branch_id, op, barrier_id)

);
dtmcli.SetBarrierTableName
OrderModelProductModel
modelDTM
$ vim mall/service/order/model/ordermodel.go
package model



......



type (

	OrderModel interface {

		TxInsert(tx *sql.Tx, data *Order) (sql.Result, error)

		TxUpdate(tx *sql.Tx, data *Order) error

	}

)



......



func (m *defaultOrderModel) TxInsert(tx *sql.Tx, data *Order) (sql.Result, error) {

	query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?)", m.table, orderRowsExpectAutoSet)

	ret, err := tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status)



	return ret, err

}



func (m *defaultOrderModel) TxUpdate(tx *sql.Tx, data *Order) error {

	productIdKey := fmt.Sprintf("%s%v", cacheOrderIdPrefix, data.Id)

	_, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {

		query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, orderRowsWithPlaceHolder)

		return tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status, data.Id)

	}, productIdKey)

	return err

}



func (m *defaultOrderModel) FindOneByUid(uid int64) (*Order, error) {

	var resp Order



	query := fmt.Sprintf("select %s from %s where `uid` = ? order by create_time desc limit 1", orderRows, m.table)

	err := m.QueryRowNoCache(&resp, query, uid)



	switch err {

	case nil:

		return &resp, nil

	case sqlc.ErrNotFound:

		return nil, ErrNotFound

	default:

		return nil, err

	}

}
$ vim mall/service/product/model/productmodel.go
package model



......



type (

	ProductModel interface {

		TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error)

	}

)



......



func (m *defaultProductModel) TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error) {

	productIdKey := fmt.Sprintf("%s%v", cacheProductIdPrefix, id)

	return m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {

		query := fmt.Sprintf("update %s set stock=stock+? where stock >= -? and id=?", m.table)

		return tx.Exec(query, delta, delta, id)

	}, productIdKey)

}
product rpc
DecrStockDecrStockRevertproduct rpcDecrStockDecrStockRevert
$ vim mall/service/product/rpc/product.proto
syntax = "proto3";



package productclient;



option go_package = "product";



......



// 减产品库存

message DecrStockRequest {

    int64 id = 1;

    int64 num = 2;

}

message DecrStockResponse {

}

// 减产品库存



service Product {

    ......

    rpc DecrStock(DecrStockRequest) returns(DecrStockResponse);

    rpc DecrStockRevert(DecrStockRequest) returns(DecrStockResponse);

}

提示:修改后使用 goctl 工具重新生成下代码。

DecrStock
$ vim mall/service/product/rpc/internal/logic/decrstocklogic.go
package logic



import (

	"context"

	"database/sql"



	"mall/service/product/rpc/internal/svc"

	"mall/service/product/rpc/product"



	"github.com/dtm-labs/dtmcli"

	"github.com/dtm-labs/dtmgrpc"

	"github.com/tal-tech/go-zero/core/logx"

	"github.com/tal-tech/go-zero/core/stores/sqlx"

	"google.golang.org/grpc/codes"

	"google.golang.org/grpc/status"

)



type DecrStockLogic struct {

	ctx    context.Context

	svcCtx *svc.ServiceContext

	logx.Logger

}



func NewDecrStockLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockLogic {

	return &DecrStockLogic{

		ctx:    ctx,

		svcCtx: svcCtx,

		Logger: logx.WithContext(ctx),

	}

}



func (l *DecrStockLogic) DecrStock(in *product.DecrStockRequest) (*product.DecrStockResponse, error) {

	// 获取 RawDB

	db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()

	if err != nil {

		return nil, status.Error(500, err.Error())

	}



	// 获取子事务屏障对象

	barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)

	if err != nil {

		return nil, status.Error(500, err.Error())

	}

	// 开启子事务屏障

	err = barrier.CallWithDB(db, func(tx *sql.Tx) error {

		// 更新产品库存

		result, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, -1)

		if err != nil {

			return err

		}



		affected, err := result.RowsAffected()

		// 库存不足,返回子事务失败

		if err == nil && affected == 0 {

			return dtmcli.ErrFailure

		}



		return err

	})



	// 这种情况是库存不足,不再重试,走回滚

	if err == dtmcli.ErrFailure {

		return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)

	}



	if err != nil {

		return nil, err

	}



	return &product.DecrStockResponse{}, nil

}
DecrStockRevertDecrStockDecrStock
$ vim mall/service/product/rpc/internal/logic/decrstockrevertlogic.go
package logic



import (

	"context"

	"database/sql"



	"mall/service/product/rpc/internal/svc"

	"mall/service/product/rpc/product"



	"github.com/dtm-labs/dtmcli"

	"github.com/dtm-labs/dtmgrpc"

	"github.com/tal-tech/go-zero/core/logx"

	"github.com/tal-tech/go-zero/core/stores/sqlx"

	"google.golang.org/grpc/status"

)



type DecrStockRevertLogic struct {

	ctx    context.Context

	svcCtx *svc.ServiceContext

	logx.Logger

}



func NewDecrStockRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockRevertLogic {

	return &DecrStockRevertLogic{

		ctx:    ctx,

		svcCtx: svcCtx,

		Logger: logx.WithContext(ctx),

	}

}



func (l *DecrStockRevertLogic) DecrStockRevert(in *product.DecrStockRequest) (*product.DecrStockResponse, error) {

	// 获取 RawDB

	db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()

	if err != nil {

		return nil, status.Error(500, err.Error())

	}



	// 获取子事务屏障对象

	barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)

	if err != nil {

		return nil, status.Error(500, err.Error())

	}

	// 开启子事务屏障

	err = barrier.CallWithDB(db, func(tx *sql.Tx) error {

		// 更新产品库存

		_, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, 1)

		return err

	})



	if err != nil {

		return nil, err

	}



	return &product.DecrStockResponse{}, nil

}
order rpc
CreateRevertorder rpcCreateDecrStockRevert
$ vim mall/service/order/rpc/order.proto
syntax = "proto3";



package orderclient;



option go_package = "order";



......



service Order {

    rpc Create(CreateRequest) returns(CreateResponse);

    rpc CreateRevert(CreateRequest) returns(CreateResponse);

    ......

}

提示:修改后使用 goctl 工具重新生成下代码。

CreateCreateproduct rpcDecrStock
$ vim mall/service/order/rpc/internal/logic/createlogic.go
package logic



import (

	"context"

	"database/sql"

	"fmt"



	"mall/service/order/model"

	"mall/service/order/rpc/internal/svc"

	"mall/service/order/rpc/order"

	"mall/service/user/rpc/user"



	"github.com/dtm-labs/dtmgrpc"

	"github.com/tal-tech/go-zero/core/logx"

	"github.com/tal-tech/go-zero/core/stores/sqlx"

	"google.golang.org/grpc/status"

)



type CreateLogic struct {

	ctx    context.Context

	svcCtx *svc.ServiceContext

	logx.Logger

}



func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateLogic {

	return &CreateLogic{

		ctx:    ctx,

		svcCtx: svcCtx,

		Logger: logx.WithContext(ctx),

	}

}



func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) {

	// 获取 RawDB

	db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()

	if err != nil {

		return nil, status.Error(500, err.Error())

	}



	// 获取子事务屏障对象

	barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)

	if err != nil {

		return nil, status.Error(500, err.Error())

	}

	// 开启子事务屏障

	if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {

		// 查询用户是否存在

		_, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{

			Id: in.Uid,

		})

		if err != nil {

			return fmt.Errorf("用户不存在")

		}



		newOrder := model.Order{

			Uid:    in.Uid,

			Pid:    in.Pid,

			Amount: in.Amount,

			Status: 0,

		}

		// 创建订单

		_, err = l.svcCtx.OrderModel.TxInsert(tx, &newOrder)

		if err != nil {

			return fmt.Errorf("订单创建失败")

		}



		return nil

	}); err != nil {

		return nil, status.Error(500, err.Error())

	}



	return &order.CreateResponse{}, nil

}
CreateRevert9(无效状态)
$ vim mall/service/order/rpc/internal/logic/createrevertlogic.go
package logic



import (

	"context"

	"database/sql"

	"fmt"



	"mall/service/order/rpc/internal/svc"

	"mall/service/order/rpc/order"

	"mall/service/user/rpc/user"



	"github.com/dtm-labs/dtmgrpc"

	"github.com/tal-tech/go-zero/core/logx"

	"github.com/tal-tech/go-zero/core/stores/sqlx"

	"google.golang.org/grpc/status"

)



type CreateRevertLogic struct {

	ctx    context.Context

	svcCtx *svc.ServiceContext

	logx.Logger

}



func NewCreateRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateRevertLogic {

	return &CreateRevertLogic{

		ctx:    ctx,

		svcCtx: svcCtx,

		Logger: logx.WithContext(ctx),

	}

}



func (l *CreateRevertLogic) CreateRevert(in *order.CreateRequest) (*order.CreateResponse, error) {

	// 获取 RawDB

	db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()

	if err != nil {

		return nil, status.Error(500, err.Error())

	}



	// 获取子事务屏障对象

	barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)

	if err != nil {

		return nil, status.Error(500, err.Error())

	}

	// 开启子事务屏障

	if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {

		// 查询用户是否存在

		_, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{

			Id: in.Uid,

		})

		if err != nil {

			return fmt.Errorf("用户不存在")

		}

		// 查询用户最新创建的订单

		resOrder, err := l.svcCtx.OrderModel.FindOneByUid(in.Uid)

		if err != nil {

			return fmt.Errorf("订单不存在")

		}

		// 修改订单状态9,标识订单已失效,并更新订单

		resOrder.Status = 9

		err = l.svcCtx.OrderModel.TxUpdate(tx, resOrder)

		if err != nil {

			return fmt.Errorf("订单更新失败")

		}



		return nil

	}); err != nil {

		return nil, status.Error(500, err.Error())

	}



	return &order.CreateResponse{}, nil

}
order api
order rpcCreateCreateRevertproduct rpcDecrStockDecrStockRevertorder apiSAGA事务模式
pproduct rpc
$ vim mall/service/order/api/etc/order.yaml
Name: Order

Host: 0.0.0.0

Port: 8002



......



OrderRpc:

  Etcd:

    Hosts:

    - etcd:2379

    Key: order.rpc



ProductRpc:

  Etcd:

    Hosts:

    - etcd:2379

    Key: product.rpc
pproduct rpc
$ vim mall/service/order/api/internal/config/config.go
package config



import (

	"github.com/tal-tech/go-zero/rest"

	"github.com/tal-tech/go-zero/zrpc"

)



type Config struct {

	rest.RestConf



	Auth struct {

		AccessSecret string

		AccessExpire int64

	}



	OrderRpc   zrpc.RpcClientConf

	ProductRpc zrpc.RpcClientConf

}
pproduct rpc
$ vim mall/service/order/api/internal/svc/servicecontext.go
package svc



import (

	"mall/service/order/api/internal/config"

	"mall/service/order/rpc/orderclient"

	"mall/service/product/rpc/productclient"



	"github.com/tal-tech/go-zero/zrpc"

)



type ServiceContext struct {

	Config config.Config



	OrderRpc   orderclient.Order

	ProductRpc productclient.Product

}



func NewServiceContext(c config.Config) *ServiceContext {

	return &ServiceContext{

		Config:     c,

		OrderRpc:   orderclient.NewOrder(zrpc.MustNewClient(c.OrderRpc)),

		ProductRpc: productclient.NewProduct(zrpc.MustNewClient(c.ProductRpc)),

	}

}
gozerodtm
$ vim mall/service/order/api/order.go
package main



import (

	......



	_ "github.com/dtm-labs/driver-gozero" // 添加导入 `gozero` 的 `dtm` 驱动

)



var configFile = flag.String("f", "etc/order.yaml", "the config file")



func main() {

	......

}
order apiCreate
$ vim mall/service/order/api/internal/logic/createlogic.go
package logic



import (

	"context"



	"mall/service/order/api/internal/svc"

	"mall/service/order/api/internal/types"

	"mall/service/order/rpc/order"

	"mall/service/product/rpc/product"



	"github.com/dtm-labs/dtmgrpc"

	"github.com/tal-tech/go-zero/core/logx"

	"google.golang.org/grpc/status"

)



type CreateLogic struct {

	logx.Logger

	ctx    context.Context

	svcCtx *svc.ServiceContext

}



func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) CreateLogic {

	return CreateLogic{

		Logger: logx.WithContext(ctx),

		ctx:    ctx,

		svcCtx: svcCtx,

	}

}



func (l *CreateLogic) Create(req types.CreateRequest) (resp *types.CreateResponse, err error) {

	// 获取 OrderRpc BuildTarget

	orderRpcBusiServer, err := l.svcCtx.Config.OrderRpc.BuildTarget()

	if err != nil {

		return nil, status.Error(100, "订单创建异常")

	}



	// 获取 ProductRpc BuildTarget

	productRpcBusiServer, err := l.svcCtx.Config.ProductRpc.BuildTarget()

	if err != nil {

		return nil, status.Error(100, "订单创建异常")

	}



	// dtm 服务的 etcd 注册地址

	var dtmServer = "etcd://etcd:2379/dtmservice"

	// 创建一个gid

	gid := dtmgrpc.MustGenGid(dtmServer)

	// 创建一个saga协议的事务

	saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).

		Add(orderRpcBusiServer+"/orderclient.Order/Create", orderRpcBusiServer+"/orderclient.Order/CreateRevert", &order.CreateRequest{

			Uid:    req.Uid,

			Pid:    req.Pid,

			Amount: req.Amount,

			Status: 0,

		}).

		Add(productRpcBusiServer+"/productclient.Product/DecrStock", productRpcBusiServer+"/productclient.Product/DecrStockRevert", &product.DecrStockRequest{

			Id:  req.Pid,

			Num: 1,

		})



	// 事务提交

	err = saga.Submit()

	if err != nil {

		return nil, status.Error(500, err.Error())

	}



	return &types.CreateResponse{}, nil

}
SagaGrpc.Addactiongrpcmall/service/order/rpc/order/order.pb.gomall/service/product/rpc/product/product.pb.goInvoke

go-zeroDTM

10.3.1 测试分布式事务正常流程

postman/api/product/createstock1

postman/api/order/createpid1

10

barrier

10.3.2 测试分布式事务失败流程1

10postman/api/order/create

219

barrier(gid = fqYS8CbYbK8GkL8SCuTRUF)(branch_id = 01)(branch_id = 02)

DTMorder rpcCreateDTMproduct rpcDecrStockpidDTMorder rpcCreateRevertDTMproduct rpcDecrStockRevertproduct rpcDecrStockDecrStockRevert

10.3.3 测试分布式事务失败流程2

1product rpcDecrStock

postman/api/order/createpid1

391100

barrier(gid = ZbjYHv2jNra7RMwyWjB5Lc)(branch_id = 01)(branch_id = 02)product rpcDecrStock

DTM

子事务屏障会自动识别正向操作是否已执行,失败流程1未执行业务操作,所以补偿时,也不会执行补偿的业务操作;失败流程2执行了业务操作,所以补偿时,也会执行补偿的业务操作。

项目地址

go-zero

微信交流群

关注『微服务实践』公众号并点击 交流群 获取社区群二维码。