引言
假设我们的某个业务会涉及数据更新,同时在实际场景中有较大并发量。流程:读取->修改->保存,在不考虑基于DB层的并发处理情况下,这种场景可能对部分数据造成不可预期的执行结果,此时可以考虑使用分布式锁来解决该问题
需要解决的问题
GETDEL
代码
目录结构:
│ main.go
│
└─demo
lock.go
lock.go
package demo
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"math/rand"
"time"
)
// 重试次数
var retryTimes = 5
// 重试频率
var retryInterval = time.Millisecond * 50
var rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
// 锁的默认过期时间
var expiration time.Duration
// 模拟分布式业务加锁场景
func MockTest(tag string) {
var ctx, cancel = context.WithCancel(context.Background())
defer func() {
// 停止goroutine
cancel()
}()
// 随机value
lockV := getRandValue()
lockK := "EXAMPLE_LOCK"
// 默认过期时间
expiration = time.Millisecond * 200
fmt.Println(tag + "尝试加锁")
set, err := rdb.SetNX(ctx, lockK, lockV, expiration).Result()
if err != nil {
panic(err.Error())
}
// 加锁失败,重试
if set == false && retry(ctx, rdb, lockK, lockV, expiration, tag) == false {
fmt.Println(tag + " server unavailable, try again later")
return
}
fmt.Println(tag + "成功加锁")
// 加锁成功,新增守护线程
go watchDog(ctx, rdb, lockK, expiration, tag)
// 处理业务(通过随机时间延迟模拟)
fmt.Println(tag + "等待业务处理完成...")
time.Sleep(getRandDuration())
// 业务处理完成
// 释放锁
val := delByKeyWhenValueEquals(ctx, rdb, lockK, lockV)
fmt.Println(tag+"释放结果:", val)
}
// 释放锁
func delByKeyWhenValueEquals(ctx context.Context, rdb *redis.Client, key string, value interface{}) bool {
lua := `
-- 如果当前值与锁值一致,删除key
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
`
scriptKeys := []string{key}
val, err := rdb.Eval(ctx, lua, scriptKeys, value).Result()
if err != nil {
panic(err.Error())
}
return val == int64(1)
}
// 生成随机时间
func getRandDuration() time.Duration {
rand.Seed(time.Now().UnixNano())
min := 50
max := 100
return time.Duration(rand.Intn(max-min)+min) * time.Millisecond
}
// 生成随机值
func getRandValue() int {
rand.Seed(time.Now().UnixNano())
return rand.Int()
}
// 守护线程
func watchDog(ctx context.Context, rdb *redis.Client, key string, expiration time.Duration, tag string) {
for {
select {
// 业务完成
case <-ctx.Done():
fmt.Printf("%s任务完成,关闭%s的自动续期\n", tag, key)
return
// 业务未完成
default:
// 自动续期
rdb.PExpire(ctx, key, expiration)
// 继续等待
time.Sleep(expiration / 2)
}
}
}
// 重试
func retry(ctx context.Context, rdb *redis.Client, key string, value interface{}, expiration time.Duration, tag string) bool {
i := 1
for i <= retryTimes {
fmt.Printf(tag+"第%d次尝试加锁中...\n", i)
set, err := rdb.SetNX(ctx, key, value, expiration).Result()
if err != nil {
panic(err.Error())
}
if set == true {
return true
}
time.Sleep(retryInterval)
i++
}
return false
}
流程说明
MockTest
contextwatchDogluavaluecancelwatchDog
应对场景
expireexpirewatchDog
测试
main.go
package main
import (
"play/demo"
"time"
)
func main() {
go demo.MockTest("A")
go demo.MockTest("B")
go demo.MockTest("C")
go demo.MockTest("D")
go demo.MockTest("E")
// 用于测试goroutine接收到ctx.Done()信号后的打印
time.Sleep(time.Second * 2)
}
结果:
$ go run main.go
A尝试加锁
D尝试加锁
E尝试加锁
B尝试加锁
C尝试加锁
D成功加锁
D等待业务处理完成...
B第1次尝试加锁中...
E第1次尝试加锁中...
A第1次尝试加锁中...
C第1次尝试加锁中...
B第2次尝试加锁中...
D释放结果: true
B成功加锁
E第2次尝试加锁中...
B等待业务处理完成...
C第2次尝试加锁中...
A第2次尝试加锁中...
D任务完成,关闭EXAMPLE_LOCK的自动续期
A第3次尝试加锁中...
C第3次尝试加锁中...
E第3次尝试加锁中...
B释放结果: true
A成功加锁
A等待业务处理完成...
B任务完成,关闭EXAMPLE_LOCK的自动续期
E第4次尝试加锁中...
C第4次尝试加锁中...
A释放结果: true
A任务完成,关闭EXAMPLE_LOCK的自动续期
C第5次尝试加锁中...
E第5次尝试加锁中...
C成功加锁
C等待业务处理完成...
E server unavailable, try again later
C释放结果: true
C任务完成,关闭EXAMPLE_LOCK的自动续期
偷懒就没写单元测试了?