欢迎Star
代码较简单,分析源码仅分析重点
项目介绍本项目刚开始实现了基于HTTP/REST的内存缓存服务,受限于HTTP协议解析,性能不高。而后实现了一个基于TCP的缓存服务提升性能,对于TCP来说,需要自己定义一套序列化规范来解析缓存的get,set和del三个操作,这里使用的是ABNF协议描述范式。
项目实现的分布式缓存集群是同构集群,所有节点的功能完全相同,节点之间通过gossip协议来进行节点间通信,节点失效会在有限时间内扩散到整个集群。同时使用一致性哈希来计算负载均衡,当节点总数发生变化是,一致性哈希需要重新映射的key比传统哈希需要映射的key少的多。
分布式系统的CAP理论中,这里无法满足C,无法保证一致性,前一秒set了键值对,可能下一秒存储该kv的节点发生故障,此时再去访问该key就无法保证一致性。所以在发生节点扩缩容时,我们需要进行节点再平衡,将需要被迁移的key赋值到对应的节点上。
使用介绍注意我们使用HTTP/REST的时候,增删改查的操作并没有走一致性哈希。保留HTTP服务的定位是管理使用
#----------------基于HTTP----------------#
# 查看状态
curl 127.0.0.1:12345/status
# 插入一个kv
curl -v 127.0.0.1:12345/cache/testkey -XPUT -d testvalue
# 查看key的val
curl 127.0.0.1:12345/cache/testkey
# 查看状态
curl 127.0.0.1:12345/status
# 删除key
curl 127.0.0.1:12345/cache/testkey -XDELETE
# 查看状态
curl 127.0.0.1:12345/status
#----------------基于TCP----------------#
# 插入一个kv
../client/client.exe -c set -k testkey -v testvalue
# 查看key的val
../client/client.exe -c get -k testkey
# 查看状态
curl 127.0.0.1:12345/status
# 删除key
../client/client.exe -c del -k testkey
# 查看状态
curl 127.0.0.1:12345/status
#----------------集群----------------#
# 运行三台
go run main.go -n 10.29.1.1
go run main.go -n 10.29.1.2 -c 10.29.1.1
go run main.go -n 10.29.1.3 -c 10.29.1.2
# 查看集群节点列表
curl 10.29.1.1:12345/cluster
# 任选一台插入key,有些超过插入有些插入失败并返回重定向的地址
../client/client.exe -c set -k keya -v a -h 10.29.1.3
../client/client.exe -c set -k keyb -v b -h 10.29.1.3
../client/client.exe -c set -k keyc -v c -h 10.29.1.3
../client/client.exe -c set -k keyd -v d -h 10.29.1.3
../client/client.exe -c set -k keye -v e -h 10.29.1.3
../client/client.exe -c set -k keyf -v f -h 10.29.1.3
# 如果发生扩缩容,进行节点再平衡
curl 10.29.1.1:12345/rebalance -XPOST
# 只启动一台节点
go run main.go -n 10.29.1.1
# 插入100000条数据,那么这时100000条数据都是给10.29.1.1的
../cache-benchmark/cache-benchmark.exe -type tcp -n 100000 -d 1 --h 10.29.1.1
# 查看状态 count=100000
curl 10.29.1.1:12345/status
# 又启动一台
go run main.go -n 10.29.1.2 -c 10.29.1.1
# 发送扩缩容,进行数据平衡
curl 10.29.1.1:12345/rebalance -XPOST
# 查看状态 count=50000左右
curl 10.29.1.1:12345/status
# 再启动一台
go run main.go -n 10.29.1.3 -c 10.29.1.2
# 发送扩缩容,进行数据平衡
curl 10.29.1.1:12345/rebalance -XPOST
# 查看状态 count=33000左右
curl 10.29.1.1:12345/status
分析源码
REST接口
func (s *Server) Listen() {
//这里就是对一个map进行增删改查,并维护一个stat
http.Handle("/cache/", s.cacheHandler())
//返回上面维护的sata结构体
http.Handle("/status", s.statusHandler())
//consistent实现Members,以切片形式返回所有活跃节点的地址
//m := h.Members()
//bytes, err := json.Marshal(m)
//w.Write(bytes)
http.Handle("/cluster", s.clusterHandler())
//最后介绍
http.Handle("/rebalance", s.rebalanceHandler())
http.ListenAndServe(s.Addr()+":12345", nil)
}
TCP字节流
for {
conn, err := listener.Accept()
if err != nil {
log.Println(err)
}
//对每一个连接都开一个process的协程
go s.process(conn)
}
//代码中忽略了细节,可能会与源码有出路
func (s *Server) process(conn net.Conn) {
r := bufio.NewReader(conn)
for {
op, err := r.ReadByte()
switch op {
case 'S':
err = s.set(conn, r)
case 'G':
err = s.get(conn, r)
case 'D':
err = s.del(conn, r)
}
}
//下面以get为例,set和del原理一样
func (s *Server) get(conn net.Conn, r *bufio.Reader) error {
key, err := s.readKey(r)
if err != nil {
return sendResponse(nil, err, conn)
}
val, err := s.Get(key)
return sendResponse(val, err, conn)
}
//通过readKey对TCP字节流进行解析
func (s *Server) readKey(r *bufio.Reader) (string, error) {
keyLen, err := readLen(r)
key := make([]byte, keyLen)
_, err = io.ReadFull(r, key)
//这里进行一致性哈希运算,判断该key是否映射到该节点上
//如果是则继续运行,如果不是则返回重定向到哪个节点
jumpAddr, ok := s.ShouldProcess(string(key))
if !ok {
return "", errors.New("redirect " + jumpAddr)
}
return string(key), nil
}
//这里进行一致性哈希运算,判断该key是否映射到该节点上
func (n *node) ShouldProcess(key string) (string, bool) {
addr, _ := n.Get(key)
return addr, addr == n.addr
}
创建新节点加入到集群gossip
这里使用的是第三方库gossip
//创建新节点
node, err := cluster.New(*nodeAddr, *cls)
func New(addr, cluster string) (Node, error) {
//创建gossip新节点的config
config := memberlist.DefaultLANConfig()
config.Name = addr
config.BindAddr = addr
config.LogOutput = ioutil.Discard
//创建新节点
mbl, err := memberlist.Create(config)
if err != nil {
return nil, err
}
if cluster == "" {
cluster = addr
}
existing := []string{cluster}
//连接到集群
_, err = mbl.Join(existing)
//创建一致性哈希的节点实例
circle := consistent.New()
//设置虚拟节点数量
circle.NumberOfReplicas = 256
go func() {
for {
//获取集群成员
m := mbl.Members()
nodes := make([]string, len(m))
for i, n := range m {
nodes[i] = n.Name
}
//每隔1s将集群节点列表m更新到circle中
circle.Set(nodes)
time.Sleep(time.Second)
}
}()
return &node{circle, addr}, nil
}
一致性哈希consistent
这里使用的是第三方库consistent
如果想要学习一致性哈希算法,可以参考一致性哈希算法
//创建一致性哈希的节点实例
circle := consistent.New()
//设置虚拟节点数量
circle.NumberOfReplicas = 256
//每隔1s将集群节点列表m更新到circle中
circle.Set(nodes)
//这里进行一致性哈希运算,判断该key是否映射到该节点上
addr, _ := n.Get(key)
节点再平衡rebalance
func (h *rebalanceHandler) rebalance() {
s := h.NewScanner()
defer s.Close()
client := &http.Client{}
//遍历本节点所有的key
for s.Scan() {
k := s.Key()
redirectAddr, ok := h.ShouldProcess(k)
if !ok {//如果因为扩缩容,该key不再映射到本节点
//则将key写入到对应的节点去
r, _ := http.NewRequest(http.MethodPut, "http://"+redirectAddr+":12345/cache/"+k, bytes.NewReader(s.Value()))
client.Do(r)
h.Del(k)
}
}
}
func (c *inMemoryCache) NewScanner() Scanner {
pairCh := make(chan *pair)
closeCh := make(chan struct{})
go func() {
defer close(pairCh)
c.mutex.RLock()
for k, v := range c.cacheMap {
c.mutex.RUnlock()
select {
case <-closeCh:
return
//从map中读取一个kv,并写入channel中
case pairCh <- &pair{k, v}:
}
c.mutex.RLock()
}
c.mutex.RUnlock()
}()
return &inMemoryScanner{pair{}, pairCh, closeCh}
}
func (s *inMemoryScanner) Scan() bool {
//从channel中读取一个kv
p, ok := <-s.pairCh
if ok {
s.k, s.v = p.k, p.v
}
return ok
}