golang在1.1版本之后引入了任务窃取调度器,效率比之前有了较大提升。本文通过分析golang的任务调度过程提取调度器的设计灵感。

数据结构

  1. G — 表示 Goroutine,它是一个待执行的任务;
  2. M — 表示操作系统的线程,它由操作系统的调度器调度和管理;
  3. P — 表示处理器,它可以被看做运行在线程上的本地调度器;

任务调度

调度代码见proc.go文件

func schedule() {
  _g_ := getg()

  if _g_.m.locks != 0 {
    throw("schedule: holding locks")
  }

  ...

top:
  pp := _g_.m.p.ptr()
  pp.preempt = false

  if sched.gcwaiting != 0 {
    gcstopm()
    goto top
  }

  ...

  var gp *g
  var inheritTime bool

  ...

  if gp == nil {
    // Check the global runnable queue once in a while to ensure fairness.
    // Otherwise two goroutines can completely occupy the local runqueue
    // by constantly respawning each other.
    // 偶尔检查一下全局任务队列以确保公平性,防止两个任务相互唤醒占据本地的任务队列
    // 偶尔体现在 _g_.m.p.ptr().schedtick%61 == 0
    // 每执行 60 次任务就试着从全局任务队列获取任务(?猜测)
    if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
      lock(&sched.lock)
      // 从全局队列中获取 1 个任务
      gp = globrunqget(_g_.m.p.ptr(), 1)
      unlock(&sched.lock)
    }
  }
  if gp == nil {
    // 从本地队列获取任务
    gp, inheritTime = runqget(_g_.m.p.ptr())
    // We can see gp != nil here even if the M is spinning,
    // if checkTimers added a local goroutine via goready.
  }
  if gp == nil {
    // 阻塞式地获取任务
    // 可能会从全局队列、网络、任务窃取等多个方面获取任务
    gp, inheritTime = findrunnable() // blocks until work is available
  }

  // This thread is going to run a goroutine and is not spinning anymore,
  // so if it was marked as spinning we need to reset it now and potentially
  // start a new spinning M.
  if _g_.m.spinning {
    resetspinning()
  }

  ...

  // 执行任务
  execute(gp, inheritTime)
}

从全局队列中获取任务

// 第 36 行代码处
gp = globrunqget(_g_.m.p.ptr(), 1)


// Try get a batch of G's from the global runnable queue.
// sched.lock must be held.
func globrunqget(_p_ *p, max int32) *g {
  assertLockHeld(&sched.lock)

  if sched.runqsize == 0 {
    return nil
  }

  n := sched.runqsize/gomaxprocs + 1
  if n > sched.runqsize {
    n = sched.runqsize
  }
  if max > 0 && n > max {
    n = max
  }
  // 最多只能从全局队列取一半的任务
  if n > int32(len(_p_.runq))/2 {
    n = int32(len(_p_.runq)) / 2
  }
  // 更新还剩下的全局队列任务数量
  sched.runqsize -= n
  // 从全局队列弹出需要执行的任务
  gp := sched.runq.pop()
  n--
  for ; n > 0; n-- {
    // 其他从全局队列拿到的任务放到当前调度器的本地队列中
    gp1 := sched.runq.pop()
    runqput(_p_, gp1, false)
  }
  return gp
}






向本地队列添加任务

// 第 33 行代码


// runqput tries to put g on the local runnable queue.
// If next is false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
// 如果 next 为 false,将任务添加到本地队列队尾
// 如果 next 为 true,将任务添加到 _p_.runnext,作为下一个执行的任务,缓存作用
// 如果本地队列已满,则添加到全局队列
func runqput(_p_ *p, gp *g, next bool) {
  // randomizeScheduler 为是否随机调度
  // 即使 next 为真,仍然有一定概率不缓存
  if randomizeScheduler && next && fastrandn(2) == 0 {
    next = false
  }

  if next {
  retryNext:
    oldnext := _p_.runnext
    if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
      goto retryNext
    }
    if oldnext == 0 {
      return
    }
    // Kick the old runnext out to the regular run queue.
    gp = oldnext.ptr()
  }

retry:
  // 队头
  h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
  // 队尾
  t := _p_.runqtail
  // 本地队列未满
  if t-h < uint32(len(_p_.runq)) {
    // 任务添加到队尾
    _p_.runq[t%uint32(len(_p_.runq))].set(gp)
    // 队尾更新
    atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
    return
  }
  // 本地队列已满,添加到全局队列中
  if runqputslow(_p_, gp, h, t) {
    return
  }
  // the queue is not full, now the put above must succeed
  goto retry
}

向全局队列添加任务

// Put g and a batch of work from local runnable queue on global queue.
// Executed only by the owner P.
// 将本地队列任务取出并打包之后再放到全局队列上
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
  var batch [len(_p_.runq)/2 + 1]*g

  // First, grab a batch from local queue.
  // 从本地队列取出一半任务并打包
  n := t - h
  n = n / 2
  if n != uint32(len(_p_.runq)/2) {
    throw("runqputslow: queue is not full")
  }
  for i := uint32(0); i < n; i++ {
    batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
  }
  if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
    return false
  }
  batch[n] = gp

  // 是否需要随机调度,即打乱顺序来
  if randomizeScheduler {
    for i := uint32(1); i <= n; i++ {
      j := fastrandn(i + 1)
      batch[i], batch[j] = batch[j], batch[i]
    }
  }

  // Link the goroutines.
  // 像链表一样通过 schedlink 前后链接起来
  for i := uint32(0); i < n; i++ {
    batch[i].schedlink.set(batch[i+1])
  }
  var q gQueue
  q.head.set(batch[0])
  q.tail.set(batch[n])

  // Now put the batch on global queue.
  lock(&sched.lock)
  // 将打包的任务添加到全局队列
  globrunqputbatch(&q, int32(n+1))
  unlock(&sched.lock)
  return true
}


从本地队列获取任务

// 第 42 行代码处
gp, inheritTime = runqget(_g_.m.p.ptr())


// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
  // _p_.runnext 缓存了下一个需要执行的任务
  next := _p_.runnext
  // If the runnext is non-0 and the CAS fails, it could only have been stolen by another P,
  // because other Ps can race to set runnext to 0, but only the current P can set it to non-0.
  // Hence, there's no need to retry this CAS if it falls.
  if next != 0 && _p_.runnext.cas(next, 0) {
    return next.ptr(), true
  }

  for {
    // 队头
    h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
    // 队尾
    t := _p_.runqtail
    // 队列为空
    if t == h {
      return nil, false
    }
    // 从本地队列头部取出一个任务
    gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
    // 队头更新
    if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
      return gp, false
    }
  }
}


从多个地方获取任务

// 第 49 行
// 可能会从全局队列、网络、任务窃取等多个方面获取任务
gp, inheritTime = findrunnable() // blocks until work is available

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
  ...
  // local runq
  // 从本地队列获取任务
  if gp, inheritTime := runqget(_p_); gp != nil {
    return gp, inheritTime
  }

  // 从全局队列获取任务
  if sched.runqsize != 0 {
    lock(&sched.lock)
    gp := globrunqget(_p_, 0)
    unlock(&sched.lock)
    if gp != nil {
      return gp, false
    }
  }
  // 从网络中获取任务
  // Poll network.
  // This netpoll is only an optimization before we resort to stealing.
  // We can safely skip it if there are no waiters or a thread is blocked
  // in netpoll already. If there is any kind of logical race with that
  // blocked thread (e.g. it has already returned from netpoll, but does
  // not set lastpoll yet), this thread will do blocking netpoll below
  // anyway.
  if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
    if list := netpoll(0); !list.empty() { // non-blocking
      gp := list.pop()
      // 下方有对此函数的具体功能描述
      injectglist(&list)
      ...
      return gp, false
    }
  }
  // 任务窃取
  // Spinning Ms: steal work from other Ps.
  //
  // Limit the number of spinning Ms to half the number of busy Ps.
  // This is necessary to prevent excessive CPU consumption when
  // GOMAXPROCS>>1 but the program parallelism is low.
  procs := uint32(gomaxprocs)
  if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
    if !_g_.m.spinning {
      _g_.m.spinning = true
      atomic.Xadd(&sched.nmspinning, 1)
    }

    gp, inheritTime, tnow, w, newWork := stealWork(now)
    now = tnow
    if gp != nil {
      // Successfully stole.
      return gp, inheritTime
    }
    if newWork {
      // There may be new timer or GC work; restart to
      // discover.
      goto top
    }
    if w != 0 && (pollUntil == 0 || w < pollUntil) {
      // Earlier timer to wait for.
      pollUntil = w
    }
  }
}

// injectglist adds each runnable G on the list to some run queue,
// and clears glist. If there is no current P, they are added to the
// global queue, and up to npidle M's are started to run them.
// Otherwise, for each idle P, this adds a G to the global queue
// and starts an M. Any remaining G's are added to the current P's
// local run queue.
// 这个函数会把任务添加到一些队列中
// 如果没有调度线程,这些任务将会被添加到全局队列中,并且将会唤醒 idle 调度线程
// 如果有调度线程,那么会将与 idle 调度线程相等数量的任务添加到全局队列中,并唤醒这些 idle 调度线程
// 剩下的其他任务会添加到调度线程的本地队列中
func injectglist(glist *gList) {}


以任务窃取方式获取任务

// stealWork attempts to steal a runnable goroutine or timer from any P.
//
// If newWork is true, new work may have been readied.
//
// If now is not 0 it is the current time. stealWork returns the passed time or
// the current time if now was passed as 0.
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
  // 当前调度器线程
  pp := getg().m.p.ptr()

  ranTimer := false

  const stealTries = 4
  for i := 0; i < stealTries; i++ {
    stealTimersOrRunNextG := i == stealTries-1

    // 随机选择一个调度器窃取,不能是当前本身
    for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
      if sched.gcwaiting != 0 {
        // GC work may be available.
        return nil, false, now, pollUntil, true
      }
      // 随机一个调度器 p2
      p2 := allp[enum.position()]
      // 不能是当前调度器本身
      if pp == p2 {
        continue
      }

      // 窃取定时器任务,这部分不是重点,先不考虑
      if stealTimersOrRunNextG && timerpMask.read(enum.position()) {
        ...
      }

      // Don't bother to attempt to steal if p2 is idle.
      // 不能是 idle 调度器
      if !idlepMask.read(enum.position()) {
        // 从 p2 窃取任务
        if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
          return gp, false, now, pollUntil, ranTimer
        }
      }
    }
  }

  // No goroutines found to steal. Regardless, running a timer may have
  // made some goroutine ready that we missed. Indicate the next timer to
  // wait for.
  return nil, false, now, pollUntil, ranTimer
}

// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed).
// 从 p2 的任务队列中窃取一半的任务过来
// 放入到当前调度器的本地队列中
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
  t := _p_.runqtail
  n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
  if n == 0 {
    return nil
  }
  n--
  gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
  if n == 0 {
    return gp
  }
  h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
  if t-h+n >= uint32(len(_p_.runq)) {
    throw("runqsteal: runq overflow")
  }
  atomic.StoreRel(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
  return gp
}

// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.
// Can be executed by any P.
// _p_ 是被窃取调度器
// batch 是窃取者队列
// batchHead 为窃取者队列的队尾索引
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
  for {
    h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
    t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
    n := t - h
    // 一半数量
    n = n - n/2
    // 队列为空,会尝试获取 被窃取调度器缓存的任务 __runnext__
    // __runnext__ 在 runqput() 出现过
    if n == 0 {
      if stealRunNextG {
        // Try to steal from _p_.runnext.
        if next := _p_.runnext; next != 0 {
          if _p_.status == _Prunning {
            // Sleep to ensure that _p_ isn't about to run the g
            // we are about to steal.
            // The important use case here is when the g running
            // on _p_ ready()s another g and then almost
            // immediately blocks. Instead of stealing runnext
            // in this window, back off to give _p_ a chance to
            // schedule runnext. This will avoid thrashing gs
            // between different Ps.
            // A sync chan send/recv takes ~50ns as of time of
            // writing, so 3us gives ~50x overshoot.
            if GOOS != "windows" {
              usleep(3)
            } else {
              // On windows system timer granularity is
              // 1-15ms, which is way too much for this
              // optimization. So just yield.
              osyield()
            }
          }
          if !_p_.runnext.cas(next, 0) {
            continue
          }
          // 将窃取任务添加到队尾
          batch[batchHead%uint32(len(batch))] = next
          return 1
        }
      }
      return 0
    }
    if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
      continue
    }
    for i := uint32(0); i < n; i++ {
      // 从 _p_ 本地队列取出任务
      g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
      // 添加到队尾
      batch[(batchHead+i)%uint32(len(batch))] = g
    }
    // 更新 _p_ 的队头,说明任务已被窃取
    if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
      return n
    }
  }
}