1.简介<br /> &emsp;&emsp;最近在做服务拆分,目标就是先抽出一个流量解析模块,主要是负责流量的接入、协议的转化,将所有媒体请求都以统一接口访问后续服务,因为时间紧迫,服务间访问、负载均衡采用的是比较传统的处理方案——新服务通过内网LB访问后续服务,该种方案的最大问题就是在原有上、下层服务之间又加了一层,即数据链路又增加了一环,增加性能开销;后来利用周末时间,研究了软负载策略(又称客户端负载策略 ),该方案的特点就是将LB放在服务消费方,服务消费方通过LB组件获知可以提供服务的目标机器,然后与之建立连接、直接进行访问。<br /> &emsp;&emsp;服务发现与负载均衡策略参考 : https://www.jianshu.com/p/f49c3a29b5dc?from=timeline<br /> 2.实现<br /> &emsp;&emsp;由于平时工作也比较忙,本文比较偏实践、没有详细介绍一些理论知识,期间提到的内容,如有不了解的,还需要同学自己搜资料了解下,Go语言中,GRPC和ETCD就能很好的实现服务发现和负载均衡的处理,下面就开始讲下实现过程。<br /><br /> 2.1准备<br /> &emsp;&emsp;正所谓“工欲善其事,必先利其器”,开始之前,先把工具准备,为了处理的方便,建立先把Go升级到高版本,比如:1.14版本,主要是利用它的module功能,然后就是下载etcd并启动etcd服务,剩下的就是要安装下protobuf,后面要处理proto文件;这几块处理都比较简单,基本不会遇到啥大问题,基本网上搜搜资料都可以解决,下面说下比较麻烦的问题,就是GRPC和ETCD版本兼容的问题,之前这块被搞的吐血,基本有两个方案:<br /> &emsp;&emsp;1)向上对齐,修改etcd源码,参考地址是 : https://github.com/etcd-io/etcd/pull/11564/files<br /> &emsp;&emsp;2)向下对齐,GPRC和protoc都使用降低版本,参考地址是 : https://learnku.com/articles/43758<br /> &emsp;&emsp;我这边采用的是方案二,建议准备好环境后再开工;关注ETCD这块,最近还在研究,后面会详细介绍它的灵魂——Raft协议。<br /> 2.2方案<br /> &emsp;&emsp;本方案的处理方式是,客户端经自身LB组件知道服务端地址列表,然后建立连接、进行访问;客户端怎么获取服务端地址列表呢?这就需要借助ETCD了,ETCD是客户端与服务端的桥梁——服务端向ETCD注册服务地址、维持心跳,客户端负责监听变化、更新地址列表,下面围绕这几个点细讨论下<br /> 2.2.1服务注册<br /> &emsp;&emsp;按照理想情况,服务启动后将自身服务地址注册到ETCD上即可,看似处理过程很简单,但如果考虑到实际情况,就不可能这样简单处理了,线上实际情况是——服务器存在各种各样的断电、断网风险,比如:网线被工程挖断、被工作人员踢掉,机柜断电,服务宕机等,所以服务在ETCD上注册地址是个技术活,要考虑各种可能发现的异常情况,总的来说,要考虑以下几种情况:<br /> &emsp;&emsp;1)服务是正常的,但是,没注册上、导致无法为客户端提供服务;<br /> &emsp;&emsp;2)服务是不正常的,但是,没取消注册、导致客户端还在继续请求;<br /> &emsp;&emsp;综上所述,服务端在注册时,不能永久注册,注册地址是有过期时间的,对此,服务端要有定时检测机制,类似心跳检测,查看是否注册成功,如果服务注册失败,还要进行补注册,而补注册后,还要支持对数据的“续命”操作,即数据过期后自动续约。<br /> 2.2.2服务发现<br /> &emsp;&emsp;客户端做的事情主要是获取地址列表、与服务端建立连接,考虑到服务端地址存在变化的情况,故客户端还要支持地址监测、及时变更服务地址的处理,上述处理可以借助GRPC的命名解析功能;负载均衡的策略也是在客户端处理的,拨号时会指定策略,一般采取轮询策略。<br /> 2.3代码<br /> &emsp;&emsp;围绕方案实现处理代码,本部分主要是看下注册与发现的代码,完整的代码处理可以去Git上下载,地址是 : <br /> &emsp;&emsp;https://github.com/JackBelief/go_module_test.git <br /> &emsp;&emsp;本部分代码结构是:<br /> ![image.png](https://static.studygolang.com/200719/e81198286712e0597a0b16fbf13d5b14.png) &emsp;&emsp;其中,server.go是服务端代码,client.go是客户端代码,config是用于服务启动参数解析(引入了viper),etcd_proc完成对ETCD的各种操作处理,protoes定义GRPC契约,service完成GRPC服务结构定义。<br /> 2.3.1服务端代码<br /> &emsp;&emsp;服务端代码会启动两个协程,一个用于注册的心跳检测;另一个用于注册的续约处理,可以细看注释:<br /> ``` func RegisterETCDServer(addr string) { // 服务注册 registerServer(addr) } func registerServer(addr string) { var err error // 创建ETCD的客户端 if GClient == nil { GClient, err = newETCDClient() if err != nil { fmt.Println("ectd 客户端创建失败 error=", err.Error()) return } } fmt.Println("ectd 客户端创建成功") // 定时循环检测,查看向ETCD注册服务是否正常 // 每台服务向ETCD注册自己的IP地址,定时检测注册内容是否还在 ticker := time.NewTicker(time.Second * time.Duration(5)) go func() { for { getResp, err := GClient.Get(context.Background(), ETCDServerPrefix+addr) if err != nil { fmt.Println("etcd出现异常,key获取异常,key=", ETCDServerPrefix+addr, " error=", err.Error()) } else if getResp.Count == 0 { fmt.Println("etcd没有目标数据,需要补数据,key=", ETCDServerPrefix+addr) putData(ETCDServerPrefix+addr, addr) } else { fmt.Println("etcd目标数据正常,key=", ETCDServerPrefix+addr) } <-ticker.C } }() return } func newETCDClient() (*clientv3.Client, error) { config := clientv3.Config{ Endpoints: []string{"121.42.161.154:2379"}, DialTimeout: 5 * time.Second, } return clientv3.New(config) } func putData(key, value string) { leaseResp, err := GClient.Grant(context.Background(), 5) if err != nil { fmt.Println("etcd申请租约失败 key=", key, " error=", err.Error()) return } _, err = GClient.Put(context.Background(), key, value, clientv3.WithLease(leaseResp.ID)) if err != nil { fmt.Println("etcd写入数据失败 key=", key, " error=", err.Error()) return } kaRespChan, err := GClient.KeepAlive(context.Background(), leaseResp.ID) if err != nil { fmt.Println("etcd租约续约失败 key=", key, "id=", leaseResp.ID, " error=", err.Error()) return } // 定期查看续约结果 go func() { for { select { case respData := <-kaRespChan: if kaRespChan == nil { fmt.Println("管道关闭,出现异常,退出 key=", key) return } else { if respData == nil { fmt.Println("没有数据,可能是etcd关闭、也可能是网络异常,退出 key=", key) return } else { fmt.Println("续约成功 key=", key) } } } time.Sleep(1 * time.Second) } }() return } func UnRegisterETCDServer(addr string) { // 服务取消注册 unRegisterServer(addr) } func unRegisterServer(addr string) { var err error // 创建ETCD的客户端 if GClient == nil { GClient, err = newETCDClient() if err != nil { fmt.Println("ectd 客户端创建失败 error=", err.Error()) return } } // 删除服务注册数据 _, err = GClient.Delete(context.Background(), ETCDServerPrefix+addr) if err != nil { fmt.Println("服务关闭,etcd删除数据失败 key=", ETCDServerPrefix+addr, " error=", err.Error()) return } else { fmt.Println("服务关闭,etcd成功删除数据 key=", ETCDServerPrefix+addr) return } } ``` 2.3.2客户端代码<br /> &emsp;&emsp;客户端除了发起GRPC调用处理业务外,还要实现服务发现、监控变化的处理,具体可以细看注释:<br /> ``` /***************************************************************************************************** Builder 是接口类型,用于创建命名解析器,可监视命名空间是否发生变化,其方法有: 1) Scheme() string // 返回解析器支持的方案 2) Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) // 创建解析器 Resolver 是接口类型,用于监控目标变化,当目标发生变化时,会相应地更新地址、服务配置,其方法有: 1) Close() // 关闭解析器 2) ResolveNow(ResolveNowOptions) // 备用接口,GRPC可以再次调用用于目标的解析 客户端要实现以上接口,从而实现服务发现、变更 *****************************************************************************************************/ func NewResolver() resolver.Builder { return &ETCDResolver{rawAddr: "121.42.161.154:2379"} } type ETCDResolver struct { rawAddr string // etcd服务地址,多个地址要使用分隔符 resolverConn resolver.ClientConn // 解析器链接对象 } // 实现Builder接口类型 func (er *ETCDResolver) Scheme() string { return ETCDSchema } func (er *ETCDResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { // 构建解析器,解析器只负责对目标的更新,而对目标的监控由用户部分完成, var err error if GClient == nil { GClient, err = clientv3.New(clientv3.Config{ Endpoints: strings.Split(er.rawAddr, ";"), DialTimeout: 5 * time.Second, }) if err != nil { return nil, err } } // 解析器监控变化 er.resolverConn = cc fmt.Println("resolver create success") go er.watch("/" + target.Scheme + "/" + target.Endpoint + "/") return er, nil } func (er *ETCDResolver) watch(keyPrefix string) { for { er.watchETCD(keyPrefix) time.Sleep(1 * time.Second) } } func (er *ETCDResolver) watchETCD(keyPrefix string) { defer func() { if err := recover(); err != nil { fmt.Println("watch error =", err) } }() er.watchETCDKey(keyPrefix) } func (er *ETCDResolver) watchETCDKey(keyPrefix string) { var addrList []resolver.Address // 读取ETCD,获取IP列表 getResp, err := GClient.Get(context.Background(), keyPrefix, clientv3.WithPrefix()) if err != nil { fmt.Println("解析器读取ETCD,获取IP列表失败 err=", err.Error()) } else { for index := range getResp.Kvs { fmt.Println("初始IP地址是:", strings.TrimPrefix(string(getResp.Kvs[index].Key), keyPrefix)) addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[index].Key), keyPrefix)}) } } er.resolverConn.NewAddress(addrList) // er.resolverConn.UpdateState(resolver.State{Addresses:addrList}) // 监控ETCD中目标数据的变化 watchChan := GClient.Watch(context.Background(), keyPrefix, clientv3.WithPrefix()) for chanEle := range watchChan { for _, ev := range chanEle.Events { // 根据IP变化情况,解析器更新IP地址列表 addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix) switch ev.Type { case mvccpb.PUT: if !exist(addrList, addr) { addrList = append(addrList, resolver.Address{Addr: addr}) er.resolverConn.NewAddress(addrList) fmt.Println("插入新地址 address=", addr) } case mvccpb.DELETE: if s, ok := remove(addrList, addr); ok { addrList = s er.resolverConn.NewAddress(addrList) fmt.Println("删除老地址 address=", addr) } } } } } func exist(l []resolver.Address, addr string) bool { for i := range l { if l[i].Addr == addr { return true } } return false } func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) { for i := range s { if s[i].Addr == addr { s[i] = s[len(s)-1] return s[:len(s)-1], true } } return nil, false } // 实现Resolver接口类型 func (er *ETCDResolver) ResolveNow(rn resolver.ResolveNowOptions) { fmt.Println("ETCDResolver ResolveNow") } func (er *ETCDResolver) Close() { fmt.Println("ETCDResolver Close") } ``` 3.调测<br /> &emsp;&emsp;下载代码后,分别编译服务端和客户端代码,然后,启动多个服务端和一个客户端,通过关闭、重启服务端的方式查看客户端的服务发现、监控,另外,感兴趣的同学还可以看下断网、ETCD关闭等情况下客户端运行状态,本文不再细述。