在微服务架构里面一个很常见的问题就是服务之间的延迟和通信失败问题,极端的情况下,甚至会因为某个服务的性能下降或者故障宕机,导致访问超时,层层传递,引发雪崩,最终导致整个系统崩溃,而限流器和熔断器(这两个组件都是客户端的)能很好的解决这个问题,提高系统的可靠性和稳定性
限流器限流器,从字面上理解就是用来限制流量,有时候流量突增(可预期的比如“双11”,不可预期的微博的热门话题等),会将后端服务压垮,甚至直接宕机,使用限流器能限制访问后端的流量,起到一个保护作用,被限制的流量,可以根据具体的业务逻辑去处理,直接返回错误或者返回默认值等等
golang.org/x/time/rate
// 每 800ms 产生 1 个 token,最多缓存 1 个 token,如果缓存满了,新的 token 会被丢弃
limiter := rate.NewLimiter(rate.Every(time.Duration(800)*time.Millisecond), 1)
Allow, Wait, Reserve
- Allow: 返回是否有 token,没有 token 返回 false,或者消耗 1 个 token 返回 true
- Wait: 阻塞等待,知道取到 1 个 token
- Reserve: 返回 token 信息,Allow 其实相当于 Reserve().OK,此外还会返回需要等待多久才有新的 token
一般使用 Wait 的场景会比较多一些
if err := limiter.Wait(context.Background()); err != nil {
panic(err)
}
// do you business logic
熔断器
和限流器对依赖服务的保护机制不一样,熔断器是当依赖的服务已经出现故障时,为了保证自身服务的正常运行不再访问依赖的服务,防止雪崩效应
熔断器有三种状态:
github.com/afex/hystrix-go
hystrix.ConfigureCommand(
"addservice", // 熔断器名字,可以用服务名称命名,一个名字对应一个熔断器,对应一份熔断策略
hystrix.CommandConfig{
Timeout: 100, // 超时时间 100ms
MaxConcurrentRequests: 2, // 最大并发数,超过并发返回错误
RequestVolumeThreshold: 4, // 请求数量的阀值,用这些数量的请求来计算阀值
ErrorPercentThreshold: 25, // 错误率阀值,达到阀值,启动熔断,25%
SleepWindow: 1000, // 熔断尝试恢复时间,1000ms
},
)
提供了阻塞和非阻塞两种使用方式,完整代码可以参考如下链接: 代码
阻塞使用 Do 方法,返回一个 err
err := hystrix.Do("addservice", func() error {
// 正常业务逻辑,一般是访问其他资源
var err error
// 设置总体超时时间 10 ms 超时
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(50*time.Millisecond))
defer cancel()
res, err = client.Add(
ctx, req,
// 这里可以再次设置重试次数,重试时间,重试返回码
grpc_retry.WithMax(3),
grpc_retry.WithPerRetryTimeout(time.Duration(5)*time.Millisecond),
grpc_retry.WithCodes(codes.DeadlineExceeded),
)
return err
}, func(err error) error {
// 失败处理逻辑,访问其他资源失败时,或者处于熔断开启状态时,会调用这段逻辑
// 可以简单构造一个response返回,也可以有一定的策略,比如访问备份资源
// 也可以直接返回 err,这样不用和远端失败的资源通信,防止雪崩
// 这里因为我们的场景太简单,所以我们可以在本地在作一个加法就可以了
fmt.Println(err)
res = &addservice.AddResponse{V: req.A + req.B}
return nil
})
非阻塞方法使用 Go 方法,返回一个 error 的 channel,建议在有多个资源需要并发访问的场景下是使用
errc1 := hystrix.Go("addservice", func() error {
var err error
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(50*time.Millisecond))
defer cancel()
res1, err = client.Add(ctx, req)
if err == nil {
success <- struct{}{}
}
return err
}, nil)
// 有 fallback 处理
errc2 := hystrix.Go("addservice", func() error {
var err error
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(50*time.Millisecond))
defer cancel()
res2, err = client.Add(ctx, req)
if err == nil {
success <- struct{}{}
}
return err
}, func(err error) error {
fmt.Println(err)
res2 = &addservice.AddResponse{V: req.A + req.B}
success <- struct{}{}
return nil
})
for i := 0; i < 2; i++ {
select {
case <-success:
fmt.Println("success", i)
case err := <-errc1:
fmt.Println("err1:", err)
case err := <-errc2:
// 这个分支永远不会走到,因为熔断机制里面永远不会返回错误
fmt.Println("err2:", err)
}
}
参考链接