缓存击穿
缓存在某个时间点过期的时候,恰好在这个时间点对这个Key有大量的并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。
为什么我们需要 SingleFlight(使用场景)?
一般情况下我们在写一写对外的服务的时候都会有一层 cache 作为缓存,用来减少底层数据库的压力,但是在遇到例如 redis 抖动或者其他情况可能会导致大量的 cache miss 出现。
如下图所示,可能存在来自桌面端和移动端的用户有 1000 的并发请求,他们都访问的获取文章列表的接口,获取前 20 条信息,如果这个时候我们服务直接去访问 redis 出现 cache miss 那么我们就会去请求 1000 次数据库,这时可能会给数据库带来较大的压力(这里的 1000 只是一个例子,实际上可能远大于这个值)导致我们的服务异常或者超时。
这时候就可以使用 singleflight 库了,直译过来就是单飞,这个库的主要作用就是将一组相同的请求合并成一个请求,实际上只会去请求一次,然后对所有的请求返回相同的结果。
如下图所示,使用 singleflight 之后,我们在一个请求的时间周期内实际上只会向底层的数据库发起一次请求大大减少对数据库的压力。
我们可以来模拟一下这种场景:
package main
import (
"errors"
"log"
"sync"
"golang.org/x/sync/singleflight"
)
var errorNotExist = errors.New("not exist")
func main() {
var wg sync.WaitGroup
wg.Add(10)
//模拟10个并发
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
data, err := getData("key")
if err != nil {
log.Print(err)
return
}
log.Println(data)
}()
}
wg.Wait()
}
//获取数据
func getData(key string) (string, error) {
data, err := getDataFromCache(key)
if err == errorNotExist {
//模拟从db中获取数据
data, err = getDataFromDB(key)
if err != nil {
log.Println(err)
return "", err
}
//TOOD: set cache
} else if err != nil {
return "", err
}
return data, nil
}
//模拟从cache中获取值,cache中无该值
func getDataFromCache(key string) (string, error) {
return "", errorNotExist
}
//模拟从数据库中获取值
func getDataFromDB(key string) (string, error) {
log.Printf("get %s from database", key)
return "data", nil
}
其中通过 getData(key) 获取数据, 逻辑是
- 先尝试从cache中获取
- 如果cache中不存在就从db中获取
我们模拟了10个并发请求,来同时调用 getData 函数,执行结果如下:
可以看得到10个请求都是走的db,因为cache中不存在该值,当我们利用上 singlefligth 包, getData 改动一下:
import "golang.org/x/sync/singleflight"
var gsf singleflight.Group
//获取数据
func getData(key string) (string, error) {
data, err := getDataFromCache(key)
if err == errorNotExist {
//模拟从db中获取数据
v, err, _ := gsf.Do(key, func() (interface{}, error) {
return getDataFromDB(key)
//set cache
})
if err != nil {
log.Println(err)
return "", err
}
//TOOD: set cache
data = v.(string)
} else if err != nil {
return "", err
}
return data, nil
}
执行结果如下,可以看得到只有一个请求进入的db,其他的请求也正常返回了值,从而保护了后端DB。
它是如何实现的(源码分析)?
Group
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
Do
type call struct {
wg sync.WaitGroup
// 函数的返回值,在 wg 返回前只会写入一次
val interface{}
err error
// 使用调用了 Forgot 方法
forgotten bool
// 统计调用次数以及返回的 channel
dups int
chans []chan<- Result
}
Do
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
// 前面提到的懒加载
if g.m == nil {
g.m = make(map[string]*call)
}
// 会先去看 key 是否已经存在
if c, ok := g.m[key]; ok {
// 如果存在就会解锁
c.dups++
g.mu.Unlock()
// 然后等待 WaitGroup 执行完毕,只要一执行完,所有的 wait 都会被唤醒
c.wg.Wait()
// 这里区分 panic 错误和 runtime 的错误,避免出现死锁,后面可以看到为什么这么做
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
// 如果我们没有找到这个 key 就 new call
c := new(call)
// 然后调用 waitgroup 这里只有第一次调用会 add 1,其他的都会调用 wait 阻塞掉
// 所以这要这次调用返回,所有阻塞的调用都会被唤醒
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
// 然后我们调用 doCall 去执行
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
doCall
这个方法的实现有点意思,使用了两个 defer 巧妙的将 runtime 的错误和我们传入 function 的 panic 区别开来避免了由于传入的 function panic 导致的死锁
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
// 第一个 defer 检查 runtime 错误
defer func() {
}()
// 使用一个匿名函数来执行
func() {
defer func() {
if !normalReturn {
// 如果 panic 了我们就 recover 掉,然后 new 一个 panic 的错误
// 后面在上层重新 panic
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
// 如果 fn 没有 panic 就会执行到这一步,如果 panic 了就不会执行到这一步
// 所以可以通过这个变量来判断是否 panic 了
normalReturn = true
}()
// 如果 normalReturn 为 false 就表示,我们的 fn panic 了
// 如果执行到了这一步,也说明我们的 fn recover 住了,不是直接 runtime exit
if !normalReturn {
recovered = true
}
}
DoChan
Do chan 和 Do 类似,其实就是一个是同步等待,一个是异步返回,主要实现上就是,如果调用 DoChan 会给 call.chans 添加一个 channel 这样等第一次调用执行完毕之后就会循环向这些 channel 写入数据
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
Forget
forget 用于手动释放某个 key 下次调用就不会阻塞等待了
func (g *Group) Forget(key string) {
g.mu.Lock()
if c, ok := g.m[key]; ok {
c.forgotten = true
}
delete(g.m, key)
g.mu.Unlock()
}
有哪些注意事项(避坑指南)?
单飞虽好但也不要滥用哦,还是存在一些坑的
1. 一个阻塞,全员等待
使用 singleflight 我们比较常见的是直接使用 Do 方法,但是这个极端情况下会导致整个程序 hang 住,如果我们的代码出点问题,有一个调用 hang 住了,那么会导致所有的请求都 hang 住
还是之前的例子,我们加一个 select 模拟阻塞
func singleflightGetArticle(sg *singleflight.Group, id int) (string, error) {
v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
// 模拟出现问题,hang 住
select {}
return getArticle(id)
})
return v.(string), err
}
执行就会发现死锁了
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select (no cases)]:
这时候我们可以使用 DoChan 结合 select 做超时控制
func singleflightGetArticle(ctx context.Context, sg *singleflight.Group, id int) (string, error) {
result := sg.DoChan(fmt.Sprintf("%d", id), func() (interface{}, error) {
// 模拟出现问题,hang 住
select {}
return getArticle(id)
})
select {
case r := <-result:
return r.Val.(string), r.Err
case <-ctx.Done():
return "", ctx.Err()
}
}
调用的时候传入一个含 超时的 context 即可,执行时就会返回超时错误
❯ go run ./1.go
panic: context deadline exceeded
2. 一个出错,全部出错
这个本身不是什么问题,因为 singleflight 就是这么设计的,但是实际使用的时候 如果我们一次调用要 1s,我们的数据库请求或者是 下游服务可以支撑 10rps 的请求的时候这会导致我们的错误阈提高,因为实际上我们可以一秒内尝试 10 次,但是用了 singleflight 之后只能尝试一次,只要出错这段时间内的所有请求都会受影响
这种情况我们可以启动一个 Goroutine 定时 forget 一下,相当于将 rps 从 1rps 提高到了 10rps
go func() {
time.Sleep(100 * time.Millisecond)
// logging
g.Forget(key)
}()