本文通过 Golang 实现一个 Leaf——美团点评分布式ID生成系统 双buffer 的id获取器,
主流程
代码实现
Segment
存储了发号的具体游标信息
type Segment struct {
Cursor uint64
MaxId uint64
MinId uint64
IsInitOk bool
}
1. Segment.Cursor 是当前发放的游标
2.Segment.MaxId 和 Segment.MinId 是 id号段的范围
3.Segment.IsInitOk 标识了这个号段是否初始化完成
这里号段的实现可以使用DB的形式来实现
Begin
UPDATE table SET max_id=max_id+step WHERE segment_type=xxx
SELECT tag, max_id, step FROM table WHERE segment_type=xxx
Commit
SegmentAlloc
结构体定义
type SegmentAlloc struct {
SegmentType int // 号段业务类型
CurrentSegmentBufferIndex int // 当前使用的buffer 下标,会一直使用0的下标
SegmentBuffers []*Segment // 号段的双buffer
Step uint64 // 号段加载的步长
IsPreloading bool // 是否处于预加载号段的状态
UpdateTime time.Time // 更新号段的时间
mutex sync.Mutex // 互斥锁
Waiting []chan struct{} //等待的客户端,当号段用户正处于初始化时,其他协程处于等待状态,到一定超时时间仍然未完成再返回失败
}
SegmentAlloc.SegmentBuffer 图示:
1.当 SegmentAlloc.IsPreloading 为 true时,其他进程不能进行初始化,避免了同个业务类型重复去初始化buffer
SegmentAlloc 判断是否需要对 buffer 进行预加载
func (s *SegmentAlloc) IsNeedPreload() bool {
// 已经在预加载了
if s.IsPreloading {
return false
}
// 第二个缓冲区已经准备好 ,这里之前遗漏了该判断,会导致只要超过一半就开始去预加载
if len(s.SegmentBuffers) > 1 {
return false
}
segmentBuffer := s.SegmentBuffers[s.CurrentSegmentBufferIndex]
// 当前剩余的号已经小于步长的一半,则进行加载
restId := segmentBuffer.MaxId - segmentBuffer.Cursor
if restId <= s.Step/2 {
return true
}
return false
}
2.SegmentAlloc.Waiting 保存了所有在等待的客户端协程,等到初始化buffer 完成后再唤起所有等待的协程
func (s *SegmentAlloc) WakeUpAllWaitingClient() error {
s.mutex.Lock()
defer s.mutex.Unlock()
for _, waiting := range s.Waiting {
goasync.SafeClose(waiting)
}
return nil
}
这里如果被其他携程关闭的话则不能再进行二次关闭,所以需要通过下面方法来保证发送信号量不出异常
func SafeClose(ch chan struct{}) (ok bool) {
defer func() {
if recover() != nil {
// 已经被其他协程关闭
ok = false
}
}()
close(ch)
return true
}
3.SegmentAlloc.UpdateTime 存储了内存数据更新的时间,如果长时间没有更新的数据可以进行监控上报,看是否有业务异常
SegmentAlloc 加载完成后如果上一个号段已经用完则刷新 buffer
func (s *SegmentAlloc) IsHasSegment() bool {
currentBuffer := s.SegmentBuffers[s.CurrentSegmentBufferIndex]
// 这里可能还没有初始化好,
if currentBuffer.IsInitOk && currentBuffer.Cursor < currentBuffer.MaxId {
return true
}
return false
}
func (s *SegmentAlloc) IsNewBufferReady() bool {
if len(s.SegmentBuffers) <= 1 {
return false
}
return true
}
func (s *SegmentAlloc) RefreshBuffer() {
// 当前buffer 仍然有号则不需要刷新,因为可能其他协程已经刷新了buffer区
if s.IsHasSegment() {
return
}
if !s.IsNewBufferReady() {
return
}
s.SegmentBuffers = append(s.SegmentBuffers[:0], s.SegmentBuffers[1:]...)
}
Segment 获取新的id号
func (s *SegmentAlloc) GetId() uint64 {
if s.IsHasSegment() {
currentBuffer := s.SegmentBuffers[s.CurrentSegmentBufferIndex]
id := atomic.AddUint64(¤tBuffer.Cursor, 1)
s.UpdateTime = time.Now()
return id
}
return 0
}
func (s *SegmentAlloc) IsRightId(id uint64) bool {
return id > 0
}
SegmentCache
存储了多个业务的 SegmentCache,并且通过管理信号量来看是否需要进行预加载
type SegmentCache struct {
cache sync.Map
loadSegmentEvent chan int // 加载号段的信号量
}
1.SegmentCache.loadSegmentEvent :通过信号量的方式来看是否需要触发预加载
2.SegmentCache.cache :存储了已经分配了 SegmentAlloc 数据
存储和获取 SegmentCache中的 SegmentAlloc
func (s *SegmentCache) Add(alloc *SegmentAlloc) int {
s.cache.Store(alloc.SegmentType, alloc)
return alloc.SegmentType
}
func (s *SegmentCache) Get(segmentType int) *SegmentAlloc {
v, ok := s.cache.Load(segmentType)
if ok {
return v.(*SegmentAlloc)
}
return nil
}
读取和写入 load alloc 的信号量
func (s *SegmentCache) LoadEvent() <-chan int {
return s.loadSegmentEvent
}
func (s *SegmentCache) WriteLoadEvent(segmentAlloc *SegmentAlloc) {
segmentAlloc.IsPreloading = true
s.loadSegmentEvent <- segmentAlloc.SegmentType
}
Service 实现
客户端获取新的业务id 流程图
func (service *IdGenerateService) GetSegmentId(ctx context.Context, segmentType int) (id uint64, err error) {
segmentAlloc := service.segmentCache.Get(segmentType)
if segmentAlloc == nil {
return 0, errors.WithStack(err)
}
id, err = service.nextId(ctx, segmentAlloc)
if err != nil {
return 0, errors.WithStack(err)
}
return id, nil
}
func (service *IdGenerateService) nextId(ctx context.Context, segmentAlloc *internalDefine.SegmentAlloc) (id uint64, err error) {
if segmentAlloc == nil {
return 0, define.RespGenerateIdErr
}
segmentAlloc.Lock()
defer segmentAlloc.Unlock()
id = segmentAlloc.GetId()
if segmentAlloc.IsNeedPreload() {
service.segmentCache.WriteLoadEvent(segmentAlloc)
}
// 如果已经获取到正确的id则直接返回
if segmentAlloc.IsRightId(id) {
return id, nil
}
// 如果没拿到号段 ,在这里加入等待队列,前面已经发出事件开始加载,避免多个协程同时进行加载
waitChan := make(chan struct{}, 1)
segmentAlloc.Waiting = append(segmentAlloc.Waiting, waitChan)
// 让其他客户端可以走前面的步骤,进入到等待状态
segmentAlloc.Unlock()
// 最多等待500ms,超过等待时间则直接返回错误
timer := time.NewTimer(500 * time.Millisecond)
select {
case <-waitChan:
case <-timer.C:
}
segmentAlloc.Lock()
segmentAlloc.RefreshBuffer()
id = segmentAlloc.GetId()
if segmentAlloc.IsRightId(id) {
return id, nil
}
return 0, define.RespGenerateIdErr
}
监听信号量加载新的号段
func (service *IdGenerateService) watchSegmentLoadEvent(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case segmentType, ok := <-service.segmentCache.LoadEvent():
if !ok {
continue
}
err := service.loadSegmentAllocBuffer(ctx, segmentType)
if err != nil {
logger.CtxLogErrorf(ctx, "loadSegmentAllocBufferErr :%+v", err)
continue
}
default:
}
}
}
// loadSegmentAllocBuffer
// @Description: 预先加载id号,使用时直接从内存中获取,避免每次去请求IDCS
func (service *IdGenerateService) loadSegmentAllocBuffer(ctx context.Context, segmentType int) (err error) {
segmentAlloc := service.segmentCache.Get(segmentType)
defer func() {
segmentAlloc.IsPreloading = false
if err == nil {
wakeupErr := segmentAlloc.WakeUpAllWaitingClient()
if wakeupErr != nil {
logger.CtxLogErrorf(ctx, "wake up all client err : %+v", wakeupErr)
}
}
}()
var (
id uint64
)
for i := 0; i < 3; i++ {
id, err = service.dao.GetSeqIdBySegmentType(ctx, segmentType)
if err != nil {
logger.CtxLogErrorf(ctx, "idCenterApi.GetSeqIdBySegmentTypeErr :%+v", err)
continue
}
var (
minId = id - segmentAlloc.Step
maxId = id
)
segment := internalDefine.NewSegment(minId, maxId)
segmentAlloc.SegmentBuffers = append(segmentAlloc.SegmentBuffers, segment)
return nil
}
if err != nil {
return errors.WithStack(err)
}
return nil
}
关于高可用
chanbufferchanBuffer
2.动态步长调整
通过增加号段消耗时间来对步长进行动态调整,如果在短时间内号段很快被消耗了,那么可以将步长翻倍进行获取,缓存更多的号段再内存内。这样双 Buffer 在数据库获取号段宕机的一段时间内仍然能够获取id号。
3.为什么用内存缓存而不是Redis?
该功能本身的目的用于隔离第三方的不稳定,Redis本身也非可靠的服务,所以虽然本地内存无法实现分布式的,并且有可能会浪费一部分号段,但是对于可用性的提升更为显著
生产场景优化
1.初始步长设置问题
如果业务场景中每一次操作就会刚好把id耗尽,那么初始化的时候需要将对应类型的缓存增大,但是也不能一次性设置的过大,这样会导致重新去准备号段的时候时间过长,虽然使用超过一半就会重新加载,但是如果峰值流量过大的情况下,消耗完了还没加载完的话就会导致客户端陷入等待,走兜底去获取,也会造成接口变慢。
并且在服务第一次启动时,如果强依赖了该组件,则会影响服务重启的速度,所以目前选择的弱依赖的形式,未全部Loading完成则不能进行获取,直接兜底返回,这样会存在的一个问题是服务重启并且没有全部完成加载时,接口RT会短时间的抖动。所以如果不担心服务重启速度的话可以进行强依赖。
2.号段加载速度
加载buffer 的时候可以通过分段优化,对于初始值较大的号段,可以通过多协程来对不同段进行加载,从而提升加载速度,此处没做是担心对下游服务造成过大压力。
3.锁粒度过大导致客户端等待
segment
4.客户端等待时长的设置
此处最开始设置等待500ms,但是存在的一个问题是如果一次性获取100个号并且内存中都未准备好时,就会等待100*500ms ,这对于一个业务接口来说是不可接受的,所以将单个等待的超时时间修改为5ms