golang操作etcd简单讲解

关于etcd的详细介绍和实战清查看以下文章

本文仅对go操作etcd的put,get,watch过程做一下简单介绍。

安装第三方库

go get go.etcd.io/etcd/clientv3

由于go版本等问题,可能会在安装时报错,具体解决方案可以查看以下文章:

put操作

package main

import (
	"context"
	"fmt"
	"time"

	"go.etcd.io/etcd/clientv3"
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		// handle error!
		fmt.Printf("connect to etcd failed, err:%v\n", err)
		return
	}
    fmt.Println("connect to etcd success")
	defer cli.Close()
	// put
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	_, err = cli.Put(ctx, "q1mi", "dsb")
	cancel()
	if err != nil {
		fmt.Printf("put to etcd failed, err:%v\n", err)
		return
	}
}

上述代码的逻辑步骤是:

  1. 调用clientv3包中的New函数,传入参数配置对象,返回一个客户端的client(结构体)。
  2. 构造上下文context
  3. client调用里面字段kv接口的方法put将数据存到etcd

client的数据结构为:

type Client struct {
    Cluster
    KV
    Lease
    Watcher
    Auth
    Maintenance

    // Username is a user name for authentication.
    Username string
    // Password is a password for authentication.
    Password string
    // contains filtered or unexported fields
}

kv接口的数据结构:

type KV

type KV interface {  
    Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
    Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
    Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
    Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
    Do(ctx context.Context, op Op) (OpResponse, error)
    Txn(ctx context.Context) Txn
}

get操作

package main

import (
	"context"
	"fmt"
	"time"

	"go.etcd.io/etcd/clientv3"
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		// handle error!
		fmt.Printf("connect to etcd failed, err:%v\n", err)
		return
	}
    fmt.Println("connect to etcd success")
	defer cli.Close()
	// get
	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
	resp, err := cli.Get(ctx, "q1mi")
	cancel()
	if err != nil {
		fmt.Printf("get from etcd failed, err:%v\n", err)
		return
	}
	for _, ev := range resp.Kvs {
		fmt.Printf("%s:%s\n", ev.Key, ev.Value)
	}
}

上述代码的逻辑步骤是:

  1. 同普通一样,先对client初始化
  2. 创建上下文
  3. 直接调用cli.Get

这里说一下返回值得数据结构:*GetResponse

type RangeResponse struct {
   Header *ResponseHeader `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"`
   // kvs is the list of key-value pairs matched by the range request.
   // kvs is empty when count is requested.
   Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs,proto3" json:"kvs,omitempty"`
   // more indicates if there are more keys to return in the requested range.
   More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`
   // count is set to the number of keys within the range when requested.
   Count                int64    `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
   XXX_NoUnkeyedLiteral struct{} `json:"-"`
   XXX_unrecognized     []byte   `json:"-"`
   XXX_sizecache        int32    `json:"-"`
}

get返回的信息会存储在RangeResponse结构体的Kvs字段,这是一个KeyValue结构体的指针类型切片,因此取值时要遍历这个切片。

watch操作

package main

import (
	"context"
	"fmt"
	"time"

	"go.etcd.io/etcd/clientv3"
)

// watch demo

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		fmt.Printf("connect to etcd failed, err:%v\n", err)
		return
	}
	fmt.Println("connect to etcd success")
	defer cli.Close()
	// watch key:q1mi change
	rch := cli.Watch(context.Background(), "q1mi") // <-chan WatchResponse
	for wresp := range rch {
		for _, ev := range wresp.Events {
			fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
		}
	}
}

上述代码的逻辑步骤是:

  1. 同普通一样,先对client初始化
  2. 创建上下文注意这里并没有用with系列函数创建上下文,而是直接用根节点context,这么做可以保证一直监控,知道有变化发生。
// If the context is "context.Background/TODO", returned "WatchChan" will
// not be closed and block until event is triggered, except when server
// returns a non-recoverable error (e.g. ErrCompacted).
  1. 直接调用cli.Watch

对返回值说明:

cli.Watch返回一个WatchResponse类型通道。

type WatchChan <-chan WatchResponse
type WatchResponse struct {
	Header pb.ResponseHeader
	Events []*Event

	// CompactRevision is the minimum revision the watcher may receive.
	CompactRevision int64

	// Canceled is used to indicate watch failure.
	// If the watch failed and the stream was about to close, before the channel is closed,
	// the channel sends a final response that has Canceled set to true with a non-nil Err().
	Canceled bool

	// Created is used to indicate the creation of the watcher.
	Created bool
	// contains filtered or unexported fields
}

返回值存储在WatchResponse例的Event字段中,这个一个指针类型的切片,因此取值时要遍历此切片。