etcd是用go语言编写的key-value存储中间件组件,能保证多个节点数据的强一致性,适合存储重要数据,但不适合存储大量数据,比较适合做微服务注册中心及分布式锁等。

etcd做服务发现原理简单分析:监听服务注册的key,当对应的值发生变化时,通知grpc更新服务列表地址

etcd做服务注册原理简单分析:向etcd组件注册服务名称及地址,通过租约机制不断续约注册的key以保持服务的存活状态

下面是实现的代码:

目录结构:

client

--main.go

proto

--greet.pb.go

--greet.proto

server

--main.go

各文件代码:

proto/greet.proto

syntax = "proto3";

option go_package = ".;greet";

service Greet {
    rpc Hello(GreetRequest)returns(GreetResponse){}
}

message GreetRequest {
    string name = 1;
}

message GreetResponse {
    string message = 1;
    string from = 2;
}
protoc --go_out=plugins=grpc:. *.proto 生成

client/main.go

package main

import (
	"flag"
	"fmt"
	proto "grpc-etcd/proto"
	"log"
	"strings"
	"time"

	"go.etcd.io/etcd/client/v3"
	"golang.org/x/net/context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/resolver"
)


var (
	ServiceName = flag.String("ServiceName", "greet_service", "service name")        //服务名称
	EtcdAddr    = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址
)

var cli *clientv3.Client

//etcd解析器
type etcdResolver struct {
	etcdAddr   string
	clientConn resolver.ClientConn
}

//初始化一个etcd解析器
func newResolver(etcdAddr string) resolver.Builder {
	return &etcdResolver{etcdAddr: etcdAddr}
}

func (r *etcdResolver) Scheme() string {
	return "etcd"
}

//watch有变化以后会调用
func (r *etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
	log.Println("ResolveNow")
	fmt.Println(rn)
}

//解析器关闭时调用
func (r *etcdResolver) Close() {
	log.Println("Close")
}

//构建解析器 grpc.Dial()同步调用
func (r *etcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	var err error
	fmt.Println("call build...")
	//构建etcd client
	if cli == nil {
		cli, err = clientv3.New(clientv3.Config{
			Endpoints:   strings.Split(r.etcdAddr, ";"),
			DialTimeout: 15 * time.Second,
		})
		if err != nil {
			fmt.Printf("连接etcd失败:%s\n", err)
			return nil, err
		}
	}

	r.clientConn = clientConn

	go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")

	return r, nil
}

//监听etcd中某个key前缀的服务地址列表的变化
func (r *etcdResolver) watch(keyPrefix string) {
	//初始化服务地址列表
	var addrList []resolver.Address

	resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
	if err != nil {
		fmt.Println("获取服务地址列表失败:", err)
	} else {
		for i := range resp.Kvs {
			addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)})
		}
	}

	r.clientConn.NewAddress(addrList)

	//监听服务地址列表的变化
	rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
	for n := range rch {
		for _, ev := range n.Events {
			addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
			switch ev.Type {
			case 0://mvccpb.PUT
				if !exists(addrList, addr) {
					addrList = append(addrList, resolver.Address{Addr: addr})
					r.clientConn.NewAddress(addrList)
				}
				fmt.Println("有新的服务注册:",addr)
			case 1://mvccpb.DELETE
				if s, ok := remove(addrList, addr); ok {
					addrList = s
					r.clientConn.NewAddress(addrList)
				}
				fmt.Println("服务注销:",addr)
			}
		}
	}
}

func exists(l []resolver.Address, addr string) bool {
	for i := range l {
		if l[i].Addr == addr {
			return true
		}
	}
	return false
}

func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
	for i := range s {
		if s[i].Addr == addr {
			s[i] = s[len(s)-1]
			return s[:len(s)-1], true
		}
	}
	return nil, false
}

func main() {
	flag.Parse()

	//注册etcd解析器
	r := newResolver(*EtcdAddr)
	resolver.Register(r)

	//客户端连接服务器(负载均衡:轮询) 会同步调用r.Build()
	conn, err := grpc.Dial(r.Scheme()+"://author/"+*ServiceName, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),grpc.WithInsecure())
	if err != nil {
		fmt.Println("连接服务器失败:", err)
	}
	defer conn.Close()

	//获得grpc句柄
	c := proto.NewGreetClient(conn)
	ticker := time.NewTicker(2 * time.Second)
	i:=1
	for range ticker.C {

		resp1, err := c.Hello(
			context.Background(),
			&proto.GreetRequest{Name: fmt.Sprintf("张三%d",i)},
		)
		if err != nil {
			fmt.Println("Hello调用失败:", err)
			return
		}
		fmt.Printf("Hello 响应:%s,来自:%s\n", resp1.Message, resp1.From)

		i++
	}
}

server/main.go

/**
* etcd demo server
* author: JetWu
* date: 2020.05.01
 */
package main

import (
	"flag"
	"fmt"
	proto "grpc-etcd/proto"
	"net"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"time"

	"go.etcd.io/etcd/client/v3"
	"golang.org/x/net/context"
	"google.golang.org/grpc"
)


var host = "127.0.0.1" //服务器主机
var (
	Port        = flag.Int("Port", 3000, "listening port")                           //服务器监听端口
	ServiceName = flag.String("ServiceName", "greet_service", "service name")        //服务名称
	EtcdAddr    = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址
)
var cli *clientv3.Client

//rpc服务接口
type greetServer struct{}

func (gs *greetServer) Hello(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
	fmt.Printf("Hello 调用: %s\n", req.Name)
	return &proto.GreetResponse{
		Message: "Hello, " + req.Name,
		From:    fmt.Sprintf("127.0.0.1:%d", *Port),
	}, nil
}


//将服务地址注册到etcd中
func register(etcdAddr, serviceName, serverAddr string, ttl int64) error {
	var err error

	if cli == nil {
		//构建etcd client
		cli, err = clientv3.New(clientv3.Config{
			Endpoints:   strings.Split(etcdAddr, ";"),
			DialTimeout: 15 * time.Second,
		})
		if err != nil {
			fmt.Printf("连接etcd失败:%s\n", err)
			return err
		}
	}

	//与etcd建立长连接,并保证连接不断(心跳检测)
	ticker := time.NewTicker(time.Second * time.Duration(ttl))
	go func() {
		key := getKey(serviceName,serverAddr)
		for {
			resp, err := cli.Get(context.Background(), key)
			//fmt.Printf("resp:%+v\n", resp)
			if err != nil {
				fmt.Printf("获取服务地址失败:%s", err)
			} else if resp.Count == 0 { //尚未注册
				err = keepAlive(serviceName, serverAddr, ttl)
				if err != nil {
					fmt.Printf("保持连接失败:%s", err)
				}
			}
			<-ticker.C
		}
	}()

	return nil
}

//组装etcd key
func getKey(serviceName,serverAddr string) string {
	return 	fmt.Sprintf("/%s/%s/%s","etcd",serviceName,serverAddr)
}

//保持服务器与etcd的长连接
func keepAlive(serviceName, serverAddr string, ttl int64) error {
	//创建租约
	leaseResp, err := cli.Grant(context.Background(), ttl)
	if err != nil {
		fmt.Printf("创建租期失败:%s\n", err)
		return err
	}

	//将服务地址注册到etcd中
	key := getKey(serviceName,serverAddr)
	_, err = cli.Put(context.Background(), key, serverAddr, clientv3.WithLease(leaseResp.ID))

	if err != nil {
		fmt.Printf("注册服务失败:%s", err)
		return err
	}
	fmt.Printf("etcd服务注册成功,key:%s,value:%s",key,serverAddr)
	//建立长连接
	ch, err := cli.KeepAlive(context.Background(), leaseResp.ID)
	if err != nil {
		fmt.Printf("建立长连接失败:%s\n", err)
		return err
	}

	//清空keepAlive返回的channel
	go func() {
		for {
			<-ch
		}
	}()
	return nil
}

//取消注册
func unRegister(serviceName, serverAddr string) {
	if cli != nil {
		key := getKey(serviceName,serverAddr)
		cli.Delete(context.Background(), key)
	}
}

func main() {
	flag.Parse()

	//监听网络
	listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *Port))
	if err != nil {
		fmt.Println("监听网络失败:", err)
		return
	}
	defer listener.Close()

	//创建grpc句柄
	srv := grpc.NewServer()
	defer srv.GracefulStop()

	//将greetServer结构体注册到grpc服务中
	proto.RegisterGreetServer(srv, &greetServer{})

	//将服务地址注册到etcd中
	serverAddr := fmt.Sprintf("%s:%d", host, *Port)
	fmt.Printf("greeting server address: %s\n", serverAddr)
	register(*EtcdAddr, *ServiceName, serverAddr, 5)

	//关闭信号处理
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
	go func() {
		s := <-ch
		unRegister(*ServiceName, serverAddr)
		if i, ok := s.(syscall.Signal); ok {
			os.Exit(int(i))
		} else {
			os.Exit(0)
		}
	}()

	//监听服务
	err = srv.Serve(listener)
	if err != nil {
		fmt.Println("监听异常:", err)
		return
	}
}

运行多个服务端,然后运行客户端