etcd是一个高可用、强一致的键值仓库,其中典型的应用场景就是服务发现。它专注于以下四点:
简单:易于部署,易使用。基于HTTP+JSON的API让你用curl就可以轻松访问
安全:可选SSL客户认证机制
快速:每个实例每秒支持一千次写操作
可信:使用一致性raft算法充分实现了分布式。
etcd的场景模式处理的都是系统中的控制数据,应该尽量只存储系统服务的配置信息,对于应用数据只推荐数据量小,访问和更新频次都很高的数据存储在etcd中。
服务发现(service Discovery)
服务发现目的即在用一个分布式集群中的进程或服务,如何才能找到对方并且建立连接。
而其有三大支柱,缺一不可:
一个强一致性、高可用的服务存储目录。基于raft算法的etcd就是一个强一致性高可用的服务存储目录。
一种注册服务和监控服务监控状态的机制。用户可以在etcd中注册服务,并且对注册的服务设置 key TTL, 定时保持服务的心跳以达到健康监控的效果。
一种查找和连接服务的机制。通过在etcd指定的主题(由服务名称构成的服务目录)下注册的服务也能在对应的主题下找到。
etcd的核心组件
etcd可以分为四个部分:
HTTP Server:用于处理用户发送的API请求以及其他etcd节点的同步与心跳信息请求。
Store:用于处理etcd支持的各类功能的事物,包括数据索引、节点状态变更、监控与反馈、事件处理与执行等等,是etcd对用户提供的大多数API功能的具体实现。
Raft:Raft强一致性算法的具体实现。
WAL:Write Ahead Log(预写日志),是etcd的数据存储方式。除了在内存中存有所有数据的状态以及节点的索引以外,etcd通过WAL进行持久化存储。Snapshot是为了防止数据过多而进行的状态快照,Entry表示储存的具体日志内容。
通常,一个用户的请求发送过来,会经由HTTP Server 转发给Store进行具体的事物处理,如果涉及到节点的修改,则交给Raft模块进行状态的变更、日志的记录,然后再同步给别的etcd节点以确认数据提交,最后进行数据的提交,再次同步。
用户从集群中那个节点读写数据?
为了保证数据的强一致性,etcd集群中所有的数据流向都是一个方向,从leader(主节点)流向follower(从节点),也就是所有的follower必须和leader相同。
etcd集群会选取出leader(主节点),如果是写入,则先写leader,然后同步给follower,如果写入的信息来自其他follower,则转发给leader,由leader写入后再分发给其他的follower。
如何选举leader节点
如果在启动之初没有选出leader,则raft算法使用随机timer来初始化选举流程,每个timer都是随机的,那个节点先完成了timer,随后向其他的节点发送成为leader的请求,其他节点收到请求后回应。成为leader之后,会以固定时间向其他节点发送通知,确保自己是leader。如果有些时候leader宕机或者时区连接,那么重复选出新的leader。
判断写入是否成功
只要leader写入成功,就认为写入成功。
简单etcd指令(可以查看官网介绍为主)etcd的安装自行安装吧,这里不做介绍。
etcdctl是一个命令行客户端,感觉和redis模式差不多。通过etcdctl -h 来查看支持的命令
put(set)(我的用put,很多博客都是set,不知是否新旧版本或者操作系统有关)
get 获取指定键的值
--sort 对结果排序
--consistent 将请求发送给主节点,保证获取内容一致性
update 更新给定键中的值
同样支持ttl的配置
rm 删除给定的件
setdir 创建一个目录,无论存在与否
updaterdir 更新一个已经存在的目录
backup 备份etcd数据
watch 检测一个键值的变化,一点发生变化,就会输出新值并且退出
exec-watch 监听一个键值变化,一但键值发生更新,就执行指定命令
安装package
go get github.com/coreos/etcd/clientv3
连接客户端
用程序访问etcd首先要创建client,他需要传入一个Config配置,这里传入俩个选项:
Endpoints:etcd的多个节点服务地址。
DialTimeout:创建client的首次连接超时时间,若果超时没连接成功,就会err,一但链接成功,后续的状态,client内部会重连
_, err := clientv3.New(clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
DialTimeout: 2 * time.Second
})
// etcd clientv3 >= v3.2.10, grpc/grpc-go >= v1.7.3
if err == context.DeadlineExceeded {
// handle errors
}
// etcd clientv3 <= v3.2.9, grpc/grpc-go <= v1.2.1
if err == grpc.ErrClientConnTimeout {
// handle errors
}
// -----------------------------------------------
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
}
defer cli.Close()
返回的client,类型如下:
// Client provides and manages an etcd v3 client session.
type Client struct {
Cluster
KV
Lease
Watcher
Auth
Maintenance
conn *grpc.ClientConn
cfg Config
creds grpccredentials.TransportCredentials
resolverGroup *endpoint.ResolverGroup
mu *sync.RWMutex
ctx context.Context
cancel context.CancelFunc
// Username is a user name for authentication.
Username string
// Password is a password for authentication.
Password string
authTokenBundle credentials.Bundle
callOpts []grpc.CallOption
lg *zap.Logger
}
几个比较常用的:
Cluster:向集群里增加etcd服务端节点之类,属于管理员操作。
KV:我们主要使用的功能,即K-V键值库的操作。
Lease:租约相关操作,比如申请一个TTL=10秒的租约(应用给key可以实现键值的自动过期)。
Watcher:观察订阅,从而监听最新的数据变化。
Auth:管理etcd的用户和权限,属于管理员操作。
Maintenance:维护etcd,比如主动迁移etcd的leader节点,属于管理员操作。
确保客户端使用后,并且关闭它
ctx, cancel := context.WithTimeout(context.Background(), timeout)
resp, err := kvc.Put(ctx, "sample_key", "sample_value")
cancel()
if err != nil {
// handle error!
}
// use the response
一个简单的使用案例:
package main
import (
"context"
"log"
"os"
"time"
"github.com/coreos/etcd/clientv3"
"google.golang.org/grpc/grpclog"
)
var (
dialTimeout = 5 * time.Second
endpoints = []string{"localhost:2379", "localhost:22379", "localhost:32379"}
)
func main() {
clientv3.SetLogger(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close() // make sure to close the client
_, err = cli.Put(context.TODO(), "foo", "bar")
if err != nil {
log.Fatal(err)
}
}
基本操作
kv
put
client.Put(context.TODO(), "/test/key1", "hello etcd")
get
client.Get(context.TODO(), "/test/")
client.Get(context.TODO(), "/test/", clientv3.WithPrefix())
get不指定第三个参数则是精确获取,也可以指定第三个参数来进行影响获取结果,比如使用前缀匹配。
lease
lease提供了以下功能:
Grant:分配一个租约
Revoke:释放一个租约
TimeToLive:获取剩余TTL时间
leases:列举出所有etcd中的租约
KeepAlive:自动定时的续约某个租约
KeepAliveOnce:为某个租约续约一次
Close:释放当前客户端建立的所有租约
想要实现key自动过期,首先得到一个租约,并设置TTL为10秒,同时设置一个10秒过期的kv
grant, err := client.Grant(context.TODO(), 10)
client.Put(context.TODO(), "tt/10", "10秒过期", clientv3.WithLease(grant.ID))
这里主要使用的是grant.ID,有一种情况是如果在put之前Lease已经过期了,那么这个put就会返回error。这时需要再过期时间之内,为其续约
client.KeepAliveOnce(context.TODO(), grant.ID)
op
op一次就是操作,只为了简化用户开发,和直接调用put,get没啥区别
ops := []clientv3.Op{
clientv3.OpPut("put-key", "1223"),
clientv3.OpGet("put-key"),
clientv3.OpPut("put-key", "456"),
}
for _, op := range ops{
if _, err := client.Do(context.TODO(), op); err != nil {
log.Fatal(err)
}
}
Txn 事务
etcd中的事物都是原子执行的,只支持if...then...else...这种表达。
Txn必须是这样使用的:If(满足条件) Then(执行若干Op) Else(执行若干Op)。
下面的测试程序,判断如果k1的值大于v1并且k1的版本号是2,则Put 键值k2和k3,否则Put键值k4和k5。
client.Txn(context.TODO()).If(
clientv3.Compare(clientv3.Value("k1"), ">", "v1"),
clientv3.Compare(clientv3.Version("k1"), "=", 2),
).Then(
clientv3.OpPut("k2", "v2"), clientv3.OpPut("k3", "v3"),
).Else(
clientv3.OpPut("k4", "v4"), clientv3.OpPut("k5", "v5"),
).Commit()
Watch
Watch用于监听某个键的变化, Watch调用后返回一个 WatchChan
当监听的key有变化后会向 WatchChan发送 WatchResponse。Watch的典型应用场景是应用于系统配置的热加载,我们可以在系统读取到存储在etcd key中的配置后,用Watch监听key的变化。在单独的goroutine中接收WatchChan发送过来的数据,并将更新应用到系统设置的配置变量中,比如像下面这样在goroutine中更新变量appConfig,这样系统就实现了配置变量的热加载。
Golang etcd 服务注册与发现demo服务注册
如果没有对应版本,会报错的
go get -u google.golang.org/grpc@v1.26.0
package main
import (
"context"
"go.etcd.io/etcd/clientv3"
"log"
"os"
"os/signal"
"syscall"
"time"
)
// ServiceRegister 创建租约注册服务
type ServiceRegister struct {
cli *clientv3.Client // etcd v3 client
leaseID clientv3.LeaseID // 租约ID
// 租约keepalive相应的chan
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
key string // key
val string // value
}
// NewServiceRegister 新建服务注册
func NewServiceRegister(endpoints []string, key, val string, lease int64, dailTimeout int) (*ServiceRegister, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: time.Duration(dailTimeout) * time.Second,
})
if err != nil {
return nil, err
}
ser := &ServiceRegister{
cli: cli,
key: key,
val: val,
}
// 申请租约设置时间keepalive
if err := ser.putKeyWithLease(lease); err != nil {
return nil, err
}
return ser, nil
}
// 设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
// 创建一个租约,并且设置ttl时间
resp, err := s.cli.Grant(context.Background(), lease)
if err != nil {
return err
}
// 注册服务并且绑定租约
_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(s.leaseID))
if err != nil {
return err
}
// 设置续租 定期发送需求请求
// KeepAlive 使给定的租约永远有效。如果发布到通道的keepalive响应没有立即使用,
// 则租约客户端至少每秒钟向etcd服务器发送保持活动请求,知道获取最新响应为止
// etcd client 会自动发送ttl到etcd server, 从而保证租约一直有效
leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
if err != nil {
return err
}
s.leaseID = resp.ID
log.Println(s.leaseID)
s.keepAliveChan = leaseRespChan
log.Printf("Put key:%s val:%s success!", s.key, s.val)
return nil
}
// ListenLeaseRespChan 监听续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
for leaseKeepResp := range s.keepAliveChan {
log.Println("续约成功", leaseKeepResp)
}
log.Println("关闭租约")
}
// Close 注销服务
func (s *ServiceRegister) Close() error {
// 撤销租约
if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
return err
}
log.Println("撤销租约")
return s.cli.Close()
}
func main() {
var endpoints = []string{"localhost:2379"}
ser, err := NewServiceRegister(endpoints, "/server/node1", "localhost:8000", 6, 5)
if err != nil {
log.Fatalln(err)
}
// 监听相应的租约
go ser.ListenLeaseRespChan()
// 监控系统型号,等待 ctrl + c 系统信号通知关闭
c := make(chan os.Signal, 1)
go func() {
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
}()
log.Printf("exit %s", <-c)
ser.Close()
}
服务发现
package main
import (
"context"
"github.com/coreos/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/clientv3"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
// ServiceDiscovery 服务发现
type ServiceDiscovery struct {
cli *clientv3.Client // etcd client
serverList map[string]string // 服务列表
lock sync.RWMutex
}
// NewServiceDiscovery 新建服务发现
func NewServiceDiscovery(endpoints []string) *ServiceDiscovery {
// 初始化etcd client
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: time.Duration(5) * time.Second,
})
if err != nil {
log.Fatalln(err)
}
return &ServiceDiscovery{
cli: cli,
serverList: make(map[string]string),
}
}
// WatchService 初始化服务列表和监视
func (s *ServiceDiscovery) WatchService(prefix string) error {
// 根据前缀获取现有的key
resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
if err != nil {
return err
}
// 遍历获取得到的k和v
for _, ev := range resp.Kvs {
s.SetServiceList(string(ev.Key), string(ev.Value))
}
// 监视前缀,修改变更server
go s.watcher(prefix)
return nil
}
// watcher 监听Key的前缀
func (s *ServiceDiscovery) watcher(prefix string) {
rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
log.Printf("watching prefix:%s now...", prefix)
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT: // 修改或者新增
s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
case mvccpb.DELETE: // 删除
s.DelServiceList(string(ev.Kv.Key))
}
}
}
}
func (s *ServiceDiscovery) SetServiceList(key, val string) {
s.lock.Lock()
defer s.lock.Unlock()
s.serverList[key] = string(val)
log.Println("put key:", key, "val:", val)
}
func (s *ServiceDiscovery) DelServiceList(key string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.serverList, key)
log.Println("Del key :", key)
}
// GetServices 获取服务地址
func (s *ServiceDiscovery) GetServices() []string {
s.lock.RLock()
defer s.lock.RUnlock()
addrs := make([]string, 0, len(s.serverList))
for _, v := range s.serverList {
addrs = append(addrs, v)
}
return addrs
}
// Close 关闭服务
func (s *ServiceDiscovery) Close() error {
return s.cli.Close()
}
func main() {
var endpoints = []string{"localhost:2379"}
ser := NewServiceDiscovery(endpoints)
defer ser.Close()
err := ser.WatchService("/server/")
if err != nil {
log.Fatal(err)
}
// 监控系统信号,等待 ctrl + c 系统信号通知服务关闭
c := make(chan os.Signal, 1)
go func() {
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
}()
for {
select {
case <-time.Tick(10 * time.Second):
log.Println(ser.GetServices())
case <-c:
log.Println("server discovery exit")
return
}
}
}
参考: