关于etcd的详细介绍和实战清查看以下文章
本文仅对go操作etcd的put,get,watch过程做一下简单介绍。
安装第三方库
go get go.etcd.io/etcd/clientv3
由于go版本等问题,可能会在安装时报错,具体解决方案可以查看以下文章:
put操作
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
// put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "q1mi", "dsb")
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v\n", err)
return
}
}
上述代码的逻辑步骤是:
- 调用clientv3包中的New函数,传入参数配置对象,返回一个客户端的client(结构体)。
- 构造上下文context
- client调用里面字段kv接口的方法put将数据存到etcd
client的数据结构为:
type Client struct {
Cluster
KV
Lease
Watcher
Auth
Maintenance
// Username is a user name for authentication.
Username string
// Password is a password for authentication.
Password string
// contains filtered or unexported fields
}
kv接口的数据结构:
type KV
type KV interface {
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
Do(ctx context.Context, op Op) (OpResponse, error)
Txn(ctx context.Context) Txn
}
get操作
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "q1mi")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return
}
for _, ev := range resp.Kvs {
fmt.Printf("%s:%s\n", ev.Key, ev.Value)
}
}
上述代码的逻辑步骤是:
- 同普通一样,先对client初始化
- 创建上下文
- 直接调用cli.Get
这里说一下返回值得数据结构:*GetResponse
type RangeResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"`
// kvs is the list of key-value pairs matched by the range request.
// kvs is empty when count is requested.
Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs,proto3" json:"kvs,omitempty"`
// more indicates if there are more keys to return in the requested range.
More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`
// count is set to the number of keys within the range when requested.
Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
get返回的信息会存储在RangeResponse结构体的Kvs字段,这是一个KeyValue结构体的指针类型切片,因此取值时要遍历这个切片。
watch操作
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
// watch demo
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
// watch key:q1mi change
rch := cli.Watch(context.Background(), "q1mi") // <-chan WatchResponse
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
上述代码的逻辑步骤是:
- 同普通一样,先对client初始化
创建上下文注意这里并没有用with系列函数创建上下文,而是直接用根节点context,这么做可以保证一直监控,知道有变化发生。
// If the context is "context.Background/TODO", returned "WatchChan" will
// not be closed and block until event is triggered, except when server
// returns a non-recoverable error (e.g. ErrCompacted).
- 直接调用cli.Watch
对返回值说明:
cli.Watch返回一个WatchResponse类型通道。
type WatchChan <-chan WatchResponse
type WatchResponse struct {
Header pb.ResponseHeader
Events []*Event
// CompactRevision is the minimum revision the watcher may receive.
CompactRevision int64
// Canceled is used to indicate watch failure.
// If the watch failed and the stream was about to close, before the channel is closed,
// the channel sends a final response that has Canceled set to true with a non-nil Err().
Canceled bool
// Created is used to indicate the creation of the watcher.
Created bool
// contains filtered or unexported fields
}
返回值存储在WatchResponse例的Event字段中,这个一个指针类型的切片,因此取值时要遍历此切片。