etcd类似java系的zookeeper,是一个高可用键值存储系统,用于共享配置和服务发现,并通过raft算法保证一致性。
1. 安装
1.1 Windows环境安装
打开etcd官网,选择进入github页面,点击Releases下载二进制文件,下载到本地,解压
etcd是服务端程序,双击
可以看出etcd监听了2379端口,是用raft工作
默认是使用v2的api,如果想使用v3 api每次打开窗口都需要敲一行命令
set ETCDCTL_API=3
put、get操作,注意不同版本的api取值也不同
1.2 Centos 7安装
curl -L https://github.com/coreos/etcd/releases/download/v3.3.2/etcd-v3.3.2-linux-amd64.tar.gz -o etcd-v3.3.2-linux-amd64.tar.gz
tar zxf etcd-v3.3.2-linux-amd64.tar.gz
启动的 etcd 成员在 localhost:2379 监听客户端请求。
通过使用 etcdctl 来和已经启动的集群交互:
使用etcdctlv3的版本时,需设置环境变量ETCDCTL_API=3
在`/etc/profile`文件中添加环境变量
vi /etc/profile
...
ETCDCTL_API=3
...
source /etc/profile
2. golang操作etcd
安装etcd包:
- 直接从https://github.com/etcd-io/etcd上下载etcd的压缩包
- 然后在src目录下创建go.etcd.io文件目录,将etcd解压到该目录下
2.1 put/get/delete
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
getResp *clientv3.GetResponse
)
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
// 建立一个客户端
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
// 用于读写etcd的键值对
kv = clientv3.NewKV(client)
// 写入
kv.Put(context.TODO(), "name1", "lesroad")
kv.Put(context.TODO(), "name2", "haha")
// 读取name为前缀的所有key
if getResp, err = kv.Get(context.TODO(), "name", clientv3.WithPrefix()); err != nil {
fmt.Println(err)
return
} else {
// 获取成功
fmt.Println(getResp.Kvs)
}
// 删除name为前缀的所有key
if _, err = kv.Delete(context.TODO(), "name", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
return
}
}
2.2 lease
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
lease clientv3.Lease
leaseGrantResp *clientv3.LeaseGrantResponse
leaseId clientv3.LeaseID
putResp *clientv3.PutResponse
kv clientv3.KV
getResp *clientv3.GetResponse
)
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
// 建立一个客户端
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
// 申请一个lease(租约)
lease = clientv3.NewLease(client)
// 申请一个10秒的lease
if leaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
fmt.Println(err)
return
}
// 拿到租约id
leaseId = leaseGrantResp.ID
// 获得kv api子集
kv = clientv3.NewKV(client)
// put一个kv,让它与租约关联起来
if putResp, err = kv.Put(context.TODO(), "name", "lbwnb", clientv3.WithLease(leaseId)); err != nil {
fmt.Println(err)
return
}
fmt.Println("写入成功", putResp.Header.Revision)
// 定时看下key过期了没有
for {
if getResp, err = kv.Get(context.TODO(), "name"); err != nil {
fmt.Println(err)
return
}
if getResp.Count == 0 {
fmt.Println("kv过期")
break
}
fmt.Println("还没过期", getResp.Kvs)
time.Sleep(2 * time.Second)
}
}
2.3 watch
应用场景:
- 配置有更新的时候,etcd都会实时通知订阅者,以此达到获取最新配置信息的目的。
- 分布式日志收集,监控应用(主题)目录下所有信息的变动。
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
)
func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
watchStartRevision int64
watcher clientv3.Watcher
watchRespChan <-chan clientv3.WatchResponse
watchResp clientv3.WatchResponse
event *clientv3.Event
)
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
// 建立一个客户端
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
// 用于读写etcd的键值对
kv = clientv3.NewKV(client)
// 模拟etcd中kv的变化
go func() {
for {
kv.Put(context.TODO(), "name", "lesroad")
kv.Delete(context.TODO(), "name")
time.Sleep(1 * time.Second)
}
}()
// 创建一个监听器
watcher = clientv3.NewWatcher(client)
// 启动监听 5秒后关闭
ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(5*time.Second, func() {
cancelFunc()
})
watchRespChan = watcher.Watch(ctx, "name", clientv3.WithRev(watchStartRevision))
// 处理kv变化事件
for watchResp = range watchRespChan {
for _, event = range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改为", string(event.Kv.Value))
case mvccpb.DELETE:
fmt.Println("删除了", string(event.Kv.Key))
}
}
}
}
2.4 op操作替代get、put
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
putOp clientv3.Op
getOp clientv3.Op
opResp clientv3.OpResponse
)
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
// 建立一个客户端
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
kv = clientv3.NewKV(client)
// 创建Op :operator
putOp = clientv3.OpPut("op", "replace")
// 执行Op 用kv.Do取代 kv.Put kv.Get...
if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
fmt.Println(err)
return
}
fmt.Println("写入Revision", opResp.Put().Header.Revision)
// 创建Op
getOp = clientv3.OpGet("op")
// 执行Op
if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
fmt.Println(err)
return
}
fmt.Println("数据Revision", opResp.Get().Kvs[0].ModRevision)
fmt.Println("数据value", string(opResp.Get().Kvs[0].Value))
}
2.5 事务txn实现分布式锁
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
lease clientv3.Lease
leaseGrantResp *clientv3.LeaseGrantResponse
leaseId clientv3.LeaseID
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
keepResp *clientv3.LeaseKeepAliveResponse
ctx context.Context
cancelFunc context.CancelFunc
txn clientv3.Txn
txnResp *clientv3.TxnResponse
)
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
// 建立一个客户端
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
// lease实现锁自动过期
// op操作
// txn事务: if else then
// 1 上锁 创建租约 自动续租 拿着租约去抢占一个key
lease = clientv3.NewLease(client)
// 申请一个5秒的lease
if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
fmt.Println(err)
return
}
// 拿到租约id
leaseId = leaseGrantResp.ID
// 准备一个用于取消自动续租的context
ctx, cancelFunc = context.WithCancel(context.TODO())
if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
fmt.Println(err)
return
}
// 确保函数退出后,取消自动续租
defer cancelFunc() // 取消续租
defer lease.Revoke(context.TODO(), leaseId) // 释放租约
// 处理续租应答的协程
go func() {
select {
case keepResp = <-keepRespChan:
if keepRespChan == nil {
fmt.Println("租约失效")
goto END
} else {
fmt.Println("收到自动续租应答", keepResp.ID)
}
}
END:
}()
// if 不存在key then 设置它 else 抢锁失败
kv = clientv3.NewKV(client)
// 创建事务
txn = kv.Txn(context.TODO())
// 定义事务
// 如果key不存在
txn.If(clientv3.Compare(clientv3.CreateRevision("mutex"), "=", 0)).
Then(clientv3.OpPut("mutex", "yes", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet("mutex")) // 否则抢锁失败
// 提交事务
if txnResp, err = txn.Commit(); err != nil {
fmt.Println(err)
return
}
// 判断是否抢到了锁
if !txnResp.Succeeded {
fmt.Println("锁被占用", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
// 2 处理业务
fmt.Println("处理任务")
time.Sleep(5 * time.Second)
// 在锁内 很安全
// 3 释放锁 取消自动续租 释放租约
// defer 会把租约释放掉,关联的kv就被删除了
}