1. 安装 Golang 的 Etcd 包
etcd clientgo getetcd clinet v3
go get -v github.com/coreos/etcd/clientv3
$GOPATH/src/github.com/coreos/etcd/clientv3protobufgrpc
etcdAPI
2. 连接客户端
etcdclientConfig
EndpointsetcdDialTimeoutclienterrclientclient
cli, err := clientv3.New(clientv3.Config{Endpoints:   []string{"localhost:2379"},// Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}DialTimeout: 5 * time.Second,
})
client
type Client struct {ClusterKVLeaseWatcherAuthMaintenance// Username is a user name for authentication.Username string// Password is a password for authentication.Password string// contains filtered or unexported fields
}
etcd
ClusteretcdKVK-VLeaseTTL=10keyWatcherAuthetcdMaintenanceetcdetcdleader
client
Client.KVinterfaceK-V
type KV interface {Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)// Delete deletes a key, or optionally using WithRange(end), [key, end).Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)// Compact compacts etcd KV history before the given rev.Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)Do(ctx context.Context, op Op) (OpResponse, error)// Txn creates a transaction.Txn(ctx context.Context) Txn
}
clientv3.NewKV()KV
kv := clientv3.NewKV(cli)
kvetcd
3. PUT 设置操作
putResp, err := kv.Put(context.TODO(),"/test/key1", "Hello etcd!")
goroutineContextkeyvalueetcdkey=/test/key1
Put
// Put puts a key-value pair into etcd.
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte{0x10, 0x20}).
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
Putlease IDkey
PutPutResponseKVresponseKVresponse
type (CompactResponse pb.CompactionResponsePutResponse     pb.PutResponseGetResponse     pb.RangeResponseDeleteResponse  pb.DeleteRangeResponseTxnResponse     pb.TxnResponse
)
clientv3VSCodePutResponsePutResponsepb.PutResponseVSCodePutResponse
type PutResponse struct {Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`// if prev_kv is set in the request, the previous key-value pair will be returned.PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}
HeaderrevisionPrevKvPutvaluenilPutResponse
fmt.Printf("PutResponse: %v, err: %v", putResp, err)
// output
// PutResponse: &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:3 raft_term:7  <nil>}, err: <nil>%
err
Putkey
kv.Put(context.TODO(),"/test/key2", "Hello World!")
// 再写一个同前缀的干扰项
kv.Put(context.TODO(), "/testspam", "spam")
/testkey1key2/testspam/test

代码示例:

package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints:   []string{"127.0.0.1:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一个客户端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 用于读写etcd的键值对kv := clientv3.NewKV(client)// clientv3.WithPrevKV() 是一个可选控制项,用于获取在设置当前键值对之前的该键的键值对// 有了该控制项后,putResp 才有 PrevKv 的属性,即获取之前的键值对。// context.TODO() 表示当前还不知道用哪个 context 控制该操作,先用该字段占位putResp, err := kv.Put(context.TODO(), "/demo/A/B", "hello", clientv3.WithPrevKV())if err != nil {fmt.Println(err)}fmt.Println("putResp is ", putResp)fmt.Println("Revision:", putResp.Header.Revision)if putResp.PrevKv != nil {fmt.Println("PrevValue:", string(putResp.PrevKv.Value))}}
4. GET 获取操作
KVGet
getResp, err := kv.Get(context.TODO(), "/test/key1")

其函数声明如下:

// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
PutGetWithFromKeykeykeykey
opOptionkey=/test/key1errkeyGetResponsepb.RangeResponsekey
type RangeResponse struct {Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`// kvs is the list of key-value pairs matched by the range request.// kvs is empty when count is requested.Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"`// more indicates if there are more keys to return in the requested range.More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`// count is set to the number of keys within the range when requested.Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
}
KvsGetk-vGetkeylen(Kvs)1key
RangeResponse.MoreCountwithLimit()GetGetWithPrefix/test
rangeResp, err := kv.Get(context.TODO(), "/test/", clientv3.WithPrefix())
WithPrefix()/test/key
etcdk-v/test/key
withPrefix()/test/[“/test/”, “/test0”)/0/test0/test/key
Put/testspam/test//Get/test/testspam
rangeResp.Kvs
[key:"/test/key1" create_revision:2 mod_revision:13 version:6 value:"Hello etcd!"  
key:"/test/key2" create_revision:5 mod_revision:14 version:4 value:"Hello World!" ]

代码示例:

package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一个客户端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 用于读写etcd的键值对kv := clientv3.NewKV(client)kv.Put(context.TODO(), "/demo/A/B", "BBB", clientv3.WithPrevKV())kv.Put(context.TODO(), "/demo/A/C", "CCC", clientv3.WithPrevKV())// 	读取/demo/A/为前缀的所有key// clientv3.WithPrefix() , clientv3.WithCountOnly() 可以有多个并以 逗号分隔即可getResp, err := kv.Get(context.TODO(), "/demo/A/", clientv3.WithPrefix() /*,clientv3.WithCountOnly()*/)if err != nil {fmt.Println(err)}fmt.Println(getResp.Kvs, getResp.Count)for _, resp := range getResp.Kvs {fmt.Printf("key: %s, value:%s\n", string(resp.Key), string(resp.Value))}
}

输出结果为:

[key:"/demo/A/B" create_revision:6 mod_revision:22 version:6 value:"BBB"  
key:"/demo/A/C" create_revision:7 mod_revision:23 version:12 value:"CCC" ] 2
key: /demo/A/B, value:BBB
key: /demo/A/C, value:CCC
5. Delete 操作

示例代码:

package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一个客户端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 用于读写etcd的键值对kv := clientv3.NewKV(client)kv.Put(context.TODO(), "/demo/A/B1", "BBB", clientv3.WithPrevKV())kv.Put(context.TODO(), "/demo/A/B2", "CCC", clientv3.WithPrevKV())kv.Put(context.TODO(), "/demo/A/B3", "DDD", clientv3.WithPrevKV())/*clientv3.WithFromKey() 表示针对的key操作是大于等于当前给定的keyclientv3.WithPrevKV() 表示返回的 response 中含有之前删除的值,否则下面的 delResp.PrevKvs 为空*/delResp, err := kv.Delete(context.TODO(), "/demo/A/B",clientv3.WithFromKey(), clientv3.WithPrevKV())if err != nil {fmt.Println(err)}// 查看被删除的 key 和 value 是什么if delResp.PrevKvs != nil {// if len(delResp.PrevKvs) != 0 {for _, kvpair := range delResp.PrevKvs {fmt.Println("已删除:", string(kvpair.Key), string(kvpair.Value))}}
}

输出结果:

已删除: /demo/A/B1 BBB
已删除: /demo/A/B2 CCC
已删除: /demo/A/B3 DDD
6. Lease 租约操作
etcdLease
lease := clientv3.NewLease(cli)
leaseLeaseLease
type Lease interface {// Grant 创建一个新租约Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)// Revoke 销毁给定租约ID的租约Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)// TimeToLive retrieves the lease information of the given lease ID.TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)// Leases retrieves all leases.Leases(ctx context.Context) (*LeaseLeasesResponse, error)// KeepAlive keeps the given lease alive forever.KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)// KeepAliveOnce renews the lease once. In most of the cases, KeepAlive// should be used instead of KeepAliveOnce.KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)// Close releases all resources Lease keeps for efficient communication// with the etcd server.Close() error
}
Lease
GrantRevokeTimeToLiveLeasesKeepAliveKeepAliveOnceClose
keyTTL
grantResp, err := lease.Grant(context.TODO(), 10)
grantResponse
// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
type LeaseGrantResponse struct {*pb.ResponseHeaderID    LeaseIDTTL   int64Error string
}
ID
Leaseetcdkey
kv.Put(context.TODO(), "/test/vanish", "vanish in 10s", clientv3.WithLease(grantResp.ID))
PutLeasePuterrorLease
LeaseTTLLeaseKeepAliveOnce()key
keepResp, err := lease.KeepAliveOnce(context.TODO(), grantResp.ID)
KeepAlive()<-chan *LeaseKeepAliveResponse
KeepAlive()KeepAlivePutLeaseLeaseetcdAPIPut with LeaseerrLease

示例代码

package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一个客户端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 创建一个lease(租约)对象lease := clientv3.NewLease(client)// 申请一个10秒的租约leaseGrantResp, err := lease.Grant(context.TODO(), 10)if err != nil {fmt.Println(err)return}// 拿到租约的IDleaseId := leaseGrantResp.ID// 自动永久续租keepRespChan, err := lease.KeepAlive(context.TODO(), leaseId)if err != nil {fmt.Println(err)return}// 处理续约应答的协程go func() {for {select {case keepResp := <-keepRespChan:if keepResp == nil {fmt.Println("租约已经失效了")goto END} else { // 每秒会续租一次, 所以就会受到一次应答fmt.Println("收到自动续租应答:", keepResp.ID)}}}END:}()// 获得kv API子集kv := clientv3.NewKV(client)// Put一个KV, 让它与租约关联起来, 从而实现10秒后自动过期putResp, err := kv.Put(context.TODO(), "/demo/A/B1", "hello", clientv3.WithLease(leaseId))if err != nil {fmt.Println(err)return}fmt.Println("写入成功:", putResp.Header.Revision)// 定时的看一下key过期了没有for {getResp, err := kv.Get(context.TODO(), "/demo/A/B1")if err != nil {fmt.Println(err)return}if getResp.Count == 0 {fmt.Println("kv过期了")break}fmt.Println("还没过期:", getResp.Kvs)time.Sleep(2 * time.Second)}
}

输出结果:

收到自动续租应答: 8488292048996991588
写入成功: 80
还没过期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
还没过期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
收到自动续租应答: 8488292048996991588
还没过期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
还没过期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
收到自动续租应答: 8488292048996991588
7. Op 获取设置联合操作
OpGetPutOpAPI
KVDoOp
// Do applies a single Op on KV without a transaction.
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)
OpPut/Get/Delete…OpResponsePutResponse/GetResponse…
ClientOp
  • func OpDelete(key string, opts …OpOption) Op
  • func OpGet(key string, opts …OpOption) Op
  • func OpPut(key, val string, opts …OpOption) Op
  • func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op
KV.PutKV.GET
cli, err := clientv3.New(clientv3.Config{Endpoints:   endpoints,DialTimeout: dialTimeout,
})
if err != nil {log.Fatal(err)
}
defer cli.Close()
ops := []clientv3.Op{clientv3.OpPut("put-key", "123"),clientv3.OpGet("put-key"),clientv3.OpPut("put-key", "456")}
for _, op := range ops {if _, err := cli.Do(context.TODO(), op); err != nil {log.Fatal(err)}
}
OpDoopResp
type OpResponse struct {put *PutResponseget *GetResponsedel *DeleteResponsetxn *TxnResponse
}

你的操作是什么类型,你就用哪个指针来访问对应的结果。

示例代码:

package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一个客户端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 获得kv API子集kv := clientv3.NewKV(client)// 创建Op: operationputOp := clientv3.OpPut("/demo/A/B1", "BBBBB")// 执行OP 	// kv.Do(op)opResp, err := kv.Do(context.TODO(), putOp)if err != nil {fmt.Println(err)return}fmt.Println("写入Revision:", opResp.Put().Header.Revision)// 创建OpgetOp := clientv3.OpGet("/demo/A/B1")// 执行OPopResp, err = kv.Do(context.TODO(), getOp)if err != nil {fmt.Println(err)return}// 打印 create rev == mod revfmt.Println("数据Revision:", opResp.Get().Kvs[0].ModRevision) fmt.Println("数据value:", string(opResp.Get().Kvs[0].Value))
}

输出结果:

写入Revision: 105
数据Revision: 105
数据value: BBBBB
8. Txn 事务操作
etcdif … then … else …Txn
type Txn interface {// If takes a list of comparison. If all comparisons passed in succeed,// the operations passed into Then() will be executed. Or the operations// passed into Else() will be executed.If(cs ...Cmp) Txn// Then takes a list of operations. The Ops list will be executed, if the// comparisons passed in If() succeed.Then(ops ...Op) Txn// Else takes a list of operations. The Ops list will be executed, if the// comparisons passed in If() fail.Else(ops ...Op) Txn// Commit tries to commit the transaction.Commit() (*TxnResponse, error)
}
Txn
IfCmpThenOpElse中Op
KV
txn := kv.Txn(context.TODO())
k1v1k1Putk2k3Putk4k5
kv.Txn(context.TODO()).If(clientv3.Compare(clientv3.Value(k1), ">", v1),clientv3.Compare(clientv3.Version(k1), "=", 2)
).Then(clientv3.OpPut(k2,v2), clentv3.OpPut(k3,v3)
).Else(clientv3.OpPut(k4,v4), clientv3.OpPut(k5,v5)
).Commit()
clientv3.Value()key
  • func CreateRevision(key string) Cmp:key=xxx的创建版本必须满足…
  • func LeaseValue(key string) Cmp:key=xxx的Lease ID必须满足…
  • func ModRevision(key string) Cmp:key=xxx的最后修改版本必须满足…
  • func Value(key string) Cmp:key=xxx的创建值必须满足…
  • func Version(key string) Cmp:key=xxx的累计更新次数必须满足…
package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一个客户端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// lease实现锁自动过期:// op操作// txn事务: if else then// 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key)lease := clientv3.NewLease(client)// 申请一个5秒的租约leaseGrantResp, err := lease.Grant(context.TODO(), 5)if err != nil {fmt.Println(err)return}// 拿到租约的IDleaseId := leaseGrantResp.ID// 准备一个用于取消自动续租的contextctx, cancelFunc := context.WithCancel(context.TODO())// 确保函数退出后, 自动续租会停止defer cancelFunc()defer lease.Revoke(context.TODO(), leaseId)// 5秒后会取消自动续租keepRespChan, err := lease.KeepAlive(ctx, leaseId)if err != nil {fmt.Println(err)return}// 处理续约应答的协程go func() {for {select {case keepResp := <-keepRespChan:if keepResp == nil {fmt.Println("租约已经失效了")goto END} else { // 每秒会续租一次, 所以就会受到一次应答fmt.Println("收到自动续租应答:", keepResp.ID)}}}END:}()//  if 不存在key, then 设置它, else 抢锁失败kv := clientv3.NewKV(client)// 创建事务txn := kv.Txn(context.TODO())// 定义事务// 如果key不存在txn.If(clientv3.Compare(clientv3.CreateRevision("/demo/A/B1"), "=", 0)).Then(clientv3.OpPut("/demo/A/B1", "xxx", clientv3.WithLease(leaseId))).Else(clientv3.OpGet("/demo/A/B1")) // 否则抢锁失败// 提交事务txnResp, err := txn.Commit()if err != nil {fmt.Println(err)return // 没有问题}// 判断是否抢到了锁if !txnResp.Succeeded {fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))return}// 2, 处理业务fmt.Println("处理任务")time.Sleep(5 * time.Second)// 3, 释放锁(取消自动续租, 释放租约)// defer 会把租约释放掉, 关联的KV就被删除了
}

输出结果:

收到自动续租应答: 8488292048996991680
锁被占用: BBBBB
9. Watch 监听操作
WatchWatchWatchChan
type WatchChan <-chan WatchResponse
type WatchResponse struct {Header pb.ResponseHeaderEvents []*EventCompactRevision int64Canceled boolCreated bool
}
keyWatchChanWatchResponse
Watchetcd keyWatchkeygoroutineWatchChangoroutineappConfig
type AppConfig struct {config1 stringconfig2 string
}var appConfig Appconfigfunc watchConfig(clt *clientv3.Client, key string, ss interface{}) {watchCh := clt.Watch(context.TODO(), key)go func() {for res := range watchCh {value := res.Events[0].Kv.Valueif err := json.Unmarshal(value, ss); err != nil {fmt.Println("now", time.Now(), "watchConfig err", err)continue}fmt.Println("now", time.Now(), "watchConfig", ss)}}()
}
watchConfig(client, "config_key", &appConfig)

完整示例代码:

package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3""github.com/coreos/etcd/mvcc/mvccpb"
)func main() {config := clientv3.Config{Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一个客户端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 获得kv API子集kv := clientv3.NewKV(client)// 模拟etcd中KV的变化go func() {for {kv.Put(context.TODO(), "/demo/A/B1", "i am B1")kv.Delete(context.TODO(), "/demo/A/B1")time.Sleep(1 * time.Second)}}()// 先GET到当前的值,并监听后续变化getResp, err := kv.Get(context.TODO(), "/demo/A/B1")if err != nil {fmt.Println(err)return}// 现在key是存在的if len(getResp.Kvs) != 0 {fmt.Println("当前值:", string(getResp.Kvs[0].Value))}// 当前etcd集群事务ID, 单调递增的watchStartRevision := getResp.Header.Revision + 1// 创建一个watcherwatcher := clientv3.NewWatcher(client)// 启动监听fmt.Println("从该版本向后监听:", watchStartRevision)// 创建一个 5s 后取消的上下文ctx, cancelFunc := context.WithCancel(context.TODO())time.AfterFunc(5*time.Second, func() {cancelFunc()})// 该监听动作在 5s 后取消watchRespChan := watcher.Watch(ctx, "/demo/A/B1", clientv3.WithRev(watchStartRevision))// 处理kv变化事件for watchResp := range watchRespChan {for _, event := range watchResp.Events {switch event.Type {case mvccpb.PUT:fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)case mvccpb.DELETE:fmt.Println("删除了", "Revision:", event.Kv.ModRevision)}}}}

输出结果:

从该版本向后监听: 94
修改为: i am B1 Revision: 94 94
删除了 Revision: 95
修改为: i am B1 Revision: 96 96
删除了 Revision: 97
修改为: i am B1 Revision: 98 98
删除了 Revision: 99
修改为: i am B1 Revision: 100 100
删除了 Revision: 101
修改为: i am B1 Revision: 102 102
删除了 Revision: 103
8. 参考资料

https://segmentfault.com/a/1190000020868242?utm_source=tag-newest
https://godoc.org/github.com/coreos/etcd/clientv3
https://pkg.go.dev/go.etcd.io/etcd/clientv3?tab=doc