内容提要

服务注册发现作为微服务的基础组件,它的稳定性和可用性备受考验。在之前的文章中,我们介绍了服务注册中心的基本原理和实现,具体参阅:

今天我们来讨论实现注册中心集群版,本文主要内容包括:
  • 分布式集群架构原理
  • 分布式数据复制技术

  • 状态一致性协同算法

  • Golang 代码实现集群服务

集群待解决问题

要实现注册中心从单机版到分布式集群,有几个关键问题要解决:

  1. 集群成员间的关系与成员发现问题
  2. 集群成员间数据复制与一致性问题
  3. 数据副本机制和数据分区策略

针对上述问题会有不同解决方案,而不同方案会对集群的可用性、容错能力和数据一致性造成不同结果,著名的 CAP 理论就是对分布式问题的最好诠释。架构就是在不同的方案和结果中进行的折中,没有最好的方案,只有适合场景的最佳实践,权衡取舍也是架构之魅力所在。

节点关系与成员发现

架构模型

集群中节点关系可以分为两种:平等公平关系和非公平关系。

P2P (pear to pear)点对点架构就是平等公平关系,这种关系中各节点没有领导分工,大家分摊工作,共同努力完成目标。

与之对立的非公平关系,我们熟知的 Master/Slave 主从架构(主备架构),由于主从这个名字带有歧视色彩,最新的叫法是 Leader/Follower 领导者跟随者架构,在这个架构中节点的地位是不一样的,会有不同的角色分工。

技术选型

针对注册中心场景选择哪种架构呢?可以从以下几点分析。

1.读写性能
点对点架构每个节点都可以承担读和写,读写性能最佳;

主从架构一般是做读写分离,写主读从(当然也有同步写,后面会分析到),相对来说写性能有限,但可以通过多个从来提升读性能。

注册中心场景一般读多写少,这点上倒也没有绝对的优劣。


2.可用性

点对点架构中某节点挂了,读写不受影响,但可能会丢数据造成数据不一致,数据一致性会差一些;

主从架构中主挂了会影响写,比如 MySQL 的 MHA,Redis 的 Sentinel 都是用来监控并实现切主,来保障高可用。而像 Zookeeper 支持半数以内的节点挂掉,超过半数就要触发重新选主了,此时不能写入。相比于点对点架构,整体可用性会差一点。


CAP 理论告诉我们,分布式系统在一致性(Consistency)、可用性(Availability) 和分区容错性 (Partition tolerance)三者只能选其二。在集群正常情况下,一致性和可用性都没问题(也就是 CA,网上大多数文章说 CA 模型不存在,其实说法并不准确,在正常情况下,一致性和可用性还是可以同时保障的)。但当集群出现异常,分区容错性必须保障(想想为什么?),那么一致性和可用性就要二选一,选 AP 还是 CP?

(CAP 理论 图片来自网络)

注册中心场景中,服务寻址都要依赖注册中心,可用性显得更加重要,而短暂不一致可忽略,毕竟服务上下线变动并不频繁,就算偶尔没拿到最新服务实例也不影响其他服务。著名的注册中心 Euraka 就使用了 AP 模型,并阐明 Zookeeper 这种基于 CP 模型的注册中心不可取,可参阅文章:

Why You Shouldn’t Use ZooKeeper for Service Discovery

3.架构实现

点对点架构实现相对更简单,不用考虑选主或主从切换的问题,节点状态也只要考虑上线状态和下线状态即可;

而领导者协调者架构在实现实现选主时要应对复杂的一致性协同算法,维护更复杂的状态机。


综合分析,注册中心技术选型使用点对点架构更合适,我们会以此架构展开讨论。

集群架构设计

我们来看点对点集群架构图:

(注册中心集群点对点架构图)

每个节点 Node 互相独立,并通过数据复制同步数据,每个节点都可接受服务注册、续约和发现操作。针对注册中心各节点相互发现问题,既然注册中心本身就是解决服务注册发现的,那么使用自己来管理自己不就好了?所以可以将节点作为服务实例,实现自发现。

(注册中心集群节点自发)

代码实现

下面我们通过具体代码来展开讲解实现原理。首先我们定义节点的概念和结构体,一个节点就是一个独立的注册中心服务,集群由多个节点组成。结构体 Node 存储节点地址和节点状态,节点状态有两种:上线状态(可对外提供服务),下线状态(不对外服务)。

type Node struct {
    config      *configs.Config
    addr        string
    status      int 
}
func NewNode(config *configs.GlobalConfig, addr string) *Node {
    return &Node{
        addr:        addr,
        status:      configs.NodeStatusDown, //初始化设为下线状态
    }   
}

(代码 model/node.go)

结构体 Nodes 用于存放所有节点列表和当前节点地址,方便节点初始化和节点感知。

type Nodes struct {
    nodes    []*Node
    selfAddr string
}
//初始化默认从配置文件中加载节点信息
func NewNodes(c *configs.GlobalConfig) *Nodes {
    nodes := make([]*Node, 0, len(c.Nodes))
    for _, addr := range c.Nodes {
        n := NewNode(c, addr)
        nodes = append(nodes, n)
    }
    return &Nodes{
        nodes:    nodes,
        selfAddr: c.HttpServer,
    }   
}

(代码 model/nodes.go)

最后将 Nodes 维护到 Discovery 结构体中,当服务启动首次加载全局 Discovery 时,开始创建并维护 Nodes 列表。
type Discovery struct {
    config    *configs.GlobalConfig
    protected bool
    Registry  *Registry
+   Nodes     atomic.Value
}
func NewDiscovery(config *configs.GlobalConfig) *Discovery {
//...
+ dis.Nodes.Store(NewNodes(config))
}

(代码 model/discovery.go)

注册中心节点实现自发现,节点之间可以感知到状态变化,注册中心集群当做服务 Application,将AppId  统一命名为 Kavin.discovery (写到配置文件configs.DiscoveryAppId),每个节点对应服务实例 Instance。对这块概念还不清楚,可以先参阅上一篇文章:
在启动服务初始化 Discovery 时,将自己注册到注册中心。
func (dis *Discovery) regSelf() {
    now := time.Now().UnixNano()
    instance := &Instance{
        Env:             dis.config.Env,
        Hostname:        dis.config.Hostname,
        AppId:           configs.DiscoveryAppId, //Kavin.discovery
        Addrs:           []string{"http://" + dis.config.HttpServer},
        Status:          configs.NodeStatusUp,
        RegTimestamp:    now,
        UpTimestamp:     now,
        LatestTimestamp: now,
        RenewTimestamp:  now,
        DirtyTimestamp:  now,
    }
    dis.Registry.Register(instance, now)
    //注册后同步到其他集群,下面部分会展开讲解
    dis.Nodes.Load().(*Nodes).Replicate(configs.Register, instance)
}

(代码 model/discovery.go)

注册成功后同步给集群其他节点,数据复制后面会具体讲解。注册成功后还要实现节点的定期续约,每 30s 发送一次续约请求,如果续约返回了 NotFound 未找到实例,做了一次重新注册的操作,保障了系统的健壮性。
func (dis *Discovery) renewTask(instance *Instance) {
    now := time.Now().UnixNano()
    ticker := time.NewTicker(configs.RenewInterval) //30 second
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:            
            _, err := dis.Registry.Renew(instance.Env, instance.AppId, instance.Hostname)
            if err == errcode.NotFound {
                dis.Registry.Register(instance, now)
                dis.Nodes.Load().(*Nodes).Replicate(configs.Register, instance)
            } else {
                dis.Nodes.Load().(*Nodes).Replicate(configs.Renew, instance)
            } 
        }   
    }   
}

(代码 model/discovery.go)

节点如果要进行下线操作,会先进行节点注销操作,在项目 main() 中增加注销自己的代码,实现比较简单,可直接参考代码:Discovery.CancelSelf(),代码可通过本公众号“技术岁月”发送“注册发现”获取。

func main() {
    //graceful restart
    quit := make(chan os.Signal)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
    <-quit
    log.Println("shutdown discovery server...")
    //cancel
++  global.Discovery.CancelSelf()
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
}
(代码 model/discovery.go)

节点的状态变更感知,用于维护集群节点的上下线,从节点注册表中拉取 AppId 为 Kavin.discovery 的数据,然后通过该数据中的实例信息来维护节点列表。

func (dis *Discovery) nodesPerception() {
    var lastTimestamp int64
    ticker := time.NewTicker(configs.NodePerceptionInterval)
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:
            fetchData, err := dis.Registry.Fetch(dis.config.Env, configs.DiscoveryAppId, configs.NodeStatusUp, lastTimest
amp)
            if err != nil || fetchData == nil {
                continue
            }   
            var nodes []string
            for _, instance := range fetchData.Instances {
                for _, addr := range instance.Addrs {
                    u, err := url.Parse(addr)
                    if err == nil {
                        nodes = append(nodes, u.Host)
                    }   
                }   
            } 
            lastTimestamp = fetchData.LatestTimestamp
            config := new(configs.GlobalConfig)
            *config = *dis.config
            config.Nodes = nodes
            ns := NewNodes(config)
            ns.SetUp()
            dis.Nodes.Store(ns)
        }
    }
}

(代码 model/discovery.go)

数据副本与数据一致性

数据模型一般会有副本和分区两种形式,分区我们等会讨论,先说说副本机制。

所谓副本机制 Replication,是指分布式系统在各节点上保存相同的数据拷贝,来达到备份的目的。

副本提供了几个好处:数据冗余;可伸缩性;改善数据局部性。在点对点架构中,每个节点都是一个独立的数据副本,这样某个节点出事不会影响别人,还可通过扩充节点提升可用性,抗住更大并发。

多副本最大的困扰,就是数据的一致性了,上面我们分析了 CAP,明确了使用 AP 模型,成员间数据虽然不能做到强一致性,但怎么保障最终一致性呢?这里考虑如下几点:

  • 服务启动时当前节点数据为空,需要同步其他节点数据

  • 某节点接收到服务最新数据变更,需要同步给其他节点

  • 节点间数据不一致性如何“反熵”

节点启动时注册表初始化

节点首次启动时,其注册表是空的,那么就要想办法从其他节点同步数据。其逻辑就是遍历所有节点,获取注册表数据,依次注册到本地。这里注意只有当所有数据同步完毕后,该注册中心才可对外提供服务,切换为上线状态。

func (dis *Discovery) initSync() {
    nodes := dis.Nodes.Load().(*Nodes)
    for _, node := range nodes.AllNodes() {
        if node.addr == nodes.selfAddr {
            continue
        }
        uri := fmt.Sprintf("http://%s%s", node.addr, configs.FetchAllURL)
        resp, err := httputil.HttpPost(uri, nil)
        if err != nil {
            log.Println(err)
            continue
        }
        var res struct {
            Code    int                    `json:"code"`
            Message string                 `json:"message"`
            Data    map[string][]*Instance `json:"data"`
        }
        err = json.Unmarshal([]byte(resp), &res)
        if err != nil {
            log.Printf("get from %v error : %v", uri, err)
            continue
        }
        if res.Code != configs.StatusOK {
            log.Printf("get from %v error : %v", uri, res.Message)
            continue
        }
        dis.protected = false
        for _, v := range res.Data {
            for _, instance := range v {
                dis.Registry.Register(instance, instance.LatestTimestamp)
            }
        }
    }
    nodes.SetUp()
}

(代码 model/discovery.go)

这里考虑到节点数据可能不一致,循环同步了所有节点数据来提高一致性,相应的会有网络 io 开销与浪费,在一致性和资源开销上做了取舍选择。

注册表数据变更时同步

当实例向某节点发起注册、续约、取消操作,该节点在完成本地注册表数据更新后,还需要将其同步给其他节点。同步采用当前节点依次广播的形式,扩散传播(Gossip 算法)虽然能实现更快的扩散,但实现复杂且容易发生多轮同步的问题。

Gossip 过程是由种子节点发起,当一个种子节点有状态需要更新到其他节点时,它会随机的选择周围几个节点散播消息,收到消息的节点也会重复该过程,直至最终网络中所有的节点都收到消息。

(当前节点发起广播同步数据)

关于数据更新还要多做一些说明,这里我们是更新完当前节点,即代表写入完成,此时可以通过该节点获取最新数据,而同步其他节点并没做检查,也就是说其他节点在同步完成前,获取的数据可能不一致。如果在同步前当前节点挂了,可能这次操作会丢失,我们并没有采用同步写模式,采用了弱一致性策略。

如果我们要强一致性怎么做呢?在数据写入当前节点并完成同步之前,所有节点数据不可读或仍读取之前版本数据(快照/多版本控制)。

在数据复制技术中,有同步复制、异步复制、半同步复制技术,对应的响应延迟时间(可用性)和一致性也会有差别。

来看具体代码实现,以服务注册为例,在 RegisterHandler 中,增加数据同步的逻辑,将数据变动同步给其他节点。

func RegisterHandler(c *gin.Context) {
   //...
+   if req.Replication {
+       global.Discovery.Nodes.Load().(*model.Nodes).Replicate(c, configs.Register, instance)
+   } 
}

(代码 api/handler/register.go)

在 Replicate 方法中,遍历所有的节点,依次执行注册操作。

func (nodes *Nodes) Replicate(c *gin.Context, action configs.Action, instance *Instance) error {
    if len(nodes.nodes) == 0 {
        return nil
    }
    for _, node := range nodes.nodes {
        if node.addr != nodes.selfAddr {
            go nodes.action(c, node, action, instance)
        }
    }
    return nil
}
func (nodes *Nodes) action(c *gin.Context, node *Node, action configs.Action, instance *Instance) {
    switch action {
    case configs.Register:
        go node.Register(c, instance)
    case configs.Renew:
        go node.Renew(c, instance)
    case configs.Cancel:
        go node.Cancel(c, instance)
    }
}

(代码 model/nodes.go)

Nodes 通过调用 Node 里方法实现节点操作逻辑。
func (node *Node) Register(c *gin.Context, instance *Instance) error {
    return node.call(c, node.registerURL, configs.Register, instance, nil)
}
func (node *Node) call(c *gin.Context, uri string, action configs.Action, instance *Instance, data interface{}) error {
    params := make(map[string]interface{})
    params["env"] = instance.Env
    params["appid"] = instance.AppId
    params["hostname"] = instance.Hostname
    params["replication"] = true //broadcast stop here
    switch action {
    case configs.Register:
        params["addrs"] = instance.Addrs
        params["status"] = instance.Status
        params["version"] = instance.Version
        params["reg_timestamp"] = strconv.FormatInt(instance.RegTimestamp, 10) 
        params["dirty_timestamp"] = strconv.FormatInt(instance.DirtyTimestamp, 10) 
        params["latest_timestamp"] = strconv.FormatInt(instance.LatestTimestamp, 10) 
    case configs.Renew:
        params["dirty_timestamp"] = strconv.FormatInt(instance.DirtyTimestamp, 10) 
    case configs.Cancel:
        params["latest_timestamp"] = strconv.FormatInt(instance.LatestTimestamp, 10) 
    }   
    resp, err := httputil.HttpPost(uri, params)
    if err != nil {
        return err 
    }   
    res := Response{}
    err = json.Unmarshal([]byte(resp), &res)
    if err != nil {
        return err 
    }   
    if res.Code != configs.StatusOK {
        json.Unmarshal([]byte(res.Data), data)
        return errcode.Conflict
    }   
    return nil 
}

(代码 model/node.go)

这里同步失败并不影响正常响应,也就是说本地执行成功即会返回成功,那么有可能会因为同步失败,造成节点间的数据不一致。分析有如下不一致的 case:
  • 如果是续约事件丢失,可以在下一次续约时补上;
  • 如果是注册事件丢失,也可以在下次续约时发现并修复(NotFound 逻辑);

  • 如果是取消事件丢失,长时间不续约会有剔除。

所以不处理,也可以经过一段时间后达成最终一致,但达成的最终一致的时间会久一些,所以也可以通过记录失败队列,补发失败请求来快速修复。

最终一致性保障

做了上述工作一般情况下,出现不一致的概率会非常低,但如果确实存在特殊 case,导致了各节点数据不一致,那么就需要有一个反熵兜底的方案来实现最终一致性了。
可以考虑开启定时任务,定期与其他节点进行数据比对,并根据 lastTimestamp 来决策哪条数据准确进行修复,如果数据不存在需要进行补录。当所有节点都和其他节点进行比对并修复后,理论上数据可达成一致性(实际过程中可能又发生了变化),这个过程叫反熵。所以又回到一开始技术选型时,我们就选择了较弱的一致性。

自测方案

我们准备 3 个配置,搭建 3 个节点,通过静态配置集群节点列表。

nodes: ["localhost:8881", "localhost:8882", "localhost:8883"]http_server: "localhost:8881" //其他节点配置8882和8883hostname: "sd1"  //其他节点配置sd2和sd3
(代码 cmd/configs.yaml)
启动节点 1,节点 1 先从配置中拿集群列表,但节点感知后发现其他两个节点不能访问,节点 1 会变更为单节点。
启动节点 2,节点 2 先同步节点 1 的注册表,并注册自己同步给节点 1,节点 1 和 节点 2 组成双节点集群,数据互相复制并保持一致。启动节点 3 过程也一样。
kill 节点 3,会发现其在节点 1 和节点 2 已注销了。

数据分区策略

副本解决了数据冗余的事,但本质上还是单机存放了全量数据,当注册表数据过多,单机出现瓶颈,数据分区就出现了。另外如果不同机房,满足机房内服务调用内敛,提供就近访问能力,也需要进行分区。

分区有不同的描述,如 Kafka 叫分区 Partitioning,而 MySQL 中叫分表,在 MongoDB、Elasticsearch 又叫分片 Shard,HBase、Tidb 中叫 Region,虽然实现原理可能不尽相同,但底层数据分区的思想却是一致性的。

注册中心可以实现划分区域 zone 的机制,zone1 保存服务 A、B、C,zone2 保存服务 D、E、F,来实现数据分区治理。

(多注册中心数据分区)

跨区域同步数据时,单向同步,并且只在该区域内部广播,不能再次广播到其他区域,每个区域各节点可使用负载均衡做反向代理。这里的实现部分就不展开描述了,主要就是在注册表中嵌套一层 zone 来隔离数据。

总结

本文实现了注册中心的集群版,在集群实现过程中,先明确了点对点的平等架构方式,并通过复制技术实现各副本间一致性问题,也说明了在可用性和一致性问题上做的取舍。

注册中心早期的 Zookeeper 使用 ZAB 协议(Paxos算法)实现了线性一致性,到 etcd 的 Raft 算法,分布式一致性算法是做分布式集群绕不开的话题,后续有机会会展开了讲。
注册中心功能上还有一些待完善的地方,比如多机房流量调度,与 Service Mesh 结合,还有客户端方案等,欢迎大家持续关注公众号“技术岁月”,来获取后续更新,本文项目参考 bilibili 的 discovery 开源项目。
文章完整代码请关注公众号  技术岁月 ,发送关键字 注册发现 获取。

感谢您的阅读,欢迎点赞、转发