限流

日常开发中,一般会遇到几种场景需要限流,比如有个api-server, 需要限制单个用户的调用频率,避免用户恶意刷接口或者突发大流量导致服务不可用等,这边记录几个常用的限流方法。

并发控制

简单的并发控制,用户的所有请求丢到一个channel里,再指定一个带缓冲区的channel为并发池,缓冲池的长度就是可以同时存在的协程数量,然后将执行完的任务根据需要直接返回或者丢到另外一个channel 里, 这样做的问题是如果任务太多,后面的任务会慢慢等待(因为channel阻塞机制),用户体验不是太好。

golang 限流控制_golang

Code:

package main

import (
	"fmt"
	"sync"
)


func setJob() <-chan int {
    var wg sync.WaitGroup
    jobChan := make(chan int, 50)
    wg.Add(1)
    go func(){
        for i := 0; i < 50; i++ {
            jobChan <- i
        }
        close(jobChan)
        wg.Done()
    }()
    wg.Wait()
    return jobChan
}

func main() {
	var wg sync.WaitGroup
    // 创建一个需要处理的数据来源, 假设是个channel
    jobChan := setJob()

    // 将结果存入一个channel
    resChan := make(chan int , 50)

    // 设置一个并发池
	buckets := make(chan bool, 10)
	for job := range jobChan {
		buckets <- true
		wg.Add(1)
		go func(job int) {
            res := 10 * job
            resChan <- res
			<-buckets
			wg.Done()
		}(job)
	}

    wg.Wait()
    close(resChan)
    tmpA := []int{}
	for r := range(resChan) {
		tmpA = append(tmpA, r)
	}
    fmt.Println(tmpA)
    
}
漏桶

简单的限流,指定一个大小固定的桶(bucket),以指定速率流入流出,如果桶满了,则拒绝后续请求,好处是简单,但是由于rate是固定的,导致了在无法在业务突发高峰时候(比如活动期间)有比较好的适配性。

golang 限流控制_golang_02

Code:

package main

import (
    "fmt"
    "math"
    "time"
)

// 漏桶限流器
type BucketLimit struct {
    rate       float64 //漏桶中水的漏出速率, 即每秒流多少水
    bucketSize float64 //漏桶最多能装的水大小
    lastAccessTime   time.Time //上次访问时间
    curWater   float64 //当前桶里面的水
}

func NewBucketLimit(rate float64, bucketSize int64) *BucketLimit {
    return &BucketLimit{
        bucketSize: float64(bucketSize),
        rate:       rate,
        curWater:   0,
    }
}

func (b *BucketLimit) AllowControl() bool {
    now := time.Now()
    pastTime := now.Sub(b.lastAccessTime)

    // 当前剩余水量,当前水量减去距离上次访问的流出水量,如果流完了,即剩余水量为0
    b.curWater = math.Max(0, b.curWater - float64(pastTime) * b.rate)

    b.lastAccessTime = now

    // 当前水量必须小于桶的总量,不然则流出了
    if b.curWater < b.bucketSize {
        b.curWater = b.curWater + 1
        return true
    }
    return false
}

func main() {
    // 创建一个流出速率为1qps,桶的总量为2的限流器
    limit := NewBucketLimit(1, 2)

    // 在桶里放入1000滴水
    for i := 0; i < 1000; i++ {
        allow := limit.AllowControl()
        if allow {
            fmt.Printf("第%d滴水, 顺利流出\n", i)
            continue
        } else {
            fmt.Printf("第%d滴水, 溢出丢弃\n", i)
            time.Sleep(time.Millisecond * 100)
        }
    }
}
令牌桶

令牌桶相比漏桶可以对突发的流量做一定程度的处理,该方法意思是一定速率往一个桶里放令牌(token),同时往桶里注水,每消耗一滴水,需要一定数量的令牌,这里代码假设需要2块令牌才能消耗1滴水,所以处理水滴的速度主要取决于令牌的数量。假设放入令牌的速度是4块/s,每秒消耗的水滴数量是1滴,即每秒消耗2块令牌,那么每秒多出来的2块令牌就会积累在桶里,直到桶满位置,然后因为某次活动大放水,每秒消耗的水滴数量突然变成了3滴,即每秒需要消耗6块另外,大于每秒放入的令牌数量,由于之前桶里有积累的令牌,所以即使放水量加大,依然可以在令牌桶消耗完之前快速处理。

golang 限流控制_golang_03

Code:

package main
 
import (
    "fmt"
    "time"
    "math"
)
 
// 令牌桶限流器
type BucketLimit struct {
    rate       float64 //令牌桶放令牌的速率
    bucketSize float64 //令牌桶最多能装的令牌数量
    lastAccessTime   time.Time //上次访问时间
    curTokenNum  float64 //桶里当前的令牌数量
}

func NewBucketLimit(rate float64, bucketSize int64) *BucketLimit {
    return &BucketLimit{
        bucketSize: float64(bucketSize),
        rate:       rate,
        curTokenNum:   0,
    }
}

func (b *BucketLimit) AllowControl(tokenNeed float64) bool {
    now := time.Now()
    pastTime := now.Sub(b.lastAccessTime)
 
    // 在距离上次访问期间一共可发放了多少令牌
    newTokenNum := float64(pastTime) / float64(b.rate)

    // 剩余令牌数量不能超过桶的总空间
    b.curTokenNum = math.Min(b.bucketSize, b.curTokenNum + newTokenNum)

    b.lastAccessTime = now
    // tokenNeed 指处理一个请求需要的令牌数量
    if tokenNeed > b.curTokenNum {
        return false
    } else {
        b.curTokenNum = b.curTokenNum - tokenNeed
        return true
    }
}
 
func main() {
    // 创建一个放令牌速率为10qps,桶的总量为20的限流器
    limit := NewBucketLimit(10, 20)

    // 在桶里放入100滴水
    for i := 0; i < 100; i++ {
        // 处理1滴水需要消耗19块令牌
        if i == 50 {
            time.Sleep(3 * time.Second)
        }
        allow := limit.AllowControl(19)
        if allow {
            fmt.Printf("第%d滴水, 顺利流出\n", i)
            continue
        } else {
            fmt.Printf("第%d滴水, 溢出丢弃\n", i)
            time.Sleep(time.Millisecond * 100)
        }
    }
}
他山之石

如果不自己造轮子,其实已经有很多别人造好的轮子,简单记录下,按需求选用:

  • “https://golang.org/x/net/netutil” 官方自带的http限流工具

伪代码

import(
    "fmt"
    "net"
    "golang.org/x/net/netutil"
)
 
func main() {
    l, err := net.Listen("tcp", "127.0.0.1:0")
    if err != nil {
        fmt.Fatalf("Listen: %v", err)
    }
    defer l.Close()
    // 开启限流, 其实就算是记录器,max 就算最大的并发数量
    l = LimitListener(l, max)
    http.Serve(l, http.HandlerFunc())
    ......
}
  • “https://github.com/didip/tollbooth” 1.8K star的http限流中间件
  • “https://golang.org/x/time/rate” 官方的令牌桶限流代码包
  • “https://github.com/uber-go/ratelimit” uber的开源库,基于漏斗算法实现了一个限制器,也值得学习一下



后续代码新增和更新就放这里了:

  • “https://github.com/zyxpaomian/rate_limiter”