etcd/clientv3

如果还不熟悉etcd推荐先阅读

Let's get started now!

安装package

gogetetcd clinet v3
go get github.com/coreos/etcd/clientv3
$GOPATH/src/github.com/coreos/etcd/clientv3protobufgrpc

官方文档地址:https://godoc.org/github.com/coreos/etcd/clientv3

文档中列出了Go官方实现的etcd client中支持的所有方法,方法还是很多的,我们主要梳理一下使用etcd时经常用到的主要API并进行演示。

连接客户端

用程序访问etcd首先要创建client,它需要传入一个Config配置,这里传了2个选项:

  • Endpoints:etcd的多个节点服务地址。
  • DialTimeout:创建client的首次连接超时时间,这里传了5秒,如果5秒都没有连接成功就会返回err;一旦client创建成功,我们就不用再关心后续底层连接的状态了,client内部会重连。
cli, err := clientv3.New(clientv3.Config{   Endpoints:[]string{"localhost:2379"},
   //Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}
   DialTimeout: 5 * time.Second,
})
client
type Client struct {
    Cluster
    KV
    Lease
    Watcher
    Auth
    Maintenance
    // Username is a user name for authentication.
    Username string
    // Password is a password for authentication.
    Password string
    // contains filtered or unexported fields
}

类型中的成员是etcd客户端几何核心功能模块的具体实现,它们分别用于:

  • Cluster:向集群里增加etcd服务端节点之类,属于管理员操作。
  • KV:我们主要使用的功能,即K-V键值库的操作。
  • Lease:租约相关操作,比如申请一个TTL=10秒的租约(应用给key可以实现键值的自动过期)。
  • Watcher:观察订阅,从而监听最新的数据变化。
  • Auth:管理etcd的用户和权限,属于管理员操作。
  • Maintenance:维护etcd,比如主动迁移etcd的leader节点,属于管理员操作。

我们需要使用什么功能,就去client里获取对应的成员即可。

是一个
type KV interface {

    Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

    Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

    // Delete deletes a key, or optionally using WithRange(end), [key, end).
    Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)

    // Compact compacts etcd KV history before the given rev.
    Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)

    Do(ctx context.Context, op Op) (OpResponse, error)

    // Txn creates a transaction.
    Txn(ctx context.Context) Txn
}
clientv3.NewKV()
kv := clientv3.NewKV(cli)
kv

Put

putResp, err := kv.Put(context.TODO(),"/test/key1", "Hello etcd!")
goroutineContext

Put函数的声明如下:

// Put puts a key-value pair into etcd.
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte{0x10, 0x20}).
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

除了上面例子中的三个的参数,还支持一个变长参数,可以传递一些控制项来影响Put的行为,例如可以携带一个lease ID来支持key过期。

Put操作返回的是PutResponse,不同的KV操作对应不同的response结构,所有KV操作返回的response结构如下:

type (
   CompactResponse pb.CompactionResponse
   PutResponse     pb.PutResponse
   GetResponse     pb.RangeResponse
   DeleteResponse  pb.DeleteRangeResponse
   TxnResponse     pb.TxnResponse
)
clientv3PutResponse
type PutResponse struct {
   Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
   // if prev_kv is set in the request, the previous key-value pair will be returned.
   PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}
PutResponse
fmt.Printf("PutResponse: %v, err: %v", putResp, err)

// output
// PutResponse: &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:3 raft_term:7  <nil>}, err: <nil>%

我们需要判断err来确定操作是否成功。

我们再Put其他2个key,用于后续演示:

kv.Put(context.TODO(),"/test/key2", "Hello World!")
// 再写一个同前缀的干扰项
kv.Put(context.TODO(), "/testspam", "spam")

现在/test目录下有两个键: key1和key2, 而/testspam并不归属于/test目录

Get

Get
getResp, err := kv.Get(context.TODO(), "/test/key1")

其函数声明如下:

// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

和Put类似,函数注释里提示我们可以传递一些控制参数来影响Get的行为,比如:WithFromKey表示读取从参数key开始递增的所有key,而不是读取单个key。

在上面的例子中,我没有传递opOption,所以就是获取key=/test/key1的最新版本数据。

这里err并不能反馈出key是否存在(只能反馈出本次操作因为各种原因异常了),我们需要通过GetResponse(实际上是pb.RangeResponse)判断key是否存在:

type RangeResponse struct {
    Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
    // kvs is the list of key-value pairs matched by the range request.
    // kvs is empty when count is requested.
    Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"`
    // more indicates if there are more keys to return in the requested range.
    More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`
    // count is set to the number of keys within the range when requested.
    Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
}

Kvs字段,保存了本次Get查询到的所有k-v对,因为上述例子只Get了一个单key,所以只需要判断一下len(Kvs)是否等于1即可知道key是否存在。

RangeResponse.MoreCountwithLimit()Get

接下来,我们通过给Get查询增加WithPrefix选项,获取/test目录下的所有子元素:

rangeResp, err := kv.Get(context.TODO(), "/test/", clientv3.WithPrefix())
WithPrefix()/test/
etcd
withPrefix()/test/[“/test/”,“/test0”)/0/test0/test/
/testspam/test/Get/test/testspam

打印rangeResp.Kvs可以看到获得了两个键值:

[key:"/test/key1" create_revision:2 mod_revision:13 version:6 value:"Hello etcd!"  key:"/test/key2" create_revision:5 mod_revision:14 version:4 value:"Hello World!" ]

Lease

etcd客户端的Lease对象可以通过以下的代码获取到

lease := clientv3.NewLease(cli)

lease对象是Lease接口的实现,Lease接口的声明如下:

type Lease interface {
    // Grant 创建一个新租约
    Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)

    // Revoke 销毁给定租约ID的租约
    Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)

    // TimeToLive retrieves the lease information of the given lease ID.
    TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)

    // Leases retrieves all leases.
    Leases(ctx context.Context) (*LeaseLeasesResponse, error)

    // KeepAlive keeps the given lease alive forever.
    KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)

    // KeepAliveOnce renews the lease once. In most of the cases, KeepAlive
    // should be used instead of KeepAliveOnce.
    KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)

    // Close releases all resources Lease keeps for efficient communication
    // with the etcd server.
    Close() error
}

Lease提供了以下功能:

  • Grant:分配一个租约。
  • Revoke:释放一个租约。
  • TimeToLive:获取剩余TTL时间。
  • Leases:列举所有etcd中的租约。
  • KeepAlive:自动定时的续约某个租约。
  • KeepAliveOnce:为某个租约续约一次。
  • Close:释放当前客户端建立的所有租约。

要想实现key自动过期,首先得创建一个租约,下面的代码创建一个TTL为10秒的租约:

grantResp, err := lease.Grant(context.TODO(), 10)

返回的grantResponse的结构体声明如下:

// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
type LeaseGrantResponse struct {
    *pb.ResponseHeader
    ID    LeaseID
    TTL   int64
    Error string
}

在应用程序代码中主要使用到的是租约ID。

接下来我们用这个Lease往etcd中存储一个10秒过期的key:

kv.Put(context.TODO(), "/test/vanish", "vanish in 10s", clientv3.WithLease(grantResp.ID))

这里特别需要注意,有一种情况是在Put之前Lease已经过期了,那么这个Put操作会返回error,此时你需要重新分配Lease。

当我们实现服务注册时,需要主动给Lease进行续约,通常是以小于TTL的间隔循环调用Lease的KeepAliveOnce()方法对租约进行续期,一旦某个服务节点出错无法完成租约的续期,等key过期后客户端即无法在查询服务时获得对应节点的服务,这样就通过租约到期实现了服务的错误隔离。

keepResp, err := lease.KeepAliveOnce(context.TODO(), grantResp.ID)
KeepAlive()<-chan *LeaseKeepAliveResponseKeepAlive()

KeepAlive和Put一样,如果在执行之前Lease就已经过期了,那么需要重新分配Lease。etcd并没有提供API来实现原子的Put with Lease,需要我们自己判断err重新分配Lease。

Op

Op字面意思就是”操作”,Get和Put都属于Op,只是为了简化用户开发而开放的特殊API。

KV对象有一个Do方法接受一个Op:

// Do applies a single Op on KV without a transaction.
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)

其参数Op是一个抽象的操作,可以是Put/Get/Delete…;而OpResponse是一个抽象的结果,可以是PutResponse/GetResponse…

可以通过Client中定义的一些方法来创建Op:

  • func OpDelete(key string, opts …OpOption) Op
  • func OpGet(key string, opts …OpOption) Op
  • func OpPut(key, val string, opts …OpOption) Op
  • func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op

其实和直接调用KV.Put,KV.GET没什么区别。

下面是一个例子:

cli, err := clientv3.New(clientv3.Config{
    Endpoints:   endpoints,
    DialTimeout: dialTimeout,
})
if err != nil {
    log.Fatal(err)
}
defer cli.Close()

ops := []clientv3.Op{
    clientv3.OpPut("put-key", "123"),
    clientv3.OpGet("put-key"),
    clientv3.OpPut("put-key", "456")}

for _, op := range ops {
    if _, err := cli.Do(context.TODO(), op); err != nil {
        log.Fatal(err)
    }
}

把Op交给Do方法执行,返回的opResp结构如下:

type OpResponse struct {
    put *PutResponse
    get *GetResponse
    del *DeleteResponse
    txn *TxnResponse
}

你的操作是什么类型,你就用哪个指针来访问对应的结果。

Txn事务

etcd中事务是原子执行的,只支持if … then … else …这种表达。首先来看一下Txn中定义的方法:

type Txn interface {
    // If takes a list of comparison. If all comparisons passed in succeed,
    // the operations passed into Then() will be executed. Or the operations
    // passed into Else() will be executed.
    If(cs ...Cmp) Txn

    // Then takes a list of operations. The Ops list will be executed, if the
    // comparisons passed in If() succeed.
    Then(ops ...Op) Txn

    // Else takes a list of operations. The Ops list will be executed, if the
    // comparisons passed in If() fail.
    Else(ops ...Op) Txn

    // Commit tries to commit the transaction.
    Commit() (*TxnResponse, error)
}

Txn必须是这样使用的:If(满足条件) Then(执行若干Op) Else(执行若干Op)。

If中支持传入多个Cmp比较条件,如果所有条件满足,则执行Then中的Op(上一节介绍过Op),否则执行Else中的Op。

首先,我们需要开启一个事务,这是通过KV对象的方法实现的:

txn := kv.Txn(context.TODO())

下面的测试程序,判断如果k1的值大于v1并且k1的版本号是2,则Put 键值k2和k3,否则Put键值k4和k5。

kv.Txn(context.TODO()).If(
 clientv3.Compare(clientv3.Value(k1), ">", v1),
 clientv3.Compare(clientv3.Version(k1), "=", 2)
).Then(
 clientv3.OpPut(k2,v2), clentv3.OpPut(k3,v3)
).Else(
 clientv3.OpPut(k4,v4), clientv3.OpPut(k5,v5)
).Commit()

类似于clientv3.Value()用于指定key属性的,有这么几个方法:

  • func CreateRevision(key string) Cmp:key=xxx的创建版本必须满足…
  • func LeaseValue(key string) Cmp:key=xxx的Lease ID必须满足…
  • func ModRevision(key string) Cmp:key=xxx的最后修改版本必须满足…
  • func Value(key string) Cmp:key=xxx的创建值必须满足…
  • func Version(key string) Cmp:key=xxx的累计更新次数必须满足…

Watch

WatchWatchChan
type WatchChan <-chan WatchResponse

type WatchResponse struct {
    Header pb.ResponseHeader
    Events []*Event

    CompactRevision int64

    Canceled bool

    Created bool
}
WatchChanWatchResponse
type AppConfig struct {
  config1 string
  config2 string
}

var appConfig Appconfig

func watchConfig(clt *clientv3.Client, key string, ss interface{}) {
    watchCh := clt.Watch(context.TODO(), key)
    go func() {
        for res := range watchCh {
            value := res.Events[0].Kv.Value
            if err := json.Unmarshal(value, ss); err != nil {
                fmt.Println("now", time.Now(), "watchConfig err", err)
                continue
            }
            fmt.Println("now", time.Now(), "watchConfig", ss)
        }
    }()
}

watchConfig(client, "config_key", &appConfig)

golang etcd clientv3的主要功能就是这些,希望能帮大家梳理出学习脉络,这样工作中应用到etcd时再看官方文档就会容易很多。