用处

保护下游,针对下游的同一批请求,只有一个负责去请求,其他等待结果;
例如:缓存更新能够做到对同一个失效key的多个请求,只有一个请求执行对key的更新操作。

示例

func TestDoDupSuppress(t *testing.T) {
    var g Group
    c := make(chan string)
    var calls int32
    fn := func() (interface{}, error) {
        atomic.AddInt32(&calls, 1)
        return <-c, nil
    }

    const n = 10
    var wg sync.WaitGroup
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func() { // n个协程同时调用了g.Do,fn中的逻辑只会被一个协程执行
            v, err := g.Do("key", fn)
            if err != nil {
                t.Errorf("Do error: %v", err)
            }
            if v.(string) != "bar" {
                t.Errorf("got %q; want %q", v, "bar")
            }
            wg.Done()
        }()
    }
    time.Sleep(100 * time.Millisecond) // let goroutines above block
    c <- "bar"
    wg.Wait()
    if got := atomic.LoadInt32(&calls); got != 1 {
        t.Errorf("number of calls = %d; want 1", got)
    }
}
  1. fn只被执行了一次 -> calls的值为1;
  2. 其他的携程都能拿到fn执行的结果;

原理

map存储每个key对应的call,每个call会被多个携程同时调用。
一个call里边有个waitgroup,第一个携程去执行调用,其他携程阻塞在wg上边。 (关键就是这个wg)

  1. call的结构
  type call struct {
    wg  sync.WaitGroup
    val interface{}  //最终返回的结果
    err error
  }
  1. map的结构
type Group struct {
  mu sync.Mutex       // protects m
  m  map[string]*call // lazily initialized
}
  1. 调用
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        g.mu.Unlock()
        c.wg.Wait() //其他的请求阻塞
        return c.val, c.err
    }
    c := new(call)
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    c.val, c.err = fn() //第一个去执行调用
    c.wg.Done() //同一批都返回

    g.mu.Lock()
    delete(g.m, key)
    g.mu.Unlock()

    return c.val, c.err
}
  1. 实际使用的例子
ProductSku = singleflight.Group{}
skuList, err, shared := ProductSku.Do(strconv.FormatInt(productId, 10), func() (i interface{}, e error) {
        return rpc.GetProductSku(ctx, productId, nil)
    })