本文主要分析 Sentinel Go 流量指标统计底层滑动窗口的实现。

在高可用流量防护场景中,比如流控、熔断等,不管基于什么策略,底层最核心的一部分是系统指标(如请求数、rt、异常数、异常比例等等)的统计结构。本文主要分析 Sentinel Go 中统计结构滑动窗口的实现。

什么是滑动时间窗口

应用指标统计很重要一点是要与时间对齐,比如流控可能希望的是拿到前一秒的总请求数,所以在我们统计的指标都是需要与时间对齐。

滑动窗口基本运行模式

滑动时间窗口有两个很重要设置:
(1)滑动窗口的统计周期:表示滑动窗口的统计周期,一个滑动窗口有一个或多个窗口。
(2)滑动窗口中每个窗口长度:每个窗口(也叫格子,后文格子都是指一个窗口)的统计周期。

[开始时间,开始时间+格子长度)
intervalbucketLengthnow
index = (now/bucketLength)%interval

也就是说我们知道当前时间就能知道当前时间对应在滑动窗口的第几个格子。举一些例子来说明:
(1)假设当前时间是1455ms,那么经过计算,index 就是 2,也就是第三个格子。
(2)假设当前时间是1455000000000 ms,那么经过计算,index 就是 0,也就是第一个格子。

随着时间的滑动,滑动窗口类似于一个固定长度的环形队列。

前面图示的滑动窗口的时间范围是[1000, 2000),假设当前时间滑动到了2001ms时候会是怎么样呢?看下图:
在这里插入图片描述

根据前面计算方式,now=2001,算出来index是0,也就是当前时间打到了第一个格子。然后计算当前时间对应的格子的起始时间,很明显就是2000,这个时候发现 2000 > 1000(格子原始统计起始时间),说明当前格子进入了新的统计周期,所以需要把当前格子重置,重置包括两部分:(1)格子起始时间到2000;(2)统计结构清空。

滑动窗口的周期和格子长度怎么设置?

滑动窗口的设置主要是两个参数:
(1)滑动窗口的长度;
(2)滑动窗口每个格子的长度。
那么这两个设置应该怎么设置呢?

这里主要考虑点是:抗脉冲流量的能力和精确度之间的平衡。
(1)如果格子长度设置的小那么统计就会更加精确,但是格子太多,会增加竞争的可能性,因为窗口滑动必须是并发安全的,这里会有竞争。
(2)如果滑动窗口长度越长,对脉冲的平滑能力就会越强。

滑动窗口长度一致,格子长度不一致

这里首先来个对比:
在这里插入图片描述
很直观,假设这两种case下的统计数据如下:
在这里插入图片描述
在[1000,1500) 区间统计都是600,[1500, 2000) 之间统计都是500。我们获取滑动窗口的统计时候,两者的统计总和都是1100。

但是,当前时间如果滑动到了2001ms的时候,按照前面滑动窗口的逻辑,我们需要将滑动窗口的第一个格子覆盖成新的统计格子。如下图:
在这里插入图片描述
从上图可以看出来,到覆盖第一个格子时候,两个滑动窗口的统计结果就完全不一样了:
(1)第一个滑动窗口第一个格子(500ms长度)清零了,整个统计总计数变成了 501;
(2)第二个滑动窗口第一个格子(100ms长度)清零了,整个统计总计数变成了 981;
但是随着时间的继续往后滑动,在[2000, 2500) 之间,时间越往后,两者之间精度差会越来越小。

在滑动窗口统计周期一样情况下,格子划分的越多,那么统计的精度就越高。

格子长度一致,滑动窗口长度不一致

滑动窗口整体的统计长度怎么设置呢?有哪些需要考虑的点呢?这里一个非常重要的因素是对流量脉冲的抵抗能力。

什么是流量脉冲?什么是非脉冲流量?

(1)下图就是一个典型的类似脉冲流量,窗口统计长度是2000ms,格子长度是200ms。在[1400ms,2200ms)之间来了一波峰值。
在这里插入图片描述

(2)下图类似于非脉冲流量,窗口统计长度是2000ms,格子长度是200ms。每个格子内请求数都在160左右。
在这里插入图片描述
我们统计滑动窗口内的请求数总数时候,也就是把所有格子的统计值求和。
(1)对于上图(1)中脉冲流量,在整个滑动窗口的总和是1600个请求数左右。平均下来每200ms大概是160个请求,但是会在1600ms这个格子来一个峰值,达到了640个请求。 这里明显是一个很大的脉冲。
(2)对于上图(2)中非脉冲流量,在整个滑动窗口的总和是1600个请求数左右。而且以200ms粒度来看,整体流量非常平滑。

这里带来问题是:从整个滑动窗口统计周期(2000ms)来看,两者最后的效果是一样的。因为统计周期长,导致脉冲流量被平均掉了(也可以理解成被平滑了)。

那么假设我的滑动窗口统计周期只有400ms呢?对于[1400ms, 1600ms)区间,如下图:
在这里插入图片描述
窗口长度是400ms,两个格子,每个格子200ms。 整个窗口统计总请求数:650。 按照前面2000ms窗口来平均,400ms窗口应该是400个请求数。明显650 > 400。 所以窗口长度越小,抗脉冲能力越差。

滑动窗口的长度设置的越长,整体统计的结果抗脉冲能力会越强;滑动窗口的长度设置的越短,整体统计结果对脉冲的抵抗能力越弱。

具体怎么平衡还是要依据系统对脉冲流量的处理能力。

总结

(1)滑动窗口长度设置主要影响对脉冲流量的平滑效果,窗口越长,抗脉冲能力越强;
(2)滑动窗口的格子数量(固定滑动窗口长度下的格子长度),主要影响统计的精度,格子数越多,精度越高,但是也不是越多越好,太多了会影响并发性能。

Sentinel Go时间滑动窗口实现

Sentinel Go 滑动窗口 的实现基本和前一个章节描述的一样。

整个实现分为两部分:
(1)使用可设置的定长数组来表示滑动窗口(无锁),数组每个元素就是一个格子,格子是一个抽象对象,可以存储任意统计实体。
(2)一套基于时间滑动的算法,保证滑动窗口的滑动符合预期。

长度可设置的原子数组

滑动窗口最基础部分是一个无锁的原子数组。 Sentinel Go 是基于 slice 来实现原子数组的。原子数组可以无锁实现并发下的读写。

先看定义:

// AtomicBucketWrapArray forbit appending or shrinking after initializing
type AtomicBucketWrapArray struct {
	// The base address for real data array
	base unsafe.Pointer
	// The length of slice(array), it can not be modified.
	length int
	data   []*BucketWrap
}

// BucketWrap represent a slot to record metrics
type BucketWrap struct {
	// The start timestamp of this statistic bucket wrapper.
	BucketStart uint64
	// The actual data structure to record the metrics (e.g. MetricBucket).
	Value atomic.Value
}
AtomicBucketWrapArray[]*BucketWraplengthbase原子数组AtomicBucketWrapArray创建时候必须制定长度,一旦创建了之后不允许再增加元素或则缩容
BucketWrapBucketStartValuevoid*
AtomicBucketWrapArrayAtomicBucketWrapArray
// SliceHeader is a safe version of SliceHeader used within this project.
type SliceHeader struct {
	Data unsafe.Pointer
	Len  int
	Cap  int
}

func NewAtomicBucketWrapArrayWithTime(len int, bucketLengthInMs uint32, now uint64, generator BucketGenerator) *AtomicBucketWrapArray {
	// Step1: new
	ret := &AtomicBucketWrapArray{
		length: len,
		data:   make([]*BucketWrap, len),
	}
	// Step2: ......
	// initilize sliding windows start time (BucketWrap)
	
	// Step3: calculate base address for real data array
	sliHeader := (*SliceHeader)(unsafe.Pointer(&ret.data))
	ret.base = unsafe.Pointer((**BucketWrap)(sliHeader.Data))
	return ret
}
NewAtomicBucketWrapArrayWithTimeAtomicBucketWrapArraydata: make([]*BucketWrap, len)lenlen*BucketWrapAtomicBucketWrapArray.base**BuckerWarp**BuckerWarp

这里基于 slice 的内存转换拿到底层首元素内存地址的指针转换原理,可以参考另一篇文章 Go unsafe.Pointer 使用基本原则

AtomicBucketWrapArrayNewAtomicBucketWrapArrayWithTime
timeId := now / uint64(bucketLengthInMs)
idx := int(timeId) % len
startTime := calculateStartTime(now, bucketLengthInMs)

for i := idx; i <= len-1; i++ {
	ww := &BucketWrap{
		BucketStart: startTime,
		Value:       atomic.Value{},
	}
	ww.Value.Store(generator.NewEmptyBucket())
	ret.data[i] = ww
	startTime += uint64(bucketLengthInMs)
}
for i := 0; i < idx; i++ {
	ww := &BucketWrap{
		BucketStart: startTime,
		Value:       atomic.Value{},
	}
	ww.Value.Store(generator.NewEmptyBucket())
	ret.data[i] = ww
	startTime += uint64(bucketLengthInMs)
}
AtomicBucketWrapArraynowidxstartTime*BucketWrapstartTime*BucketWrap
*BucketWrapAtomicBucketWrapArrayAtomicBucketWrapArray
func (aa *AtomicBucketWrapArray) elementOffset(idx int) (unsafe.Pointer, bool) {
	if idx >= aa.length || idx < 0 {
		logging.Error(errors.New("array index out of bounds"),
			"array index out of bounds in AtomicBucketWrapArray.elementOffset()",
			"idx", idx, "arrayLength", aa.length)
		return nil, false
	}
	basePtr := aa.base
	return unsafe.Pointer(uintptr(basePtr) + uintptr(idx*PtrSize)), true
}

func (aa *AtomicBucketWrapArray) get(idx int) *BucketWrap {
	// aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx]
	// then convert to (*unsafe.Pointer)
	if offset, ok := aa.elementOffset(idx); ok {
		return (*BucketWrap)(atomic.LoadPointer((*unsafe.Pointer)(offset)))
	}
	return nil
}

func (aa *AtomicBucketWrapArray) compareAndSet(idx int, except, update *BucketWrap) bool {
	// aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx]
	// then convert to (*unsafe.Pointer)
	// update secondary pointer
	if offset, ok := aa.elementOffset(idx); ok {
		return atomic.CompareAndSwapPointer((*unsafe.Pointer)(offset), unsafe.Pointer(except), unsafe.Pointer(update))
	}
	return false
}
nownowidxidx
elementOffsetAtomicBucketWrapArray.baseidxunsafe.Pointer(uintptr(basePtr) + uintptr(idx*PtrSize))
elementOffset(idx)atomic.LoadPointeridx*BucketWrap(*BucketWrap)(atomic.LoadPointer((*unsafe.Pointer)(elementOffset(idx))))
compareAndSetget
*BucketWrapbase*base*(base+8)

基于时间的滑动窗口实现

AtomicBucketWrapArrayAtomicBucketWrapArray
type LeapArray struct {
	bucketLengthInMs uint32
	sampleCount      uint32
	intervalInMs     uint32
	array            *AtomicBucketWrapArray
	// update lock
	updateLock mutex
}

// Generic interface to generate bucket
type BucketGenerator interface {
	// called when timestamp entry a new slot interval
	NewEmptyBucket() interface{}

	// reset the BucketWrap, clear all data of BucketWrap
	ResetBucketTo(bw *BucketWrap, startTime uint64) *BucketWrap
}

func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {
	if now <= 0 {
		return nil, errors.New("Current time is less than 0.")
	}

	idx := la.calculateTimeIdx(now)
	bucketStart := calculateStartTime(now, la.bucketLengthInMs)

	for { //spin to get the current BucketWrap
		old := la.array.get(idx)
		if old == nil {
			// because la.array.data had initiated when new la.array
			// theoretically, here is not reachable
			newWrap := &BucketWrap{
				BucketStart: bucketStart,
				Value:       atomic.Value{},
			}
			newWrap.Value.Store(bg.NewEmptyBucket())
			if la.array.compareAndSet(idx, nil, newWrap) {
				return newWrap, nil
			} else {
				runtime.Gosched()
			}
		} else if bucketStart == atomic.LoadUint64(&old.BucketStart) {
			return old, nil
		} else if bucketStart > atomic.LoadUint64(&old.BucketStart) {
			// current time has been next cycle of LeapArray and LeapArray dont't count in last cycle.
			// reset BucketWrap
			if la.updateLock.TryLock() {
				old = bg.ResetBucketTo(old, bucketStart)
				la.updateLock.Unlock()
				return old, nil
			} else {
				runtime.Gosched()
			}
		} else if bucketStart < atomic.LoadUint64(&old.BucketStart) {
			if la.sampleCount == 1 {
				// if sampleCount==1 in leap array, in concurrency scenario, this case is possible
				return old, nil
			}
			// TODO: reserve for some special case (e.g. when occupying "future" buckets).
			return nil, errors.New(fmt.Sprintf("Provided time timeMillis=%d is already behind old.BucketStart=%d.", bucketStart, old.BucketStart))
		}
	}
}
AtomicBucketWrapArray
BucketGeneratorNewEmptyBucketResetBucketToBucketGenerator
currentBucketOfTime
nowbucketStart*BucketWrapAtomicBucketWrapArray*BucketWrap*BucketWrapold.BucketStartnownowold.BucketStartnownowTryLockold.BucketStartnow

下图展示滑动窗口随着时间滑动过程中变化:
滑动窗口滑动过程中示意图

参考文档:
Sentinel Go源码
golang slice实践以及底层实现
golang unsafe.Pointer使用原则以及 uintptr 隐藏的坑