本文是使用 golang 实现 redis 系列的第七篇, 将介绍如何将单点的缓存服务器扩展为分布式缓存。godis 集群的源码在Github:Godis/cluster

单台服务器的CPU和内存等资源总是有限的,随着数据量和访问量的增加单台服务器很容易遇到瓶颈。利用多台机器建立分布式系统,分工处理是提高系统容量和吞吐量的常用方法。

使用更多机器来提高系统容量的方式称为系统横向扩容。与之相对的,提高单台机器性能被称为纵向扩容。由于无法在单台机器上无限提高硬件配置且硬件价格与性能的关系并非线性的,所以建立分布式系统进行横向扩容是更为经济实用的选择。

我们采用一致性 hash 算法 key 分散到不同的服务器,客户端可以连接到服务集群中任意一个节点。当节点需要访问的数据不在自己本地时,需要通过一致性 hash 算法计算出数据所在的节点并将指令转发给它。

与分布式系统理论中的分区容错性不同,我们仅将数据存在一个节点没有保存副本。这种设计提高了系统吞吐量和容量,但是并没有提高系统可用性,当有一个节点崩溃时它保存的数据将无法访问。

生产环境实用的 redis 集群通常也采取类似的分片存储策略,并为每个节点配置从节点作为热备节点,并使用 sentinel 机制监控 master 节点状态。在 master 节点崩溃后,sentinel 将备份节点提升为 master 节点以保证可用性。

一致性 hash 算法

为什么需要一致性 hash

node = hashCode(key) % n
node = hashCode(key) % n

算法原理

一致性 hash 算法的目的是在节点数量 n 变化时, 使尽可能少的 key 需要进行节点间重新分布。一致性 hash 算法将数据 key 和服务器地址 addr 散列到 2^32 的空间中。

我们将 2^32 个整数首尾相连形成一个环,首先计算服务器地址 addr 的 hash 值放置在环上。然后计算 key 的 hash 值放置在环上,顺时针查找,将数据放在找到的的第一个节点上。

key1, key2 和 key5 在 node2 上,key 3 在 node4 上,key4 在 node6 上

在增加或删除节点时只有该节点附近的数据需要重新分布,从而解决了上述问题。

新增 node8 后,key 5 从 node2 转移到 node8。其它 key 不变

如果服务器节点较少则比较容易出现数据分布不均匀的问题,一般来说环上的节点越多数据分布越均匀。我们不需要真的增加一台服务器,只需要将实际的服务器节点映射为几个虚拟节点放在环上即可。

Golang 实现一致性 Hash

我们使用 Golang 实现一致性 hash 算法, 源码在 Github: HDT3213/Godis, 大约 80 行代码。

type HashFunc func(data []byte) uint32

type Map struct {
    hashFunc HashFunc
    replicas int
    keys     []int // sorted
    hashMap  map[int]string
}

func New(replicas int, fn HashFunc) *Map {
    m := &Map{
        replicas: replicas, // 每个物理节点会产生 replicas 个虚拟节点
        hashFunc: fn,
        hashMap:  make(map[int]string), // 虚拟节点 hash 值到物理节点地址的映射
    }
    if m.hashFunc == nil {
        m.hashFunc = crc32.ChecksumIEEE
    }
    return m
}

func (m *Map) IsEmpty() bool {
    return len(m.keys) == 0
}

接下来实现添加物理节点的 Add 方法:

func (m *Map) Add(keys ...string) {
    for _, key := range keys {
        if key == "" {
            continue
        }
        for i := 0; i < m.replicas; i++ {
            // 使用 i + key 作为一个虚拟节点,计算虚拟节点的 hash 值
            hash := int(m.hashFunc([]byte(strconv.Itoa(i) + key))) 
            // 将虚拟节点添加到环上
            m.keys = append(m.keys, hash) 
            // 注册虚拟节点到物理节点的映射
            m.hashMap[hash] = key
        }
    }
    sort.Ints(m.keys)
}

接下来实现查找算法:

func (m *Map) Get(key string) string {
    if m.IsEmpty() {
        return ""
    }

    // 支持根据 key 的 hashtag 来确定分布 
    partitionKey := getPartitionKey(key)
    hash := int(m.hashFunc([]byte(partitionKey)))

    // sort.Search 会使用二分查找法搜索 keys 中满足 m.keys[i] >= hash 的最小 i 值
    idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })

    // 若 key 的 hash 值大于最后一个虚拟节点的 hash 值,则 sort.Search 找不到目标
    // 这种情况下选择第一个虚拟节点
    if idx == len(m.keys) {
        idx = 0
    }

    // 将虚拟节点映射为实际地址
    return m.hashMap[m.keys[idx]]
}
实现集群

实现了一致性 hash 算法后我们可以着手实现集群模式了,Godis 集群的代码在 Github:Godis/cluster。

集群最核心的逻辑是找到 key 所在节点并将指令转发过去:

// 集群模式下,除了 MSet、DEL 等特殊指令外,其它指令会交由 defaultFunc 处理
func defaultFunc(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
    key := string(args[1])
    peer := cluster.peerPicker.Get(key) // 通过一致性 hash 找到节点
    return cluster.Relay(peer, c, args)
}

func (cluster *Cluster) Relay(peer string, c redis.Connection, args [][]byte) redis.Reply {
    if peer == cluster.self { // 若数据在本地则直接调用数据库引擎
        // to self db
        return cluster.db.Exec(c, args)
    } else {
        // 从连接池取一个与目标节点的连接
        // 连接池使用 github.com/jolestar/go-commons-pool/v2 实现
        peerClient, err := cluster.getPeerClient(peer) 
        if err != nil {
            return reply.MakeErrReply(err.Error())
        }
        defer func() {
            _ = cluster.returnPeerClient(peer, peerClient) // 处理完成后将连接放回连接池
        }()
        // 将指令发送到目标节点
        return peerClient.Send(args) 
    }
}

func (cluster *Cluster) getPeerClient(peer string) (*client.Client, error) {
    connectionFactory, ok := cluster.peerConnection[peer]
    if !ok {
        return nil, errors.New("connection factory not found")
    }
    raw, err := connectionFactory.BorrowObject(context.Background())
    if err != nil {
        return nil, err
    }
    conn, ok := raw.(*client.Client)
    if !ok {
        return nil, errors.New("connection factory make wrong type")
    }
    return conn, nil
}

func (cluster *Cluster) returnPeerClient(peer string, peerClient *client.Client) error {
    connectionFactory, ok := cluster.peerConnection[peer]
    if !ok {
        return errors.New("connection factory not found")
    }
    return connectionFactory.ReturnObject(context.Background(), peerClient)
}