etcd是一个分布式的,一致的 key-value 存储,主要用途是共享配置和服务发现。Etcd 已经在很多分布式系统中得到广泛的使用。

etcd实现服务注册与发现功能,主要使用key-value存储,租约(lease)以及watch功能。

服务注册

1.租约模式,lease用于检测客户生存状态的机制。etcd集群支持具有生命周期的租约。如果etcd集群在给定的TTL周期内未收到客户端的keepAlive,则租约到期。存储在etcd集群中的key-value允许附加一个租约。当租约到期或被撤销时,绑定到该租约的所有key-value键值对将会被删除。

2.相同服务存储的key的前缀需要设置成一样

3.注册服务就是向服务端使用租约模式写入一个key和对应的value

 

代码实现(etcd_ register.go)

package main

import (
	"os"
	"log"
	"time"
	"syscall"
	"context"
	"os/signal"

	"go.etcd.io/etcd/clientv3"
)

//ServiceRegister 创建租约注册服务
type ServiceRegister struct {
	cli     *clientv3.Client //etcd v3 client
	leaseID clientv3.LeaseID //租约ID
	//租约keepalieve相应chan
	keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
	key           string //key
	val           string //value
}

//NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, key, val string, lease int64,dailTimeout  int) (*ServiceRegister, error) {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: time.Duration(dailTimeout) * time.Second,
	})
	if err != nil {
		return nil,err
	}

	ser := &ServiceRegister{
		cli: cli,
		key: key,
		val: val,
	}

	//申请租约设置时间keepalive
	if err := ser.putKeyWithLease(lease); err != nil {
		return nil, err
	}

	return ser, nil
}

//设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
	//创建一个新的租约,并设置ttl时间
	resp, err := s.cli.Grant(context.Background(), lease)
	if err != nil {
		return err
	}

	//注册服务并绑定租约
	_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
	if err != nil {
		return err
	}
	//设置续租 定期发送需求请求
	//KeepAlive使给定的租约永远有效。 如果发布到通道的keepalive响应没有立即被使用,
	// 则租约客户端将至少每秒钟继续向etcd服务器发送保持活动请求,直到获取最新的响应为止。
	//etcd client会自动发送ttl到etcd server,从而保证该租约一直有效
	leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
	if err != nil {
		return err
	}

	s.leaseID = resp.ID
	log.Println(s.leaseID)
	s.keepAliveChan = leaseRespChan
	log.Printf("Put key:%s  val:%s  success!", s.key, s.val)
	return nil
}

//ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
	for leaseKeepResp := range s.keepAliveChan {
		log.Println("续约成功", leaseKeepResp)
	}
	log.Println("关闭续租")
}

// Close 注销服务
func (s *ServiceRegister) Close() error {
	//撤销租约
	if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
		return err
	}
	log.Println("撤销租约")
	return s.cli.Close()
}

func main() {
	var endpoints = []string{"localhost:2379"}

	ser, err := NewServiceRegister(endpoints, "/server/node1", "localhost:8000", 6 , 5)
	if err != nil {
		log.Fatalln(err)
	}
	//监听续租相应chan
	go ser.ListenLeaseRespChan()

	// 监控系统信号,等待 ctrl + c 系统信号通知服务关闭
	c := make(chan os.Signal, 1)
	go func() {
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
	}()
	log.Printf("exit %s", <-c)
	ser.Close()
}

 

服务发现

1.创建一个etcd client

2.匹配到所有相同前缀的 key. 存储信息到本地

3.watch这个key前缀,当有增加或者删除的时候就修改本地

4.本地维护server的列表

 

代码实现(etcd_discovery.go)

package main

import (
	"os"
	"log"
	"time"
	"sync"
	"syscall"
	"context"
	"os/signal"

	"go.etcd.io/etcd/clientv3"
	"github.com/coreos/etcd/mvcc/mvccpb"
)

//ServiceDiscovery 服务发现
type ServiceDiscovery struct {
	cli        *clientv3.Client  //etcd client
	serverList map[string]string //服务列表
	lock       sync.RWMutex
}

//NewServiceDiscovery  新建发现服务
func NewServiceDiscovery(endpoints []string) *ServiceDiscovery {
	//初始化etcd client v3
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})

	if err != nil {
		log.Fatal(err)
	}

	return &ServiceDiscovery{
		cli:        cli,
		serverList: make(map[string]string),
	}
}

//WatchService 初始化服务列表和监视
func (s *ServiceDiscovery) WatchService(prefix string) error {
	//根据前缀获取现有的key
	resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
	if err != nil {
		return err
	}

	//遍历获取到的key和value
	for _, ev := range resp.Kvs {
		s.SetServiceList(string(ev.Key), string(ev.Value))
	}

	//监视前缀,修改变更的server
	go s.watcher(prefix)
	return nil
}

//watcher 监听key的前缀
func (s *ServiceDiscovery) watcher(prefix string) {
	rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
	log.Printf("watching prefix:%s now...", prefix)
	for wresp := range rch {
		for _, ev := range wresp.Events {
			switch ev.Type {
			case mvccpb.PUT: //修改或者新增
				s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
			case mvccpb.DELETE: //删除
				s.DelServiceList(string(ev.Kv.Key))
			}
		}
	}
}

//SetServiceList 新增服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	s.serverList[key] = string(val)
	log.Println("put key :", key, "val:", val)
}

//DelServiceList 删除服务地址
func (s *ServiceDiscovery) DelServiceList(key string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	delete(s.serverList, key)
	log.Println("del key:", key)
}

//GetServices 获取服务地址
func (s *ServiceDiscovery) GetServices() []string {
	s.lock.RLock()
	defer s.lock.RUnlock()
	addrs := make([]string, 0,len(s.serverList))

	for _, v := range s.serverList {
		addrs = append(addrs, v)
	}
	return addrs
}

//Close 关闭服务
func (s *ServiceDiscovery) Close() error {
	return s.cli.Close()
}

func main() {
	var endpoints = []string{"localhost:2379"}
	ser := NewServiceDiscovery(endpoints)
	defer ser.Close()

	err := ser.WatchService("/server/")
	if err != nil {
		log.Fatal(err)
	}


	// 监控系统信号,等待 ctrl + c 系统信号通知服务关闭
	c := make(chan os.Signal, 1)
	go func() {
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
	}()

	for {
		select {
		case <-time.Tick(10 * time.Second):
			log.Println(ser.GetServices())
		case <-c:
			log.Println("server discovery exit")
			return
		}
	}
}

 

go run etcd_regiser.go

etcd_register实例1启动

 

go run etcd_discovery.go

 

一会儿之后我们将etcd_register中的key改为/server/node2,value改为localhost:8001

go run etcd_register.go(etcd_register实例2)启动成功之后观察etcd_discovery.go日志信息

我们发现etcd_discovery已经watch到新增实例的操作了,也更新了本地实例(服务)列表

 

然后我们关闭etcd_register实例1,观察etcd_discovery.go日志信息

发现etcd_discoveryo已经watch到删除实例的操作了,也更新了本地实例(服务)列表

 

 

感谢:https://www.cnblogs.com/FireworksEasyCool/p/12890649.html