1、简介

限流器是后台服务中的非常重要的组件,可以用来限制请求速率,保护服务,以免服务过载。 限流器的实现方法有很多种,例如滑动窗口法、Token Bucket、Leaky Bucket等。

golang标准库中就自带了限流算法的实现,即golang.org/x/time/rate。 该限流器是基于Token Bucket(令牌桶)实现的。

简单来说、令牌桶就是想象一个固定大小的桶、系统会以为恒定速率向桶中存放Token,桶满则暂时不放。

而用户则从桶中获取Token,存在Token就会一直获取,如果没有Token,则需要等待系统中被放置Token才行

2、使用

2.1、NewLimiter 创建限流器对象
limiter := NewLimiter(10,1)

两个参数:

  • 第一参数r Limit:代表每秒可以向Token桶中存放多少Token,Limit类型实际上是float64
  • 第二参数b int:b代表Token桶容量大小。

其构造出的限流器含义是令牌桶大小为1,以每秒10个Token的速率想桶中存放Token

2.2、Every 设置存放间隔时间
limit := Every(100 * time.Millisecond)
limiter := NewLimiter(limit,1)

上述表示以每100ms向桶中存放一个Token,本质上就是1s存放10个Token

2.3、Wait/WaitN
func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context,n int) (err error)

Wait实际上就是WaitN(ctx,1)

当Wait方法消费Token时,此时桶内Token不足(小于N),Wait就会阻塞一段时间,直到Token满足条件。如果充足则直接返回。

Wait方法有context参数,可通过context的Deadline或Timeout,设置Wait等待时长。

2.4、Allow/AllowN
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time,n int) bool

Allow实际上就是AllowN(time.Now(),1)

AllowN表示截止到某一时刻,目前桶中数据是否至少为n个,满足返回true,同时消费n个Token。反之不消费Token,false

对应线上场景,请求速率过快,直接丢到某些请求

2.5、Reserve/ReserveN
func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation

Reserve相当于ReserveN(time.Now(), 1)。

ReserveN的用法相对复杂,当调用完后,无论Token是否充足,都会返回一个Reveration*对象

可以调用该对象的Delay()方法,该方法返回需要等待的时间。如果等待时间为0,无需等待。反之必须等到等待时间后,才能进行接下来的操作。

如果不想等待可以调用Cancel()方法,该方法会将Token归还。

ReserveN 返回一个 Reservation,指示调用者在 n 个事件发生之前必须等待多长时间。 限制器在允许未来事件时会考虑此保留。 如果 n 超过限制器的突发大小,则返回的 Reservation 的 OK() 方法返回 false。

用法示例:

r := lim.ReserveN(time.Now(), 1)
if !r.OK() {
    // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
    return
}
time.Sleep(r.Delay())
Act()
2.6、动态速率

Limiter支持可以调整速率和桶大小:

SetLimit(Limit) //改变放入Token的速率
SetBurst(int)//改变Token桶大小

根据当前需求、环境、条件,动态改变Token桶大小和速率

3、实例代码

package main

import (
	"context"
	"log"
	"time"

	"golang.org/x/time/rate"
)

func main() {
	//TODO limit每秒产生Token数量,burst最多存放Token数量
	limit := rate.Every(time.Second * 1)
	limiter := rate.NewLimiter(limit, 5)
	log.Println(limiter.Limit(), limiter.Burst())
	for i := 0; i < 10; i++ {
		//TODO 阻塞等待,直到获取一个Token
		log.Println("before wait")
		c, _ := context.WithTimeout(context.Background(), time.Second*2)
		if err := limiter.Wait(c); err != nil {
			log.Println("limiter wait err :" + err.Error())
		}
		log.Println("after wait")
		//TODO 返回需要等待多久获取新的Token,等待指定时间执行任务
		r := limiter.Reserve()
		log.Println("reserve Delay:", r.Delay())
		//TODO 判断当前是否可以取到Token
		a := limiter.Allow()
		log.Println("Allow:", a)

	}
}