client := goredislib.NewClient(&goredislib.Options{
   Addr: "10.211.55.6:6379",
})
pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
rs := redsync.New(pool)

//根据需求设置锁名称
mutexName := "goods-1"
var wg sync.WaitGroup
wg.Add(20)
//根据需求设置锁
mutexName := "goods-1"
var wg sync.WaitGroup
wg.Add(20)
for i := 0; i < 20; i++ {
   go func() {
      defer wg.Done()
      var goods Goods
      mutex := rs.NewMutex(mutexName)
      fmt.Println("开始获取锁")
      if err := mutex.Lock(); err != nil {
         fmt.Println("获取锁异常")
         panic(err)
      }
      fmt.Println("获取锁成功")
      db.Where(Goods{ProductId: 1}).First(&goods)
      result := db.Model(&Goods{}).Where("product_id=?", 1).Updates(Goods{Inventory: goods.Inventory - 1})
      if result.RowsAffected == 0 {
         fmt.Println("更新失败")
      }
      fmt.Println("开始释放锁")
      if ok, err := mutex.Unlock(); !ok || err != nil {
         panic("unlock failed")
      }
      fmt.Println("锁释放成功")
   }()
}
wg.Wait()

redsync源码解读

使用Redis Setnx 命令:

在指定的 key 不存在时,为 key 设置指定的值。设置成功,返回 1 。 设置失败,返回 0 。(将获取和设置值变成原子性操作)

redis 127.0.0.1:6379> SETNX KEY_NAME VALUE

避免执行过程服务挂掉,释放锁失败,出现死锁。

func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
   conn, err := pool.Get(ctx)
   if err != nil {
      return false, err
   }
   defer conn.Close()
   reply, err := conn.SetNX(m.name, value, m.expiry)  //过期时间为8秒
   if err != nil {
      return false, err
   }
   return reply, nil
}

业务还没有执行完,时间就过期了?

1.过期前刷新一下过期时间
2.需要自己启动协程完成延时工作,避免服务hung住一直申请延长时间,导致其他服务拿不到锁

var touchScript = redis.NewScript(1, `
   if redis.call("GET", KEYS[1]) == ARGV[1] then
      return redis.call("PEXPIRE", KEYS[1], ARGV[2])
   else
      return 0
   end
`)

分布式锁需要解决的问题:–lua脚本实现

1.互斥性 -setnx
2.避免死锁 -过期时间
3.安全性 -锁只能被持有者删除,不能被其他用户删除。通过value值去判断,只有当前g知道value的值,删除的时候取出值对比。

Redlock(红锁) 算法

在分布式版本的算法里我们假设我们有N个Redis master节点,这些节点都是完全独立的,我们不用任何复制或者其他隐含的分布式协调算法。我们已经描述了如何在单节点环境下安全地获取和释放锁。因此我们理所当然地应当用这个方法在每个单节点里来获取和释放锁。在我们的例子里面我们把N设成5,这个数字是一个相对比较合理的数值,因此我们需要在不同的计算机或者虚拟机上运行5个master节点来保证他们大多数情况下都不会同时宕机。一个客户端需要做如下操作来获取锁:
1.获取当前时间(单位是毫秒)。
2.轮流用相同的key和随机值在N个节点上请求锁,在这一步里,客户端在每个master上请求锁时,会有一个和总的锁释放时间相比小的多的超时时间。比如如果锁自动释放时间是10秒钟,那每个节点锁请求的超时时间可能是5-50毫秒的范围,这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间,如果一个master节点不可用了,我们应该尽快尝试下一个master节点。
3.客户端计算第二步中获取锁所花的时间,只有当客户端在大多数master节点上成功获取了锁(在这里是3个),而且总共消耗的时间不超过锁释放时间,这个锁就认为是获取成功了。
4.如果锁获取成功了,那现在锁自动释放时间就是最初的锁释放时间减去之前获取锁所消耗的时间。
5.如果锁获取失败了,不管是因为获取成功的锁不超过一半(N/2+1)还是因为总消耗时间超过了锁释放时间,客户端都会到每个master节点上释放锁,即便是那些他认为没有获取成功的锁。

本作品采用《CC 协议》,转载必须注明作者和本文链接