etcd介绍
"etcd"这个名字源于两个想法,即 unix "/etc" 文件夹和分布式系统"d"istibuted。 "/etc" 文件夹为单个系统存储配置数据的地方,而 etcd 存储大规模分布式系统的配置信息。因此,"d"istibuted 的 "/etc" ,是为 "etcd"。
etcd 以一致和容错的方式存储元数据。分布式系统使用 etcd 作为一致性键值存储,用于配置管理,服务发现和协调分布式工作。使用 etcd 的通用分布式模式包括领导选举,分布式锁和监控机器活动。
etcd 设计用于可靠存储不频繁更新(读多写少)的数据,并提供可靠的观察查询。etcd 以持久性 b+tree 键值对的方式存储物理数据。
etcd分层架构图
按照分层模型,etcd 可分为 Client 层、API 网络层、Raft 算法层、逻辑层和存储层。这些层的功能如下:
- Client 层:Client 层包括 client v2 和 v3 两个大版本 API 客户端库,提供了简洁易用的 API,同时支持负载均衡、节点间故障自动转移,可极大降低业务使用 etcd 复杂度,提升开发效率、服务可用性。
- API 网络层:API 网络层主要包括 client 访问 server 和 server 节点之间的通信协议。一方面,client 访问 etcd server 的 API 分为 v2 和 v3 两个大版本。v2 API 使用 HTTP/1.x 协议,v3 API 使用 gRPC 协议。同时 v3 通过 etcd grpc-gateway 组件也支持 HTTP/1.x 协议,便于各种语言的服务调用。另一方面,server 之间通信协议,是指节点间通过 Raft 算法实现数据复制和 Leader 选举等功能时使用的 HTTP 协议。etcdv3版本中client 和 server 之间的通信,使用的是基于 HTTP/2 的 gRPC 协议。相比 etcd v2 的 HTTP/1.x,HTTP/2 是基于二进制而不是文本、支持多路复用而不再有序且阻塞、支持数据压缩以减少包大小、支持 server push 等特性。因此,基于 HTTP/2 的 gRPC 协议具有低延迟、高性能的特点,有效解决etcd v2 中 HTTP/1.x 性能问题。
- Raft 算法层:Raft 算法层实现了 Leader 选举、日志复制、ReadIndex 等核心算法特性,用于保障 etcd 多个节点间的数据一致性、提升服务可用性等,是 etcd 的基石和亮点。raft协议动画Raft
- 功能逻辑层:etcd 核心特性实现层,如典型的 KVServer 模块、MVCC 模块、Auth 鉴权模块、Lease 租约模块、Compactor 压缩模块等,其中 MVCC 模块主要由 treeIndex(内存树形索引) 模块和 boltdb(嵌入式的 KV 持久化存储库) 模块组成。treeIndex 模块使用B-tree 数据结构来保存用户 key 和版本号的映射关系,使用B-tree是因为etcd支持范围查询,使用hash表不适合,从性能上看,B-tree相对于二叉树层级较矮,效率更高;boltdb是个基于 B+ tree 实现的 key-value 键值库,支持事务,提供 Get/Put 等简易 API 给 etcd 操作。
- 存储层:存储层包含预写日志 (WAL) 模块、快照 (Snapshot) 模块、boltdb 模块。其中 WAL 可保障 etcd crash 后数据不丢失,boltdb 则保存了集群元数据和用户写入的数据。
clientv3使用
go get github.com/coreos/etcd/clientv3
go get github.com/coreos/etcd/pkg/transport
连接etcd
var (dialTimeout = 5 * time.Secondendpoints = []string{"172.20.42.70:2379"} // 多个节点[]string{"xx", "yy"}
)func main(){cli := getCli()
}func getCli() *clientv3.Client {var err errortlsInfo := transport.TLSInfo{CertFile: "tls/kube-etcd-172-20-42-70.pem", // etcd公钥KeyFile: "tls/kube-etcd-172-20-42-70-key.pem", // etcd私钥TrustedCAFile: "tls/kube-ca.pem", // ca证书}tlsConfig, err = tlsInfo.ClientConfig()if err != nil {log.Fatal(err)}cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints, // etcd集群列表DialTimeout: dialTimeout, // 连接超时时间TLS: tlsConfig, //不使用tls可不用添加Username: "root", //不开启权限认证可不用添加,password同Password: "123",})if err != nil {fmt.Println(err)}return cli
}
key-value操作
添加值 put
func Put(cli *clientv3.Client) {ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)resp, err := cli.Put(ctx, "sample_key", "sample_value")cancel()if err != nil {log.Fatal(err)}fmt.Println("current revision:", resp.Header.Revision) // revision start at 1// current revision: 2
}
取值 get
func Get(cli *clientv3.Client, key string) {ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)resp, err := cli.Get(ctx, key)// 按前缀取//resp, err := cli.Get(ctx, key, clientv3.WithPrefix())cancel()if err != nil {log.Fatal(err)}for _, ev := range resp.Kvs {fmt.Printf("%s : %s\n", ev.Key, ev.Value)}// foo : bar
}
不使用事务在一次请求中执行多个操作 do
// Do 在创建任意操作时很有用
func Do(cli *clientv3.Client) {ops := []clientv3.Op{clientv3.OpPut("put-key", "123"),clientv3.OpGet("put-key"),clientv3.OpGet("put-key"),clientv3.OpPut("put-key", "456"),clientv3.OpPut("aaa", "bbbbbb"),clientv3.OpGet("aaa"),}for _, op := range ops {if resp, err := cli.Do(context.TODO(), op); err != nil {log.Fatal(err)}else {fmt.Println(resp.Get())}}
}
压缩 Compact
// 压缩,Compact方法压缩在etcd键值存储中的事件历史,
// 键值存储应该定期压缩,否则事件历史会无限制的持续增长.
func Compact(cli *clientv3.Client) {ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)resp, err := cli.Get(ctx, "put-key")cancel()if err != nil {log.Fatal(err)}compRev := resp.Header.Revision // specify compact revision of your choicectx, cancel = context.WithTimeout(context.Background(), requestTimeout)_, err = cli.Compact(ctx, compRev)cancel()if err != nil {log.Fatal(err)}
}
删除 Delete
func Delete(cli *clientv3.Client, key string) {ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)defer cancel()// count keys about to be deleted, 根据前缀获取gresp, err := cli.Get(ctx, key, clientv3.WithPrefix())if err != nil {log.Fatal(err)}fmt.Println(gresp.Count)// delete the keys 根据前缀删除dresp, err := cli.Delete(ctx, key, clientv3.WithPrefix())if err != nil {log.Fatal(err)}fmt.Println("Deleted all keys:", int64(len(gresp.Kvs)) == dresp.Deleted)// 精确获取gresp, err = cli.Get(ctx, "fo")if err != nil {log.Fatal(err)}fmt.Println(gresp.Count)// 精确删除dresp, err = cli.Delete(ctx, "fo")if err != nil {log.Fatal(err)}fmt.Println(dresp)// 已被删除Get(cli, "fo")
}
通过修订版本获取值 WithRev
func GetWithRev(cli *clientv3.Client) {presp, err := cli.Put(context.TODO(), "foo", "bar1")if err != nil {log.Fatal(err)}_, err = cli.Put(context.TODO(), "foo", "bar2")if err != nil {log.Fatal(err)}ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)resp, err := cli.Get(ctx, "foo", clientv3.WithRev(presp.Header.Revision))cancel()if err != nil {log.Fatal(err)}for _, ev := range resp.Kvs {fmt.Printf("%s : %s\n", ev.Key, ev.Value) //会获取到bar1,尽管已经被被修改为bar2}
}
将获取到的值按key排序 WithSort
// 按key排序
func GetSortedPrefix(cli *clientv3.Client) {// 先插入一些测试值for i := range make([]int, 3) {ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)_, err := cli.Put(ctx, fmt.Sprintf("key_%d", i), "value")cancel()if err != nil {log.Fatal(err)}}ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)// 降序resp, err := cli.Get(ctx, "key", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))cancel()if err != nil {log.Fatal(err)}for _, ev := range resp.Kvs {fmt.Printf("%s : %s\n", ev.Key, ev.Value)}ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)// 升序resp, err = cli.Get(ctx, "key", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))cancel()if err != nil {log.Fatal(err)}for _, ev := range resp.Kvs {fmt.Printf("%s : %s\n", ev.Key, ev.Value)}}
错误处理
func PutErrorHandling(cli *clientv3.Client) {ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)_, err := cli.Put(ctx, "", "sample_value")cancel()if err != nil {switch err {case context.Canceled:fmt.Printf("ctx is canceled by another routine: %v\n", err)case context.DeadlineExceeded:fmt.Printf("ctx is attached with a deadline is exceeded: %v\n", err)case rpctypes.ErrEmptyKey:fmt.Printf("client-side error: %v\n", err)default:fmt.Printf("bad cluster endpoints, which are not etcd servers: %v\n", err)}}
}
事务Txn
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = kvc.Txn(ctx).// txn value comparisons are lexical 通过If Then Else实现事务,包括一个比较操作和一个赋值操作If(clientv3.Compare(clientv3.Value("key"), ">", "abc")).// the "Then" runs, since "xyz" > "abc"Then(clientv3.OpPut("key", "XYZ")).// the "Else" does not runElse(clientv3.OpPut("key", "ABC")).Commit()
cancel()
if err != nil {log.Fatal(err)
}gresp, err := kvc.Get(context.TODO(), "key")
cancel()
if err != nil {log.Fatal(err)
}
for _, ev := range gresp.Kvs {fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
权限操作
package authimport ("context""crypto/tls""fmt""github.com/coreos/etcd/clientv3""github.com/coreos/etcd/pkg/transport""log""strings""time"
)var(dialTimeout = 5 * time.Secondendpoints = []string{"172.20.42.70:2379"}
)type Auth struct {Cli *clientv3.ClientTlsConfig *tls.Config
}func (a *Auth)AuthExample() {//若已存在某个role,程序重启后无法修改该role的权限,需要先删除,再创建,才可修改//_, _ = cli.RoleDelete(context.TODO(), "r-role")fmt.Println("example")// 关闭权限认证defer a.Cli.AuthDisable(context.TODO())a.NormalUser()a.Root()
}func (a *Auth)NormalUser() {a.AddUser("user1", "123")a.AddRole("role1")a.UserBindRole("user1", "role1")// role1权限是可以读/写key为["foo", "goo")范围内的数据,r用户与r-role绑定,所以r用户拥有这个权限a.GrantPermission("role1", "foo", "goo", clientv3.PermissionType(clientv3.PermReadWrite))// 开启权限认证,开启后连接必须使用用户名密码a.Cli.AuthEnable(context.TODO())// 普通用户连接cliAuth, err := clientv3.New(clientv3.Config{Endpoints: endpoints,DialTimeout: dialTimeout,TLS: a.TlsConfig,Username: "user1",Password: "123",})if err != nil {log.Fatal("client new ", err)}defer cliAuth.Close()if _, err = cliAuth.Put(context.TODO(), "foo", "bar"); err != nil {fmt.Println("r Put foo bar", err)}else {fmt.Println("r Put foo bar success")}// 为角色移除某个 key 的权限,此处的key和rangeEnd必须和前面创建的时候一致,即都为foo, goo_, err = a.Cli.RoleRevokePermission(context.TODO(), "role1", "foo", "goo")fmt.Println("RoleRevokePermission", err)// 此时已无权限if _, err = cliAuth.Put(context.TODO(), "foo", "bar"); err != nil {fmt.Println("r Put foo bar", err)}else {fmt.Println("r Put foo bar success")}// 普通用户无权限关闭认证,无法执行成功_, err = cliAuth.AuthDisable(context.TODO())fmt.Println("user cliAuth.AuthDisable ", err)
}// root拥有所有权限,会自动识别root这个关键字
func (a *Auth)Root() {a.AddUser("root", "123")a.AddRole("root")a.UserBindRole("root", "root")rootCli, err := clientv3.New(clientv3.Config{Endpoints: endpoints,DialTimeout: dialTimeout,TLS: a.TlsConfig,Username: "root",Password: "123",})if err != nil {log.Fatal(err)}defer rootCli.Close()resp, err := rootCli.RoleGet(context.TODO(), "role1")if err != nil {fmt.Println("root get role", err)}fmt.Printf("user r permission: key %q, range end %q\n", resp.Perm, resp.Perm)
}// 判断已存在,如果已存在直接创建会报错
func (a *Auth)IsAlreadyExists(err error) bool {if strings.Contains(fmt.Sprintf("%s", err), "already exists"){return true}return false
}// 添加用户
func (a *Auth) AddUser(user, passwd string) {if _, err := a.Cli.UserAdd(context.TODO(), user, passwd); err != nil {if !a.IsAlreadyExists(err){log.Fatal("UserAdd ", err)}}
}// 添加角色
func (a *Auth) AddRole(role string) {if _, err := a.Cli.RoleAdd(context.TODO(), role); err != nil {if !a.IsAlreadyExists(err){log.Fatal("RoleAdd ", err)}}
}// 绑定角色与用户
func (a *Auth)UserBindRole(user, role string) {if _, err := a.Cli.UserGrantRole(context.TODO(), user, role); err != nil {if !a.IsAlreadyExists(err){log.Fatal("UserGrantRole ", err)}}
}// 赋予角色权限
func (a *Auth)GrantPermission(user, key, rangeEnd string, permission clientv3.PermissionType) {if resp, err := a.Cli.RoleGrantPermission(context.TODO(),user, // role namekey, // keyrangeEnd, // range endpermission,); err != nil {log.Fatal("RoleGrantPermission ", err)}else {fmt.Printf("RoleGrantPermission resp %v\n", resp)}
}
租约操作
package leaseimport ("context""fmt""github.com/coreos/etcd/clientv3""log""time"
)type Lease struct {Cli *clientv3.Client
}func (l *Lease)LeaseExample() {//l.grant()//l.keepAlived()//l.keepAliveOnce()l.revoke()
}// 申请租约
func (l *Lease)grant() {// minimum lease TTL is 5-second 最小租约为5秒,设为1也没用,实际还是会有5秒resp, err := l.Cli.Grant(context.TODO(), 1)if err != nil {log.Fatal(err)}// after 5 seconds, the key 'aa' will be removed_, err = l.Cli.Put(context.TODO(), "aa", "bar", clientv3.WithLease(resp.ID))if err != nil {log.Fatal(err)}rsp, _ := l.Cli.Get(context.TODO(), "aa")fmt.Println(rsp.Kvs)time.Sleep(10*time.Second)// 此时key aa被删除了rsp, _ = l.Cli.Get(context.TODO(), "aa")fmt.Println("it is none", rsp.Kvs)
}// 永久续约
func (l *Lease)keepAlived() {// 申请5秒的租约resp, err := l.Cli.Grant(context.TODO(), 5)if err != nil {log.Fatal(err)}_, err = l.Cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))if err != nil {log.Fatal(err)}// the key 'foo' will be kept forever, foo会被永久保存ch, kaerr := l.Cli.KeepAlive(context.TODO(), resp.ID)if kaerr != nil {log.Fatal(kaerr)}fmt.Println(ch)ka := <-chfmt.Println("ttl:", ka.TTL)// 虽然初始租约为5秒,但此时还是能获取,foo已被永久保存,除非手动删除time.Sleep(10*time.Second)rsp, _ := l.Cli.Get(context.TODO(), "foo")fmt.Println(rsp)
}// 续约一次
func (l *Lease)keepAliveOnce() {resp, err := l.Cli.Grant(context.TODO(), 5)if err != nil {log.Fatal(err)}_, err = l.Cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))if err != nil {log.Fatal(err)}// to renew the lease only once, 只会续约一次,5+5=10秒存活时间ka, kaerr := l.Cli.KeepAliveOnce(context.TODO(), resp.ID)if kaerr != nil {log.Fatal(kaerr)}fmt.Println("ttl:", ka.TTL)// 还在time.Sleep(7*time.Second)rsp, _ := l.Cli.Get(context.TODO(), "foo")fmt.Println(rsp)fmt.Println(ka.TTL)// 还在time.Sleep(1*time.Second)rsp, _ = l.Cli.Get(context.TODO(), "foo")fmt.Println(rsp)fmt.Println(ka.TTL)// 已删除time.Sleep(3*time.Second)rsp, _ = l.Cli.Get(context.TODO(), "foo")fmt.Println(rsp)fmt.Println(ka.TTL)
}// 删除租约
func (l *Lease) revoke() {// 5秒租约resp, err := l.Cli.Grant(context.TODO(), 5)if err != nil {log.Fatal(err)}// foo将在租约结束即5秒后被删除_, err = l.Cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))if err != nil {log.Fatal(err)}// 此时还能获取到gresp, err := l.Cli.Get(context.TODO(), "foo")if err != nil {log.Fatal(err)}fmt.Println("number of keys:", len(gresp.Kvs))// revoking lease expires the key attached to its lease ID, 使租约失效,立即删除keyrsp, err := l.Cli.Revoke(context.TODO(), resp.ID)if err != nil {log.Fatal(err)}else {fmt.Println(rsp)}// 还不到5秒,但已不能获取gresp, err = l.Cli.Get(context.TODO(), "foo")if err != nil {log.Fatal(err)}fmt.Println("number of keys:", len(gresp.Kvs))
}
Watch操作
package watchimport ("context""fmt""github.com/coreos/etcd/clientv3""time"
)type Watch struct {Cli *clientv3.Client
}func (w *Watch)WatchExample() {//w.watch()//w.watchWithPrefix()w.progressNotify()
}func (w *Watch)watch() {go func() {time.Sleep(1*time.Second)_, _ = w.Cli.Put(context.TODO(), "foo", "bar")}()go func() {time.Sleep(1*time.Second)_, _ = w.Cli.Put(context.TODO(), "foo", "bar2")}()rch := w.Cli.Watch(context.Background(), "foo")// 一直监听不会退出for wresp := range rch {for _, ev := range wresp.Events {fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)}}
}func (w *Watch)watchWithPrefix() {// 模拟一段时间后添加和删除keygo func() {time.Sleep(1*time.Second)_, _ = w.Cli.Put(context.TODO(), "foo1", "bar")}()go func() {time.Sleep(1*time.Second)_, _ = w.Cli.Put(context.TODO(), "foo2", "bar2")}()go func() {time.Sleep(1*time.Second)_, _ = w.Cli.Delete(context.TODO(), "foo2")}()// 前缀监听rch := w.Cli.Watch(context.Background(), "foo", clientv3.WithPrefix())// 范围监听//rch := cli.Watch(context.Background(), "foo1", clientv3.WithRange("foo4"))// 一直监听不会退出for wresp := range rch {for _, ev := range wresp.Events {fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)}}
}func (w *Watch) progressNotify() {go func() {time.Sleep(1*time.Second)_, _ = w.Cli.Put(context.TODO(), "foo", "bar")}()go func() {time.Sleep(2*time.Second)_, _ = w.Cli.Delete(context.TODO(), "foo")}()// 进度通知rch := w.Cli.Watch(context.Background(), "foo", clientv3.WithProgressNotify())//wresp := <-rchfor wresp := range rch {for _, ev := range wresp.Events {fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)fmt.Printf("wresp.Header.Revision: %d\n", wresp.Header.Revision)fmt.Println("wresp.IsProgressNotify:", wresp.IsProgressNotify())}}
}
leader选举
package electionimport ("context""fmt""github.com/coreos/etcd/clientv3""github.com/coreos/etcd/clientv3/concurrency""log""sync""time"
)type Election struct {Cli *clientv3.Client
}func (el *Election)ElectionExample() {// create three separate sessions for election competitions1, err := concurrency.NewSession(el.Cli)if err != nil {log.Fatal(err)}defer s1.Close()e1 := concurrency.NewElection(s1, "/my-election/")s2, err := concurrency.NewSession(el.Cli)if err != nil {log.Fatal(err)}defer s2.Close()e2 := concurrency.NewElection(s2, "/my-election/")s3, err := concurrency.NewSession(el.Cli)if err != nil {log.Fatal(err)}defer s3.Close()e3 := concurrency.NewElection(s3, "/my-election/")// create competing candidates, with e1 initially losing to e2 or e3var wg sync.WaitGroupwg.Add(3)electc := make(chan *concurrency.Election, 3)go func() {defer wg.Done()// delay candidacy so e2 wins firsttime.Sleep(3 * time.Second)if err := e1.Campaign(context.Background(), "e1"); err != nil {log.Fatal(err)}electc <- e1}()go func() {defer wg.Done()if err := e2.Campaign(context.Background(), "e2"); err != nil {log.Fatal(err)}electc <- e2}()go func() {defer wg.Done()// 一直阻塞直到被选举上或者发生错误或者context cancel掉if err := e3.Campaign(context.Background(), "e3"); err != nil {log.Fatal(err)}electc <- e3}()cctx, cancel := context.WithCancel(context.TODO())defer cancel()e := <-electcfmt.Println("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value)) // e2或者e3// resign so next candidate can be elected, 重新开始选举if err := e.Resign(context.TODO()); err != nil {log.Fatal(err)}e = <-electcfmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value)) // e2或者e3// resign so next candidate can be elected, 重新开始选举if err := e.Resign(context.TODO()); err != nil {log.Fatal(err)}e = <-electcfmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value)) // e1wg.Wait()
}
数据交换
package transferimport ("context""fmt""github.com/coreos/etcd/clientv3""github.com/coreos/etcd/clientv3/concurrency""log""math/rand""sync"
)type Transfer struct {Cli *clientv3.Client
}func (t *Transfer)BalancesTransfer() {// set up "accounts",生成5条数据totalAccounts := 5for i := 0; i < totalAccounts; i++ {k := fmt.Sprintf("accts/%d", i)if _, err := t.Cli.Put(context.TODO(), k, "100"); err != nil {log.Fatal(err)}}exchange := func(stm concurrency.STM) error {// 随机两个数之间交易from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts)if from == to {// nothing to doreturn nil}// read valuesfromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to)fromV, toV := stm.Get(fromK), stm.Get(toK)fromInt, toInt := 0, 0// 存的是字符串,转为intfmt.Sscanf(fromV, "%d", &fromInt)fmt.Sscanf(toV, "%d", &toInt)// transfer amount,将原数据的一半转移给接收者xfer := fromInt / 2fromInt, toInt = fromInt-xfer, toInt+xfer// write backstm.Put(fromK, fmt.Sprintf("%d", fromInt))stm.Put(toK, fmt.Sprintf("%d", toInt))return nil}// concurrently exchange values between accountsvar wg sync.WaitGroupwg.Add(10)// 模拟10次随机数据转移for i := 0; i < 10; i++ {go func() {defer wg.Done()// 每一次数据交换都属于原子操作if _, serr := concurrency.NewSTM(t.Cli, exchange); serr != nil {log.Fatal(serr)}}()}wg.Wait()// confirm account sum matches sum from beginning. 10次随机数据转移后和还是500sum := 0accts, err := t.Cli.Get(context.TODO(), "accts/", clientv3.WithPrefix())if err != nil {log.Fatal(err)}for _, kv := range accts.Kvs {v := 0fmt.Sscanf(string(kv.Value), "%d", &v)sum += v}fmt.Println("account sum is", sum)
}
分布式锁
Mutex锁
package lockimport ("context""fmt""github.com/coreos/etcd/clientv3""github.com/coreos/etcd/clientv3/concurrency""log"
)type Lock struct {Cli *clientv3.Client
}// 不带租约的锁
func (l *Lock)LockExample() {// create two separate sessions for lock competitions1, err := concurrency.NewSession(l.Cli)if err != nil {log.Fatal(err)}defer s1.Close()m1 := concurrency.NewMutex(s1, "/my-lock/")s2, err := concurrency.NewSession(l.Cli)if err != nil {log.Fatal(err)}defer s2.Close()m2 := concurrency.NewMutex(s2, "/my-lock/")// acquire lock for s1, 先让s1获取到锁if err := m1.Lock(context.TODO()); err != nil {log.Fatal(err)}fmt.Println("acquired lock for s1")m2Locked := make(chan struct{})go func() {defer close(m2Locked)// wait until s1 is locks /my-lock/,阻塞直到取到锁if err := m2.Lock(context.TODO()); err != nil {log.Fatal(err)}}()// s1释放锁让s2去获取if err := m1.Unlock(context.TODO()); err != nil {log.Fatal(err)}fmt.Println("released lock for s1")// 等待防止进程退出<-m2Lockedfmt.Println("acquired lock for s2")}// 带租约的锁
func (l *Lock)LockKey(id int, ttl int64) {now := time.Now().Unix()fmt.Println(now)//创建一个租约lease, err := l.Cli.Grant(context.Background(), ttl)if err != nil{log.Fatal(err)}//租约与session绑定s, err := concurrency.NewSession(l.Cli, concurrency.WithLease(lease.ID))if err != nil {log.Fatal(err)}//close将不会等待租期到期即会被其他程序获取到锁,即立即释放锁//defer s.Close()//Orphan会在租期到期时释放锁defer s.Orphan()m := concurrency.NewMutex(s, "leaseLock2")// acquire lock for s1fmt.Println("WAIT LOCK", id)if err := m.Lock(context.Background()); err != nil {log.Fatal(err)}fmt.Printf("acquired lock for s%d\n", id)end := time.Now().Unix()fmt.Printf("wait time %d\n", end-now)
}
Mutex锁源码解析
// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {s := m.sclient := m.s.Client()// concurrency包基于lease封装了session,s.Lease()是一个64位的整型,每个客户端都有一个独立的lease,所有每个客户端可以生成一个唯一的key,m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())// CreateRevision是这个key创建时被分配的这个序号,当key不存在时,createRevision是0cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)// put self in lock waiters via myKey; oldest waiter holds lockput := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))// reuse key in case this session already holds the lockget := v3.OpGet(m.myKey)// fetch current holder to complete uncontended path with only one RPCgetOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)// 如果CreateRevision是0,表示当前客户端还没有创建这个key,则创建,否则取到这个keyresp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()if err != nil {return err}// 如果是创建,则获取到当前操作的revisionm.myRev = resp.Header.Revision// 否则获取到已有的revisionif !resp.Succeeded {m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision}// if no key on prefix / the minimum rev is key, already hold the lockownerKey := resp.Responses[1].GetResponseRange().Kvs// 如果没有以pfx开头的key或者当前的revision是以pfx开头的key中最小的revision,则获取到锁if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {m.hdr = resp.Headerreturn nil}// wait for deletion revisions prior to myKeyhdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)// release lock key if wait failedif werr != nil {m.Unlock(client.Ctx())} else {m.hdr = hdr}return werr
}// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))/*getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
v3.WithLastCreate()查询最新的revision
v3.WithMaxCreateRev(maxCreateRev)查询不超过maxCreateRev的revision,这里的maxCreateRev为当前的revision-1,也就说明监听的必定是上一个加锁的客户端
合起来就是查询不超过maxCreateRev的最新的revision,也就是获取到对当前key前一次操作的revision,然后去监听这个revision是否有删除操作,如果有,说明前一个加锁的操作已经释放掉锁了,自己获取锁*/for {resp, err := client.Get(ctx, pfx, getOpts...)if err != nil {return nil, err}if len(resp.Kvs) == 0 {return resp.Header, nil}lastKey := string(resp.Kvs[0].Key)if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {return nil, err}}
}func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {cctx, cancel := context.WithCancel(ctx)defer cancel()var wr v3.WatchResponsewch := client.Watch(cctx, key, v3.WithRev(rev))for wr = range wch {for _, ev := range wr.Events {// 监听到delete操作就返回if ev.Type == mvccpb.DELETE {return nil}}}if err := wr.Err(); err != nil {return err}if err := ctx.Err(); err != nil {return err}return fmt.Errorf("lost watcher waiting for delete")
}func (m *Mutex) Unlock(ctx context.Context) error {client := m.s.Client()// unlock就是把这个key删除if _, err := client.Delete(ctx, m.myKey); err != nil {return err}m.myKey = "\x00"m.myRev = -1return nil
}
使用Txn模拟实现乐观锁
func main(){...ChangeKey(cli)
}// 事务
func (kv *KV)Txn(cli *clientv3.Client) bool {key := "key"k, _ := cli.Get(context.TODO(), key)fmt.Println(k)// 模拟耗时操作,等待值被其他人修改time.Sleep(5*time.Second)ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)var txrsp *clientv3.TxnResponsevar err errorput := clientv3.OpPut(key, "XYZ")if len(k.Kvs) == 0{// len为0说明获取的时候还没有这个key存在,为了防止在这期间其他客户端创建了这个key,需要以下操作// CreateRevision是这个key创建时被分配的这个序号,当key不存在时,createRevision是0cmpExists := clientv3.Compare(clientv3.CreateRevision(key), "=", 0)txrsp, err = cli.Txn(ctx).If(cmpExists).Then(put).Else().Commit()}else {// ModRevision是修改的revision,每修改一次值加1,如果当前的revision和前面获取到的revision一致,说明没有被修改cmpModVersion := clientv3.Compare(clientv3.ModRevision(key), "=", k.Kvs[0].ModRevision)txrsp, err = cli.Txn(ctx).If(cmpModVersion).Then(put).Else().Commit()}cancel()if err != nil {log.Fatal(err)}// 如果上面IF判断成功,succeeded为true,否则为falsefmt.Println(txrsp.Succeeded)// 模拟值被修改后此处为omg,否则为XYZkv.Get(cli, key)return txrsp.Succeeded
}func ChangeKey(cli *clientv3.Client) {go func() {time.Sleep(2*time.Second)rsp, err := cli.Put(context.TODO(), "key", "omg")fmt.Println(rsp, err)fmt.Println("change to omg")}()for{succeed := Txn(cli)// 修改成功则退出if succeed{break}// 否则一秒后重新尝试time.Sleep(1*time.Second)}
}
问题
go mod tidy时的两个问题
go: etcd importsgithub.com/coreos/etcd/clientv3 tested bygithub.com/coreos/etcd/clientv3.test importsgithub.com/coreos/etcd/auth importsgithub.com/coreos/etcd/mvcc/backend importsgithub.com/coreos/bbolt: github.com/coreos/bbolt@v1.3.6: parsing go.mod:module declares its path as: go.etcd.io/bboltbut was required as: github.com/coreos/bboltgo: finding module for package google.golang.org/grpc/naming
etcd importsgithub.com/coreos/etcd/clientv3 tested bygithub.com/coreos/etcd/clientv3.test importsgithub.com/coreos/etcd/integration importsgithub.com/coreos/etcd/proxy/grpcproxy importsgoogle.golang.org/grpc/naming: module google.golang.org/grpc@latest found (v1.41.0), but does not contain package google.golang.org/grpc/naming
解决:
go mod init// 前一个问题,需要替换
go mod edit -replace github.com/coreos/bbolt@v1.3.6=go.etcd.io/bbolt@v1.3.6
// 后一个问题,grpc版本过高,需要降级
go mod edit -replace google.golang.org/grpc@v1.41.0=google.golang.org/grpc@v1.26.0
go mod tidy