原文链接: 高并发零碎的限流策略:漏桶和令牌桶(附源码分析)

前言

PSasong5

文中测试代码已上传:https://github.com/asong2020/… 欢送`star

漏桶算法

HTTP
uberuber-go/ratelimit

样例

Gin
// 定义全局限流器对象
var rateLimit ratelimit.Limiter

// 在 gin.HandlerFunc 退出限流逻辑
func leakyBucket() gin.HandlerFunc {
    prev := time.Now()
    return func(c *gin.Context) {
        now := rateLimit.Take()
        fmt.Println(now.Sub(prev)) // 为了打印工夫距离
        prev = now // 记录上一次的工夫,没有这个打印的会有问题
    }
}

func main() {
    rateLimit = ratelimit.New(10)
    r := gin.Default()
    r.GET("/ping", leakyBucket(), func(c *gin.Context) {
        c.JSON(200, true)
    })
    r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}
abab -n 10 -c 2 http://127.0.0.1:8080/ping
uber

源码实现

咱们首先来看一下其外围构造:

type limiter struct {
    sync.Mutex
    last       time.Time
    sleepFor   time.Duration
    perRequest time.Duration
    maxSlack   time.Duration
    clock      Clock
}
type Limiter interface {
    // Take should block to make sure that the RPS is met.
    Take() time.Time
}
take()take()
sync.MutextlastsleepForperRequestmaxSlackclocknowsleep
NewrateoptionlimitNew
l := &limiter{
        perRequest: time.Second / time.Duration(rate),
        maxSlack:   -10 * time.Second / time.Duration(rate),
    }
1ntake
take
func (t *limiter) Take() time.Time {
    t.Lock()
    defer t.Unlock()
    now := t.clock.Now()
    if t.last.IsZero() {
        t.last = now
        return t.last
    }
    t.sleepFor += t.perRequest - now.Sub(t.last)
    if t.sleepFor < t.maxSlack {
        t.sleepFor = t.maxSlack
    }
    if t.sleepFor > 0 {
        t.clock.Sleep(t.sleepFor)
        t.last = now.Add(t.sleepFor)
        t.sleepFor = 0
    } else {
        t.last = now
    }

    return t.last
}
take()
IsZeronowsleepsleepFor

步骤其实不是很多,次要须要留神一个知识点 —— 最大松弛量。

req1req2>=perRequestreq1req2req3req1req2req2req250msreq3100ms50ms250ms200ms

对于下面这种状况,咱们能够把之前距离比拟长的申请的工夫匀给前面的申请判断限流时应用,缩小申请期待的工夫了,然而当两个申请之间达到的距离比拟大时,就会产生很大的可对消工夫,以至于前面大量申请霎时达到时,也无奈对消这个工夫,那样就曾经失去了限流的意义,所以引入了最大松弛量 (maxSlack) 的概念, 该值为负值,示意容许对消的最长工夫,避免以上状况的呈现。

以上就是漏桶实现的基本思路了,整体还是很简略的,你学会了吗?

令牌桶算法

令牌桶其实和漏桶的原理相似,令牌桶就是设想有一个固定大小的桶,零碎会以恒定速率向桶中放 Token,桶满则临时不放。从网上找了图,表述十分失当:

Githubgithub.com/juju/ratelimitGolangtimer/ratejuju/ratelimittime/rate

样例

gin
import (
    "net/http"
    "time"

    "github.com/gin-gonic/gin"
    "golang.org/x/time/rate"
)

var rateLimit *rate.Limiter

func tokenBucket() gin.HandlerFunc {
    return func(c *gin.Context) {
        if rateLimit.Allow() {
            c.String(http.StatusOK, "rate limit,Drop")
            c.Abort()
            return
        }
        c.Next()
    }
}

func main() {
    limit := rate.Every(100 * time.Millisecond)
    rateLimit = rate.NewLimiter(limit, 10)
    r := gin.Default()
    r.GET("/ping", tokenBucket(), func(c *gin.Context) {
        c.JSON(200, true)
    })
    r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}
NewLimiterr limitTokentokenb intToken100mstoken1s1010tokenAllowAllowN(time.Now(),1)AllowN ntruen tokenToken1

源码分析

Limit类型

time/ratelimitfloat64LimitInf Every Token Token
type Limit float64

// Inf is the infinite rate limit; it allows all events (even if burst is zero).
const Inf = Limit(math.MaxFloat64)

// Every converts a minimum time interval between events to a Limit.
func Every(interval time.Duration) Limit {
    if interval <= 0 {
        return Inf
    }
    return 1 / Limit(interval.Seconds())
}
Limiter
type Limiter struct {
    mu     sync.Mutex
    limit  Limit
    burst  int
    tokens float64
    // last is the last time the limiter's tokens field was updated
    last time.Time
    // lastEvent is the latest time of a rate-limited event (past or future)
    lastEvent time.Time
}

各个字段含意如下:

mulimitburstbursttokenslastlastEventlastEventReservation
Reservation
type Reservation struct {
    ok        bool
    lim       *Limiter
    tokens    int
    timeToAct time.Time
    // This is the Limit at reservation time, it can change later.
    limit Limit
}

各个字段含意如下:

oklimlimitertokenstimeToActlimit
reservationtimeToAct
Limiter
LimitetokenAllowReserveWait reserveNadvanceTokenreserveNadvance
advance
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
    //last不能在以后工夫now之后,否则计算出来的elapsed为正数,会导致令牌桶数量缩小
  last := lim.last
    if now.Before(last) {
        last = now
    }

    //依据令牌桶的缺数计算出令牌桶未进行更新的最大工夫
    maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
    elapsed := now.Sub(last) //令牌桶未进行更新的时间段
    if elapsed > maxElapsed {
        elapsed = maxElapsed
    }

    //依据未更新的工夫(未向桶中退出令牌的时间段)计算出产生的令牌数
    delta := lim.limit.tokensFromDuration(elapsed)
    tokens := lim.tokens + delta //计算出可用的令牌数
    if burst := float64(lim.burst); tokens > burst {
        tokens = burst
    }

    return now, last, tokens
}
advanceelapseddeltanewTokens
reserveNreserveN AllowN ReserveN WaitNmaxFutureReserve
// @param n 要生产的token数量
// @param maxFutureReserve 违心期待的最长工夫
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
    lim.mu.Lock()
    // 如果没有限度
    if lim.limit == Inf {
        lim.mu.Unlock()
        return Reservation{
            ok:        true, //桶中有足够的令牌
            lim:       lim,
            tokens:    n,
            timeToAct: now,
        }
    }
    //更新令牌桶的状态,tokens为目前可用的令牌数量
    now, last, tokens := lim.advance(now)
  // 计算取完之后桶还能剩能下多少token
    tokens -= float64(n)
    var waitDuration time.Duration
  // 如果token < 0, 阐明目前的token不够,须要期待一段时间
    if tokens < 0 {
        waitDuration = lim.limit.durationFromTokens(-tokens)
    }
    ok := n <= lim.burst && waitDuration <= maxFutureReserve
    r := Reservation{
        ok:    ok,
        lim:   lim,
        limit: lim.limit,
    }
  // timeToAct示意当桶中满足token数目等于n的工夫
    if ok {
        r.tokens = n
        r.timeToAct = now.Add(waitDuration)
    }
  // 更新桶外面的token数目
    // 更新last工夫
    // lastEvent
    if ok {
        lim.last = now
        lim.tokens = tokens
        lim.lastEvent = r.timeToAct
    } else {
        lim.last = last
    }
    lim.mu.Unlock()
    return r
}

下面的代码我曾经进行了正文,这里在总结一下流程:

Token Token Token Token TokentokensFromDurationTokenToken TokenToken TokendurationFromTokens

其实整个过程就是利用了 Token 数能够和工夫互相转化 的原理。而如果 Token 数为负,则须要期待相应工夫即可。

durationFromTokenstokensFromDuration
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
    seconds := tokens / float64(limit)
    return time.Nanosecond * time.Duration(1e9*seconds)
}
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
    // Split the integer and fractional parts ourself to minimize rounding errors.
    // See golang.org/issues/34861.
    sec := float64(d/time.Second) * float64(limit)
    nsec := float64(d%time.Second) * float64(limit)
    return sec + nsec/1e9
}
durationFromTokensN TokentokensFromDuration
tokensFromDuration TokenGolangd.Seconds() * float64(limit)
limiterToken
TokentokenCancel()token
func (r *Reservation) CancelAt(now time.Time) {
    if !r.ok {
        return
    }

    r.lim.mu.Lock()
    defer r.lim.mu.Unlock()
  /*
  1.如果无需限流
    2. tokens为0 (须要获取的令牌数量为0)
    3. 曾经过了截至工夫
    以上三种状况无需解决勾销操作
    */
    if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
        return
    }

    //计算出须要还原的令牌数量
    //这里的r.lim.lastEvent可能是本次Reservation的完结工夫,也可能是起初的Reservation的完结工夫,所以要把本次完结工夫点(r.timeToAct)之后产生的令牌数减去
    restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
  // 当小于0,示意曾经都预支完了,不能偿还了
    if restoreTokens <= 0 {
        return
    }
    //从新计算令牌桶的状态
    now, _, tokens := r.lim.advance(now)
    //还原以后令牌桶的令牌数量,以后的令牌数tokens加上须要还原的令牌数restoreTokens
    tokens += restoreTokens
  //如果tokens大于桶的最大容量,则将tokens置为桶的最大容量
    if burst := float64(r.lim.burst); tokens > burst {
        tokens = burst
    }
    // update state
    r.lim.last = now //记录桶的更新工夫
    r.lim.tokens = tokens //更新令牌数量
 // 如果都相等,阐明跟没生产一样。间接还原成上次的状态吧
    if r.timeToAct == r.lim.lastEvent {
        prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
        if !prevEvent.Before(now) {
            r.lim.lastEvent = prevEvent
        }
    }

    return
}
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))r.tokenstokenr.timeToAcrTokenr.lim.lastEventtimeToActr.limit.tokensFromDurationToken
TokenTokentoken

好啦,源码就临时剖析到这了,因为规范库的实现的代码量有点大,还有一部分在这里没有说,留给大家本人去分析吧~。

总结

本文重点介绍了漏桶算法和令牌桶算法,漏桶算法和令牌桶算法的次要区别在于,”漏桶算法”可能强行限度数据的传输速率(或申请频率),而”令牌桶算法”在可能限度数据的均匀传输速率外,还容许某种程度的突发传输。在某些状况下,漏桶算法不可能无效地应用网络资源,因为漏桶的漏出速率是固定的,所以即便网络中没有产生拥塞,漏桶算法也不能使某一个独自的数据流达到端口速率。因而,漏桶算法对于存在突发个性的流量来说不足效率。而令牌桶算法则可能满足这些具备突发个性的流量。通常,漏桶算法与令牌桶算法联合起来为网络流量提供更高效的管制。

star

好啦,这篇文章就到这里啦,素质三连(分享、点赞、在看)都是笔者继续创作更多优质内容的能源!

创立了一个Golang学习交换群,欢送各位大佬们踊跃入群,咱们一起学习交换。入群形式:加我vx拉你入群,或者公众号获取入群二维码

结尾给大家发一个小福利吧,最近我在看[微服务架构设计模式]这一本书,讲的很好,本人也收集了一本PDF,有须要的小伙能够到自行下载。获取形式:关注公众号:[Golang梦工厂],后盾回复:[微服务],即可获取。

我翻译了一份GIN中文文档,会定期进行保护,有须要的小伙伴后盾回复[gin]即可下载。

翻译了一份Machinery中文文档,会定期进行保护,有须要的小伙伴们后盾回复[machinery]即可获取。

我是asong,一名普普通通的程序猿,让咱们一起缓缓变强吧。欢送各位的关注,咱们下期见~~~

举荐往期文章:

  • Go看源码必会常识之unsafe包
  • 源码分析panic与recover,看不懂你打我好了!
  • 详解并发编程根底之原子操作(atomic包)
  • 详解defer实现机制
  • 真的了解interface了嘛
  • Leaf—Segment分布式ID生成零碎(Golang实现版本)
  • 十张动图带你搞懂排序算法(附go实现代码)
  • go参数传递类型
  • 手把手教姐姐写音讯队列
  • 常见面试题之缓存雪崩、缓存穿透、缓存击穿
  • 详解Context包,看这一篇就够了!!!
  • go-ElasticSearch入门看这一篇就够了(一)
  • 面试官:go中for-range应用过吗?这几个问题你能解释一下起因吗