本文通过 Golang 实现一个 Leaf——美团点评分布式ID生成系统 双buffer 的id获取器,

主流程

主流程

代码实现

Segment

存储了发号的具体游标信息

type Segment struct {
	Cursor   uint64
	MaxId    uint64
	MinId    uint64
	IsInitOk bool 
}

1. Segment.Cursor 是当前发放的游标
2.Segment.MaxId 和 Segment.MinId 是 id号段的范围
3.Segment.IsInitOk 标识了这个号段是否初始化完成

这里号段的实现可以使用DB的形式来实现

Begin
UPDATE table SET max_id=max_id+step WHERE segment_type=xxx
SELECT tag, max_id, step FROM table WHERE segment_type=xxx
Commit

SegmentAlloc

结构体定义

type SegmentAlloc struct {
	SegmentType               int             // 号段业务类型
	CurrentSegmentBufferIndex int             // 当前使用的buffer 下标,会一直使用0的下标
	SegmentBuffers            []*Segment      // 号段的双buffer
	Step                      uint64          // 号段加载的步长
	IsPreloading              bool            // 是否处于预加载号段的状态
	UpdateTime                time.Time       // 更新号段的时间
	mutex                     sync.Mutex      // 互斥锁
	Waiting                   []chan struct{} //等待的客户端,当号段用户正处于初始化时,其他协程处于等待状态,到一定超时时间仍然未完成再返回失败
}

SegmentAlloc.SegmentBuffer 图示:

buffer 使用

1.当 SegmentAlloc.IsPreloading 为 true时,其他进程不能进行初始化,避免了同个业务类型重复去初始化buffer

SegmentAlloc 判断是否需要对 buffer 进行预加载

func (s *SegmentAlloc) IsNeedPreload() bool {
	// 已经在预加载了
	if s.IsPreloading {
		return false
	}
        // 第二个缓冲区已经准备好 ,这里之前遗漏了该判断,会导致只要超过一半就开始去预加载
	if len(s.SegmentBuffers) > 1 {
		return false
	}
	segmentBuffer := s.SegmentBuffers[s.CurrentSegmentBufferIndex]
	// 当前剩余的号已经小于步长的一半,则进行加载
	restId := segmentBuffer.MaxId - segmentBuffer.Cursor
	if restId <= s.Step/2 {
		return true
	}
	return false
}

2.SegmentAlloc.Waiting 保存了所有在等待的客户端协程,等到初始化buffer 完成后再唤起所有等待的协程

func (s *SegmentAlloc) WakeUpAllWaitingClient() error {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	for _, waiting := range s.Waiting {
		goasync.SafeClose(waiting)
	}
	return nil
}

这里如果被其他携程关闭的话则不能再进行二次关闭,所以需要通过下面方法来保证发送信号量不出异常

func SafeClose(ch chan struct{}) (ok bool) {
	defer func() {
		if recover() != nil {
			// 已经被其他协程关闭
			ok = false
		}
	}()
	close(ch)
	return true
}

3.SegmentAlloc.UpdateTime 存储了内存数据更新的时间,如果长时间没有更新的数据可以进行监控上报,看是否有业务异常

SegmentAlloc 加载完成后如果上一个号段已经用完则刷新 buffer

func (s *SegmentAlloc) IsHasSegment() bool {
	currentBuffer := s.SegmentBuffers[s.CurrentSegmentBufferIndex]
	// 这里可能还没有初始化好,
	if currentBuffer.IsInitOk && currentBuffer.Cursor < currentBuffer.MaxId {
		return true
	}
	return false
}

func (s *SegmentAlloc) IsNewBufferReady() bool {
	if len(s.SegmentBuffers) <= 1 {
		return false
	}
	return true
}

func (s *SegmentAlloc) RefreshBuffer() {
	// 当前buffer 仍然有号则不需要刷新,因为可能其他协程已经刷新了buffer区
	if s.IsHasSegment() {
		return
	}
	if !s.IsNewBufferReady() {
		return
	}
	s.SegmentBuffers = append(s.SegmentBuffers[:0], s.SegmentBuffers[1:]...)
}

Segment 获取新的id号

func (s *SegmentAlloc) GetId() uint64 {
	if s.IsHasSegment() {
		currentBuffer := s.SegmentBuffers[s.CurrentSegmentBufferIndex]
		id := atomic.AddUint64(&currentBuffer.Cursor, 1)
		s.UpdateTime = time.Now()
		return id
	}
	return 0
}

func (s *SegmentAlloc) IsRightId(id uint64) bool {
	return id > 0
}

SegmentCache

存储了多个业务的 SegmentCache,并且通过管理信号量来看是否需要进行预加载

type SegmentCache struct {
	cache            sync.Map
	loadSegmentEvent chan int // 加载号段的信号量
}

1.SegmentCache.loadSegmentEvent :通过信号量的方式来看是否需要触发预加载
2.SegmentCache.cache :存储了已经分配了 SegmentAlloc 数据

存储和获取 SegmentCache中的 SegmentAlloc

func (s *SegmentCache) Add(alloc *SegmentAlloc) int {
	s.cache.Store(alloc.SegmentType, alloc)
	return alloc.SegmentType
}

func (s *SegmentCache) Get(segmentType int) *SegmentAlloc {
	v, ok := s.cache.Load(segmentType)
	if ok {
		return v.(*SegmentAlloc)
	}
	return nil
}

读取和写入 load alloc 的信号量

func (s *SegmentCache) LoadEvent() <-chan int {
	return s.loadSegmentEvent
}

func (s *SegmentCache) WriteLoadEvent(segmentAlloc *SegmentAlloc) {
	segmentAlloc.IsPreloading = true
	s.loadSegmentEvent <- segmentAlloc.SegmentType
}

Service 实现

客户端获取新的业务id 流程图

获取业务id流程图
func (service *IdGenerateService) GetSegmentId(ctx context.Context, segmentType int) (id uint64, err error) {
	segmentAlloc := service.segmentCache.Get(segmentType)
	if segmentAlloc == nil {
		return 0, errors.WithStack(err)
	}

	id, err = service.nextId(ctx, segmentAlloc)
	if err != nil {
		return 0, errors.WithStack(err)
	}
	return id, nil
}

func (service *IdGenerateService) nextId(ctx context.Context, segmentAlloc *internalDefine.SegmentAlloc) (id uint64, err error) {
	if segmentAlloc == nil {
		return 0, define.RespGenerateIdErr
	}

	segmentAlloc.Lock()
	defer segmentAlloc.Unlock()

	id = segmentAlloc.GetId()

	if segmentAlloc.IsNeedPreload() {
		service.segmentCache.WriteLoadEvent(segmentAlloc)
	}

	// 如果已经获取到正确的id则直接返回
	if segmentAlloc.IsRightId(id) {
		return id, nil
	}

	// 如果没拿到号段 ,在这里加入等待队列,前面已经发出事件开始加载,避免多个协程同时进行加载
	waitChan := make(chan struct{}, 1)
	segmentAlloc.Waiting = append(segmentAlloc.Waiting, waitChan)
	// 让其他客户端可以走前面的步骤,进入到等待状态
	segmentAlloc.Unlock()

	// 最多等待500ms,超过等待时间则直接返回错误
	timer := time.NewTimer(500 * time.Millisecond)
	select {
	case <-waitChan:
	case <-timer.C:
	}

	segmentAlloc.Lock()
	segmentAlloc.RefreshBuffer()
	id = segmentAlloc.GetId()
	if segmentAlloc.IsRightId(id) {
		return id, nil
	}
	return 0, define.RespGenerateIdErr
}

监听信号量加载新的号段

func (service *IdGenerateService) watchSegmentLoadEvent(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		case segmentType, ok := <-service.segmentCache.LoadEvent():
			if !ok {
				continue
			}
			err := service.loadSegmentAllocBuffer(ctx, segmentType)
			if err != nil {
				logger.CtxLogErrorf(ctx, "loadSegmentAllocBufferErr :%+v", err)
				continue
			}
		default:
		}
	}
}

// loadSegmentAllocBuffer
// @Description: 预先加载id号,使用时直接从内存中获取,避免每次去请求IDCS
func (service *IdGenerateService) loadSegmentAllocBuffer(ctx context.Context, segmentType int) (err error) {
	segmentAlloc := service.segmentCache.Get(segmentType)
	defer func() {
		segmentAlloc.IsPreloading = false
		if err == nil {
			wakeupErr := segmentAlloc.WakeUpAllWaitingClient()
			if wakeupErr != nil {
				logger.CtxLogErrorf(ctx, "wake up all client err : %+v", wakeupErr)
			}
		}
	}()
	var (
		id uint64
	)
	for i := 0; i < 3; i++ {
		id, err = service.dao.GetSeqIdBySegmentType(ctx, segmentType)
		if err != nil {
			logger.CtxLogErrorf(ctx, "idCenterApi.GetSeqIdBySegmentTypeErr :%+v", err)
			continue
		}
		var (
			minId = id - segmentAlloc.Step
			maxId = id
		)
		segment := internalDefine.NewSegment(minId, maxId)
		segmentAlloc.SegmentBuffers = append(segmentAlloc.SegmentBuffers, segment)
		return nil
	}
	if err != nil {
		return errors.WithStack(err)
	}
	return nil
}


关于高可用

chanbufferchanBuffer

2.动态步长调整
通过增加号段消耗时间来对步长进行动态调整,如果在短时间内号段很快被消耗了,那么可以将步长翻倍进行获取,缓存更多的号段再内存内。这样双 Buffer 在数据库获取号段宕机的一段时间内仍然能够获取id号。

3.为什么用内存缓存而不是Redis?

该功能本身的目的用于隔离第三方的不稳定,Redis本身也非可靠的服务,所以虽然本地内存无法实现分布式的,并且有可能会浪费一部分号段,但是对于可用性的提升更为显著

生产场景优化

1.初始步长设置问题

如果业务场景中每一次操作就会刚好把id耗尽,那么初始化的时候需要将对应类型的缓存增大,但是也不能一次性设置的过大,这样会导致重新去准备号段的时候时间过长,虽然使用超过一半就会重新加载,但是如果峰值流量过大的情况下,消耗完了还没加载完的话就会导致客户端陷入等待,走兜底去获取,也会造成接口变慢。

并且在服务第一次启动时,如果强依赖了该组件,则会影响服务重启的速度,所以目前选择的弱依赖的形式,未全部Loading完成则不能进行获取,直接兜底返回,这样会存在的一个问题是服务重启并且没有全部完成加载时,接口RT会短时间的抖动。所以如果不担心服务重启速度的话可以进行强依赖。

2.号段加载速度

加载buffer 的时候可以通过分段优化,对于初始值较大的号段,可以通过多协程来对不同段进行加载,从而提升加载速度,此处没做是担心对下游服务造成过大压力。

分段初始化

3.锁粒度过大导致客户端等待

segment

4.客户端等待时长的设置

此处最开始设置等待500ms,但是存在的一个问题是如果一次性获取100个号并且内存中都未准备好时,就会等待100*500ms ,这对于一个业务接口来说是不可接受的,所以将单个等待的超时时间修改为5ms