etcd是用go语言编写的key-value存储中间件组件,能保证多个节点数据的强一致性,适合存储重要数据,但不适合存储大量数据,比较适合做微服务注册中心及分布式锁等。
etcd做服务发现原理简单分析:监听服务注册的key,当对应的值发生变化时,通知grpc更新服务列表地址
etcd做服务注册原理简单分析:向etcd组件注册服务名称及地址,通过租约机制不断续约注册的key以保持服务的存活状态
下面是实现的代码:
目录结构:
client
--main.go
proto
--greet.pb.go
--greet.proto
server
--main.go
各文件代码:
proto/greet.proto
syntax = "proto3";
option go_package = ".;greet";
service Greet {
rpc Hello(GreetRequest)returns(GreetResponse){}
}
message GreetRequest {
string name = 1;
}
message GreetResponse {
string message = 1;
string from = 2;
}
protoc --go_out=plugins=grpc:. *.proto 生成
client/main.go
package main
import (
"flag"
"fmt"
proto "grpc-etcd/proto"
"log"
"strings"
"time"
"go.etcd.io/etcd/client/v3"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/resolver"
)
var (
ServiceName = flag.String("ServiceName", "greet_service", "service name") //服务名称
EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址
)
var cli *clientv3.Client
//etcd解析器
type etcdResolver struct {
etcdAddr string
clientConn resolver.ClientConn
}
//初始化一个etcd解析器
func newResolver(etcdAddr string) resolver.Builder {
return &etcdResolver{etcdAddr: etcdAddr}
}
func (r *etcdResolver) Scheme() string {
return "etcd"
}
//watch有变化以后会调用
func (r *etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
log.Println("ResolveNow")
fmt.Println(rn)
}
//解析器关闭时调用
func (r *etcdResolver) Close() {
log.Println("Close")
}
//构建解析器 grpc.Dial()同步调用
func (r *etcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
var err error
fmt.Println("call build...")
//构建etcd client
if cli == nil {
cli, err = clientv3.New(clientv3.Config{
Endpoints: strings.Split(r.etcdAddr, ";"),
DialTimeout: 15 * time.Second,
})
if err != nil {
fmt.Printf("连接etcd失败:%s\n", err)
return nil, err
}
}
r.clientConn = clientConn
go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
return r, nil
}
//监听etcd中某个key前缀的服务地址列表的变化
func (r *etcdResolver) watch(keyPrefix string) {
//初始化服务地址列表
var addrList []resolver.Address
resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
if err != nil {
fmt.Println("获取服务地址列表失败:", err)
} else {
for i := range resp.Kvs {
addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)})
}
}
r.clientConn.NewAddress(addrList)
//监听服务地址列表的变化
rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
for n := range rch {
for _, ev := range n.Events {
addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
switch ev.Type {
case 0://mvccpb.PUT
if !exists(addrList, addr) {
addrList = append(addrList, resolver.Address{Addr: addr})
r.clientConn.NewAddress(addrList)
}
fmt.Println("有新的服务注册:",addr)
case 1://mvccpb.DELETE
if s, ok := remove(addrList, addr); ok {
addrList = s
r.clientConn.NewAddress(addrList)
}
fmt.Println("服务注销:",addr)
}
}
}
}
func exists(l []resolver.Address, addr string) bool {
for i := range l {
if l[i].Addr == addr {
return true
}
}
return false
}
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
for i := range s {
if s[i].Addr == addr {
s[i] = s[len(s)-1]
return s[:len(s)-1], true
}
}
return nil, false
}
func main() {
flag.Parse()
//注册etcd解析器
r := newResolver(*EtcdAddr)
resolver.Register(r)
//客户端连接服务器(负载均衡:轮询) 会同步调用r.Build()
conn, err := grpc.Dial(r.Scheme()+"://author/"+*ServiceName, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),grpc.WithInsecure())
if err != nil {
fmt.Println("连接服务器失败:", err)
}
defer conn.Close()
//获得grpc句柄
c := proto.NewGreetClient(conn)
ticker := time.NewTicker(2 * time.Second)
i:=1
for range ticker.C {
resp1, err := c.Hello(
context.Background(),
&proto.GreetRequest{Name: fmt.Sprintf("张三%d",i)},
)
if err != nil {
fmt.Println("Hello调用失败:", err)
return
}
fmt.Printf("Hello 响应:%s,来自:%s\n", resp1.Message, resp1.From)
i++
}
}
server/main.go
/**
* etcd demo server
* author: JetWu
* date: 2020.05.01
*/
package main
import (
"flag"
"fmt"
proto "grpc-etcd/proto"
"net"
"os"
"os/signal"
"strings"
"syscall"
"time"
"go.etcd.io/etcd/client/v3"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
var host = "127.0.0.1" //服务器主机
var (
Port = flag.Int("Port", 3000, "listening port") //服务器监听端口
ServiceName = flag.String("ServiceName", "greet_service", "service name") //服务名称
EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址
)
var cli *clientv3.Client
//rpc服务接口
type greetServer struct{}
func (gs *greetServer) Hello(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
fmt.Printf("Hello 调用: %s\n", req.Name)
return &proto.GreetResponse{
Message: "Hello, " + req.Name,
From: fmt.Sprintf("127.0.0.1:%d", *Port),
}, nil
}
//将服务地址注册到etcd中
func register(etcdAddr, serviceName, serverAddr string, ttl int64) error {
var err error
if cli == nil {
//构建etcd client
cli, err = clientv3.New(clientv3.Config{
Endpoints: strings.Split(etcdAddr, ";"),
DialTimeout: 15 * time.Second,
})
if err != nil {
fmt.Printf("连接etcd失败:%s\n", err)
return err
}
}
//与etcd建立长连接,并保证连接不断(心跳检测)
ticker := time.NewTicker(time.Second * time.Duration(ttl))
go func() {
key := getKey(serviceName,serverAddr)
for {
resp, err := cli.Get(context.Background(), key)
//fmt.Printf("resp:%+v\n", resp)
if err != nil {
fmt.Printf("获取服务地址失败:%s", err)
} else if resp.Count == 0 { //尚未注册
err = keepAlive(serviceName, serverAddr, ttl)
if err != nil {
fmt.Printf("保持连接失败:%s", err)
}
}
<-ticker.C
}
}()
return nil
}
//组装etcd key
func getKey(serviceName,serverAddr string) string {
return fmt.Sprintf("/%s/%s/%s","etcd",serviceName,serverAddr)
}
//保持服务器与etcd的长连接
func keepAlive(serviceName, serverAddr string, ttl int64) error {
//创建租约
leaseResp, err := cli.Grant(context.Background(), ttl)
if err != nil {
fmt.Printf("创建租期失败:%s\n", err)
return err
}
//将服务地址注册到etcd中
key := getKey(serviceName,serverAddr)
_, err = cli.Put(context.Background(), key, serverAddr, clientv3.WithLease(leaseResp.ID))
if err != nil {
fmt.Printf("注册服务失败:%s", err)
return err
}
fmt.Printf("etcd服务注册成功,key:%s,value:%s",key,serverAddr)
//建立长连接
ch, err := cli.KeepAlive(context.Background(), leaseResp.ID)
if err != nil {
fmt.Printf("建立长连接失败:%s\n", err)
return err
}
//清空keepAlive返回的channel
go func() {
for {
<-ch
}
}()
return nil
}
//取消注册
func unRegister(serviceName, serverAddr string) {
if cli != nil {
key := getKey(serviceName,serverAddr)
cli.Delete(context.Background(), key)
}
}
func main() {
flag.Parse()
//监听网络
listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *Port))
if err != nil {
fmt.Println("监听网络失败:", err)
return
}
defer listener.Close()
//创建grpc句柄
srv := grpc.NewServer()
defer srv.GracefulStop()
//将greetServer结构体注册到grpc服务中
proto.RegisterGreetServer(srv, &greetServer{})
//将服务地址注册到etcd中
serverAddr := fmt.Sprintf("%s:%d", host, *Port)
fmt.Printf("greeting server address: %s\n", serverAddr)
register(*EtcdAddr, *ServiceName, serverAddr, 5)
//关闭信号处理
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
go func() {
s := <-ch
unRegister(*ServiceName, serverAddr)
if i, ok := s.(syscall.Signal); ok {
os.Exit(int(i))
} else {
os.Exit(0)
}
}()
//监听服务
err = srv.Serve(listener)
if err != nil {
fmt.Println("监听异常:", err)
return
}
}
运行多个服务端,然后运行客户端