内容提要

在上一篇文章中分享了如何从零开始搭建一个 RPC 框架,并完成了 P2P 版本功能,本章继续完善增加服务注册发现和负载均衡实现集群能力。
传送门:
本文主要内容包括:

  • RPC 接入服务注册中心
  • 服务端实现平滑启停

  • 客户端实现服务发现

  • 客户端实现负载均衡

  • 客户端实现失败策略

服务注册发现

在 P2P 版本 RPC 中,客户端要知道服务端的地址,并发起点对点连接,虽然满足了服务调用的能力,但其弊端也显而易见。为了保障服务高可用,通常会冗余部署多个服务端实例,而客户端如何知道每一个服务实例的调用地址,服务端实例上下线又如何告知客户端,这就需要引入服务自动注册发现的能力。


注册发现是指客户端具备动态发现服务端实例的能力,一般借助服务注册中心来实现,开源注册中心有“Eurake”或“Nacos”等,本人之前专门有文章讲过其实现,对应项目为 “service_discovery”,这里将以它为服务注册中心,完成客户端接入。

具体参阅:

首先定义客户端接口,既要满足服务提供者注册/下线的能力,又要满足服务消费者发现/观察的能力。

type Registry interface {
    Register(context.Context, *Instance) (context.CancelFunc, error)
    Fetch(context.Context, string) ([]*Instance, bool)
    Close() error
}

naming/naming.go

定义 Discovery 继承接口 Registry 实现与 “service_rpc” 接入,如果要使用“Eurake”或“Nacos”作为注册中心,可以自定义扩展。
type Discovery struct {
    once       *sync.Once
    conf       *Config
    ctx        context.Context
    cancelFunc context.CancelFunc
    //local cache
    mutex    sync.RWMutex
    apps     map[string]*FetchData
    registry map[string]struct{}
    //registry center node
    idx  uint64       //node index
    node atomic.Value //node list
}
func New(conf *Config) *Discovery {
    if len(conf.Nodes) == 0 {
        panic("conf nodes empty!")
    }
    ctx, cancel := context.WithCancel(context.Background())
    dis := &Discovery{
        ctx:        ctx,
        cancelFunc: cancel,
        conf:       conf,
        apps:       map[string]*FetchData{},
        registry:   map[string]struct{}{},
    }
    //from conf get node list
    dis.node.Store(conf.Nodes)
    go dis.updateNode()
    return dis
}

naming/discovery.go

初始化 Discovery,默认从配置中获取注册中心节点(地址),并开启单独协程来定期更新维护节点变化。
func (dis *Discovery) updateNode() {
    ticker := time.NewTicker(NodeInterval)
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:
            uri := fmt.Sprintf(_nodesURL, dis.pickNode())
            log.Println("discovery - request and update node, url:" + uri)
            params := make(map[string]interface{})
            params["env"] = dis.conf.Env
            resp, err := HttpPost(uri, params)
            if err != nil {
                log.Println(err)
                continue
            }
            res := ResponseFetch{}
            err = json.Unmarshal([]byte(resp), &res)
            if err != nil {
                log.Println(err)
                continue
            }
            newNodes := []string{}
            for _, ins := range res.Data.Instances {
                for _, addr := range ins.Addrs {
                  newNodes = append(newNodes, strings.TrimPrefix(addr, "http://"))
                }
            }
            if len(newNodes) == 0 {
                continue

            }
            curNodes := dis.node.Load().([]string)
            if !compareNodes(curNodes, newNodes) {
                dis.node.Store(newNodes)
                log.Println("nodes list changed!", newNodes)
                log.Println(newNodes)
            } else {
                log.Println("nodes list not change:", curNodes)
            }
        }
    }
}

naming/discovery.go

这里主要是开启定时器,间隔时间(NodeInterval = 60*time.Second)去请求注册中心接口(/api/nodes)获取所有注册中心服务器节点的地址,如果有变化则变更内存维护的节点列表。
//对比两个数据是否完全相等
func compareNodes(a, b []string) bool {
    if len(a) != len(b) {
        return false
    }
    mapB := make(map[string]struct{}, len(b))
    for _, node := range b {
        mapB[node] = struct{}{}
    }
    for _, node := range a {
        if _, ok := mapB[node]; !ok {
            return false
        }
    }
    return true
}

naming/discovery.go

实现服务注册能力,先检测本地缓存查看是否已注册,没有则请求注册中心并发起注册,异步维护一个定时任务来维持心跳(续约),如果发生终止则会调用取消接口从注册中心注销。

func (dis *Discovery) Register(ctx context.Context, instance *Instance) (context.CancelFunc, error)
 {
    var err error
    //check local cache
    dis.mutex.Lock()
    if _, ok := dis.registry[instance.AppId]; ok {
        err = errors.New("instance duplicate register")
    } else {
        dis.registry[instance.AppId] = struct{}{} //register local cache
    }
    dis.mutex.Unlock()
    if err != nil {
        return nil, err 
    }
    //http register
    ctx, cancel := context.WithCancel(dis.ctx)
    if err = dis.register(instance); err != nil {
        //fail
        dis.mutex.Lock()
        delete(dis.registry, instance.AppId)
        dis.mutex.Unlock()
        return cancel, err
    }
    ch := make(chan struct{}, 1)
    cancelFunc := context.CancelFunc(func() {
        cancel()
        <-ch
    })
    //renew&cancel
    go func() {
        ticker := time.NewTicker(RenewInterval)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                if err := dis.renew(instance); err != nil {
                    dis.register(instance)
                }
            case <-ctx.Done():
                dis.cancel(instance)
                ch <- struct{}{}
            }
        }

    }()
    return cancelFunc, nil
}

naming/discovery.go

根据服务标识(appId)获取服务注册信息,先从本地缓存中获取,如不存在则从远程注册中心拉取并缓存下来。
func (dis *Discovery) Fetch(ctx context.Context, appId string) ([]*Instance, bool) {
    //from local
    dis.mutex.RLock()
    fetchData, ok := dis.apps[appId]
    dis.mutex.RUnlock()
    if ok {
        log.Println("get data from local memory, appid:" + appId)
        return fetchData.Instances, ok
    }
    //from remote
    uri := fmt.Sprintf(_fetchURL, dis.pickNode())
    params := make(map[string]interface{})
    params["env"] = dis.conf.Env
    params["appid"] = appId
    params["status"] = 1 //up
    resp, err := HttpPost(uri, params)
    if err != nil {
        dis.switchNode()
        return nil, false
    }
    res := ResponseFetch{}
    err = json.Unmarshal([]byte(resp), &res)
    if res.Code != 200 {
        return nil, false
    }
    if err != nil {
        log.Println(err)
        return nil, false
    }
    var result []*Instance
    for _, ins := range res.Data.Instances {
        result = append(result, ins)

    }
    if len(result) > 0 {
        ok = true
        dis.mutex.Lock()
        dis.apps[appId] = &res.Data
        dis.mutex.Unlock()
    }
    return result, ok
}

naming/discovery.go

服务端改造

服务端与注册中心的交互包括服务启动时会将自身服务信息(监听地址和端口)写入注册中心,开启定时续约,在服务关闭退出时会注销自身的注册信息。

服务启动注册

首先在 RPCServer 中增加 Registry 用于绑定服务注册中心实例。
type RPCServer struct {
    listener   Listener
++  registry   naming.Registry
}
func NewRPCServer(option Option, registry naming.Registry) *RPCServer {
    return &RPCServer{
        listener: NewRPCListener(option),
++      registry: registry,
        option:   option,
    }
}

provider/server.go

在服务端入口,实例化 RPCServer 时传入注册中心依赖。
func main() {
    //服务注册中心
    conf := &naming.Config{Nodes: config.RegistryAddrs, Env: config.Env}
    discovery := naming.New(conf)
    //注入依赖
    srv := provider.NewRPCServer(option, discovery)
}

demo/server/server.go

服务启动时将信息发布到注册中心。考虑下“服务启动”和"服务注册"是否有先后顺序? 也就是如何保障服务端“优雅启动”的问题。
服务注册到注册中心后,客户端即可刷到该服务的地址信息并发起连接调用,而此时如果服务端并没有 ready ,就会导致服务调用失败产生异常,所以一定要等到服务启动完成后,再去暴露服务地址。像 Java 服务由于涉及到 JVM 预热(将常用类字节码转为机器码提高执行效率),还会有延迟暴露的需求, Golang 服务可以不用考虑。
func (svr *RPCServer) Run() {
    //先启动后暴露服务
    err := svr.listener.Run()
    if err != nil {
        panic(err)
    }
    //register in discovery,注册失败(重试失败)退出服务
    err = svr.registerToNaming()
    if err != nil {
        svr.Close() //注册失败关闭服务
        panic(err)
    }   
}
provider/server.go
服务注册数据包括运行环境(env),服务标识(appId),主机名(hostname),服务地址(addrs)等。向服务注册中心发起注册请求,失败后会进行重试,如果重试失败将会终止退出并关闭服务。
func (svr *RPCServer) registerToNaming() error {
    instance := &naming.Instance{
        Env:      svr.option.Env,
        AppId:    svr.option.AppId,
        Hostname: svr.option.Hostname,
        Addrs:    svr.listener.GetAddrs(),
    }
    retries := maxRegisterRetry
    for retries > 0 {
        retries--
        cancel, err := svr.registry.Register(context.Background(), instance)
        if err == nil {
            svr.cancelFunc = cancel
            return nil
        }
    }
    return errors.New("register to naming server fail")
}

provider/server.go

由于 registry.Register 已实现定时请求 renew,所以服务启动后会自动开启服务续约保持服务状态。 


做个测试,先启动服务注册中心(service_discovery),再运行 demo/server,通过配置不同端口和hostname,启动两个服务,从服务注册中心可以看到其结果。

服务退出注销

服务关闭退出时需要将其从注册中心一并移除,此时还需要考虑顺序问题保障“优雅退出”。和启动顺序相反,启动时先将服务启动再去暴露给注册中心,而退出时先从注册中心注销,再去关闭服务。想想看为什么?

服务端从注册中心注销后,客户端从注册中心感知服务下线,就不再发送新连接和请求到该服务端实例。
这里也可能有些问题,由于客户端缓存机制导致客户端感知服务端变化滞后,仍会有少许时间新连接和请求提交到当前服务端。目前由于还未使用长链接管理,无法知晓有哪些客户端连接。如果此时服务仍存活就正常处理返回,如果失败可以返回“特殊失败码“,告知客户端不要再请求了,服务端关闭了。

func (svr *RPCServer) Close() {
    //从服务注册中心注销 
    if svr.cancelFunc != nil {
        svr.cancelFunc()
    }
    //关闭当前服务
    if svr.listener != nil {
        svr.listener.Close()
    }
}
func (svr *RPCServer) registerToNaming() error {
++  cancel, err := svr.registry.Register(context.Background(), instance)
++  svr.cancelFunc = cancel
}
//注册中心注册 (naming/discovery.go)
func (dis *Discovery) Register(ctx context.Context, instance *Instance) (context.CancelFunc, error)
 {
    ctx, cancel := context.WithCancel(dis.ctx)
    ch := make(chan struct{}, 1)
    cancelFunc := context.CancelFunc(func() {
        cancel()
        <-ch
    })
    for {
            select {
            case <-ctx.Done():
                dis.cancel(instance) //服务注销
                ch <- struct{}{}
            }
        }
    return cancelFunc, nil
}
provider/server.go
协程间状态同步通过 context.WithCancel 的方式,将服务注销方法提供给外层协程调用。当执行 Close() 时,会执行 cancelFun(),进而 cancel() 触发 ctx.Done(),完成 dis.cancel() ,将服务从注册中心注销。


服务关闭时,除了不再接受新请求外,还需要考虑处理中的请求,不能因为服务关闭而强制中断所有处理中的请求。根据请求所处阶段不同,可以分别设置“挡板”,告知服务调用方当前服务处于关闭流程,不再接受请求了。

func main() {
    //...
    quit := make(chan os.Signal)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
    <-quit
    srv.Shutdown()
}

demo/server/server.go

在 main() 中捕获服务退出系统信号,调用 Shutdown() 实现优雅关闭,Shutdown 和 Close() 区别在于是否优雅关闭。
func (svr *RPCServer) Shutdown() {
    //从服务注册中心注销 
    if svr.cancelFunc != nil {
        svr.cancelFunc()
    }
    //关闭当前服务
    if svr.listener != nil {
        svr.listener.Shutdown()
    }
}

provider/server.go

上面说到根据请求所处阶段不同,设置挡板,那么都有哪些阶段呢?

(1)首先是服务端接收到客户端连接阶段。如果此时发现服务关闭,设置挡板不再往下执行,直接返回。

func (l *RPCListener) Run() error {
    //... listen ...    
++    go l.acceptConn()  //accept conn
}
func (l *RPCListener) acceptConn() {
    for {
        conn, err := l.nl.Accept()
        if err != nil {
            select {
            case <-l.getDoneChan(): //挡板:server closed done
                return 
            default:
            }
            return
        }
        go l.handleConn(conn) //处理连接
    }
}
type RPCListener struct {
++    doneChan    chan struct{} //控制结束
}
func (l *RPCListener) getDoneChan() <-chan struct{} {
    return l.doneChan
}
//关闭时关闭通道
func (l *RPCListener) Shutdown() {
    l.closeDoneChan()
}
//关闭通道
func (l *RPCListener) closeDoneChan() {
    select {
    case <-l.doneChan:
    default:
        close(l.doneChan)
    }
}

provider/listener.go

在服务关闭时,会关闭 doneChan 通道,这样上游 acceptConn 就会收到  <-l.getDoneChan() 数据,代表服务正在关闭,不继续处理请求,起到挡板的作用。
(2)接着是开始处理请求阶段。优先判断服务是否正在关闭,关闭则退出处理流程。通过设置一个全局标志位(shutdown),关闭服务时原子操作设置其值为 1,并通过判断值是否为 1,来去拦截请求。
func (l *RPCListener) handleConn(conn net.Conn) {
     //关闭挡板
++   if l.isShutdown() {
++       return
++   }
     for {
++      if l.isShutdown() {
++        return
++      }  
        //handle ...     
     }
}
type RPCListener struct {
++   shutdown    int32         //关闭处理中标识位
}
//判断是否关闭
func (l *RPCListener) isShutdown() bool {
    return atomic.LoadInt32(&l.shutdown) == 1
}
//关闭逻辑
func (l *RPCListener) Shutdown() {
    atomic.CompareAndSwapInt32(&l.shutdown, 0, 1)
}

provider/listener.go

(3)最后请求已进入服务实际处理阶段。此时无法简单设置挡板了,因为已经是处理中,就应该将请求处理完成。但我们需要确认有多少处理中的请求,并且确保这些请求全部执行完成,然后就可以安全退出了。这有点像 WaitGroup 计数器,我们也维护一个处理中任务计数来达到目的。

type RPCListener struct {
++    handlingNum int32         //处理中任务数
}
func (l *RPCListener) handleConn(conn net.Conn) {
   //...
   //处理中任务数+1
++ atomic.AddInt32(&l.handlingNum, 1)
   //任意退出都会导致处理中任务数-1
++ defer atomic.AddInt32(&l.handlingNum, -1)
   //read from network
   //decode
   //call local func
   //encode
   //send result
}
func (l *RPCListener) Shutdown() {
    atomic.CompareAndSwapInt32(&l.shutdown, 0, 1)
++  for {
++      if atomic.LoadInt32(&l.handlingNum) == 0 {
++          break
++      }
++  }
    l.closeDoneChan()
}

provider/listener.go

对于请求处理时间过长或者请求挂起的情况,可以加上超时时间控制,当超过指定时间仍未结束,则强制退出应用。
func NewClientProxy(appId string, option Option, registry naming.Registry) ClientProxy {
    cp := &RPCClientProxy{
        option:   option,
        failMode: option.FailMode,
        registry: registry,
    }
    servers, err := cp.discoveryService(context.Background(), appId)
    if err != nil {
        log.Fatal(err)
    }
    cp.servers = servers
    cp.loadBalance = LoadBalanceFactory(option.LoadBalanceMode, cp.servers)
    return cp
}
//获取服务列表
func (cp *RPCClientProxy) discoveryService(ctx context.Context, appId string) ([]string, error) {
    instances, ok := cp.registry.Fetch(ctx, appId)
    if !ok {
        return nil, errors.New("service not found")
    }
    var servers []string
    for _, instance := range instances {
        servers = append(servers, instance.Addrs...)
    }
    return servers, nil
}
type LoadBalanceMode int
const (
    RandomBalance LoadBalanceMode = iota
    RoundRobinBalance
    WeightRoundRobinBalance
)        
type LoadBalance interface {
    Get() string
}    
func LoadBalanceFactory(mode LoadBalanceMode, servers []string) LoadBalance {
    switch mode {
    case RandomBalance:
        return newRandomBalance(servers)
    case RoundRobinBalance:
        return newRoundRobinBalance(servers)
    default:
        return newRandomBalance(servers)
    }
}
type randomBalance struct {
    servers []string
}
func newRandomBalance(servers []string) LoadBalance {
    return &randomBalance{servers: servers}
}
func (b *randomBalance) Get() string {
    rand.Seed(time.Now().Unix())
    return b.servers[rand.Intn(len(b.servers))]
}
type roundRobinBalance struct {
    servers []string
    curIdx  int
}
func newRoundRobinBalance(servers []string) LoadBalance {
    return &roundRobinBalance{servers: servers, curIdx: 0}
}
func (b *roundRobinBalance) Get() string {
    lens := len(b.servers)
    if b.curIdx >= lens {
        b.curIdx = 0
    }
    server := b.servers[b.curIdx]
    b.curIdx = (b.curIdx + 1) % lens
    return server
}
func (cp *RPCClientProxy) getConn() error {
    addr := strings.Replace(cp.loadBalance.Get(), cp.option.NetProtocol+"://", "", -1)
    err := cp.client.Connect(addr) //长连接管理
    if err != nil {
        return err
    }
    return nil
}
type FailMode int 
const (
    Failover FailMode = iota
    Failfast
    Failretry
)
func (cp *RPCClientProxy) Call(ctx context.Context, servicePath string, stub interface{}, params ..
.interface{}) (interface{}, error) {
    service, err := NewService(servicePath)
    if err != nil {
        return nil, err 
    }
    err := cp.getConn()
    if err != nil && cp.failMode == Failfast { //快速失败
        return nil, err 
    }
    //失败策略
    switch cp.failMode {
    case Failretry:
    //...
    case Failover:
    //...
    case Failfast:
    //...
    }
    return nil, errors.New("call error")
}
switch cp.failMode {
    case Failretry:
        retries := cp.option.Retries
        for retries > 0 {
            retries--
            if client != nil {
                rs, err := cp.client.Invoke(ctx, service, stub, params...)
                if err == nil {
                    return rs, nil
                }
            }
        }
    case Failover:
        retries := cp.option.Retries
        for retries > 0 {
            retries--
            if client != nil {
                rs, err := cp.client.Invoke(ctx, service, stub, params...)
                if err == nil {
                    return rs, nil
                }
            }
            err = cp.getConn()
        }
    case Failfast:
        if client != nil {
            rs, err := cp.client.Invoke(ctx, service, stub, params...)
            if err == nil {
                return rs, nil
            }
            return nil, err
        }


总结与补充

这一版 RPC 框架具备了集群能力、负载均衡和简单容错能力,当然离一个完善的微服务框架仍有不少距离,所以后续会陆续迭代,希望大家多多支持。


文章完整代码请关注公众号  技术岁月 ,发送关键字 RPC 获取,服务注册中心代码发送 注册发现 获取。