在分布式系统中,如果某个服务节点发生故障或者网络发生异常,都有可能导致调用方被阻塞等待,如果超时时间设置很长,调用方资源很可能被耗尽。这又导致了调用方的上游系统发生资源耗尽的情况,最终导致系统雪崩,如下情况会导致系统雪崩
【服务提供者不可用】:硬件故障;程序bug;缓存击穿;用户大量请求
【重试导致加大请求流量】:重试机制导致过多重试;代码问题重试
【服务调用者不可用】:同步等待造成的资源耗尽
所以我们可以使用如下机制来解决
【扩机器】:增加机器数量;增加机器硬件
【流量控制】:限流;关闭重试
【缓存】:预先加载缓存
【服务降级】:服务器压力剧增,对非核心业务流程进行降级,保证核心功能可用,暂停其他业务保证自身核心业务正常运行
【服务熔断】:下游服务不可用或者响应过慢,切断调用链路直接返回结果,保证自身服务的可用性
1.1 限流
当系统的处理能力不能应对外部请求的突增流量时,为了不让系统奔溃,必须采取限流的措施
1.1.1 限流指标
TPS
系统吞吐量是衡量系统性能的关键指标,按照事务的完成数量来限流是最合理的。
但是对分布式系统来说,按照事务来限流并不现实。在分布式系统中完成一笔事务需要多个系统的配合。比如我们在电商系统购物,需要订单、库存、账户、支付等多个服务配合完成,有的服务需要异步返回,这样完成一笔事务花费的时间可能会很长。如果按照TPS来进行限流,时间粒度可能会很大大,很难准确评估系统的响应性能。
HPS
每秒请求数,指每秒钟服务端收到客户端的请求数量。
如果一个请求完成一笔事务,那TPS和HPS是等同的。但在分布式场景下,完成一笔事务可能需要多次请求,所以TPS和HPS指标不能等同看待
QPS
服务端每秒能够响应的客户端查询请求数量。
如果后台只有一台服务器,那 HPS 和 QPS 是等同的。但是在分布式场景下,每个请求需要多个服务器配合完成响应
目前主流的限流方法多采用 HPS 作为限流指标
1.1.12 限流方法
流量计数器
这是最简单直接的方法,比如限制每秒请求数量 100,超过 100 的请求就拒绝掉。
但是这个方法存在明显的问题:
1 单位时间(比如 1s)很难把控
从下面标注的可以看出HPS 没有超过 100,但是从上面的标注可以看出,HPS超过100了
2 有一段时间流量超了,也不一定真的需要限流
系统 HPS 限制 50,虽然前 3s 流量超了,但是如果读超时时间设置为 5s,并不需要限流
滑动时间窗口
滑动时间窗口算法是目前比较流行的限流算法,主要思想是把时间看做是一个向前滚动的窗口,如下图
开始的时候,我们把 t1~t5 看做一个时间窗口,每个窗口 1s,如果我们定的限流目标是每秒 50 个请求,那 t1~t5 这个窗口的请求总和不能超过 250 个。
这个窗口是滑动的,下一秒的窗口成了 t2~t6,这时把 t1 时间片的统计抛弃,加入 t6 时间片进行统计。这段时间内的请求数量也不能超过 250 个。
滑动时间窗口的优点是解决了流量计数器算法的缺陷,但是也有 2 个问题:
- 流量超过就必须抛弃或者走降级逻辑
- 对流量控制不够精细,不能限制集中在短时间内的流量,也不能削峰填谷
漏桶算法
在客户端的请求发送到服务器之前,先用漏桶缓存起来,这个漏桶可以是一个长度固定的队列,这个队列中的请求均匀的发送到服务端。
如果客户端的请求速率太快,漏桶的队列满了,就会被拒绝掉,或者走降级处理逻辑。这样服务端就不会受到突发流量的冲击。
漏桶算法的优点是实现简单,可以使用消息队列来削峰填谷。
但是也有 3 个问题需要考虑:
- 漏桶的大小,如果太大,可能给服务端带来较大处理压力,太小可能会有大量请求被丢弃。
- 漏桶给服务端的请求发送速率。
- 使用缓存请求的方式,会使请求响应时间变长。
漏桶大小和发送速率这 2 个值在项目上线初期都会根据测试结果选择一个值,但是随着架构的改进和集群的伸缩,这 2 个值也会随之发生改变。
令牌桶算法
令牌桶算法就跟病人去医院看病一样,找医生之前需要先挂号,而医院每天放的号是有限的。当天的号用完了,第二天又会放一批号
令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。从原理上看,令牌桶算法和漏桶算法是相反的,一个“进水”,一个是“漏水”
令牌桶算法解决了漏桶算法的问题,而且实现并不复杂,使用信号量就可以实现。在实际限流场景中使用最多
1.2 熔断
相信大家对断路器路器并不陌生,它就相当于一个开关,打开后可以阻止流量通过。比如保险丝,当电流过大时,就会熔断,从而避免元器件损坏。
服务熔断是指调用方访问服务时通过断路器做代理进行访问,断路器会持续观察服务返回的成功、失败的状态,当失败超过设置的阈值时断路器打开,请求就不能真正地访问到服务了
1.2.1 断路器的状态
断路器有 3 种状态:
- CLOSED:默认状态。断路器观察到请求失败比例没有达到阈值,断路器认为被代理服务状态良好。
- OPEN:断路器观察到请求失败比例已经达到阈值,断路器认为被代理服务故障,打开开关,请求不再到达被代理的服务,而是快速失败。
- HALF OPEN:断路器打开后,为了能自动恢复对被代理服务的访问,会切换到半开放状态,去尝试请求被代理服务以查看服务是否已经故障恢复。如果成功,会转成 CLOSED 状态,否则转到 OPEN 状态
1.2.2 需要考虑的问题
使用断路器需要考虑一些问题:
- 针对不同的异常,定义不同的熔断后处理逻辑。
- 设置熔断的时长,超过这个时长后切换到 HALF OPEN 进行重试。
- 记录请求失败日志,供监控使用。
- 主动重试,比如对于 connection timeout 造成的熔断,可以用异步线程进行网络检测,比如 telenet,检测到网络畅通时切换到 HALF OPEN 进行重试。
- 补偿接口,断路器可以提供补偿接口让运维人员手工关闭。
- 重试时,可以使用之前失败的请求进行重试,但一定要注意业务上是否允许这样做。
1.2.3 使用场景
- 服务故障或者升级时,让客户端快速失败
- 失败处理逻辑容易定义
- 响应耗时较长,客户端设置的 read timeout 会比较长,防止客户端大量重试请求导致的连接、线程资源不能释放
1.3 降级
降级也就是服务降级,当我们的服务器压力剧增为了保证核心功能的可用性 ,而选择性的降低一些功能的可用性,或者直接关闭该功能。这就是典型的丢车保帅了。
就比如贴吧类型的网站,当服务器吃不消的时候,可以选择把发帖功能关闭,注册功能关闭,改密码,改头像这些都关了,为了确保登录和浏览帖子这种核心的功能。
1.4 总结
拿下棋比喻:
限流: 相当于尽量避免同时和两三个人同时下
熔断:相当于你的一颗卒被围死了,就不要利用其它棋去救它了,弃卒保帅,否则救他的棋也可能被拖死
降级:相当于尽量不要走用处不大的棋了,浪费走棋机会(资源),使已经过河的棋有更多的走棋机会(资源)发挥最大作用
2.1 Hystrix
stars数:22.7k
2.2 sentinel
Sentinel是阿里面向云原生开发的微服务流量控制、熔断降级的开源组件。
stars数:19.3k
2.2.1 Sentinel历史
- 2012 年,Sentinel 诞生,主要功能为入口流量控制。
- 2013-2017 年,Sentinel 在阿里巴巴集团内部迅速发展,成为基础技术模块,覆盖了所有的核心场景。Sentinel 也因此积累了大量的流量归整场景以及生产实践。
- 2018 年,Sentinel 开源,并持续演进。
- 2019 年,Sentinel 朝着多语言扩展的方向不断探索,推出 C++ 原生版本,同时针对 Service Mesh 场景也推出了 Envoy 集群流量控制支持,以解决 Service Mesh 架构下多语言限流的问题。
- 2020 年,推出 Sentinel Go 版本,继续朝着云原生方向演进。
- 2021 年,Sentinel 正在朝着 2.0 云原生高可用决策中心组件进行演进;同时推出了 Sentinel Rust 原生版本。同时我们也在 Rust 社区进行了 Envoy WASM extension 及 eBPF extension 等场景探索。
2.2.2 Sentinel优势
Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
官网是这样介绍的
Sentinel 具有以下特征:
- 丰富的应用场景:
Sentinel承接了阿里巴巴近10年的双十一大促流量的核心场景,例如秒杀,即突发流量控制在系统容量可以承受的范围;消息削峰填谷;实时熔断下游不可用应用,等等。 - 完备的监控功能:
Sentinel同时提供最实时的监控功能,您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。 - 简单易用的扩展点:
Sentinel提供简单易用的扩展点,您可以通过实现扩展点,快速的定制逻辑。例如定制规则管理,适配数据源等。
Sentinel功2.2.3 能和设计理念
流量控制
流量控制在网络传输中是一个常用的概念,它用于调整网络包的发送数据。然而,从系统稳定性角度考虑,在处理请求的速度上,也有非常多的讲究。任意时间到来的请求往往是随机不可控的,而系统的处理能力是有限的。我们需要根据系统的处理能力对流量进行控制。Sentinel 作为一个调配器,可以根据需要把随机的请求调整成合适的形状,如下图所示:
流量控制有以下几个角度:
- 资源的调用关系,例如资源的调用链路,资源和资源之间的关系;
- 运行指标,例如 QPS、线程池、系统负载等;
- 控制的效果,例如直接限流、冷启动、排队等。
Sentinel 的设计理念是让您自由选择控制的角度,并进行灵活组合,从而达到想要的效果。
熔断降级
什么是熔断降级
除了流量控制以外,降低调用链路中的不稳定资源也是 Sentinel 的使命之一。由于调用关系的复杂性,如果调用链路中的某个资源出现了不稳定,最终会导致请求发生堆积。这个问题和 Hystrix 里面描述的问题是一样的。
Sentinel 和 Hystrix 的原则是一致的: 当调用链路中某个资源出现不稳定,例如,表现为 timeout,异常比例升高的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终产生雪崩的效果。
熔断降级设计理念
在限制的手段上,Sentinel 和 Hystrix 采取了完全不一样的方法。
Hystrix 通过线程池的方式,来对依赖(在我们的概念中对应资源)进行了隔离。这样做的好处是资源和资源之间做到了最彻底的隔离。缺点是除了增加了线程切换的成本,还需要预先给各个资源做线程池大小的分配。
Sentinel 对这个问题采取了两种手段:
- 通过并发线程数进行限制
和资源池隔离的方法不同,Sentinel 通过限制资源并发线程的数量,来减少不稳定资源对其它资源的影响。这样不但没有线程切换的损耗,也不需要您预先分配线程池的大小。当某个资源出现不稳定的情况下,例如响应时间变长,对资源的直接影响就是会造成线程数的逐步堆积。当线程数在特定资源上堆积到一定的数量之后,对该资源的新请求就会被拒绝。堆积的线程完成任务后才开始继续接收请求。
- 通过响应时间对资源进行降级
除了对并发线程数进行控制以外,Sentinel 还可以通过响应时间来快速降级不稳定的资源。当依赖的资源出现响应时间过长后,所有对该资源的访问都会被直接拒绝,直到过了指定的时间窗口之后才重新恢复。
系统负载保护
Sentinel 同时提供系统维度的自适应保护能力。防止雪崩,是系统防护中重要的一环。当系统负载较高的时候,如果还持续让请求进入,可能会导致系统崩溃,无法响应。在集群环境下,网络负载均衡会把本应这台机器承载的流量转发到其它的机器上去。如果这个时候其它的机器也处在一个边缘状态的时候,这个增加的流量就会导致这台机器也崩溃,最后导致整个集群不可用。
针对这个情况,Sentinel 提供了对应的保护机制,让系统的入口流量和系统的负载达到一个平衡,保证系统在能力范围之内处理最多的请求。
三 sentinel-golang使用3.1 流量控制
3.1.1 按qps控制
package main
import (
"fmt"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/logging"
"log"
)
func main() {
// We should initialize Sentinel first.
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
//方式一: 通过配置文件创建
err := sentinel.InitWithConfig(conf)
//方式二: 通过默认创建
//err :sentinel.InitDefault()
if err != nil {
log.Fatal(err)
}
// TokenCalculateStrategy配置flow.Direct,Threshold配置10,StatIntervalInMs配置100,表示1s钟10个流量
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: "lqz-test", //资源名,即规则的作用目标
TokenCalculateStrategy: flow.Direct, //当前流量控制器的Token计算策略。Direct表示直接使用字段 Threshold 作为阈值;WarmUp表示使用预热方式计算Token的阈值,三种方式
ControlBehavior: flow.Reject, //表示流量控制器的控制策略;Reject表示超过阈值直接拒绝,Throttling表示匀速排队。 Threshold: 表示流控
Threshold: 10, //表示流控阈值;如果字段 StatIntervalInMs 是1000(也就是1秒),那么Threshold就表示QPS,流量控制器也就会依据资源的QPS来做流控
StatIntervalInMs: 1000, //规则对应的流量控制器的独立统计结构的统计周期。如果StatIntervalInMs是1000,也就是统计QPS
},
})
if err != nil {
log.Fatalf("Unexpected error: %+v", err)
return
}
ch := make(chan struct{})
for i := 0; i < 12; i++ {
go func() {
// base.Inbound表示入口流量控制,Outbound表示出口流量控制
e, b := sentinel.Entry("lqz-test", sentinel.WithTrafficType(base.Inbound))
if b != nil {
// Blocked. We could get the block reason from the BlockError.
fmt.Println("失败")
} else {
// Passed, wrap the logic here.
fmt.Println("通过")
// Be sure the entry is exited finally.
e.Exit()
}
}()
}
<-ch
}
// 可以看到通过10个,失败2个
ResourceTokenCalculateStrategyControlBehaviorThresholdRelationStrategyRefResourceRefResourceWarmUpPeriodSecWarmUpWarmUpColdFactorWarmUpMaxQueueingTimeMsThrottlingStatIntervalInMs
流量控制器的控制行为Throttling
package main
import (
"fmt"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/logging"
"log"
)
func main() {
// We should initialize Sentinel first.
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
//方式一: 通过配置文件创建
err := sentinel.InitWithConfig(conf)
//方式二: 通过默认创建
//err :sentinel.InitDefault()
if err != nil {
log.Fatal(err)
}
// TokenCalculateStrategy配置flow.Direct,Threshold配置10,StatIntervalInMs配置100,表示1s钟10个流量
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: "lqz-test", //资源名,即规则的作用目标
TokenCalculateStrategy: flow.Direct, //当前流量控制器的Token计算策略。Direct表示直接使用字段 Threshold 作为阈值;WarmUp表示使用预热方式计算Token的阈值,三种方式
ControlBehavior: flow.Throttling, //Throttling表示匀速排队
Threshold: 10, //表示流控阈值;如果字段 StatIntervalInMs 是1000(也就是1秒),那么Threshold就表示QPS,流量控制器也就会依据资源的QPS来做流控
StatIntervalInMs: 1000, //规则对应的流量控制器的独立统计结构的统计周期。如果StatIntervalInMs是1000,也就是统计QPS
},
})
if err != nil {
log.Fatalf("Unexpected error: %+v", err)
return
}
for {
// base.Inbound表示入口流量控制,Outbound表示出口流量控制
e, b := sentinel.Entry("lqz-test", sentinel.WithTrafficType(base.Inbound))
if b != nil {
// Blocked. We could get the block reason from the BlockError.
fmt.Println("失败")
} else {
// Passed, wrap the logic here.
fmt.Println("通过")
// Be sure the entry is exited finally.
e.Exit()
}
//time.Sleep(100*time.Millisecond) // 100 毫秒发送一次,都能通过,如果去掉,就只能通过一个
}
}
3.1.2 预热冷启动(warm_up)
WarmUp 方式,即预热/冷启动方式。当系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过"冷启动",让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。这块设计和 Java 类似,可以参考限流-冷启动文档
通常冷启动的过程系统允许通过的 QPS 曲线如下图所示
【设置了预热冷启动,qps会以曲线的形式慢慢增加,而不是一下增加,保证了系统不会瞬间承受很大压力而挂掉】
package main
import (
"fmt"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/logging"
"log"
"math/rand"
"time"
)
func main() {
// 定义三个变量,统计总共请求数,通过个数,失败个数
var total, pass, block int
// We should initialize Sentinel first.
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
//方式一: 通过配置文件创建
err := sentinel.InitWithConfig(conf)
//方式二: 通过默认创建
//err :sentinel.InitDefault()
if err != nil {
log.Fatal(err)
}
// TokenCalculateStrategy配置flow.WarmUp,
//WarmUpPeriodSec配置10,
//Threshold配置1000,
//表示这1000个流量,要在10s内缓慢的增加到
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: "lqz-test", //资源名,即规则的作用目标
TokenCalculateStrategy: flow.WarmUp, //使用WarmUp形式启动
ControlBehavior: flow.Reject, //表示流量控制器的控制策略;Reject表示超过阈值直接拒绝,Throttling表示匀速排队。 Threshold: 表示流控
WarmUpPeriodSec: 10, //预热的时间长度,该字段仅仅对 WarmUp 的TokenCalculateStrategy生效
Threshold: 1000, //表示流控阈值;如果字段 StatIntervalInMs 是1000(也就是1秒),那么Threshold就表示QPS,流量控制器也就会依据资源的QPS来做流控
//WarmUpColdFactor: 预热的因子,默认是3,该值的设置会影响预热的速度,该字段仅仅对 WarmUp 的TokenCalculateStrategy生效
},
})
if err != nil {
log.Fatalf("Unexpected error: %+v", err)
return
}
ch := make(chan struct{})
// 启动100个协程,分别:不停的请求
for i:=0;i<100;i++{
go func() {
// 死循环
for {
total++
e, b := sentinel.Entry("lqz-test", sentinel.WithTrafficType(base.Inbound))
if b != nil {
block++
} else {
pass++
e.Exit()
}
time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond) // 每次随机睡10毫秒以内的时间
}
}()
}
//
go func() {
// 统计过去一秒钟,总共多少个,通过多少个,block多少个
var oldTotal, oldPass, oldBlock int
for { //死循环每隔1s打印,上一秒中总共多少,通过多少,block多少
oldSecondTotal := total - oldTotal
oldTotal = total
oldSecondPass := pass - oldPass
oldPass = pass
oldSecondBlock := block - oldBlock
oldBlock = block
time.Sleep(1 * time.Second)
fmt.Printf("总共:%d,通过;%d,拒绝:%d \n", oldSecondTotal, oldSecondPass, oldSecondBlock)
}
}()
<-ch
}
3.2 熔断降级
3.2.1 熔断器模型
3.2.2 熔断策略
Sentinel 熔断器的三种熔断策略都支持静默期 (规则中通过MinRequestAmount字段表示)。静默期是指一个最小的静默请求数,在一个统计周期内,如果对资源的请求数小于设置的静默数,那么熔断器将不会基于其统计值去更改熔断器的状态。静默期的设计理由也很简单,举个例子,假设在一个统计周期刚刚开始时候,第 1 个请求碰巧是个慢请求,这个时候这个时候的慢调用比例就会是 100%,很明显是不合理,所以存在一定的巧合性。所以静默期提高了熔断器的精准性以及降低误判可能性。
Sentinel 支持以下几种熔断策略
- 慢调用比例策略 (SlowRequestRatio):Sentinel 的熔断器不在静默期,并且慢调用的比例大于设置的阈值,则接下来的熔断周期内对资源的访问会自动地被熔断。该策略下需要设置允许的调用 RT 临界值(即最大的响应时间),对该资源访问的响应时间大于该阈值则统计为慢调用。
- 错误比例策略 (ErrorRatio):Sentinel 的熔断器不在静默期,并且在统计周期内资源请求访问异常的比例大于设定的阈值,则接下来的熔断周期内对资源的访问会自动地被熔断。
- 错误计数策略 (ErrorCount):Sentinel 的熔断器不在静默期,并且在统计周期内资源请求访问异常数大于设定的阈值,则接下来的熔断周期内对资源的访问会自动地被熔断
3.2.3 基于错误数量的熔断
package main
import (
"errors"
"fmt"
"log"
"math/rand"
"time"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
)
// 状态转移的时候会触发
type stateChangeTestListener struct {
}
func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) {
fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}
func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) {
fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %d, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis())
}
func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) {
fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}
func main() {
// 定义total,pass,block,totalErr 统计
var total,pass,block ,totalErr int
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
err := sentinel.InitWithConfig(conf)
if err != nil {
log.Fatal(err)
}
ch := make(chan struct{})
// Register a state change listener so that we could observer the state change of the internal circuit breaker.
// 注册状态转移监听
circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})
_, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
// Statistic time span=5s, recoveryTimeout=3s, maxErrorCount=50
{
Resource: "abc",
Strategy: circuitbreaker.ErrorCount,
RetryTimeoutMs: 3000, // 3s后尝试恢复,进入half状态
MinRequestAmount: 10, // 静默数
StatIntervalMs: 5000, // 5s钟错误不超过50g
StatSlidingWindowBucketCount: 10, //滑动时间窗口是10
Threshold: 50, // 5s钟错误不超过50g
},
})
if err != nil {
log.Fatal(err)
}
logging.Info("[CircuitBreaker ErrorCount] Sentinel Go circuit breaking demo is running. You may see the pass/block metric in the metric log.")
go func() {
for {
total++
e, b := sentinel.Entry("abc")
if b != nil {
// g1 blocked
block++
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
} else {
if rand.Uint64()%20 > 9 {
// Record current invocation as error.
totalErr++
sentinel.TraceError(e, errors.New("biz error"))
}
// g1 passed
time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond)
e.Exit()
}
}
}()
go func() {
for {
total++
e, b := sentinel.Entry("abc")
if b != nil {
// g2 blocked
block++
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
} else {
// g2 passed
pass++
time.Sleep(time.Duration(rand.Uint64()%80) * time.Millisecond)
e.Exit()
}
}
}()
go func() {
for {
time.Sleep(time.Second)
fmt.Printf("总数:%d,通过:%d,错误:%d,block:%d\n",total,pass,totalErr,block)
}
}()
<-ch
}
3.2.4 基于错误率的熔断
package main
import (
"errors"
"fmt"
"log"
"math/rand"
"time"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
)
// 状态转移的时候会触发
type stateChangeTestListener struct {
}
func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) {
fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}
func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) {
fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %d, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis())
}
func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) {
fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}
func main() {
// 定义total,pass,block,totalErr 统计
var total,pass,block ,totalErr int
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
err := sentinel.InitWithConfig(conf)
if err != nil {
log.Fatal(err)
}
ch := make(chan struct{})
// Register a state change listener so that we could observer the state change of the internal circuit breaker.
// 注册状态转移监听
circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})
_, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
// Statistic time span=5s, recoveryTimeout=3s, maxErrorCount=50
{
Resource: "abc",
Strategy: circuitbreaker.ErrorRatio,
RetryTimeoutMs: 3000, // 3s后尝试恢复,进入half状态
MinRequestAmount: 10, // 静默数
StatIntervalMs: 5000, // 5s钟错误比例不超过0.4
StatSlidingWindowBucketCount: 10, //滑动时间窗口是10
Threshold: 0.4, // 5s钟错误比例不超过0.4
},
})
if err != nil {
log.Fatal(err)
}
logging.Info("[CircuitBreaker ErrorCount] Sentinel Go circuit breaking demo is running. You may see the pass/block metric in the metric log.")
go func() {
for {
total++
e, b := sentinel.Entry("abc")
if b != nil {
// g1 blocked
block++
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
} else {
if rand.Uint64()%20 > 6 {
// Record current invocation as error.
totalErr++
sentinel.TraceError(e, errors.New("biz error"))
}
// g1 passed
time.Sleep(time.Duration(rand.Uint64()%80+20) * time.Millisecond)
e.Exit()
}
}
}()
go func() {
for {
total++
e, b := sentinel.Entry("abc")
if b != nil {
// g2 blocked
block++
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
} else {
// g2 passed
pass++
time.Sleep(time.Duration(rand.Uint64()%80+40) * time.Millisecond)
e.Exit()
}
}
}()
go func() {
for {
time.Sleep(time.Second)
fmt.Printf("总数:%d,通过:%d,错误:%d,block:%d\n",total,pass,totalErr,block)
}
}()
<-ch
}
3.2.5 基于慢请求的熔断
package main
import (
"errors"
"fmt"
"log"
"math/rand"
"time"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
)
// 状态转移的时候会触发
type stateChangeTestListener struct {
}
func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) {
fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}
func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) {
fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %d, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis())
}
func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) {
fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}
func main() {
// 定义total,pass,block,totalErr 统计
var total,pass,block ,totalErr int
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
err := sentinel.InitWithConfig(conf)
if err != nil {
log.Fatal(err)
}
ch := make(chan struct{})
// Register a state change listener so that we could observer the state change of the internal circuit breaker.
// 注册状态转移监听
circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})
_, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
// Statistic time span=5s, recoveryTimeout=3s, maxErrorCount=50
{
Resource: "abc", // 名字
Strategy: circuitbreaker.SlowRequestRatio, // 慢查询的侧脸
RetryTimeoutMs: 3000, // 3s后尝试恢复,进入half状态
MinRequestAmount: 10, // 静默数
StatIntervalMs: 5000, // 5s钟慢查询比例不超过0.4
StatSlidingWindowBucketCount: 10, //滑动时间窗口是10
MaxAllowedRtMs: 50, // 50毫秒以外算慢查询
Threshold: 0.5,// 5s钟慢查询比例不超过0.4
},
})
if err != nil {
log.Fatal(err)
}
logging.Info("[CircuitBreaker ErrorCount] Sentinel Go circuit breaking demo is running. You may see the pass/block metric in the metric log.")
go func() {
for {
total++
e, b := sentinel.Entry("abc")
if b != nil {
// g1 blocked
block++
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
} else {
if rand.Uint64()%20 > 6 {
// Record current invocation as error.
totalErr++
sentinel.TraceError(e, errors.New("biz error"))
}
// g1 passed
time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond)
e.Exit()
}
}
}()
go func() {
for {
total++
e, b := sentinel.Entry("abc")
if b != nil {
// g2 blocked
block++
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
} else {
// g2 passed
pass++
time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond)
e.Exit()
}
}
}()
go func() {
for {
time.Sleep(time.Second)
fmt.Printf("总数:%d,通过:%d,错误:%d,block:%d\n",total,pass,totalErr,block)
}
}()
<-ch
}
三 gin中集成限流
gin的initGin/sentinel.go
package initGin
import (
"fmt"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/logging"
"log"
)
func InitSentinel() {
conf := config.NewDefaultConfig()
// for testing, logging output to console
conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
//方式一: 通过配置文件创建
err := sentinel.InitWithConfig(conf)
//方式二: 通过默认创建
//err :sentinel.InitDefault()
if err != nil {
fmt.Println("sss")
log.Fatal(err)
}
// TokenCalculateStrategy配置flow.Direct,Threshold配置1,StatIntervalInMs配置100,表示1s钟1个流量
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: "lqz-test", //资源名,即规则的作用目标
TokenCalculateStrategy: flow.Direct, //当前流量控制器的Token计算策略。Direct表示直接使用字段 Threshold 作为阈值;WarmUp表示使用预热方式计算Token的阈值,三种方式
ControlBehavior: flow.Throttling, //Throttling表示匀速排队
Threshold: 1, //表示流控阈值;如果字段 StatIntervalInMs 是1000(也就是1秒),那么Threshold就表示QPS,流量控制器也就会依据资源的QPS来做流控
StatIntervalInMs: 1000, //规则对应的流量控制器的独立统计结构的统计周期。如果StatIntervalInMs是1000,也就是统计QPS
},
})
if err != nil {
log.Fatalf("Unexpected error: %+v", err)
return
}
}
gin的main.go
package main
import (
"context"
"fmt"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"net/http"
//sentinelPlugin "github.com/alibaba/sentinel-golang/pkg/adapters/gin"
"github.com/gin-gonic/gin"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"grpc_proto_demo/sentinel_demo/gin_demo/initGin"
"grpc_proto_demo/sentinel_demo/proto"
)
func main() {
initGin.InitSentinel()
r := gin.Default()
//r.Use(sentinelPlugin.SentinelMiddleware())
r.GET("/index", func(c *gin.Context) {
// 第一步:连接服务端
conn, err := grpc.Dial("127.0.0.1:50052", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
c.JSON(200, "连接服务异常")
}
defer conn.Close()
// 第二步:创建客户端调用
client := proto.NewGreeterClient(conn)
// ****限流开始****
e, b := sentinel.Entry("lqz-test", sentinel.WithTrafficType(base.Inbound))
if b != nil {
// 返回给前端,超过了qps
fmt.Println("失败")
c.JSON(http.StatusTooManyRequests, gin.H{
"msg": "请求过快,请稍后再试",
})
return
}
resp, err := client.SayHello(context.Background(), &proto.HelloRequest{
Name: "lqz",
Age: 19,
})
e.Exit() // 不要忘了加它
// ****限流结束****
if err != nil {
c.JSON(200, "服务器错误")
}
c.JSON(200, resp.Reply)
})
r.Run()
}
grpc/server/main.go
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"grpc_proto_demo/sentinel_demo/proto"
"net"
)
type GreeterServer struct {
}
func (h GreeterServer) SayHello(ctx context.Context, in *proto.HelloRequest) (*proto.HelloResponse, error) {
// 接收客户端发送过来的数据,打印出来
fmt.Println("客户端传入的名字是:", in.Name)
fmt.Println("客户端传入的年龄是:", in.Age)
// 返回给客户端
return &proto.HelloResponse{
Reply: "服务端给你回复",
}, nil
}
// 服务端代码
func main() {
// 第一步:new一个server
g := grpc.NewServer()
// 第二步:生成一个结构体对象
s := GreeterServer{}
// 第三步: 把s注册到g对象中
proto.RegisterGreeterServer(g, &s)
// 第四步:启动服务,监听端口
lis, error := net.Listen("tcp", "0.0.0.0:50052")
if error != nil {
panic("启动服务异常")
}
g.Serve(lis)
}
proto
syntax = "proto3";
option go_package = ".;proto";
service Greeter{
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
// 类似于go的结构体,可以定义属性
message HelloRequest {
string name = 1; // 1 是编号,不是值
int32 age = 2;
}
// 定义一个响应的类型
message HelloResponse {
string reply =1;
}