在单机单应用体系下,为了消除多线程对共享变量的访问时产生的竞态问题,通常需要对临界区加锁。在集群场景下,应用会被负载均衡到多台机器上进行部署,此时也会产生一系列并发安全问题。比如,多台机器上的应用需要共同维护与执行同一组定时任务,当定时时间来临,在不加任何额外处理的情况下,定时任务将会被多台机器重复执行,可能造成意想不到的后果。因此,在多机部署场景下,也需要一套跨机器的互斥解决方案,保证同一个操作在同一时刻只能被一台机器所执行。
分布式锁的设计理念
单机场景中的锁不适于多机场景下使用,多机场景下自然需要一种分布式形态的锁,允许多机共同抢占。这种锁同样需要实现并满足锁的一般特点:
1.必须要保证互斥机制的稳定性。
2.加锁与解锁的操作尽量高性能。
3.可以根据具体情境考虑实现抢锁失败是否阻塞。
4.最好具备可重入与失效机制,避免死锁。
实现方式:
1.基于MySQL数据库实现,可以借助MySQL的唯一索引,进行 insert 等操作,以操作的成功与否决定是否抢锁成功。也可以借助MySQL的排它锁 for update ,对增改操作加锁,实现互斥。另外也可以通过记录版本号信息,实现乐观锁。但是对于MySQL分布式锁的实现,其加解锁需要在磁盘上进行读写,性能上可能不太如意,需要根据实际场景考虑是否采用该方式。
2.基于Redis实现,该方式也是本文将要实现的方式。通常可以借助Redis中的 SETNX 操作,实现加锁的互斥性。而Redis作为一种单工作线程的内存数据库,其所有命令操作具有天然原子性,这保证了其并发的安全性。另外,其采用的IO多路复用机制保证了其高吞吐特性。因此,在高并发场景下,通常可以采用基于Redis实现的分布式锁。
基于go-redis的设计与实现
本文将基于go语言,使用了一个常用的go Redis客户端 go-redis库 (https://github.com/go-redis/redis) , 一步一步探索与实现一个简单的Redis分布式锁。
1.基于 SETNX 的锁初步实现
SETNX 命令用于在Redis中设置某个不存在的键的值。如果该键不存在,则设置成功,如果该键存在,则设置失败,不作任何动作。基于此可以实现一种简单的抢占机制。
首先配置连接Redis:
var (
client *redis.Client
lockKey = "my_lock"
)
func init() {
client = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0, redis默认拥有16个db(0~15),且默认连接db0
})
}
上述创建了一个Redis数据库连接,并定义了一个 lockKey 变量,作为锁的标记。
既然是锁,那么必须要有加锁与解锁的实现,如下为加锁代码实现:
func Lock() {
var resp *redis.BoolCmd
for {
resp = client.SetNX(lockKey, 1, time.Second*10) //返回执行结果
lockSuccess, err := resp.Result()
if err == nil && lockSuccess {
fmt.Println("lock success!")
return
} else {
//抢锁失败,继续自旋
fmt.Println("lock failed!", err)
}
}
}
Lock()函数封装了对锁的抢占操作,这种抢占采用了自旋的方式,进行锁状态的轮询,是一种同步非阻塞的锁实现。client.SetNX(lockKey, 1, time.Second*10) 这行代码本质上执行了如下Redis操作命令:
set my_lock 1 ex 10 nx
该命令为 my_lock 键以 NX 方式设置了值。
如果持有锁的进程万一挂了,那么该键将永远存在与Redis中,其他竞争者无法进行 SETNX 操作,形成死锁。为了防范这种情况发生,这里顺便设置了过期时间为10s,这样即便持锁者挂了,锁在一定时间后依然后自动释放。这里整个 set 操作是原子性的,并对该操作的返回结果作了判断,如果成功设置,说明抢占锁成功,则函数返回,进入临界区可以继续执行下面的代码。如果设置失败,则继续for循环自旋。另外,抢锁失败时,可以适当进行线程休眠,以降低自旋空转对CPU的占用。
接下来看看,在加锁成功并完成了临界区代码后,如何进行解锁操作:
func Unlock() {
delResp := client.Del(lockKey)
unlockSuccess, err := delResp.Result()
if err == nil && unlockSuccess > 0 {
fmt.Println("unlock success!")
} else {
fmt.Println("unlock failed!", err)
}
}
上述代码中,client.Del(lockKey) 对Redis中的 my_lock 键进行了删除操作,当删除后,其他竞争者才有机会对该键进行 SETNX操作。
2.锁的防误删实现
这样就使用Redis实现了一个简单的分布式锁,但是仍然存在一个问题:如果键过期了,其持有者仍未完成任务,那么锁可能会被其他竞争者抢走,待原持有者完成任务进行解锁操作时,解除的将是当前其他持有者的锁,即发生误删。
为了解决这种问题,持有者可以给锁添加一个唯一标识,使之只能删除自己的锁。因此需要完善一下加解锁操作:
func Lock() {
var resp *redis.BoolCmd
for {
goId := utils.GetCurrentGoroutineId()
resp = client.SetNX(lockKey, goId, time.Second*10) //返回执行结果
lockSuccess, err := resp.Result()
if err == nil && lockSuccess {
fmt.Println("lock success!")
return
} else {
fmt.Println("lock failed!", err)
}
//抢锁失败,继续自旋
}
}
修改之处在于第4、5行,这里我使用了Goroutine的id为my_lock 键设置值,来作为锁的唯一标识。
另外顺便提一下,众所周知,Go语言官方并没有提供任何Goroutine id的获取接口。但我们注意到,Goroutine的堆栈信息具有以下特征:
goroutine 1 [running]:
main.main()
D:/albert/gopath/src/go_coding/main.go:12 +0x82
其中 goroutine 1 [running] 中蕴含了该Goroutine的id信息,因此这里我使用了这么一个技巧,通过查看Goroutine的堆栈信息来解析获取其Goroutine id,代码如下:
func GetCurrentGoroutineId() int {
buf := make([]byte, 128)
buf = buf[:runtime.Stack(buf, false)]
stackInfo := string(buf)
goIdStr := strings.TrimSpace(strings.Split(strings.Split(stackInfo, "[running]")[0], "goroutine")[1])
goId, err := strconv.Atoi(goIdStr)
if err != nil {
fmt.Println("err=", err)
return 0
}
return goId
}
另外,除了Goroutine id外,唯一标识还可以通过其他方式实现,如随机数或者时间戳等。
在解锁时,需要先判断锁的唯一标识值是否是与当前执行者相匹配:
func Unlock() {
if value, err := client.Get(lockKey).Result(); err != nil || value == "" {
fmt.Println("lock not exist")
return
} else {
if value != strconv.Itoa(utils.GetCurrentGoroutineId()) {
return
}
}
//确认当前锁是自己的锁后,进行删除锁
//确认当前锁是自己的锁之后,删除锁之前,这段时间内,锁可能会恰巧过期释放且被其他竞争者抢占
//那么继续删除锁则是删除别人的锁
//因此需要将整个解锁过程原子化,使得在解锁期间,其他竞争者的任何操作不能被redis执行
delResp := client.Del(lockKey)
unlockSuccess, err := delResp.Result()
if err == nil && unlockSuccess > 0 {
fmt.Println("unlock success!")
} else {
fmt.Println("unlock failed!", err)
}
}
上述代码第5行,将获取的键值与当前的Goroutine id比较之后,再决定是否执行下面的删除锁操作。
3.解锁的原子化实现
上述解锁操作中,仍存在一个问题:在确认当前锁是自己的锁之后,删除锁之前,这段时间内,锁可能会恰巧过期释放且被其他竞争者抢占,那么继续删除则删除的是别人的锁,又会出现误删问题。
因此需要将整个解锁过程原子化,使得在解锁期间,其他竞争者的任何操作不能被Redis执行。
这里我采用了Lua脚本,封装了判断标识与删除键的整个操作,通过KEYS与ARGV 数组将键与值传入:
func Unlock() {
script := redis.NewScript(`
if redis.call('get', KEYS[1]) == ARGV[1]
then
return redis.call('del', KEYS[1])
else
return 0
end
`)
resp := script.Run(client, []string{lockKey}, utils.GetCurrentGoroutineId())
if result, err := resp.Result(); err != nil || result == 0 {
fmt.Println("unlock failed:", err)
}
}
这样一来,确认锁与删除锁的整体操作进行了原子化,便可以防止上述存在的误删问题。
4.锁续期的看门狗实现
以上完成与解决了锁的期限、唯一性等问题,仍存在一个问题:当锁的持有者任务未完成,但是锁已过期。此时虽然他仍可以将任务继续完成,并且也不会误删其他持锁者的锁,但是此时可能会存在多个执行者同时执行临界区代码,使得数据的一致性难以保证,造成意外的后果,分布式锁就失去了意义。
因此,需要一个锁的自动续期机制,分布式锁框架Redission中就有这么一个看门狗,专门为将要到期的锁进行续期。这里我们也来实现一个简单的看门狗吧:
var (
unlockCh = make(chan struct{}, 0) //用户解锁通知通道
)
//自动续期看门狗
func watchDog(goId int) {
// 创建一个定时器NewTicker, 每隔8s触发一次
expTicker := time.NewTicker(time.Second * 8)
//确认锁与锁续期打包原子化
script := redis.NewScript(`
if redis.call('get', KEYS[1]) == ARGV[1]
then
return redis.call('expire', KEYS[1], ARGV[2])
else
return 0
end
`)
for {
select {
case <-expTicker.C: //因为上边是用NewTicker创建的定时器,所以每隔8s都会触发
resp := script.Run(client, []string{lockKey}, goId, 10)
if result, err := resp.Result(); err != nil || result == int64(0) {
log.Println("expire lock failed", err)
}
case <-unlockCh: //任务完成后用户解锁通知看门狗退出
return
}
}
}
上述代码实现了一个简单看门狗,鉴于我们的锁的默认期限是10s,因此看门狗将每隔8s触发一次,这里我们使用了go语言标准库中的Ticker实现。在select 语句中,每隔8s就会触发一次 Expire 操作进行续期,将 my_lock 键的过期时间重置为10s。注意,这里使用Lua脚本封装了确认锁与锁续期的操作,以防止误续期了其他持有者的锁。因此需要将Goroutine id传入看门狗函数中,而且不可以在看门狗函数中获取Goroutine id,因为这将获取的是看门狗线程的Goroutine id。
我们只需要在加锁成功时,以启动看门狗线程即可,如下第8行所示:
func Lock() {
var resp *redis.BoolCmd
for {
goId := utils.GetCurrentGoroutineId()
resp = client.SetNX(lockKey, goId, time.Millisecond*60) //返回执行结果
lockSuccess, err := resp.Result()
if err == nil && lockSuccess { //抢锁成功,开启看门狗 并跳出,否则失败继续自旋
go watchDog(goId)
return
} else {
fmt.Println("lock failed!", err)
}
}
}
另外,当任务完成后,进行解锁操作时需要通知看门狗退出,这里使用了一个unlockCh 通道,当解锁时会向 unlockCh 发送一个信号,让select 去选择执行,使得看门狗线程return退出。因此,我们只需要在删除锁成功时,发送信号通知看门狗退出即可,如下第15行所示:
func Unlock() {
script := redis.NewScript(`
if redis.call('get', KEYS[1]) == ARGV[1]
then
return redis.call('del', KEYS[1])
else
return 0
end
`)
resp := script.Run(client, []string{lockKey}, utils.GetCurrentGoroutineId())
if result, err := resp.Result(); err != nil || result == 0 {
fmt.Println("unlock failed:", err)
} else {
//删锁成功后,通知看门狗退出
unlockCh <- struct{}{}
}
}
在分布式系统中,各个机器上的应用在自己的内存空间中独立维护自己的看门狗以及unlockCh 通道,因此看门狗不存在竞态问题,也不存在误通知其他应用的看门狗退出的现象发生。
最终
至此,我们简单完成了一个Redis分布式锁的实现,也尝试解决了一些可能存在的问题。当然,这只是分布式锁的一种粗略实现,在实际场景中,还需要结合具体业务,考虑锁的过期时间、续期时间、可重入性、公平性、阻塞方式等问题。最后,附上完整的最终代码以供参考,有疏漏错误之处,还望予以指出:
package main
import (
"fmt"
"github.com/go-redis/redis"
"go_coding/utils"
"log"
"sync"
"time"
)
var (
client *redis.Client
lockKey = "my_lock"
unlockCh = make(chan struct{}, 0) //用户解锁通知通道
)
func init() {
client = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0, //redis默认拥有16个db(0~15),且默认连接db0
})
}
func Lock() {
var resp *redis.BoolCmd
for {
goId := utils.GetCurrentGoroutineId()
resp = client.SetNX(lockKey, goId, time.Second*10) //返回执行结果
lockSuccess, err := resp.Result()
if err == nil && lockSuccess {
//抢锁成功,开启看门狗 并跳出,否则失败继续自旋
go watchDog(goId)
return
}else {
//time.Sleep(time.Millisecond*30) //可以适当休眠
}
}
}
//自动续期看门狗
func watchDog(goId int) {
// 创建一个定时器NewTicker, 每隔2秒触发一次,类似于闹钟
expTicker := time.NewTicker(time.Second * 8)
//确认锁与锁续期打包原子化
script := redis.NewScript(`
if redis.call('get', KEYS[1]) == ARGV[1]
then
return redis.call('expire', KEYS[1], ARGV[2])
else
return 0
end
`)
for {
select {
case <-expTicker.C: //因为上边是用NewTicker创建的定时器,所以每隔8s都会触发
resp := script.Run(client, []string{lockKey}, goId, 10)
if result, err := resp.Result(); err != nil || result == int64(0) {
//续期失败
log.Println("expire lock failed", err)
}
case <-unlockCh: //任务完成后用户解锁通知看门狗退出
return
}
}
}
func Unlock() {
script := redis.NewScript(`
if redis.call('get', KEYS[1]) == ARGV[1]
then
return redis.call('del', KEYS[1])
else
return 0
end
`)
resp := script.Run(client, []string{lockKey}, utils.GetCurrentGoroutineId())
if result, err := resp.Result(); err != nil || result == 0 {
fmt.Println("unlock failed:", err)
} else {
//删锁成功后,通知看门狗退出
unlockCh <- struct{}{}
}
}