1、简介
限流器是后台服务中的非常重要的组件,可以用来限制请求速率,保护服务,以免服务过载。 限流器的实现方法有很多种,例如滑动窗口法、Token Bucket、Leaky Bucket等。
golang标准库中就自带了限流算法的实现,即golang.org/x/time/rate。 该限流器是基于Token Bucket(令牌桶)实现的。
简单来说、令牌桶就是想象一个固定大小的桶、系统会以为恒定速率向桶中存放Token,桶满则暂时不放。
而用户则从桶中获取Token,存在Token就会一直获取,如果没有Token,则需要等待系统中被放置Token才行
2、使用
2.1、NewLimiter 创建限流器对象
limiter := NewLimiter(10,1)
两个参数:
- 第一参数r Limit:代表每秒可以向Token桶中存放多少Token,Limit类型实际上是float64
- 第二参数b int:b代表Token桶容量大小。
其构造出的限流器含义是令牌桶大小为1,以每秒10个Token的速率想桶中存放Token
2.2、Every 设置存放间隔时间
limit := Every(100 * time.Millisecond)
limiter := NewLimiter(limit,1)
上述表示以每100ms向桶中存放一个Token,本质上就是1s存放10个Token
2.3、Wait/WaitN
func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context,n int) (err error)
Wait实际上就是WaitN(ctx,1)
当Wait方法消费Token时,此时桶内Token不足(小于N),Wait就会阻塞一段时间,直到Token满足条件。如果充足则直接返回。
Wait方法有context参数,可通过context的Deadline或Timeout,设置Wait等待时长。
2.4、Allow/AllowN
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time,n int) bool
Allow实际上就是AllowN(time.Now(),1)
AllowN表示截止到某一时刻,目前桶中数据是否至少为n个,满足返回true,同时消费n个Token。反之不消费Token,false
对应线上场景,请求速率过快,直接丢到某些请求
2.5、Reserve/ReserveN
func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
Reserve相当于ReserveN(time.Now(), 1)。
ReserveN的用法相对复杂,当调用完后,无论Token是否充足,都会返回一个Reveration*对象
可以调用该对象的Delay()方法,该方法返回需要等待的时间。如果等待时间为0,无需等待。反之必须等到等待时间后,才能进行接下来的操作。
如果不想等待可以调用Cancel()方法,该方法会将Token归还。
ReserveN 返回一个 Reservation,指示调用者在 n 个事件发生之前必须等待多长时间。 限制器在允许未来事件时会考虑此保留。 如果 n 超过限制器的突发大小,则返回的 Reservation 的 OK() 方法返回 false。
用法示例:
r := lim.ReserveN(time.Now(), 1)
if !r.OK() {
// Not allowed to act! Did you remember to set lim.burst to be > 0 ?
return
}
time.Sleep(r.Delay())
Act()
2.6、动态速率
Limiter支持可以调整速率和桶大小:
SetLimit(Limit) //改变放入Token的速率
SetBurst(int)//改变Token桶大小
根据当前需求、环境、条件,动态改变Token桶大小和速率
3、实例代码
package main
import (
"context"
"log"
"time"
"golang.org/x/time/rate"
)
func main() {
//TODO limit每秒产生Token数量,burst最多存放Token数量
limit := rate.Every(time.Second * 1)
limiter := rate.NewLimiter(limit, 5)
log.Println(limiter.Limit(), limiter.Burst())
for i := 0; i < 10; i++ {
//TODO 阻塞等待,直到获取一个Token
log.Println("before wait")
c, _ := context.WithTimeout(context.Background(), time.Second*2)
if err := limiter.Wait(c); err != nil {
log.Println("limiter wait err :" + err.Error())
}
log.Println("after wait")
//TODO 返回需要等待多久获取新的Token,等待指定时间执行任务
r := limiter.Reserve()
log.Println("reserve Delay:", r.Delay())
//TODO 判断当前是否可以取到Token
a := limiter.Allow()
log.Println("Allow:", a)
}
}