Consul 实现分布式锁非常简单,api包内部已经给我们提供了很简单的接口来访问。

包:

go get -u github.com/hashicorp/consul/api


安装consul 请参考文档:


启动 consul

consul agent  -data-dir ./data/node1 -dev -ui

上代码:

package main

import (
	"github.com/hashicorp/consul/api"
	"log"
	"sync"
)

func main() {
	client, err := api.NewClient(api.DefaultConfig())
	if err != nil {
		log.Fatal(err)
	}
	wg := sync.WaitGroup{}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()

			val, _, err := client.KV().Get("key", nil)
			if err != nil {
				log.Println(err)
				return
			}
			var v string
			if val == nil {
				v = "0"
			} else {
				v = string(val.Value) + "0"
			}
			_, err = client.KV().Put(&api.KVPair{
				Key:   "key",
				Value: []byte(v),
			}, nil)
			if err != nil {
				log.Fatal(err)
				return
			}
		}()
	}

	wg.Wait()

	val, _, err := client.KV().Get("key", nil)
	if err != nil {
		log.Println(err)
		return
	}

	log.Println(string(val.Value))
}

上述示例代码,使用了10个goroutine同时向consul中获取并且更新同一个key,在分布式系统中经常见到的,比如很多个副本程序操作同一个数据。那么该程序最终的输出结果可能是一个0到十个0之间,并不能保证最终的结果是十个0,按道理来说我起了十个线程,先读取再追加,可能读取的时候另一个线程已经把自己的结果写入了,造成了读取了脏数据,再更新数据就丢了。


将上述代码引入分布式锁,在读取--写入之间锁住程序,让期间的步骤按照顺序进行,有且只有一个线程执行,执行完了,将锁释放,另一个线程抢占住锁然后继续操作,这样就不会有脏数据了。

package main

import (
	"github.com/hashicorp/consul/api"
	"log"
	"sync"
)

func main() {
	client, err := api.NewClient(api.DefaultConfig())
	if err != nil {
		log.Fatal(err)
	}
	wg := sync.WaitGroup{}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			// 1. 获取锁
			// 锁具备默认的超时时间
			// 如果想自定义超时时间以及一些额外的配置可以用 LockOpts
			// client.LockOpts()
			lock, err := client.LockKey("lock")
			if err != nil {
				log.Println(err)
				return
			}
			// 2. 尝试锁住
			ch := make(chan struct{})
			defer close(ch)

			_, err = lock.Lock(ch)
			if err != nil {
				log.Println(err)
				return
			}
			defer lock.Unlock()

			// 3. 操作需要的资源,如修改consul里面的数据
			// .. 此处的操作为原子性的.
			// 更新某个key.
			val, _, err := client.KV().Get("key", nil)
			if err != nil {
				log.Println(err)
				return
			}
			var v string
			if val == nil {
				v = "0"
			} else {
				v = string(val.Value) + "0"
			}
			_, err = client.KV().Put(&api.KVPair{
				Key:   "key",
				Value: []byte(v),
			}, nil)
			if err != nil {
				log.Fatal(err)
				return
			}
		}()
	}

	wg.Wait()

	val, _, err := client.KV().Get("key", nil)
	if err != nil {
		log.Println(err)
		return
	}

	log.Println(string(val.Value))
}


上述代码输出将会是期望的 十个0


注意的地方

  1. 分布式锁一定需要使用超时机制,否则一旦程序发生错误,该锁将无法释放。
  2. 大家可能注意到我上述例子使用的 lock的key和我想更新的key不一样,lock的key只是用来获取到锁,跟需要操作的key是不一样的,因此大家需要有一套机制来实现锁的key的生成。
  3. 大家注意到生成锁的地方使用了一个channel,这个channel的作用是自己来控制程序的阻塞与非阻塞,比如说锁的超时时间30秒,但是我期望程序最多5秒就返回,因此可以加个CancelContext来控制. 可以用如下程序来改造:


			// .........
                        // 2. 尝试锁住
			ch := make(chan struct{})
			go func() {
				ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
				defer cancel()

				select {
				case <-ctx.Done():
					close(ch)
					log.Println("cancel")
				}
			}()

			stopCh, err := lock.Lock(ch)
			if err != nil {
				log.Println(err)
				return
			}
			time.Sleep(time.Duration(rand.Intn(10)) * time.Second)
			if stopCh == nil {
				// 自动取消了锁
				log.Println("auto unlocked")
				return
			}
			defer lock.Unlock()
                        // ......