🚀 优质资源分享 🚀

学习路线指引(点击解锁)知识定位人群定位
进阶级本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。
入门级手把手带你打造一个易扩展、更安全、效率更高的量化交易系统

一、Go time/rate 限流器

1.1 简介

Go 在 x 标准库,即 golang.org/x/time/rate 里自带了一个限流器,这个限流器是基于令牌桶算法(token bucket)实现的。

在上一篇文章讲了几种限流算法,里面就有令牌桶算法,具体可以看上篇文章介绍。

1.2 rate/time 限流构造器

这个限流构造器就是生成 token,供后面使用。

Copy// https://github.com/golang/time/blob/master/rate/rate.go#L55

// The methods AllowN, ReserveN, and WaitN consume n tokens.
type Limiter struct {
    mu sync.Mutex
    limit Limit   // 放入 token 的速率
    burst int     // 令牌桶限制最大值
    tokens float64 // 桶中令牌数
    // last is the last time the limiter's tokens field was updated
    last time.Time
    // lastEvent is the latest time of a rate-limited event (past or future)
    lastEvent time.Time
}

  • r :产生 token 的速率。默认是每秒中可以向桶中生产多少 token。也可以设置这个值,用方法 Every 设置 token 速率时间粒度。
  • b :桶的容量,桶容纳 token 的最大数量。 b == 0,允许声明容量为 0 的值,这时拒绝所有请求;与 b== 0 情况相反,如果 r 为 inf 时,将允许所有请求,即使是 b == 0。
Copy// Inf is the infinite rate limit; it allows all events (even if burst is zero). 
const Inf = Limit(math.MaxFloat64)

It implements a “token bucket” of size b, initially full and refilled at rate r tokens per second.
构造器一开始会为桶注入 b 个 token,然后每秒补充 r 个 token。

  • 每秒生成 20 个 token,桶的容量为 5,代码为:
Copylimiter := NewLimiter(20, 5)

  • 200ms 生成 1 个 token

这时候不是秒为单位生成 token ,就可以使用 Every 方法设置生成 token 的速率:

Copylimit := Every(200 * time.Millisecond)
limiter := NewLimiter(limit, 5)

1秒 = 200ms * 5,也就是每秒生成 5 个 token。

生成了 token 之后,请求获取 token,然后使用 token。

1.3 time/rate 有3种限流用法

time/rate 源码里注释,消费 n 个 tokens 的方法

// The methods AllowN, ReserveN, and WaitN consume n tokens.

  • AllowN
  • ReserveN
  • WaitN

A. WaitN、Wait

WaitN / Wait 方法:

Copy// https://pkg.go.dev/golang.org/x/time/rate#Limiter.WaitN
// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
// The burst limit is ignored if the rate limit is Inf.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)

func (lim *Limiter) Wait(ctx context.Context) (err error)

WaitN(ctx, 1)

方法里还有 Contex 参数,所以也可以设置 Deadline 或 Timeout,来决定 Wait 最长时间。比如下面代码片段:

Copy ctx, cancel := context.WithTimeout(context.Background(), time.Second * 5)
 defer cancel()
 err := limiter.WaitN(ctx, 2)

例子1:

Copypackage main

import (
	"context"
	"fmt"
	"time"

	"golang.org/x/time/rate"
)

func main() {
	limit := rate.NewLimiter(3, 5) // 每秒产生 3 个token,桶容量 5

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel() // 超时取消

	for i := 0; ; i++ { // 有多少令牌直接消耗掉
		fmt.Printf("%03d %s\n", i, time.Now().Format("2006-01-02 15:04:05.000"))
		err := limit.Wait(ctx)
		if err != nil { // 超时取消 err != nil
			fmt.Println("err: ", err.Error())
			return // 超时取消,退出 for
		}
	}
}

分析:这里指定令牌桶大小为 5,每秒生成 3 个令牌。for 循环消耗令牌,产生多少令牌都会消耗掉。
从开始一直到 5 秒超时,计算令牌数,一开始初始化 NewLimiter 的 5 个 + 每秒 3 个令牌 * 5秒 ,总计 20 个令牌。运行程序输出看看:

Copy$ go run .\waitdemo.go
000 2022-05-17 21:35:38.400
001 2022-05-17 21:35:38.425
002 2022-05-17 21:35:38.425
003 2022-05-17 21:35:38.425
004 2022-05-17 21:35:38.425
005 2022-05-17 21:35:38.425
006 2022-05-17 21:35:38.773
007 2022-05-17 21:35:39.096
008 2022-05-17 21:35:39.436
009 2022-05-17 21:35:39.764
010 2022-05-17 21:35:40.106
011 2022-05-17 21:35:40.434
012 2022-05-17 21:35:40.762
013 2022-05-17 21:35:41.104
014 2022-05-17 21:35:41.430
015 2022-05-17 21:35:41.759
016 2022-05-17 21:35:42.104
017 2022-05-17 21:35:42.429
018 2022-05-17 21:35:42.773
019 2022-05-17 21:35:43.101
err:  rate: Wait(n=1) would exceed context deadline

B: AllowN、Allow

AllowN / Allow 方法

Copy// https://pkg.go.dev/golang.org/x/time/rate#Limiter.AllowN
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool

// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool

AllowN(time.Now(), 1)

例子:

然后你可以找一个 http 测试工具模拟用户压测下,比如 https://github.com/rakyll/hey 这个工具。测试命令:

Copyhey -n 100 http://localhost:8080/

就可以看到输出的内容

例子2:

Copypackage main

import (
	"fmt"
	"time"

	"golang.org/x/time/rate"
)

func main() {
	limit := rate.NewLimiter(1, 3)
	for {
		if limit.AllowN(time.Now(), 2) {
			fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
		} else {
			time.Sleep(time.Second * 3)
		}
	}
}

C:ReserveN、Reserve

ReserveN / Reserve 方法

Copy// https://pkg.go.dev/golang.org/x/time/rate#Limiter.ReserveN
// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. The Limiter takes this Reservation into account when allowing future events. The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size.
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation

func (lim *Limiter) Reserve() *Reservation

func (r *Reservation) DelayFrom(now time.Time) time.Duration
func (r *Reservation) Delay() time.Duration
func (r *Reservation) OK() bool

其实上面的 WaitN 和 AllowN 都是基于 ReserveN 方法。具体可以去看看这 3 个方法的源码。

AllowN(time.Now(), 1)

usage example:

Copy// https://pkg.go.dev/golang.org/x/time/rate#Limiter.ReserveN

r := lim.ReserveN(time.Now(), 1)
if !r.OK() {
  // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
  return
}
time.Sleep(r.Delay())
Act() // 执行相关逻辑

1.4 动态设置桶token容量和速率

SetBurstAt / SetBurst:

Copyfunc (lim *Limiter) SetBurstAt(now time.Time, newBurst int)
func (lim *Limiter) SetBurst(newBurst int)

SetBurstAt :设置到某时刻桶中 token 的容量
SetBurst:SetBurstAt(time.Now())

SetLimitAt / SetLimit

Copyfunc (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)
func (lim *Limiter) SetLimit(newLimit Limit)

SetLimitAt :设置某刻 token 的速率
SetLimit :设置 token 的速率

二、uber 的 rate limiter

2.1 简介

漏桶算法(leaky bucket)

与令牌桶算法的区别:

  1. 漏桶算法流出的速率可以控制,流进桶中请求不能控制
  2. 令牌桶算法对于流入和流出的速度都是可以控制的,因为令牌可以自己生成。所以它还可以应对突发流量。突发流量生成 token 就快些。
  3. 令牌桶算法只要桶中有 token 就可以一直消费,漏桶是按照预定的间隔顺序进行消费的。

2.2 使用

官方的例子:

Copylimit := ratelimit.New(100) // 每秒钟允许100个请求

prev := time.Now()

for i := 0; i < 10; i++ {
    now := limit.Take()
    fmt.Println(i, now.Sub(prev))
    prev = now
}

限流器每秒可以通过 100 个请求,平均每个间隔 10ms。

2.3 uber 对漏桶算法的改进

在传统的漏桶算法,每个请求间隔是固定的,然而在实际应用中,流量不是这么平均的,时而小时而大,对于这种情况,uber 对 leaky bucket 做了一点改进,引入 maxSlack 最大松弛量的概念。

举例子:比如 3 个请求,请求 1 完成,15ms后,请求 2 才到来,可以对 2 立即处理。请求 2 完成后,5ms后,请求 3 到来,这个请求距离上次请求不足 10ms,因此要等 5ms。
但是,对于这种情况,实际三个请求一共耗时 25ms 才完成,并不是预期的 20ms。

uber 的改进是:可以把之情请求间隔比较长的时间,匀给后面的请求使用,只要保证每秒请求数即可。

uber ratelimit 改进代码实现:

Copyt.sleepFor += t.perRequest - now.Sub(t.last)
if t.sleepFor > 0 {
  t.clock.Sleep(t.sleepFor)
  t.last = now.Add(t.sleepFor)
  t.sleepFor = 0
} else {
  t.last = now
}

把每个请求多余出来的等待时间累加起来,以给后面的抵消使用。

其他参数用法:

  • WithoutSlack:

ratelimit 中引入最大松弛量,默认的最大松弛量为 10 个请求的间隔时间。
但是我不想用这个最大松弛量呢,就要限制请求的固定间隔时间,用 WithoutSlack 这个参数限制:

Copylimit := ratelimit.New(100, ratelimit.WithoutSlack)

  • WithClock(clock Clock):

ratelimit 中时间相关计算是用 go 的标准时间库 time,如果想要更高进度或特殊需求计算,可以用 WithClock 参数替换,实现 Clock 的 interface 就可以了

Copytype Clock interface {
        Now() time.Time
        Sleep(time.Duration)
}

clock &= MyClock{}
limiter := ratelimit.New(100, ratelimit.WithClock(clock))

更多 ratelimit

三、其他限流器,算法库包和软件

LimitByKeys()X-Rate-Limit-Limit

四、参考