SingleFlight模块

什么是缓存击穿?

一个已经缓存的Key,在过期的那一时刻,突然被大量请求,给数据库带来巨大压力。

如何解决?

假设一个节点要向Peer节点通过http请求一个Key,它连续发送了N次这样的请求,我们是否可以只处理第一次请求(即只对数据库进行一次查询操作),其他的请求被block,一旦第一次请求拿到结果,其他请求直接使用这个结果,这样就避免了多次请求多次查询数据库的困境,缓解了数据库的压力。

如何实现?

源码的实现只有短短的64行代码,下面进行详细介绍:

结构体Group

Group represents a class of work and forms a namespace in which units of work can be executed with duplicate suppression.

内部实现:

mu sync.Mutex       // protects m
m  map[string]*call // lazily initialized

字段mu是一个互斥锁,字段m是字典,键是key,值是封装的结构体call;call的实现如下:

// call is an in-flight or completed Do call
type call struct {
    wg  sync.WaitGroup
    val interface{}
    err error
}

call实际查询数据库请求,封装了wg,保存请求结果的val,和err;

方法Do

Do执行函数调用,返回结果,确保同一时刻只执行一次函数调用,重复调用将等待第一个调用完成并接收结果;

// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        g.mu.Unlock()
        c.wg.Wait()
        return c.val, c.err
    }
    c := new(call)
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    c.val, c.err = fn()
    c.wg.Done()

    g.mu.Lock()
    delete(g.m, key)
    g.mu.Unlock()

    return c.val, c.err
}

观察代码,我们发现m是一个懒惰初始化的map,为了避免重复初始化,加了互斥锁,对于第一次请求,我们实例化call,然后wg+1使其他协程等待这次调用的结果,然后我们给map赋值后就可以解锁了,其他协程又进入if里面开始等待call的调用结果;我们把结果保存在val里,调用完成后其他协程就可以直接取到结果了;最后我们删除key,这里解决的使同一时刻多次请求的问题,所以,不同时刻的请求每次都会进行函数的调用,我们必须在并发环境下进行测试;

测试

首先我们要测试单次请求能否正确进行函数调用返回值value;其次我们测试单次请求能否返回正确的错误信息err;最后我们测试在并发请求下函数是否只被调用了一次;
下面我们着重介绍最后一个测试:

func TestDoDupSuppress(t *testing.T) {
    var g Group
    c := make(chan string)
    var calls int32
    fn := func() (interface{}, error) {
        atomic.AddInt32(&calls, 1)
        return <-c, nil
    }

    const n = 10
    var wg sync.WaitGroup
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func() {
            v, err := g.Do("key", fn)
            if err != nil {
                t.Errorf("Do error: %v", err)
            }
            if v.(string) != "bar" {
                t.Errorf("got %q; want %q", v, "bar")
            }
            wg.Done()
        }()
    }
    time.Sleep(100 * time.Millisecond) // let goroutines above block
    c <- "bar"
    wg.Wait()
    if got := atomic.LoadInt32(&calls); got != 1 {
        t.Errorf("number of calls = %d; want 1", got)
    }
}
atomic.AddInt32(&calls, 1)bar