什么是缓存击穿?
一个已经缓存的Key,在过期的那一时刻,突然被大量请求,给数据库带来巨大压力。
如何解决?
假设一个节点要向Peer节点通过http请求一个Key,它连续发送了N次这样的请求,我们是否可以只处理第一次请求(即只对数据库进行一次查询操作),其他的请求被block,一旦第一次请求拿到结果,其他请求直接使用这个结果,这样就避免了多次请求多次查询数据库的困境,缓解了数据库的压力。
如何实现?
源码的实现只有短短的64行代码,下面进行详细介绍:
结构体Group
Group represents a class of work and forms a namespace in which units of work can be executed with duplicate suppression.
内部实现:
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
字段mu是一个互斥锁,字段m是字典,键是key,值是封装的结构体call;call的实现如下:
// call is an in-flight or completed Do call
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
call实际查询数据库请求,封装了wg,保存请求结果的val,和err;
方法Do
Do执行函数调用,返回结果,确保同一时刻只执行一次函数调用,重复调用将等待第一个调用完成并接收结果;
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}
观察代码,我们发现m是一个懒惰初始化的map,为了避免重复初始化,加了互斥锁,对于第一次请求,我们实例化call,然后wg+1使其他协程等待这次调用的结果,然后我们给map赋值后就可以解锁了,其他协程又进入if里面开始等待call的调用结果;我们把结果保存在val里,调用完成后其他协程就可以直接取到结果了;最后我们删除key,这里解决的使同一时刻多次请求的问题,所以,不同时刻的请求每次都会进行函数的调用,我们必须在并发环境下进行测试;
测试
首先我们要测试单次请求能否正确进行函数调用返回值value;其次我们测试单次请求能否返回正确的错误信息err;最后我们测试在并发请求下函数是否只被调用了一次;
下面我们着重介绍最后一个测试:
func TestDoDupSuppress(t *testing.T) {
var g Group
c := make(chan string)
var calls int32
fn := func() (interface{}, error) {
atomic.AddInt32(&calls, 1)
return <-c, nil
}
const n = 10
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
v, err := g.Do("key", fn)
if err != nil {
t.Errorf("Do error: %v", err)
}
if v.(string) != "bar" {
t.Errorf("got %q; want %q", v, "bar")
}
wg.Done()
}()
}
time.Sleep(100 * time.Millisecond) // let goroutines above block
c <- "bar"
wg.Wait()
if got := atomic.LoadInt32(&calls); got != 1 {
t.Errorf("number of calls = %d; want 1", got)
}
}
atomic.AddInt32(&calls, 1)bar