一致性Hash 算法

1,介绍一致性hash算法(Consistent Hashing)及其在分布式缓存中的应用,以及对一致性hash算法原理的介绍。
2,简单的代码实现
3,发现有什么问题方便请指出下。。谢谢。。

一,背景

我们通过一个问题思考引入。。

假设我们有个网站,随着用户量的不断增长,日活量的不断提升,导致 mysql 的数据读写压力飞速增长,对我们的服务造成极大的压力。。

为了解决上面的困境我们引入了 redis 作为缓存机制,但是你会发现其实随着业务扩张,单台 redis 很快就无法支撑起你系统服务,那么为了解决这个问题,我们会开始引入多台 redis 构成服务集群为网站提供缓存服务;

由于我们不可能所有的 redis 服务都缓存所有数据,因为这样意义不大,我们理想的是 每个 redis 服务提供一部分用户的缓存数据,然后根据不同的客户的请求读取不同 redis 的缓存数据;

在这个时候:很多人会想起 hash取模 法;

redis分布式缓存(hash取模)

如上图:根据 Customer_id 取模得到值,根据值流入指定的redis服务中。如果 customer_id 是自增的,我们每个 redis 服务里面的值的数量都是绝对平均的;

然而短期问题是解决了。。但是你有没有想过,当3个redis 服务无法正常支撑,需要加入 第 4 个 redis 服务 或者资源过剩了,你希望回收一个 redis 使用两个 redis 支撑服务了。。然而这个时候你会发现。。 取模的值一旦改变。。之前缓存的数据就会发生错乱。。这样会有大量的请求穿过你的 redis 缓存服务,直接到达你的 mysql。。而且 redis 缓存服务也缓存着大量无用数据,后果有点可怕。。

为了解决上述问题。。。我们引入 一致性Hash算法

二,算法简述

简单来说我们把所有数据做一致性hash计算后的所有结果看作一个 hash 闭环,而闭环的大小为0~2^32,也就说所有的数据都会坐落在这个 hash 闭环;如下图:

redis分布式缓存(一致性hash算法

0 和 2^32 在 hash 闭环中重合,这里为什么要这样做 大家可以思考下。。

下一步我们把 redis 服务 的IP 也使用 相同的 hash("redis服务IP") 方法进行求出 hash 值,这样我们 redis服务 也会坐落在我们的 hash闭环 中,他的作用 可以看做个桶,承接顺指针流入的数据;如下图:

redis分布式缓存(一致性hash算法)图2

接下来我们把所有的数据key (即:Customer_id)使用相同的 hash("key") 函数计算出对应的 hash 值;那么我们的数据也会坐落于我们 hash闭环 中,从此位置开始沿着 hash闭环 顺时针行走,遇到第一个 Redis服务 就是其定位所在的缓存服务器。。如下图:

redis分布式缓存(一致性hash算法)图3

比如:我们有 A,B,C,D 4个数据,通过 hash() 函数计算后,坐落在 hash闭环上,顺时针运转后,A 数据会流入 Redis服务1中,B 和 C 数据会流入Redis服务2中,D数据则会流入 Redis服务3中;

也许你会好奇,hash 取模 不也可以实现上述的吗?而且还能保证数据的绝对均匀。。

OK!那么我看他特有的优势:

当我们需要把 Redis服务3 从集群中剥离出来会发生怎样的情况,如下图:

redis分布式缓存(一致性hash算法)图4

我们看出,当 Redis服务3 从集群中剥离出来后,原来要流入服务器 Redis服务3 的数据 D ,只能流入 Redis服务1 中,而 Redis服务2的数据将没有任何影响,也就是环境话说 当缓存服务器从集群中剥离出来或者添加缓存服务器到集群中都只会影响到顺流而下的下一个缓存服务器;

换句话说:一致性哈希算法对于节点的增减都只需重定位环空间中的一小部分数据,具有较好的容错性和可扩展性。

三,虚拟节点

如果你细心阅读,你会发现如果按照上面的方案实施,我们没有办法保证缓存数据能够均匀分布到每一个缓存服务器中。既然问题发现了,就必然会存在解决的方案。。

不妨我们做一个大胆的假设,如果我们 redis服务 非常多,数量极大,无限趋向于 2^32,那么我们数据流入服务器的数量是不是就一定会趋向于均匀呢?(这里我们不考虑hash碰撞的问题哟)

答案是肯定的,当然我们缓存服务器的数量也不可能有这么多,这个说明只是为了说明当 缓存节点坐落在 hash闭环 上的数量越多,那么我们数据分布就会趋向于均匀。。但是我们又没办法用于这么多真实的缓存服务节点,所以我们引入了一个 虚拟节点 的概念。。如下图:

redis分布式缓存(一致性hash算法)图5

我们把 真实的Redis服务1节点 转换成2个虚拟节点 分布在环上,当数据流入虚拟节点时实际上也是流入Redis服务1

其实 虚拟节点 除了保证分布的均匀,还有一个突出的作用:每一台缓存服务的性能不同 会导致提供服务的能力不同。通过虚拟节点 我们可以有效控制数据缓存的量。(nginx的一致性hash算法负载均衡就是这样实现的。。有兴趣可以了解下)

四,代码实现

下面我提供下我实现的 一个简单的 一致性hash实现

package main

import (
    "errors"
    "hash/crc32"
    "sort"
    "strconv"
    "sync"
)

type Hash func(data []byte) uint32

type UInt32Slice []uint32

func (s UInt32Slice) Len() int {
    return len(s)
}

func (s UInt32Slice) Less(i, j int) bool {
    return s[i] < s[j]
}

func (s UInt32Slice) Swap(i, j int)  {
    s[i], s[j] = s[j], s[i]
}

type ConsistentHashBanlance struct {
    mux sync.RWMutex
    hash Hash
    replicas int // 虚拟节点数量,虚拟节点越多 节点分布更趋向均匀
    keys UInt32Slice // 节点排序
    hashMap map[uint32]string //节点哈希和Key的map,键是hash值,值是节点key
}


func NewConsistentHashBanlance(replicas int, fn Hash) *ConsistentHashBanlance {
    m := &ConsistentHashBanlance{
        replicas: replicas,
        hash:     fn,
        hashMap:  make(map[uint32]string),
    }
    if m.hash == nil {
        //最多32位,保证是一个2^32-1环
        m.hash = crc32.ChecksumIEEE
    }
    return m
}

// 验证是否为空
func (c *ConsistentHashBanlance) IsEmpty() bool {
    return len(c.keys) == 0
}

// Add 方法用来添加缓存节点,参数为节点key,比如使用IP
func (c *ConsistentHashBanlance) Add(params ...string) error {
    if len(params) == 0 {
        return errors.New("param len 1 at least")
    }
    addr := params[0]
    c.mux.Lock()
    defer c.mux.Unlock()
    // 结合 虚拟节点数量 计算所有虚拟节点的hash值,并存入m.keys中,同时在m.hashMap中保存哈希值和key的映射
    for i := 0; i < c.replicas; i++ {
        hash := c.hash([]byte(strconv.Itoa(i) + addr))
        c.keys = append(c.keys, hash)
        c.hashMap[hash] = addr
    }
    // 对所有虚拟节点的哈希值进行排序,方便之后进行二分查找
    sort.Sort(c.keys)
    return nil
}

// Get 方法根据给定的对象获取最靠近它的那个节点
func (c *ConsistentHashBanlance) Get(key string) (string, error) {
    if c.IsEmpty() {
        return "", errors.New("node is empty")
    }
    hash := c.hash([]byte(key))

    // 通过二分查找获取最优节点,第一个"服务器hash"值大于"数据hash"值的就是最优"服务器节点"
    idx := sort.Search(len(c.keys), func(i int) bool { return c.keys[i] >= hash })

    // 如果查找结果 大于 服务器节点哈希数组的最大索引,表示此时该对象哈希值位于最后一个节点之后,那么放入第一个节点中
    if idx == len(c.keys) {
        idx = 0
    }
    c.mux.RLock()
    defer c.mux.RUnlock()
    return c.hashMap[c.keys[idx]], nil
}

可以参考下。

参考链接: