内容提要
在上一篇文章中分享了如何从零开始搭建一个 RPC 框架,并完成了 P2P 版本功能,本章继续完善增加服务注册发现和负载均衡实现集群能力。
传送门:
本文主要内容包括:
RPC 接入服务注册中心 服务端实现平滑启停
客户端实现服务发现
客户端实现负载均衡
客户端实现失败策略
服务注册发现
在 P2P 版本 RPC 中,客户端要知道服务端的地址,并发起点对点连接,虽然满足了服务调用的能力,但其弊端也显而易见。为了保障服务高可用,通常会冗余部署多个服务端实例,而客户端如何知道每一个服务实例的调用地址,服务端实例上下线又如何告知客户端,这就需要引入服务自动注册发现的能力。
注册发现是指客户端具备动态发现服务端实例的能力,一般借助服务注册中心来实现,开源注册中心有“Eurake”或“Nacos”等,本人之前专门有文章讲过其实现,对应项目为 “service_discovery”,这里将以它为服务注册中心,完成客户端接入。
具体参阅:
首先定义客户端接口,既要满足服务提供者注册/下线的能力,又要满足服务消费者发现/观察的能力。
type Registry interface {
Register(context.Context, *Instance) (context.CancelFunc, error)
Fetch(context.Context, string) ([]*Instance, bool)
Close() error
}
naming/naming.go
type Discovery struct {
once *sync.Once
conf *Config
ctx context.Context
cancelFunc context.CancelFunc
//local cache
mutex sync.RWMutex
apps map[string]*FetchData
registry map[string]struct{}
//registry center node
idx uint64 //node index
node atomic.Value //node list
}
func New(conf *Config) *Discovery {
if len(conf.Nodes) == 0 {
panic("conf nodes empty!")
}
ctx, cancel := context.WithCancel(context.Background())
dis := &Discovery{
ctx: ctx,
cancelFunc: cancel,
conf: conf,
apps: map[string]*FetchData{},
registry: map[string]struct{}{},
}
//from conf get node list
dis.node.Store(conf.Nodes)
go dis.updateNode()
return dis
}
naming/discovery.go
func (dis *Discovery) updateNode() {
ticker := time.NewTicker(NodeInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
uri := fmt.Sprintf(_nodesURL, dis.pickNode())
log.Println("discovery - request and update node, url:" + uri)
params := make(map[string]interface{})
params["env"] = dis.conf.Env
resp, err := HttpPost(uri, params)
if err != nil {
log.Println(err)
continue
}
res := ResponseFetch{}
err = json.Unmarshal([]byte(resp), &res)
if err != nil {
log.Println(err)
continue
}
newNodes := []string{}
for _, ins := range res.Data.Instances {
for _, addr := range ins.Addrs {
newNodes = append(newNodes, strings.TrimPrefix(addr, "http://"))
}
}
if len(newNodes) == 0 {
continue
}
curNodes := dis.node.Load().([]string)
if !compareNodes(curNodes, newNodes) {
dis.node.Store(newNodes)
log.Println("nodes list changed!", newNodes)
log.Println(newNodes)
} else {
log.Println("nodes list not change:", curNodes)
}
}
}
}
naming/discovery.go
//对比两个数据是否完全相等
func compareNodes(a, b []string) bool {
if len(a) != len(b) {
return false
}
mapB := make(map[string]struct{}, len(b))
for _, node := range b {
mapB[node] = struct{}{}
}
for _, node := range a {
if _, ok := mapB[node]; !ok {
return false
}
}
return true
}
naming/discovery.go
实现服务注册能力,先检测本地缓存查看是否已注册,没有则请求注册中心并发起注册,异步维护一个定时任务来维持心跳(续约),如果发生终止则会调用取消接口从注册中心注销。
func (dis *Discovery) Register(ctx context.Context, instance *Instance) (context.CancelFunc, error)
{
var err error
//check local cache
dis.mutex.Lock()
if _, ok := dis.registry[instance.AppId]; ok {
err = errors.New("instance duplicate register")
} else {
dis.registry[instance.AppId] = struct{}{} //register local cache
}
dis.mutex.Unlock()
if err != nil {
return nil, err
}
//http register
ctx, cancel := context.WithCancel(dis.ctx)
if err = dis.register(instance); err != nil {
//fail
dis.mutex.Lock()
delete(dis.registry, instance.AppId)
dis.mutex.Unlock()
return cancel, err
}
ch := make(chan struct{}, 1)
cancelFunc := context.CancelFunc(func() {
cancel()
<-ch
})
//renew&cancel
go func() {
ticker := time.NewTicker(RenewInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := dis.renew(instance); err != nil {
dis.register(instance)
}
case <-ctx.Done():
dis.cancel(instance)
ch <- struct{}{}
}
}
}()
return cancelFunc, nil
}
naming/discovery.go
func (dis *Discovery) Fetch(ctx context.Context, appId string) ([]*Instance, bool) {
//from local
dis.mutex.RLock()
fetchData, ok := dis.apps[appId]
dis.mutex.RUnlock()
if ok {
log.Println("get data from local memory, appid:" + appId)
return fetchData.Instances, ok
}
//from remote
uri := fmt.Sprintf(_fetchURL, dis.pickNode())
params := make(map[string]interface{})
params["env"] = dis.conf.Env
params["appid"] = appId
params["status"] = 1 //up
resp, err := HttpPost(uri, params)
if err != nil {
dis.switchNode()
return nil, false
}
res := ResponseFetch{}
err = json.Unmarshal([]byte(resp), &res)
if res.Code != 200 {
return nil, false
}
if err != nil {
log.Println(err)
return nil, false
}
var result []*Instance
for _, ins := range res.Data.Instances {
result = append(result, ins)
}
if len(result) > 0 {
ok = true
dis.mutex.Lock()
dis.apps[appId] = &res.Data
dis.mutex.Unlock()
}
return result, ok
}
naming/discovery.go
服务端改造
服务端与注册中心的交互包括服务启动时会将自身服务信息(监听地址和端口)写入注册中心,开启定时续约,在服务关闭退出时会注销自身的注册信息。
服务启动注册
type RPCServer struct {
listener Listener
++ registry naming.Registry
}
func NewRPCServer(option Option, registry naming.Registry) *RPCServer {
return &RPCServer{
listener: NewRPCListener(option),
++ registry: registry,
option: option,
}
}
provider/server.go
func main() {
//服务注册中心
conf := &naming.Config{Nodes: config.RegistryAddrs, Env: config.Env}
discovery := naming.New(conf)
//注入依赖
srv := provider.NewRPCServer(option, discovery)
}
demo/server/server.go
func (svr *RPCServer) Run() {
//先启动后暴露服务
err := svr.listener.Run()
if err != nil {
panic(err)
}
//register in discovery,注册失败(重试失败)退出服务
err = svr.registerToNaming()
if err != nil {
svr.Close() //注册失败关闭服务
panic(err)
}
}
func (svr *RPCServer) registerToNaming() error {
instance := &naming.Instance{
Env: svr.option.Env,
AppId: svr.option.AppId,
Hostname: svr.option.Hostname,
Addrs: svr.listener.GetAddrs(),
}
retries := maxRegisterRetry
for retries > 0 {
retries--
cancel, err := svr.registry.Register(context.Background(), instance)
if err == nil {
svr.cancelFunc = cancel
return nil
}
}
return errors.New("register to naming server fail")
}
provider/server.go
做个测试,先启动服务注册中心(service_discovery),再运行 demo/server,通过配置不同端口和hostname,启动两个服务,从服务注册中心可以看到其结果。
服务退出注销
服务端从注册中心注销后,客户端从注册中心感知服务下线,就不再发送新连接和请求到该服务端实例。
这里也可能有些问题,由于客户端缓存机制导致客户端感知服务端变化滞后,仍会有少许时间新连接和请求提交到当前服务端。目前由于还未使用长链接管理,无法知晓有哪些客户端连接。如果此时服务仍存活就正常处理返回,如果失败可以返回“特殊失败码“,告知客户端不要再请求了,服务端关闭了。
func (svr *RPCServer) Close() {
//从服务注册中心注销
if svr.cancelFunc != nil {
svr.cancelFunc()
}
//关闭当前服务
if svr.listener != nil {
svr.listener.Close()
}
}
func (svr *RPCServer) registerToNaming() error {
++ cancel, err := svr.registry.Register(context.Background(), instance)
++ svr.cancelFunc = cancel
}
//注册中心注册 (naming/discovery.go)
func (dis *Discovery) Register(ctx context.Context, instance *Instance) (context.CancelFunc, error)
{
ctx, cancel := context.WithCancel(dis.ctx)
ch := make(chan struct{}, 1)
cancelFunc := context.CancelFunc(func() {
cancel()
<-ch
})
for {
select {
case <-ctx.Done():
dis.cancel(instance) //服务注销
ch <- struct{}{}
}
}
return cancelFunc, nil
}
服务关闭时,除了不再接受新请求外,还需要考虑处理中的请求,不能因为服务关闭而强制中断所有处理中的请求。根据请求所处阶段不同,可以分别设置“挡板”,告知服务调用方当前服务处于关闭流程,不再接受请求了。
func main() {
//...
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
<-quit
srv.Shutdown()
}
demo/server/server.go
func (svr *RPCServer) Shutdown() {
//从服务注册中心注销
if svr.cancelFunc != nil {
svr.cancelFunc()
}
//关闭当前服务
if svr.listener != nil {
svr.listener.Shutdown()
}
}
provider/server.go
(1)首先是服务端接收到客户端连接阶段。如果此时发现服务关闭,设置挡板不再往下执行,直接返回。
func (l *RPCListener) Run() error {
//... listen ...
++ go l.acceptConn() //accept conn
}
func (l *RPCListener) acceptConn() {
for {
conn, err := l.nl.Accept()
if err != nil {
select {
case <-l.getDoneChan(): //挡板:server closed done
return
default:
}
return
}
go l.handleConn(conn) //处理连接
}
}
type RPCListener struct {
++ doneChan chan struct{} //控制结束
}
func (l *RPCListener) getDoneChan() <-chan struct{} {
return l.doneChan
}
//关闭时关闭通道
func (l *RPCListener) Shutdown() {
l.closeDoneChan()
}
//关闭通道
func (l *RPCListener) closeDoneChan() {
select {
case <-l.doneChan:
default:
close(l.doneChan)
}
}
provider/listener.go
func (l *RPCListener) handleConn(conn net.Conn) {
//关闭挡板
++ if l.isShutdown() {
++ return
++ }
for {
++ if l.isShutdown() {
++ return
++ }
//handle ...
}
}
type RPCListener struct {
++ shutdown int32 //关闭处理中标识位
}
//判断是否关闭
func (l *RPCListener) isShutdown() bool {
return atomic.LoadInt32(&l.shutdown) == 1
}
//关闭逻辑
func (l *RPCListener) Shutdown() {
atomic.CompareAndSwapInt32(&l.shutdown, 0, 1)
}
provider/listener.go
(3)最后请求已进入服务实际处理阶段。此时无法简单设置挡板了,因为已经是处理中,就应该将请求处理完成。但我们需要确认有多少处理中的请求,并且确保这些请求全部执行完成,然后就可以安全退出了。这有点像 WaitGroup 计数器,我们也维护一个处理中任务计数来达到目的。
type RPCListener struct {
++ handlingNum int32 //处理中任务数
}
func (l *RPCListener) handleConn(conn net.Conn) {
//...
//处理中任务数+1
++ atomic.AddInt32(&l.handlingNum, 1)
//任意退出都会导致处理中任务数-1
++ defer atomic.AddInt32(&l.handlingNum, -1)
//read from network
//decode
//call local func
//encode
//send result
}
func (l *RPCListener) Shutdown() {
atomic.CompareAndSwapInt32(&l.shutdown, 0, 1)
++ for {
++ if atomic.LoadInt32(&l.handlingNum) == 0 {
++ break
++ }
++ }
l.closeDoneChan()
}
provider/listener.go
func NewClientProxy(appId string, option Option, registry naming.Registry) ClientProxy {
cp := &RPCClientProxy{
option: option,
failMode: option.FailMode,
registry: registry,
}
servers, err := cp.discoveryService(context.Background(), appId)
if err != nil {
log.Fatal(err)
}
cp.servers = servers
cp.loadBalance = LoadBalanceFactory(option.LoadBalanceMode, cp.servers)
return cp
}
//获取服务列表
func (cp *RPCClientProxy) discoveryService(ctx context.Context, appId string) ([]string, error) {
instances, ok := cp.registry.Fetch(ctx, appId)
if !ok {
return nil, errors.New("service not found")
}
var servers []string
for _, instance := range instances {
servers = append(servers, instance.Addrs...)
}
return servers, nil
}
type LoadBalanceMode int
const (
RandomBalance LoadBalanceMode = iota
RoundRobinBalance
WeightRoundRobinBalance
)
type LoadBalance interface {
Get() string
}
func LoadBalanceFactory(mode LoadBalanceMode, servers []string) LoadBalance {
switch mode {
case RandomBalance:
return newRandomBalance(servers)
case RoundRobinBalance:
return newRoundRobinBalance(servers)
default:
return newRandomBalance(servers)
}
}
type randomBalance struct {
servers []string
}
func newRandomBalance(servers []string) LoadBalance {
return &randomBalance{servers: servers}
}
func (b *randomBalance) Get() string {
rand.Seed(time.Now().Unix())
return b.servers[rand.Intn(len(b.servers))]
}
type roundRobinBalance struct {
servers []string
curIdx int
}
func newRoundRobinBalance(servers []string) LoadBalance {
return &roundRobinBalance{servers: servers, curIdx: 0}
}
func (b *roundRobinBalance) Get() string {
lens := len(b.servers)
if b.curIdx >= lens {
b.curIdx = 0
}
server := b.servers[b.curIdx]
b.curIdx = (b.curIdx + 1) % lens
return server
}
func (cp *RPCClientProxy) getConn() error {
addr := strings.Replace(cp.loadBalance.Get(), cp.option.NetProtocol+"://", "", -1)
err := cp.client.Connect(addr) //长连接管理
if err != nil {
return err
}
return nil
}
type FailMode int
const (
Failover FailMode = iota
Failfast
Failretry
)
func (cp *RPCClientProxy) Call(ctx context.Context, servicePath string, stub interface{}, params ..
.interface{}) (interface{}, error) {
service, err := NewService(servicePath)
if err != nil {
return nil, err
}
err := cp.getConn()
if err != nil && cp.failMode == Failfast { //快速失败
return nil, err
}
//失败策略
switch cp.failMode {
case Failretry:
//...
case Failover:
//...
case Failfast:
//...
}
return nil, errors.New("call error")
}
switch cp.failMode {
case Failretry:
retries := cp.option.Retries
for retries > 0 {
retries--
if client != nil {
rs, err := cp.client.Invoke(ctx, service, stub, params...)
if err == nil {
return rs, nil
}
}
}
case Failover:
retries := cp.option.Retries
for retries > 0 {
retries--
if client != nil {
rs, err := cp.client.Invoke(ctx, service, stub, params...)
if err == nil {
return rs, nil
}
}
err = cp.getConn()
}
case Failfast:
if client != nil {
rs, err := cp.client.Invoke(ctx, service, stub, params...)
if err == nil {
return rs, nil
}
return nil, err
}
总结与补充
这一版 RPC 框架具备了集群能力、负载均衡和简单容错能力,当然离一个完善的微服务框架仍有不少距离,所以后续会陆续迭代,希望大家多多支持。