etcd是一个分布式的,一致的 key-value 存储,主要用途是共享配置和服务发现。Etcd 已经在很多分布式系统中得到广泛的使用。
etcd实现服务注册与发现功能,主要使用key-value存储,租约(lease)以及watch功能。
服务注册
1.租约模式,lease用于检测客户生存状态的机制。etcd集群支持具有生命周期的租约。如果etcd集群在给定的TTL周期内未收到客户端的keepAlive,则租约到期。存储在etcd集群中的key-value允许附加一个租约。当租约到期或被撤销时,绑定到该租约的所有key-value键值对将会被删除。
2.相同服务存储的key的前缀需要设置成一样
3.注册服务就是向服务端使用租约模式写入一个key和对应的value
代码实现(etcd_ register.go)
package main
import (
"os"
"log"
"time"
"syscall"
"context"
"os/signal"
"go.etcd.io/etcd/clientv3"
)
//ServiceRegister 创建租约注册服务
type ServiceRegister struct {
cli *clientv3.Client //etcd v3 client
leaseID clientv3.LeaseID //租约ID
//租约keepalieve相应chan
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
key string //key
val string //value
}
//NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, key, val string, lease int64,dailTimeout int) (*ServiceRegister, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: time.Duration(dailTimeout) * time.Second,
})
if err != nil {
return nil,err
}
ser := &ServiceRegister{
cli: cli,
key: key,
val: val,
}
//申请租约设置时间keepalive
if err := ser.putKeyWithLease(lease); err != nil {
return nil, err
}
return ser, nil
}
//设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
//创建一个新的租约,并设置ttl时间
resp, err := s.cli.Grant(context.Background(), lease)
if err != nil {
return err
}
//注册服务并绑定租约
_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
if err != nil {
return err
}
//设置续租 定期发送需求请求
//KeepAlive使给定的租约永远有效。 如果发布到通道的keepalive响应没有立即被使用,
// 则租约客户端将至少每秒钟继续向etcd服务器发送保持活动请求,直到获取最新的响应为止。
//etcd client会自动发送ttl到etcd server,从而保证该租约一直有效
leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
if err != nil {
return err
}
s.leaseID = resp.ID
log.Println(s.leaseID)
s.keepAliveChan = leaseRespChan
log.Printf("Put key:%s val:%s success!", s.key, s.val)
return nil
}
//ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
for leaseKeepResp := range s.keepAliveChan {
log.Println("续约成功", leaseKeepResp)
}
log.Println("关闭续租")
}
// Close 注销服务
func (s *ServiceRegister) Close() error {
//撤销租约
if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
return err
}
log.Println("撤销租约")
return s.cli.Close()
}
func main() {
var endpoints = []string{"localhost:2379"}
ser, err := NewServiceRegister(endpoints, "/server/node1", "localhost:8000", 6 , 5)
if err != nil {
log.Fatalln(err)
}
//监听续租相应chan
go ser.ListenLeaseRespChan()
// 监控系统信号,等待 ctrl + c 系统信号通知服务关闭
c := make(chan os.Signal, 1)
go func() {
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
}()
log.Printf("exit %s", <-c)
ser.Close()
}
服务发现
1.创建一个etcd client
2.匹配到所有相同前缀的 key. 存储信息到本地
3.watch这个key前缀,当有增加或者删除的时候就修改本地
4.本地维护server的列表
代码实现(etcd_discovery.go)
package main
import (
"os"
"log"
"time"
"sync"
"syscall"
"context"
"os/signal"
"go.etcd.io/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
)
//ServiceDiscovery 服务发现
type ServiceDiscovery struct {
cli *clientv3.Client //etcd client
serverList map[string]string //服务列表
lock sync.RWMutex
}
//NewServiceDiscovery 新建发现服务
func NewServiceDiscovery(endpoints []string) *ServiceDiscovery {
//初始化etcd client v3
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
return &ServiceDiscovery{
cli: cli,
serverList: make(map[string]string),
}
}
//WatchService 初始化服务列表和监视
func (s *ServiceDiscovery) WatchService(prefix string) error {
//根据前缀获取现有的key
resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
if err != nil {
return err
}
//遍历获取到的key和value
for _, ev := range resp.Kvs {
s.SetServiceList(string(ev.Key), string(ev.Value))
}
//监视前缀,修改变更的server
go s.watcher(prefix)
return nil
}
//watcher 监听key的前缀
func (s *ServiceDiscovery) watcher(prefix string) {
rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
log.Printf("watching prefix:%s now...", prefix)
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT: //修改或者新增
s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
case mvccpb.DELETE: //删除
s.DelServiceList(string(ev.Kv.Key))
}
}
}
}
//SetServiceList 新增服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
s.lock.Lock()
defer s.lock.Unlock()
s.serverList[key] = string(val)
log.Println("put key :", key, "val:", val)
}
//DelServiceList 删除服务地址
func (s *ServiceDiscovery) DelServiceList(key string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.serverList, key)
log.Println("del key:", key)
}
//GetServices 获取服务地址
func (s *ServiceDiscovery) GetServices() []string {
s.lock.RLock()
defer s.lock.RUnlock()
addrs := make([]string, 0,len(s.serverList))
for _, v := range s.serverList {
addrs = append(addrs, v)
}
return addrs
}
//Close 关闭服务
func (s *ServiceDiscovery) Close() error {
return s.cli.Close()
}
func main() {
var endpoints = []string{"localhost:2379"}
ser := NewServiceDiscovery(endpoints)
defer ser.Close()
err := ser.WatchService("/server/")
if err != nil {
log.Fatal(err)
}
// 监控系统信号,等待 ctrl + c 系统信号通知服务关闭
c := make(chan os.Signal, 1)
go func() {
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
}()
for {
select {
case <-time.Tick(10 * time.Second):
log.Println(ser.GetServices())
case <-c:
log.Println("server discovery exit")
return
}
}
}
go run etcd_regiser.go
etcd_register实例1启动
go run etcd_discovery.go
一会儿之后我们将etcd_register中的key改为/server/node2,value改为localhost:8001
go run etcd_register.go(etcd_register实例2)启动成功之后观察etcd_discovery.go日志信息
我们发现etcd_discovery已经watch到新增实例的操作了,也更新了本地实例(服务)列表
然后我们关闭etcd_register实例1,观察etcd_discovery.go日志信息
发现etcd_discoveryo已经watch到删除实例的操作了,也更新了本地实例(服务)列表
感谢:https://www.cnblogs.com/FireworksEasyCool/p/12890649.html