二、Golang代码操作etcd
2.1、etcd安装
# 官网:
https://github.com/etcd-io/etcd/tree/main/client/v3
https://pkg.go.dev/github.com/coreos/etcd/clientv3#pkg-index
# 安装依赖
go get go.etcd.io/etcd/client/v3
# 安装etcd
[root@node01 ~]# yum install -y etcd
# 设置开机自启动
systemctl enable etcd
# 启动etcd
systemctl start etcd
# 查看etcd运行状态
systemctl status etcd
# systemd配置
从systemctl status etcd命令的输出可以看到,etcd的 systemd配置文件位于/usr/lib/systemd/system/etcd.service,该配置文件的内容如下:
$ cat /usr/lib/systemd/system/etcd.service
[Unit]
Description=Etcd Server
After=network.target
After=network-online.target
Wants=network-online.target
[Service]
Type=notify
WorkingDirectory=/var/lib/etcd/
EnvironmentFile=-/etc/etcd/etcd.conf
User=etcd
# set GOMAXPROCS to number of processors
ExecStart=/bin/bash -c "GOMAXPROCS=$(nproc) /usr/bin/etcd --name=\"${ETCD_NAME}\" --data-dir=\"${ETCD_DATA_DIR}\" --listen-client-urls=\"${ETCD_LISTEN_CLIENT_URLS}\""
Restart=on-failure
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target
# 从上面的配置中可以看到,etcd的配置文件位于/etc/etcd/etcd.conf,如果我们想要修改某些配置项,可以编辑该文件。
# 远程访问
etcd安装完成后,默认只能本地访问,如果需要开启远程访问,还需要修改/etc/etcd/etcd.conf中的配置。例如,本实例中我安装etcd的机器IP是10.103.18.41,我尝试通过自己的机器远程访问10.103.18.41上安装的etcd的2379端口,结果访问被拒绝:
# 修改/etc/etcd/etcd.conf配置:
ETCD_LISTEN_CLIENT_URLS="http://10.103.18.41:2379,http://localhost:2379"
# 然后重启
systemctl restart etcd
2.2、代码操作
- 连接etcd
package main
import (
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
config clientv3.Config
client *clientv3.Client
err error
)
func main() {
// ETCD客户端连接信息
config = clientv3.Config{
Endpoints: []string{"192.168.1.210:2379"}, // 节点信息
DialTimeout: 5 * time.Second, // 超时时间
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
fmt.Println(client)
}
- 操作etcd
- 相关理论
RevisionCreateRevisionModRevisionVersion
关于 watch 哪个版本:
ModRevisionRevision
- 增加一个key、查询一个key、删除一个key
package main
import (
"context"
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
config clientv3.Config
client *clientv3.Client
putResp *clientv3.PutResponse
getResp *clientv3.GetResponse
delResp *clientv3.DeleteResponse
kv clientv3.KV
err error
)
func main() {
// ETCD客户端连接信息
config = clientv3.Config{
Endpoints: []string{"192.168.1.210:2379"}, // 节点信息
DialTimeout: 5 * time.Second, // 超时时间
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
// 用于读写ETCD的键值对
kv = clientv3.NewKV(client)
// 操作etcd,context.TODO() 这是一个上下文,如果这上下文不知道选那种,就选这个万精油;clientv3.WithPrevKV()加这参数获取前一个kv的值
if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1", "1008611", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
return
}
// Revision: 作用域为集群,逻辑时间戳,全局单调递增,任何 key 修改都会使其自增
fmt.Println("Revision is:", putResp.Header.Revision)
if putResp.PrevKv != nil {
// 查看被更新的K V
fmt.Println("更新的Key是:", string(putResp.PrevKv.Key))
fmt.Println("被更新的Value是:", string(putResp.PrevKv.Value))
}
// 读取ETCD数据
if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1"); err != nil {
fmt.Println(err)
return
}
fmt.Println(getResp.Kvs)
// 读取ETCD数据,获取前缀相同的WithPrefix()
if getResp, err = kv.Get(context.TODO(), "/cron/jobs/", clientv3.WithPrefix()); err != nil {
fmt.Println(err)
return
}
fmt.Println(getResp.Kvs)
// 删除ETCD数据;WithPrevKV--->赋值数据给delResp.PrevKvs,方便后续判断
// 删除多个key:kv.Delete(context.TODO(), "/cron/jobs/", clientv3.WithPrefix())
if delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
return
}
// 打印被删除之前的kv
if len(delResp.PrevKvs) != 0 {
for _, kvpx := range delResp.PrevKvs {
fmt.Println("被删除的数据是: ", string(kvpx.Key), string(kvpx.Value))
}
}
}
- 租约、自动租约、lease
package main
import (
"context"
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
config clientv3.Config
leaseID clientv3.LeaseID
client *clientv3.Client
LeaseGrantResp *clientv3.LeaseGrantResponse
putResp *clientv3.PutResponse
getResp *clientv3.GetResponse
keepResp *clientv3.LeaseKeepAliveResponse
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse // 只读管道
kv clientv3.KV
err error
)
func main() {
// 连接客户端配置文件
config = clientv3.Config{
Endpoints: []string{"192.168.1.210:2379"},
DialTimeout: 5 * time.Second,
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Printf("conect to etcd faild, err:%v\n", err)
return
} else {
fmt.Println("connect to etcd success")
}
// 获取kv API子集
kv = clientv3.NewKV(client)
// 申请一个租约 lease
lease := clientv3.Lease(client)
// 申请一个10s的租约
if LeaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
fmt.Println("租约申请失败", err)
return
}
// 租约ID
leaseID = LeaseGrantResp.ID
// 自动续租
if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseID); err != nil {
fmt.Println("自动续租失败", err)
return
}
/*
10s后自动过期
ctx, canceFunc := context.WithCancel(context.TODO())
// 自动续租
if keepRespChan, err = lease.KeepAlive(ctx, leaseID); err != nil {
fmt.Println("自动续租失败", err)
return
}
canceFunc()
*/
// 处理续约应答的协程 消费keepRespChan
go func() {
for {
select {
case keepResp = <-keepRespChan:
if keepRespChan == nil {
fmt.Println("租约已经失效了")
goto END
} else {
// KeepAlive每秒会续租一次,所以就会收到一次应答
fmt.Println("收到应答,租约ID是:", keepResp.ID)
}
}
}
END:
}()
// put一个kv,让他与租约关联起来,从而实现10s后自动过期,key就会被删除; 关联用的是clientv3.WithLease(leaseID)
if putResp, err = kv.Put(context.TODO(), "/cron/lock/job3", "3", clientv3.WithLease(leaseID)); err != nil {
fmt.Println(err)
return
}
fmt.Println("写入成功:", putResp.Header.Revision)
// 判断key是否过期
for {
if getResp, err = kv.Get(context.TODO(), "/cron/lock/job3"); err != nil {
fmt.Println(err)
return
}
// 如果等于0,说明过期了
if getResp.Count == 0 {
fmt.Println("kv过期了")
break
} else {
fmt.Println("没过期", getResp.Kvs)
}
time.Sleep(2 * time.Second)
}
}
-
watch操作
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
config clientv3.Config
leaseID clientv3.LeaseID
watcher clientv3.Watcher
kv clientv3.KV
watchResp clientv3.WatchResponse
event *clientv3.Event
client *clientv3.Client
LeaseGrantResp *clientv3.LeaseGrantResponse
putResp *clientv3.PutResponse
getResp *clientv3.GetResponse
keepResp *clientv3.LeaseKeepAliveResponse
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse // 只读管道
watchRespChan <-chan clientv3.WatchResponse
watchStartRevision int64
err error
)
func main() {
// 连接客户端配置文件
config = clientv3.Config{
Endpoints: []string{"192.168.1.210:2379"},
DialTimeout: 5 * time.Second,
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Printf("conect to etcd faild, err:%v\n", err)
return
} else {
fmt.Println("connect to etcd success")
}
// 获取kv API子集
kv = clientv3.NewKV(client)
// 模拟etcd中数据的变化
go func() {
for {
kv.Put(context.TODO(), "/cron/jobs/job18", "I am 18")
kv.Delete(context.TODO(), "/cron/jobs/job18")
time.Sleep(1 * time.Second)
}
}()
if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job18"); err != nil {
fmt.Printf("getResp err:%v\n", err)
return
}
if len(getResp.Kvs) != 0 {
fmt.Println(getResp.Kvs[0].Value)
}
// 当前etcd集群事务ID,单调递增的
watchStartRevision = getResp.Header.Revision + 1
// 创建个 watcher
watcher = clientv3.NewWatcher(client)
// 启动监听
fmt.Println("从该Revision版本向后监听:", watchStartRevision)
// 一直监听
// watchRespChan = watcher.Watch(context.TODO(), "/cron/jobs/job18", clientv3.WithRev(watchStartRevision))
// 自动关闭监听,调用canceFunc()函数即可取消
xtc, canceFunc := context.WithCancel(context.TODO())
// xx秒后干什么事--->time.AfterFunc,执行匿名函数
time.AfterFunc(5*time.Second, func() {
canceFunc()
})
//启动监听
watchRespChan = watcher.Watch(xtc, "/cron/jobs/job18", clientv3.WithRev(watchStartRevision))
// 处理kv变化事件
for watchResp = range watchRespChan {
for _, event = range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改为:", string(event.Kv.Value), "CreateRevision is:", event.Kv.CreateRevision, "ModRevision is:", event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("删除了:", "Revision is", event.Kv.ModRevision)
}
}
}
}
// xx秒后干什么事--->time.AfterFunc,执行匿名函数
time.AfterFunc(5*time.Second, func() {
fmt.Println("1")
})
-
OP的方式PUT、GET数据
package main
import (
"context"
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
config clientv3.Config
kv clientv3.KV
putOp clientv3.Op
getOp clientv3.Op
opResp clientv3.OpResponse
client *clientv3.Client
err error
)
func main() {
// 连接客户端配置文件
config = clientv3.Config{
Endpoints: []string{"192.168.1.210:2379"},
DialTimeout: 5 * time.Second,
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Printf("conect to etcd faild, err:%v\n", err)
return
} else {
fmt.Println("connect to etcd success")
}
// 获取kv API子集
kv = clientv3.NewKV(client)
// 创建OP---> k v 对象
putOp = clientv3.OpPut("/cron/jobs/job19", "19")
// 执行OP
if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
fmt.Printf("执行OP faild, err:%v\n", err)
return
}
fmt.Println("Revision is:", opResp.Put().Header.Revision)
// 创建OP---> k v 对象
getOp = clientv3.OpGet("/cron/jobs/job19")
// 执行OP
if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
fmt.Printf("执行OP faild, err:%v\n", err)
return
}
// 打印数据
fmt.Println("数据ModRevision", opResp.Get().Kvs[0].ModRevision)
fmt.Println("数据Value", string(opResp.Get().Kvs[0].Value))
}
-
分布式锁
- 同时运行两次,验证代码
package main
import (
"context"
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
config clientv3.Config
leaseID clientv3.LeaseID
ctx context.Context
canceFunc context.CancelFunc
txn clientv3.Txn
client *clientv3.Client
LeaseGrantResp *clientv3.LeaseGrantResponse
keepResp *clientv3.LeaseKeepAliveResponse
txnResp *clientv3.TxnResponse
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse // 只读管道
kv clientv3.KV
err error
)
/*
lease实现锁自动过期
op操着
txn事务:if else then
*/
func main() {
// 连接客户端配置文件
config = clientv3.Config{
Endpoints: []string{"192.168.1.210:2379"},
DialTimeout: 5 * time.Second,
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Printf("conect to etcd faild, err:%v\n", err)
return
} else {
fmt.Println("connect to etcd success")
}
// 1、上锁(创建租约、自动续租、拿着租约去抢占一个key)
// 申请一个租约 lease
lease := clientv3.Lease(client)
// 申请一个5s的租约
if LeaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
fmt.Println("租约申请失败", err)
return
}
// 租约ID
leaseID = LeaseGrantResp.ID
// 准备一个用于取消的自动续租的context; cancanceFunc 取消续租调用这个函数即可
ctx, canceFunc = context.WithCancel(context.TODO())
// 确保函数退出后,自动续约会停止
defer canceFunc()
defer lease.Revoke(context.TODO(), leaseID)
// 自动续租
if keepRespChan, err = lease.KeepAlive(ctx, leaseID); err != nil {
fmt.Println("自动续租失败", err)
return
}
// 判断续约应答的协程
go func() {
for {
select {
case keepResp = <-keepRespChan:
if keepRespChan == nil {
fmt.Println("租约已经失效了")
goto END
} else {
// KeepAlive每秒会续租一次,所以就会收到一次应答
fmt.Println("收到应答,租约ID是:", keepResp.ID)
}
}
}
END:
}()
// ***拿着租约去抢占一个key***
// 获取kv API子集
kv = clientv3.NewKV(client)
// 创建事务
txn = kv.Txn(context.TODO())
// 定义事务
// 如果key不存在;关联用的是clientv3.WithLease(leaseID)
txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job19"), "=", 0)).
// 不存在就put一个key
Then(clientv3.OpPut("/cron/lock/job19", "xxx", clientv3.WithLease(leaseID))).
// 否则枪锁失败
Else(clientv3.OpGet("/cron/lock/job19"))
// 提交事务
if txnResp, err = txn.Commit(); err != nil {
fmt.Println("txn err", err)
return
}
// 判断释放抢到锁
if !txnResp.Succeeded {
fmt.Println("锁被占用", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
// 2、处理业务
fmt.Println("处理任务")
time.Sleep(50 * time.Second)
// 3、释放锁(取消自动续租、释放租约)
/*
defer canceFunc()
defer lease.Revoke(context.TODO(), leaseID)
上面这个释放了租约,关联的kv会被删除,从而达到释放锁
*/
}