前言

欢迎Star

代码较简单,分析源码仅分析重点

项目介绍

  本项目刚开始实现了基于HTTP/REST的内存缓存服务,受限于HTTP协议解析,性能不高。而后实现了一个基于TCP的缓存服务提升性能,对于TCP来说,需要自己定义一套序列化规范来解析缓存的get,set和del三个操作,这里使用的是ABNF协议描述范式。

  项目实现的分布式缓存集群是同构集群,所有节点的功能完全相同,节点之间通过gossip协议来进行节点间通信,节点失效会在有限时间内扩散到整个集群。同时使用一致性哈希来计算负载均衡,当节点总数发生变化是,一致性哈希需要重新映射的key比传统哈希需要映射的key少的多。

  分布式系统的CAP理论中,这里无法满足C,无法保证一致性,前一秒set了键值对,可能下一秒存储该kv的节点发生故障,此时再去访问该key就无法保证一致性。所以在发生节点扩缩容时,我们需要进行节点再平衡,将需要被迁移的key赋值到对应的节点上。

使用介绍

注意我们使用HTTP/REST的时候,增删改查的操作并没有走一致性哈希。保留HTTP服务的定位是管理使用

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

#----------------基于HTTP----------------#
# 查看状态
curl 127.0.0.1:12345/status
# 插入一个kv
curl -v  127.0.0.1:12345/cache/testkey -XPUT -d testvalue
# 查看key的val
curl 127.0.0.1:12345/cache/testkey
# 查看状态
curl 127.0.0.1:12345/status
# 删除key
curl 127.0.0.1:12345/cache/testkey -XDELETE
# 查看状态
curl 127.0.0.1:12345/status
#----------------基于TCP----------------#
# 插入一个kv
../client/client.exe -c set -k testkey -v testvalue
# 查看key的val
../client/client.exe -c get -k testkey
# 查看状态
curl 127.0.0.1:12345/status
# 删除key
../client/client.exe -c del -k testkey
# 查看状态
curl 127.0.0.1:12345/status
#----------------集群----------------#
# 运行三台
go run main.go -n 10.29.1.1
go run main.go -n 10.29.1.2 -c 10.29.1.1
go run main.go -n 10.29.1.3 -c 10.29.1.2
# 查看集群节点列表
curl 10.29.1.1:12345/cluster
# 任选一台插入key,有些超过插入有些插入失败并返回重定向的地址
../client/client.exe -c set -k keya -v a -h 10.29.1.3
../client/client.exe -c set -k keyb -v b -h 10.29.1.3
../client/client.exe -c set -k keyc -v c -h 10.29.1.3
../client/client.exe -c set -k keyd -v d -h 10.29.1.3
../client/client.exe -c set -k keye -v e -h 10.29.1.3
../client/client.exe -c set -k keyf -v f -h 10.29.1.3
# 如果发生扩缩容,进行节点再平衡
curl 10.29.1.1:12345/rebalance -XPOST
# 只启动一台节点
go run main.go -n 10.29.1.1
# 插入100000条数据,那么这时100000条数据都是给10.29.1.1的
../cache-benchmark/cache-benchmark.exe -type tcp -n 100000 -d 1 --h 10.29.1.1
# 查看状态 count=100000
curl 10.29.1.1:12345/status
# 又启动一台 
go run main.go -n 10.29.1.2 -c 10.29.1.1
# 发送扩缩容,进行数据平衡
curl 10.29.1.1:12345/rebalance -XPOST
# 查看状态 count=50000左右
curl 10.29.1.1:12345/status
# 再启动一台
go run main.go -n 10.29.1.3 -c 10.29.1.2
# 发送扩缩容,进行数据平衡
curl 10.29.1.1:12345/rebalance -XPOST
# 查看状态 count=33000左右
curl 10.29.1.1:12345/status
分析源码

REST接口

func (s *Server) Listen() {
	//这里就是对一个map进行增删改查,并维护一个stat
	http.Handle("/cache/", s.cacheHandler())
	//返回上面维护的sata结构体
	http.Handle("/status", s.statusHandler())
	//consistent实现Members,以切片形式返回所有活跃节点的地址
	//m := h.Members()
	//bytes, err := json.Marshal(m)
	//w.Write(bytes)
	http.Handle("/cluster", s.clusterHandler())
	//最后介绍
	http.Handle("/rebalance", s.rebalanceHandler())
	http.ListenAndServe(s.Addr()+":12345", nil)
}

TCP字节流

for {
	conn, err := listener.Accept()
	if err != nil {
		log.Println(err)
	}
	//对每一个连接都开一个process的协程
	go s.process(conn)
}
//代码中忽略了细节,可能会与源码有出路
func (s *Server) process(conn net.Conn) {
	r := bufio.NewReader(conn)
	for {
		op, err := r.ReadByte()
		switch op {
		case 'S':
			err = s.set(conn, r)
		case 'G':
			err = s.get(conn, r)
		case 'D':
			err = s.del(conn, r)
	}
}
//下面以get为例,set和del原理一样
func (s *Server) get(conn net.Conn, r *bufio.Reader) error {
	key, err := s.readKey(r)
	if err != nil {
		return sendResponse(nil, err, conn)
	}
	val, err := s.Get(key)
	return sendResponse(val, err, conn)
}
//通过readKey对TCP字节流进行解析
func (s *Server) readKey(r *bufio.Reader) (string, error) {
	keyLen, err := readLen(r)

	key := make([]byte, keyLen)
	_, err = io.ReadFull(r, key)
	//这里进行一致性哈希运算,判断该key是否映射到该节点上
	//如果是则继续运行,如果不是则返回重定向到哪个节点
	jumpAddr, ok := s.ShouldProcess(string(key))
	if !ok {
		return "", errors.New("redirect " + jumpAddr)
	}
	return string(key), nil
}
//这里进行一致性哈希运算,判断该key是否映射到该节点上
func (n *node) ShouldProcess(key string) (string, bool) {
	addr, _ := n.Get(key)
	return addr, addr == n.addr
}

创建新节点加入到集群gossip

这里使用的是第三方库gossip

//创建新节点
node, err := cluster.New(*nodeAddr, *cls)

func New(addr, cluster string) (Node, error) {
	//创建gossip新节点的config
	config := memberlist.DefaultLANConfig()
	config.Name = addr
	config.BindAddr = addr
	config.LogOutput = ioutil.Discard
	//创建新节点
	mbl, err := memberlist.Create(config)
	if err != nil {
		return nil, err
	}
	if cluster == "" {
		cluster = addr
	}
	existing := []string{cluster}
	//连接到集群
	_, err = mbl.Join(existing)

	//创建一致性哈希的节点实例
	circle := consistent.New()
	//设置虚拟节点数量
	circle.NumberOfReplicas = 256
	go func() {
		for {
			//获取集群成员
			m := mbl.Members()
			nodes := make([]string, len(m))
			for i, n := range m {
				nodes[i] = n.Name
			}
			//每隔1s将集群节点列表m更新到circle中
			circle.Set(nodes)
			time.Sleep(time.Second)
		}
	}()
	return &node{circle, addr}, nil
}

一致性哈希consistent

这里使用的是第三方库consistent

如果想要学习一致性哈希算法,可以参考一致性哈希算法

//创建一致性哈希的节点实例
circle := consistent.New()
//设置虚拟节点数量
circle.NumberOfReplicas = 256
//每隔1s将集群节点列表m更新到circle中
circle.Set(nodes)
//这里进行一致性哈希运算,判断该key是否映射到该节点上
addr, _ := n.Get(key)

节点再平衡rebalance

func (h *rebalanceHandler) rebalance() {
	s := h.NewScanner()
	defer s.Close()
	client := &http.Client{}
	//遍历本节点所有的key
	for s.Scan() {
		k := s.Key()
		redirectAddr, ok := h.ShouldProcess(k)
		if !ok {//如果因为扩缩容,该key不再映射到本节点
			//则将key写入到对应的节点去
			r, _ := http.NewRequest(http.MethodPut, "http://"+redirectAddr+":12345/cache/"+k, bytes.NewReader(s.Value()))
			client.Do(r)
			h.Del(k)
		}
	}
}
func (c *inMemoryCache) NewScanner() Scanner {
	pairCh := make(chan *pair)
	closeCh := make(chan struct{})
	go func() {
		defer close(pairCh)
		c.mutex.RLock()
		for k, v := range c.cacheMap {
			c.mutex.RUnlock()
			select {
			case <-closeCh:
				return
			//从map中读取一个kv,并写入channel中
			case pairCh <- &pair{k, v}:
			}
			c.mutex.RLock()
		}
		c.mutex.RUnlock()
	}()
	return &inMemoryScanner{pair{}, pairCh, closeCh}
}
func (s *inMemoryScanner) Scan() bool {
	//从channel中读取一个kv
	p, ok := <-s.pairCh
	if ok {
		s.k, s.v = p.k, p.v
	}
	return ok
}