Consul 实现分布式锁非常简单,api包内部已经给我们提供了很简单的接口来访问。
包:
go get -u github.com/hashicorp/consul/api
安装consul 请参考文档:
启动 consul
consul agent -data-dir ./data/node1 -dev -ui
上代码:
package main
import (
"github.com/hashicorp/consul/api"
"log"
"sync"
)
func main() {
client, err := api.NewClient(api.DefaultConfig())
if err != nil {
log.Fatal(err)
}
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
val, _, err := client.KV().Get("key", nil)
if err != nil {
log.Println(err)
return
}
var v string
if val == nil {
v = "0"
} else {
v = string(val.Value) + "0"
}
_, err = client.KV().Put(&api.KVPair{
Key: "key",
Value: []byte(v),
}, nil)
if err != nil {
log.Fatal(err)
return
}
}()
}
wg.Wait()
val, _, err := client.KV().Get("key", nil)
if err != nil {
log.Println(err)
return
}
log.Println(string(val.Value))
}
上述示例代码,使用了10个goroutine同时向consul中获取并且更新同一个key,在分布式系统中经常见到的,比如很多个副本程序操作同一个数据。那么该程序最终的输出结果可能是一个0到十个0之间,并不能保证最终的结果是十个0,按道理来说我起了十个线程,先读取再追加,可能读取的时候另一个线程已经把自己的结果写入了,造成了读取了脏数据,再更新数据就丢了。
将上述代码引入分布式锁,在读取--写入之间锁住程序,让期间的步骤按照顺序进行,有且只有一个线程执行,执行完了,将锁释放,另一个线程抢占住锁然后继续操作,这样就不会有脏数据了。
package main
import (
"github.com/hashicorp/consul/api"
"log"
"sync"
)
func main() {
client, err := api.NewClient(api.DefaultConfig())
if err != nil {
log.Fatal(err)
}
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 1. 获取锁
// 锁具备默认的超时时间
// 如果想自定义超时时间以及一些额外的配置可以用 LockOpts
// client.LockOpts()
lock, err := client.LockKey("lock")
if err != nil {
log.Println(err)
return
}
// 2. 尝试锁住
ch := make(chan struct{})
defer close(ch)
_, err = lock.Lock(ch)
if err != nil {
log.Println(err)
return
}
defer lock.Unlock()
// 3. 操作需要的资源,如修改consul里面的数据
// .. 此处的操作为原子性的.
// 更新某个key.
val, _, err := client.KV().Get("key", nil)
if err != nil {
log.Println(err)
return
}
var v string
if val == nil {
v = "0"
} else {
v = string(val.Value) + "0"
}
_, err = client.KV().Put(&api.KVPair{
Key: "key",
Value: []byte(v),
}, nil)
if err != nil {
log.Fatal(err)
return
}
}()
}
wg.Wait()
val, _, err := client.KV().Get("key", nil)
if err != nil {
log.Println(err)
return
}
log.Println(string(val.Value))
}
上述代码输出将会是期望的 十个0
注意的地方
- 分布式锁一定需要使用超时机制,否则一旦程序发生错误,该锁将无法释放。
- 大家可能注意到我上述例子使用的 lock的key和我想更新的key不一样,lock的key只是用来获取到锁,跟需要操作的key是不一样的,因此大家需要有一套机制来实现锁的key的生成。
- 大家注意到生成锁的地方使用了一个channel,这个channel的作用是自己来控制程序的阻塞与非阻塞,比如说锁的超时时间30秒,但是我期望程序最多5秒就返回,因此可以加个CancelContext来控制. 可以用如下程序来改造:
// .........
// 2. 尝试锁住
ch := make(chan struct{})
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
select {
case <-ctx.Done():
close(ch)
log.Println("cancel")
}
}()
stopCh, err := lock.Lock(ch)
if err != nil {
log.Println(err)
return
}
time.Sleep(time.Duration(rand.Intn(10)) * time.Second)
if stopCh == nil {
// 自动取消了锁
log.Println("auto unlocked")
return
}
defer lock.Unlock()
// ......