限流
日常开发中,一般会遇到几种场景需要限流,比如有个api-server, 需要限制单个用户的调用频率,避免用户恶意刷接口或者突发大流量导致服务不可用等,这边记录几个常用的限流方法。
并发控制
简单的并发控制,用户的所有请求丢到一个channel里,再指定一个带缓冲区的channel为并发池,缓冲池的长度就是可以同时存在的协程数量,然后将执行完的任务根据需要直接返回或者丢到另外一个channel 里, 这样做的问题是如果任务太多,后面的任务会慢慢等待(因为channel阻塞机制),用户体验不是太好。
Code:
package main import ( "fmt" "sync" ) func setJob() <-chan int { var wg sync.WaitGroup jobChan := make(chan int, 50) wg.Add(1) go func(){ for i := 0; i < 50; i++ { jobChan <- i } close(jobChan) wg.Done() }() wg.Wait() return jobChan } func main() { var wg sync.WaitGroup // 创建一个需要处理的数据来源, 假设是个channel jobChan := setJob() // 将结果存入一个channel resChan := make(chan int , 50) // 设置一个并发池 buckets := make(chan bool, 10) for job := range jobChan { buckets <- true wg.Add(1) go func(job int) { res := 10 * job resChan <- res <-buckets wg.Done() }(job) } wg.Wait() close(resChan) tmpA := []int{} for r := range(resChan) { tmpA = append(tmpA, r) } fmt.Println(tmpA) }
漏桶
简单的限流,指定一个大小固定的桶(bucket),以指定速率流入流出,如果桶满了,则拒绝后续请求,好处是简单,但是由于rate是固定的,导致了在无法在业务突发高峰时候(比如活动期间)有比较好的适配性。
Code:
package main import ( "fmt" "math" "time" ) // 漏桶限流器 type BucketLimit struct { rate float64 //漏桶中水的漏出速率, 即每秒流多少水 bucketSize float64 //漏桶最多能装的水大小 lastAccessTime time.Time //上次访问时间 curWater float64 //当前桶里面的水 } func NewBucketLimit(rate float64, bucketSize int64) *BucketLimit { return &BucketLimit{ bucketSize: float64(bucketSize), rate: rate, curWater: 0, } } func (b *BucketLimit) AllowControl() bool { now := time.Now() pastTime := now.Sub(b.lastAccessTime) // 当前剩余水量,当前水量减去距离上次访问的流出水量,如果流完了,即剩余水量为0 b.curWater = math.Max(0, b.curWater - float64(pastTime) * b.rate) b.lastAccessTime = now // 当前水量必须小于桶的总量,不然则流出了 if b.curWater < b.bucketSize { b.curWater = b.curWater + 1 return true } return false } func main() { // 创建一个流出速率为1qps,桶的总量为2的限流器 limit := NewBucketLimit(1, 2) // 在桶里放入1000滴水 for i := 0; i < 1000; i++ { allow := limit.AllowControl() if allow { fmt.Printf("第%d滴水, 顺利流出\n", i) continue } else { fmt.Printf("第%d滴水, 溢出丢弃\n", i) time.Sleep(time.Millisecond * 100) } } }
令牌桶
令牌桶相比漏桶可以对突发的流量做一定程度的处理,该方法意思是一定速率往一个桶里放令牌(token),同时往桶里注水,每消耗一滴水,需要一定数量的令牌,这里代码假设需要2块令牌才能消耗1滴水,所以处理水滴的速度主要取决于令牌的数量。假设放入令牌的速度是4块/s,每秒消耗的水滴数量是1滴,即每秒消耗2块令牌,那么每秒多出来的2块令牌就会积累在桶里,直到桶满位置,然后因为某次活动大放水,每秒消耗的水滴数量突然变成了3滴,即每秒需要消耗6块另外,大于每秒放入的令牌数量,由于之前桶里有积累的令牌,所以即使放水量加大,依然可以在令牌桶消耗完之前快速处理。
Code:
package main import ( "fmt" "time" "math" ) // 令牌桶限流器 type BucketLimit struct { rate float64 //令牌桶放令牌的速率 bucketSize float64 //令牌桶最多能装的令牌数量 lastAccessTime time.Time //上次访问时间 curTokenNum float64 //桶里当前的令牌数量 } func NewBucketLimit(rate float64, bucketSize int64) *BucketLimit { return &BucketLimit{ bucketSize: float64(bucketSize), rate: rate, curTokenNum: 0, } } func (b *BucketLimit) AllowControl(tokenNeed float64) bool { now := time.Now() pastTime := now.Sub(b.lastAccessTime) // 在距离上次访问期间一共可发放了多少令牌 newTokenNum := float64(pastTime) / float64(b.rate) // 剩余令牌数量不能超过桶的总空间 b.curTokenNum = math.Min(b.bucketSize, b.curTokenNum + newTokenNum) b.lastAccessTime = now // tokenNeed 指处理一个请求需要的令牌数量 if tokenNeed > b.curTokenNum { return false } else { b.curTokenNum = b.curTokenNum - tokenNeed return true } } func main() { // 创建一个放令牌速率为10qps,桶的总量为20的限流器 limit := NewBucketLimit(10, 20) // 在桶里放入100滴水 for i := 0; i < 100; i++ { // 处理1滴水需要消耗19块令牌 if i == 50 { time.Sleep(3 * time.Second) } allow := limit.AllowControl(19) if allow { fmt.Printf("第%d滴水, 顺利流出\n", i) continue } else { fmt.Printf("第%d滴水, 溢出丢弃\n", i) time.Sleep(time.Millisecond * 100) } } }
他山之石
如果不自己造轮子,其实已经有很多别人造好的轮子,简单记录下,按需求选用:
- “https://golang.org/x/net/netutil” 官方自带的http限流工具
伪代码
import( "fmt" "net" "golang.org/x/net/netutil" ) func main() { l, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { fmt.Fatalf("Listen: %v", err) } defer l.Close() // 开启限流, 其实就算是记录器,max 就算最大的并发数量 l = LimitListener(l, max) http.Serve(l, http.HandlerFunc()) ...... }
- “https://github.com/didip/tollbooth” 1.8K star的http限流中间件
- “https://golang.org/x/time/rate” 官方的令牌桶限流代码包
- “https://github.com/uber-go/ratelimit” uber的开源库,基于漏斗算法实现了一个限制器,也值得学习一下
后续代码新增和更新就放这里了:
- “https://github.com/zyxpaomian/rate_limiter”