如果您觉得有什么不理解,或者觉得文章有欠缺的地方,请您点击这里提出。我会很感谢您的建议也会解答您的问题。

ETCD golang ClientV3的基本使用

零、搭建单机的ETCD

为了演示,在Linux机器上搭建一个不通过SSL认证的单机ETCD,安装部署步骤如下: 在github上的relese界面找到对应的包,下载到机器上:

ETCD_VER=v3.4.4

GITHUB_URL=https://github.com/etcd-io/etcd/releases/download
DOWNLOAD_URL=${GITHUB_URL}

rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test

curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz

/tmp/etcd-download-test/etcd --version
/tmp/etcd-download-test/etcdctl version

通过后台运行部署起来:

nohup ./etcd --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2379 &

不要使用默认的监听地址,因为默认的监听的是localhost,通过外部无法访问。

./etcdctl version

一、etcdctl的基本使用

1.1 关于数据的CRUD + Watch

新增一条数据

./etcdctl put "/school/class/name" "helios"

获取一条数据

[root@dajiahao03 etcd]# ./etcdctl get "/school/class/name"
/school/class/name
helios

得到一组数据

[root@dajiahao03 etcd]# ./etcdctl get "/school/class/" --prefix
/school/class/name
helios1
/school/class/name1
helios

得到所有的key

[root@dajiahao03 etcd]# ./etcdctl --prefix --keys-only=true get /
/school/class/name

/school/class/name1

删除一条数据

[root@dajiahao03 etcd]# ./etcdctl del "/school/class/name2"
1

watch的功能,这个功能要开两个终端哟

# 第一个终端:
./etcdctl watch "/school/class" --prefix
# 第二个终端
[root@dajiahao03 etcd]# ./etcdctl put "/school/class/name2" "helios2"
OK
# 第一个终端的变化
[root@dajiahao03 etcd]# ./etcdctl watch "/school/class" --prefix
PUT
/school/class/name2
helios2

1.2 关于集群的操作

查看集群状态(如果单机的可以不用指定ENDPOINTS,如果是集群的话,通过分号的形式加到ENDPOINTS后面)

[root@dajiahao03 etcd]# export ENDPOINTS="172.27.143.50:2379"
[root@dajiahao03 etcd]# ./etcdctl --write-out=table --endpoints=$ENDPOINTS endpoint status
+--------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
|      ENDPOINT      |        ID        | VERSION | DB SIZE | IS LEADER | IS LEARNER | RAFT TERM | RAFT INDEX | RAFT APPLIED INDEX | ERRORS |
+--------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
| 172.27.143.50:2379 | 8e9e05c52164694d |   3.4.4 |   20 kB |      true |      false |         3 |         13 |                 13 |        |
+--------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+

查看集群成员

[root@dajiahao03 etcd]# ./etcdctl --write-out=table --endpoints=$ENDPOINTS member list
+------------------+---------+---------+-----------------------+---------------------+------------+
|        ID        | STATUS  |  NAME   |      PEER ADDRS       |    CLIENT ADDRS     | IS LEARNER |
+------------------+---------+---------+-----------------------+---------------------+------------+
| 8e9e05c52164694d | started | default | http://localhost:2380 | http://0.0.0.0:2379 |      false |
+------------------+---------+---------+-----------------------+---------------------+------------+

删除集群成员

MEMBER_ID=8e9e05c52164694d
etcdctl --endpoints=$ENDPOINTS member remove ${MEMBER_ID}

添加成员

NEW_ETCD_NAME="new_etcd"
NEW_ETCD_HOST="172.27.140.172"
./etcdctl --endpoints=$ENDPOINTS member add ${NEW_ETCD_NAME} --peer-urls=http://${NEW_ETCD_HOST}:2380

1.3 关于集群的备份和恢复

磁盘碎片整理

[root@dajiahao03 etcd]# ./etcdctl --endpoints=$ENDPOINTS defrag
Finished defragmenting etcd member[172.27.143.50:2379]

备份当前的ETD集群

[root@dajiahao03 etcd]# ./etcdctl snapshot save snapshot.db
{"level":"info","ts":1583651900.406544,"caller":"snapshot/v3_snapshot.go:110","msg":"created temporary db file","path":"snapshot.db.part"}
{"level":"info","ts":1583651900.4077375,"caller":"snapshot/v3_snapshot.go:121","msg":"fetching snapshot","endpoint":"127.0.0.1:2379"}
{"level":"info","ts":1583651900.4105544,"caller":"snapshot/v3_snapshot.go:134","msg":"fetched snapshot","endpoint":"127.0.0.1:2379","took":0.003921237}
{"level":"info","ts":1583651900.410609,"caller":"snapshot/v3_snapshot.go:143","msg":"saved","path":"snapshot.db"}
Snapshot saved at snapshot.db
[root@dajiahao03 etcd]# ll snapshot.db
-rw------- 1 root root 20512 3月   8 15:18 snapshot.db

查看snapshot状态

[root@dajiahao03 etcd]# ./etcdctl snapshot status snapshot.db
21c0c96e, 8, 11, 20 kB

从备份中恢复集群

[root@dajiahao03 etcd]# ./etcdctl snapshot save snapshot.db
{"level":"info","ts":1583652044.0606484,"caller":"snapshot/v3_snapshot.go:110","msg":"created temporary db file","path":"snapshot.db.part"}
{"level":"info","ts":1583652044.0613058,"caller":"snapshot/v3_snapshot.go:121","msg":"fetching snapshot","endpoint":"127.0.0.1:2379"}
{"level":"info","ts":1583652044.0659368,"caller":"snapshot/v3_snapshot.go:134","msg":"fetched snapshot","endpoint":"127.0.0.1:2379","took":0.005182366}
{"level":"info","ts":1583652044.0660565,"caller":"snapshot/v3_snapshot.go:143","msg":"saved","path":"snapshot.db"}
Snapshot saved at snapshot.db

切换leader

# 先看状态
etcdctl endpoint --cluster=true status  -w table
# move-leader
./etcdctl move-leader d6414a7c7c550d29

二、使用etcd client可能会出现的问题

go get go.etcd.io/etcd/clientv3
mkfir -p ${GOPATH}/src/go.etcd.io
git clone git@github.com:etcd-io/etcd.git ${GOPATH}/src/go.etcd.io

在build的时候出现下图的问题的话,可以参考这个博客,但是我尝试了,好想不管用,如果不管用可以看我后面的终极方法。


经过了上述的还是不行,因为我的go版本是13.X,我就干脆降到12.X,就work了。 步骤如下(国内镜像地址:http://mirrors.ustc.edu.cn/golang/):

# 找到匹配的版本的go包(我的是mac的)
wget http://mirrors.ustc.edu.cn/golang/go1.12.4.darwin-amd64.tar.gz
tar -C /usr/local/bin -xzf /home/yourname/Downloads/go1.12.4.linux-amd64.tar.gz
# 修改bashrc重的PATH(我用的是zsh)
vim ~/.zshrc
export PATH=$PATH:/usr/local/bin/go/bin
source ~/.zshrc

三、etcd ClientV3的使用

3.1 连接ETCD

var (
    config clientv3.Config
    client *clientv3.Client
    err error
)
// 客户端配置
config = clientv3.Config{
    Endpoints: []string{"172.27.43.50:2379"},
    DialTimeout: 5 * time.Second,
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
    fmt.Println(err)
    return
}

可运行代码请查看etcd-client.go

3.2 写入数据到ETCD

// 实例化一个用于操作ETCD的KV
    kv = clientv3.NewKV(client)
    if putResp, err = kv.Put(context.TODO(), "/school/class/students", "helios0", clientv3.WithPrevKV()); err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println(putResp.Header.Revision)
    if putResp.PrevKv != nil {
        fmt.Printf("prev Value: %s \n CreateRevision : %d \n ModRevision: %d \n Version: %d \n",
            string(putResp.PrevKv.Value), putResp.PrevKv.CreateRevision, putResp.PrevKv.ModRevision, putResp.PrevKv.Version)
    }

输出为:

20
prev Value: helios0
 CreateRevision : 9
 ModRevision: 19
 Version: 11

可运行代码请查看etcd-put.go。

这里注意kv.Put的最后一个参数表示,返回上一次的数据,是个可选参数。在etcd的client中有很多这样的可选参数,更多的参数可以参考这个文档clientv3 option

3.3 获取ETCD里面的数据

// 实例化一个用于操作ETCD的KV
kv = clientv3.NewKV(client)
if getResp, err = kv.Get(context.TODO(), "/school/class/students"); err != nil {
    fmt.Println(err)
    return
}
// 输出本次的Revision
fmt.Printf("Key is s %s \n Value is %s \n", getResp.Kvs[0].Key, getResp.Kvs[0].Value)

输出为:

Key is s /school/class/students
Value is helios0
clientv3.WithPrefix()

可运行代码请查看etcd-get.go

3.4 删除ETCD里面的数据

kv = clientv3.NewKV(client)
_, err = kv.Put(context.TODO(), "/school/class/students", "helios1")

if delResp, err = kv.Delete(context.TODO(), "/school/class/students", clientv3.WithPrevKV()); err != nil {
    fmt.Println(err)
    return
}
if len(delResp.PrevKvs) != 0 {
    for _, kvpair = range delResp.PrevKvs {
        fmt.Printf("delete key is: %s \n Value: %s \n", string(kvpair.Key), string(kvpair.Value))
    }
}

输出为:

delete key is: /school/class/students
 Value: helios1

在Delete中还可以增加下面两个option: - clientv3.WithFromKey(): 从这个key开始 - clientv3.WithLimit(n): 先知删除n条

可运行代码请查看etcd-delete.go

3.4 对租约的操作

3.4.1 申请一个10s的租约,定时查看是否过期

// 申请一个租约
lease = clientv3.NewLease(client)
if leaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
    fmt.Println(err)
    return
}
leaseId = leaseGrantResp.ID

// 获得kv API子集
kv = clientv3.NewKV(client)

if _, err = kv.Put(context.TODO(), "/school/class/students", "h", clientv3.WithLease(leaseId)); err != nil {
    fmt.Println(err)
    return
}

for {
    if getResp, err = kv.Get(context.TODO(), "/school/class/students"); err != nil {
        fmt.Println(err)
        return
    }
    if getResp.Count == 0 {
        fmt.Println("kv过期了")
        break
    }
    fmt.Println("还没过期:", getResp.Kvs)
    time.Sleep(2 * time.Second)
}

输出:

还没过期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
还没过期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
还没过期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
还没过期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
还没过期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
还没过期: [key:"/school/class/students" create_revision:24 mod_revision:24 version:1 value:"h" lease:7587844869553529889 ]
kv过期了

可运行代码请查看3.4.1.go

3.4.2 自动续租

if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil {
    fmt.Println(err)
    return
}
go func() {
    for {
        select {
        case keepResp = <- keepRespChan:
            if keepRespChan == nil {
                fmt.Println("租约已经失效了")
                goto END
            } else {    // 每秒会续租一次, 所以就会受到一次应答
                fmt.Println("收到自动续租应答:", keepResp.ID)
            }
        }
    }
END:
}()

可运行代码请查看3.4.2.go

3.5 watch功能

对某一个key进行监听5s:

kv = clientv3.NewKV(client)

// 模拟KV的变化
go func() {
    for {
        _ , err = kv.Put(context.TODO(), "/school/class/students", "helios1")
        _, err = kv.Delete(context.TODO(), "/school/class/students")
        time.Sleep(1 * time.Second)
    }
}()

// 先GET到当前的值,并监听后续变化
if getResp, err = kv.Get(context.TODO(), "/school/class/students"); err != nil {
    fmt.Println(err)
    return
}

// 现在key是存在的
if len(getResp.Kvs) != 0 {
    fmt.Println("当前值:", string(getResp.Kvs[0].Value))
}

// 获得当前revision
watchStartRevision = getResp.Header.Revision + 1
// 创建一个watcher
watcher = clientv3.NewWatcher(client)
fmt.Println("从该版本向后监听:", watchStartRevision)

ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(5 * time.Second, func() {
    cancelFunc()
})

watchRespChan = watcher.Watch(ctx, "/school/class/students", clientv3.WithRev(watchStartRevision))
// 处理kv变化事件
for watchResp = range watchRespChan {
    for _, event = range watchResp.Events {
        switch event.Type {
        case mvccpb.PUT:
            fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
        case mvccpb.DELETE:
            fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
        }
    }
}

可运行代码请查看3.5.go

3.6 通过op方法代替kv.Get/kv.Put/kv.Delete

putOp = clientv3.OpPut("/school/class/students", "helios")

if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
    panic(err)
}
fmt.Println("写入Revision:", opResp.Put().Header.Revision)

getOp = clientv3.OpGet("/school/class/students")

if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
    panic(err)
}
fmt.Println("数据Revision:", opResp.Get().Kvs[0].ModRevision)
fmt.Println("数据value:", string(opResp.Get().Kvs[0].Value))

输出为:

写入Revision: 46
数据Revision: 46
数据value: helios

可运行代码请查看3.6.go

3.7(选看) 通过txn实现分布式锁

ETCD中的txn通过简单的"If-Then-Else"实现了原子操作。

实现分布式锁主要分为三个步骤: 1. 上锁,包括创建租约、自动续约、在租约时间内去抢一个key 2. 抢到锁后执行业务逻辑,没有抢到退出 3. 释放租约

接下来我们看代码:

// 1. 上锁
// 1.1 创建租约
lease = clientv3.NewLease(client)

if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
    panic(err)
}
leaseId = leaseGrantResp.ID

// 1.2 自动续约
// 创建一个可取消的租约,主要是为了退出的时候能够释放
ctx, cancelFunc = context.WithCancel(context.TODO())

// 3. 释放租约
defer cancelFunc()
defer lease.Revoke(context.TODO(), leaseId)

if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
    panic(err)
}
// 续约应答
go func() {
    for {
        select {
        case keepResp = <- keepRespChan:
            if keepRespChan == nil {
                fmt.Println("租约已经失效了")
                goto END
            } else {    // 每秒会续租一次, 所以就会受到一次应答
                fmt.Println("收到自动续租应答:", keepResp.ID)
            }
        }
    }
END:
}()

// 1.3 在租约时间内去抢锁(etcd里面的锁就是一个key)
kv = clientv3.NewKV(client)

// 创建事物
txn = kv.Txn(context.TODO())

//if 不存在key, then 设置它, else 抢锁失败
txn.If(clientv3.Compare(clientv3.CreateRevision("lock"), "=", 0)).
    Then(clientv3.OpPut("lock", "g", clientv3.WithLease(leaseId))).
    Else(clientv3.OpGet("lock"))

// 提交事务
if txnResp, err = txn.Commit(); err != nil {
    panic(err)
}

if !txnResp.Succeeded {
    fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
    return
}

// 2. 抢到锁后执行业务逻辑,没有抢到退出
fmt.Println("处理任务")
time.Sleep(5 * time.Second)

// 3. 释放锁,步骤在上面的defer,当defer租约关掉的时候,对应的key被回收了

可运行代码请查看3.7.go

如果您觉得有什么不理解,或者觉得文章有欠缺的地方,请您点击这里提出。我会很感谢您的建议也会解答您的问题。