1. go实现分布式锁

通过 golang 实现一个简单的分布式锁,包括锁续约、重试机制、singleflght机制的使用

1.1 redis_lock.go

package redis_lockimport ("context"_ "embed""errors""github.com/go-redis/redis/v9""github.com/google/uuid""golang.org/x/sync/singleflight""time"
)// go:embed 可以直接解析出文件中的字符串
var (//go:embed lua_unlock.lualuaUnlock string//go:embed refresh.lualuaRefresh string//go:embed lock.lualuaLock string//定义好两个异常信息ErrLockNotHold         = errors.New("未持有锁")ErrFailedToPreemptLock = errors.New("加锁失败")
)type Client struct {//采用公共的接口,后续实例通过传入的方式client redis.Cmdable// singleflight 用于在一个实例的多个携程中只需要竞争出一个携程s singleflight.Group
}func NewClient(c redis.Cmdable) *Client {return &Client{client: c,}
}func (c *Client) SingleflightLock(ctx context.Context,key string,expire time.Duration,retry RetryStrategy,timeout time.Duration) (*Lock, error) {for {flag := falseresCh := c.s.DoChan(key, func() (interface{}, error) {flag = truereturn c.Lock(ctx, key, expire, retry, timeout)})select {case res := <-resCh:if flag {if res.Err != nil {return nil, res.Err}//返回锁对象return res.Val.(*Lock), nil}case <-ctx.Done():return nil, ctx.Err()}}
}//Lock 加锁方法,根据重试机制进行重新获取
func (c *Client) Lock(ctx context.Context,key string,expire time.Duration,retry RetryStrategy,timeout time.Duration) (*Lock, error) {var timer *time.Timerdefer func() {if timer != nil {timer.Stop()}}()for {//设置超时lct, cancel := context.WithTimeout(ctx, timeout)//获取到uuidvalue := uuid.New().String()//执行lua脚本进行加锁result, err := c.client.Eval(lct, luaLock, []string{key}, value, expire).Bool()//用于主动释放资源cancel()if err != nil && !errors.Is(err, context.DeadlineExceeded) {return nil, err}if result {return newLock(c.client, key, value), nil}//可以不传重试机制if retry != nil {//通过重试机制获取重试的策略interval, ok := retry.Next()if !ok {//不用重试return nil, ErrFailedToPreemptLock}if timer == nil {timer = time.NewTimer(interval)}timer.Reset(interval)select {case <-timer.C: //睡眠时间超时了return nil, ctx.Err()case <-ctx.Done(): //整个调用的超时return nil, ctx.Err()}}}
}// TryLock 尝试加锁
func (c *Client) TryLock(ctx context.Context, key string, expire time.Duration) (*Lock, error) {return c.Lock(ctx, key, expire, nil, 0)
}// NewLock 创建一个锁结构体
func newLock(client redis.Cmdable, key string, value string) *Lock {return &Lock{client:     client,key:        key,value:      value,unLockChan: make(chan struct{}, 1), //设置1个缓存数据,用于解锁的信号量}
}// Lock 结构体对象
type Lock struct {client redis.Cmdablekey    stringvalue  stringexpire time.Duration//在解锁成功之后发送信号来取消续约unLockChan chan struct{}
}// AutoRefresh 自动续约
func (l *Lock) AutoRefresh(interval time.Duration, timeout time.Duration) error {//设计一个管道,如果失败了,就发送数据到管道之中,通知进行重试retry := make(chan struct{}, 1)//方法返回时关闭closedefer close(retry)ticker := time.NewTicker(interval)for {select {//接收到结束的信号时,直接returncase <-l.unLockChan:return nil//监听重试的管道case <-retry:ctx, cancel := context.WithTimeout(context.Background(), timeout)err := l.Refresh(ctx)//主动调用释放资源cancel()if err == context.DeadlineExceeded {// 执行重试往管道中发送一个信号retry <- struct{}{}continue}if err != nil {return err}case <-ticker.C:ctx, cancel := context.WithTimeout(context.Background(), timeout)err := l.Refresh(ctx)//主动调用释放资源cancel()if err == context.DeadlineExceeded {// 执行重试往管道中发送一个信号retry <- struct{}{}continue}if err != nil {return err}}}
}// Refresh 续约
func (l *Lock) Refresh(ctx context.Context) error {//执行lua脚本,对锁进行续约i, err := l.client.Eval(ctx, luaRefresh, []string{l.key}, l.value, l.expire.Milliseconds()).Int64()if err == redis.Nil {return ErrLockNotHold}if err != nil {return err}if i == 0 {return ErrLockNotHold}return nil
}// Unlock 解锁
func (l *Lock) Unlock(ctx context.Context) error {//解锁时,退出方法需要发送一个信号让自动续约的goroutine停止defer func() {l.unLockChan <- struct{}{}close(l.unLockChan)}()//判断返回的结果result, err := l.client.Eval(ctx, luaUnlock, []string{l.key}, l.value).Int64()if err == redis.Nil {return ErrLockNotHold}if err != nil {return err}//lua脚本返回的结果如果为0,也是代表当前锁不是自己的if result == 0 {return ErrLockNotHold}return nil
}

1.2 retry.go

package redis_lockimport "time"// RetryStrategy 重试策略
type RetryStrategy interface {// Next 下一次重试的时间是多久,返回两个参数 time 时间,bool 是否直接重试Next() (time.Duration, bool)
}

1.3 lock.lua

lua脚本原子化加锁

--[[ 获取到对应的value是否跟当前的一样 ]]
if redis.call("get", KEYS[1]) == ARGV[1]
then
-- 如果一样直接对其时间进行续约return redis.call("pexpire", KEYS[1], ARGV[2])
else
-- 如果不一样调用setnx命令对其进行设置值return redis.call("set", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])

1.4 lua_unlock.lua

lua脚本原子化解锁

if redis.call("get", KEYS[1]) == ARGV[1] then-- 返回0,代表key不在return redis.call("del", KEYS[1])
else-- key在,但是值不对return 0
end

1.5 refresh.lua

lua脚本续约

if redis.call("get", KEYS[1]) == ARGV[1] then-- 返回0,代表key不在return redis.call("pexpire", KEYS[1], ARGV[2])
else-- key在,但是值不对return 0
end

1.6 单元测试

使用go-mock工具生成本地的单元测试,不需要再单独的搭建一个 redis 的服务端

项目根目录下安装mockgen工具

go install github.com/golang/mock/mockgen@latest

添加依赖

go get github.com/golang/mock/mockgen/model

生成redis客户端接口

mockgen -package=mocks -destination=mocks/redis_cmdable.mock.go github.com/go-redis/redis/v9 Cmdable

  • package:指定包
  • destination:生成路径名称
  • 剩下的是指定使用redis包下面的 Cmdable接口生成代码

在这里插入图片描述

测试类

func TestClient_TryLock(t *testing.T) {ctrl := gomock.NewController(t)defer ctrl.Finish()testCase := []struct {//测试的场景name string//输入key        stringexpiration time.Duration//返回一个mock数据mock func() redis.Cmdable//期望的返回的错误值wantError error//期望返回的锁wantLock *Lock}{{name:       "locked",key:        "locked-key",expiration: time.Minute,mock: func() redis.Cmdable {rdb := mocks.NewMockCmdable(ctrl)res := redis.NewBoolResult(true, nil)i := []interface{}{gomock.Any(), time.Minute}rdb.EXPECT().Eval(gomock.Any(), luaLock, []string{"locked-key"}, i...).Return(res)return rdb},wantLock: &Lock{key: "locked-key",},},}for _, tc := range testCase {t.Run(tc.name, func(t *testing.T) {var c = NewClient(tc.mock())l, err := c.TryLock(context.Background(), tc.key, tc.expiration)assert.Equal(t, tc.wantError, err)if err != nil {return}//判断返回的key是否跟期望的一样assert.Equal(t, tc.key, l.key)assert.Equal(t, tc.wantLock.key, l.key)assert.NotEmpty(t, l.value)})}
}