go-zerocore/syncx/singleflight.go
go-zeroSingleFlight

利用场景

  1. 查问缓存时,合并申请,晋升服务性能。
    假如有一个 IP 查问的服务,每次用户申请先在缓存中查问一个 IP 的归属地,如果缓存中有后果则间接返回,不存在则进行 IP 解析操作。
SingleFlight
  1. 避免缓存击穿。

缓存击穿问题是指:在高并发的场景中,大量的申请同时查问一个 key ,如果这个 key 正好过期生效了,就会导致大量的申请都打到数据库,导致数据库的连贯增多,负载回升。

SingleFlight

利用形式

间接上代码:

func main() {
  round := 10
  var wg sync.WaitGroup
  barrier := syncx.NewSingleFlight()
  wg.Add(round)
  for i := 0; i < round; i++ {
    go func() {
      defer wg.Done()
      // 启用10个协程模仿获取缓存操作
      val, err := barrier.Do("get_rand_int", func() (interface{}, error) {
        time.Sleep(time.Second)
        return rand.Int(), nil
      })
      if err != nil {
        fmt.Println(err)
      } else {
        fmt.Println(val)
      }
    }()
  }
  wg.Wait()
}
Do()
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
rand.Int()

源码解析

先看代码构造:

type (
  // 定义接口,有2个办法 Do 和 DoEx,其实逻辑是一样的,DoEx 多了一个标识,次要看Do的逻辑就够了
  SingleFlight interface {
    Do(key string, fn func() (interface{}, error)) (interface{}, error)
    DoEx(key string, fn func() (interface{}, error)) (interface{}, bool, error)
  }
  // 定义 call 的构造
  call struct {
    wg  sync.WaitGroup // 用于实现通过1个 call,其余 call 阻塞
    val interface{}    // 示意 call 操作的返回后果
    err error          // 示意 call 操作产生的谬误
  }
  // 总控构造,实现 SingleFlight 接口
  flightGroup struct {
    calls map[string]*call // 不同的 call 对应不同的 key
    lock  sync.Mutex       // 利用锁管制申请
  }
)
Do办法
func (g *flightGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  c, done := g.createCall(key)
  if done {
    return c.val, c.err
  }

  g.makeCall(c, key, fn)
  return c.val, c.err
}
g.createCall(key)g.makeCall(c, key, fn)g.createCall(key)

从上图可知,其实要害就两步:

  1. 判断是第一个申请的协程(利用map)
  2. 阻塞住其余所有协程(利用 sync.WaitGroup)
g.createCall(key)
func (g *flightGroup) createCall(key string) (c *call, done bool) {
  g.lock.Lock()
  if c, ok := g.calls[key]; ok {
    g.lock.Unlock()
    c.wg.Wait()
    return c, true
  }

  c = new(call)
  c.wg.Add(1)
  g.calls[key] = c
  g.lock.Unlock()

  return c, false
}

先看第一步:判断是第一个申请的协程(利用map)

g.lock.Lock()
if c, ok := g.calls[key]; ok {
  g.lock.Unlock()
  c.wg.Wait()
  return c, true
}
sync.WaitGroupWait()

再看第二步:阻塞住其余所有协程(利用 sync.WaitGroup)

c = new(call)
c.wg.Add(1)
g.calls[key] = c
wg.Add(1)wg.Wait()g.makeCall(c, key, fn)
func (g *flightGroup) makeCall(c *call, key string, fn func() (interface{}, error)) {
  defer func() {
    g.lock.Lock()
    delete(g.calls, key)
    g.lock.Unlock()
    c.wg.Done()
  }()

  c.val, c.err = fn()
}
fn()
wg.Done()
SingleFlight

总结

须要阻塞管制协程

我的项目地址

https://github.com/zeromicro/go-zero

go-zero

微信交换群

关注『微服务实际』公众号并点击 交换群 获取社区群二维码。

go-zero