目录

0. 简介

goroutine

1. 协程调度发生的时机

goroutine
情形说明
go func(){}使用go关键字创建一个新的goroutine,调度器会考虑调度
GC由于GC也需要在系统线程M上执行,且其中需要所有的goroutine都停止运行,所以也会发生调度
系统调用发生系统的调用时,会阻塞M,所以它会被调度走,同时新的goroutine也会被调度上来
同步内存访问mutex、channel等操作会使得goroutine阻塞,因此会被调度走,等条件满足后,还会被调度上来继续运行

2. 创建协程时的调度

go
func newproc(fn *funcval) {
   gp := getg()
   pc := getcallerpc()
   systemstack(func() {
      newg := newproc1(fn, gp, pc)

      _p_ := getg().m.p.ptr()
      runqput(_p_, newg, true)

      if mainStarted {
         wakep()
      }
   })
}
goGoruntime.newprocmain goroutineruntime.mainmainStartedtrueruntime.wakep
func wakep() {
   if atomic.Load(&sched.npidle) == 0 {
      return
   }
   // be conservative about spinning threads
   if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) {
      return
   }
   startm(nil, true)
}
wakepspinningstartm
func startm(_p_ *p, spinning bool) {
   // Disable preemption.
   //
   // Every owned P must have an owner that will eventually stop it in the
   // event of a GC stop request. startm takes transient ownership of a P
   // (either from argument or pidleget below) and transfers ownership to
   // a started M, which will be responsible for performing the stop.
   //
   // Preemption must be disabled during this transient ownership,
   // otherwise the P this is running on may enter GC stop while still
   // holding the transient P, leaving that P in limbo and deadlocking the
   // STW.
   //
   // Callers passing a non-nil P must already be in non-preemptible
   // context, otherwise such preemption could occur on function entry to
   // startm. Callers passing a nil P may be preemptible, so we must
   // disable preemption before acquiring a P from pidleget below.
   mp := acquirem()  // 保证在此期间不会发生栈扩展
   lock(&sched.lock)
   if _p_ == nil {   // 没有指定p,那么需要从空闲队列中取一个p
      _p_ = pidleget()
      if _p_ == nil {// 如果没有空闲的p,直接返回
         unlock(&sched.lock)
         if spinning {
            // The caller incremented nmspinning, but there are no idle Ps,
            // so it's okay to just undo the increment and give up.
            if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
               throw("startm: negative nmspinning")
            }
         }
         releasem(mp)
         return
      }
   }
   nmp := mget()  // 如果有空闲的p,那么取出一个空闲的m
   if nmp == nil {// 如果没有空闲的m,那么调用newm创建一个,然后返回
      // No M is available, we must drop sched.lock and call newm.
      // However, we already own a P to assign to the M.
      //
      // Once sched.lock is released, another G (e.g., in a syscall),
      // could find no idle P while checkdead finds a runnable G but
      // no running M's because this new M hasn't started yet, thus
      // throwing in an apparent deadlock.
      //
      // Avoid this situation by pre-allocating the ID for the new M,
      // thus marking it as 'running' before we drop sched.lock. This
      // new M will eventually run the scheduler to execute any
      // queued G's.
      id := mReserveID()
      unlock(&sched.lock)

      var fn func()
      if spinning {
         // The caller incremented nmspinning, so set m.spinning in the new M.
         fn = mspinning
      }
      newm(fn, _p_, id)
      // Ownership transfer of _p_ committed by start in newm.
      // Preemption is now safe.
      releasem(mp)
      return
   }
   unlock(&sched.lock)
   if nmp.spinning {
      throw("startm: m is spinning")
   }
   if nmp.nextp != 0 {
      throw("startm: m has p")
   }
   if spinning && !runqempty(_p_) {
      throw("startm: p has runnable gs")
   }
   // The caller incremented nmspinning, so set m.spinning in the new M.
   nmp.spinning = spinning
   nmp.nextp.set(_p_)
   notewakeup(&nmp.park) // 如果有空闲的m,则唤醒这个m
   // Ownership transfer of _p_ committed by wakeup. Preemption is now
   // safe.
   releasem(mp)
}
startmwakep
goroutine
goroutinegoroutinegoroutine
goroutinefindrunnable
func findrunnable() (gp *g, inheritTime bool) {
   _g_ := getg()

top:
   _p_ := _g_.m.p.ptr()
   ...

   // local runq
   // 再从本地队列找找
   if gp, inheritTime := runqget(_p_); gp != nil {
      return gp, inheritTime
   }

   // global runq
   // 再看看全局队列
   if sched.runqsize != 0 {
      lock(&sched.lock)
      gp := globrunqget(_p_, 0)
      unlock(&sched.lock)
      if gp != nil {
         return gp, false
      }
   }

   ...

   // Spinning Ms: steal work from other Ps.
   //
   // Limit the number of spinning Ms to half the number of busy Ps.
   // This is necessary to prevent excessive CPU consumption when
   // GOMAXPROCS>>1 but the program parallelism is low.
   procs := uint32(gomaxprocs)
   if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
      if !_g_.m.spinning {
         _g_.m.spinning = true
         atomic.Xadd(&sched.nmspinning, 1)
      }

      gp, inheritTime, tnow, w, newWork := stealWork(now) // 调用stealWork盗取goroutine
      now = tnow
      if gp != nil {
         // Successfully stole.
         return gp, inheritTime
      }
      if newWork {
         // There may be new timer or GC work; restart to
         // discover.
         goto top
      }
      if w != 0 && (pollUntil == 0 || w < pollUntil) {
         // Earlier timer to wait for.
         pollUntil = w
      }
   }

   ...

   // return P and block
   // 上面的窃取没有成功,那么解除m和p的绑定,摒弃娥江p放到空闲队列,然后去休眠
   lock(&sched.lock)
   if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
      unlock(&sched.lock)
      goto top
   }
   if sched.runqsize != 0 {
      gp := globrunqget(_p_, 0)
      unlock(&sched.lock)
      return gp, false
   }
   if releasep() != _p_ {
      throw("findrunnable: wrong p")
   }
   pidleput(_p_)
   unlock(&sched.lock)

   ...
      _g_.m.spinning = false // m即将睡眠,状态不再是spinning
      if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
         throw("findrunnable: negative nmspinning")
      }

   ...
   stopm() // 休眠
   goto top
}
goroutinewakepstartm
stealWorkstopm
func stopm() {
   _g_ := getg()

   if _g_.m.locks != 0 {
      throw("stopm holding locks")
   }
   if _g_.m.p != 0 {
      throw("stopm holding p")
   }
   if _g_.m.spinning {
      throw("stopm spinning")
   }

   lock(&sched.lock)
   mput(_g_.m)         // 把m放到sched.midle空闲队列
   unlock(&sched.lock)
   mPark()
   acquirep(_g_.m.nextp.ptr()) // 绑定这个m和其下一个p,这里没有看懂为啥这么操作
   _g_.m.nextp = 0
}


func mPark() {
   gp := getg()
   notesleep(&gp.m.park) // 进入睡眠状态
   noteclear(&gp.m.park)
}
stopmnotesleepnotego runtimenotesleepnotewakeup

小结

goroutinewakep非main goroutine
mstartmstartstartmschedulegoroutinegoroutinegoroutine

如果是去唤醒工作协程,唤醒后会在休眠的地方开始,重新进行窃取。

窃取到工作协程后,就会去执行,然后就会因为各种原因重新开始调度循环。

3. 主动挂起

Gogoroutinechannelgoroutine

3.1 协程挂起

channel
v := <- ch
v, ok := <- ch
chanrecv1chanrecv2
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
   chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
   _, received = chanrecv(c, elem, true)
   return
}
chanrecv
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   ...
   
   c.recvq.enqueue(mysg) // 将这个goroutine放到channel的recv的queue中
   
   atomic.Store8(&gp.parkingOnChan, 1)
   // 挂起这个goroutine
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

   ...
}
chanrecvgoroutinechannelrecvqueuegoparkgoroutine
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
   if reason != waitReasonSleep {
      checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
   }
   mp := acquirem()
   gp := mp.curg
   status := readgstatus(gp)
   if status != _Grunning && status != _Gscanrunning {
      throw("gopark: bad g status")
   }
   mp.waitlock = lock
   mp.waitunlockf = unlockf
   gp.waitreason = reason
   mp.waittraceev = traceEv
   mp.waittraceskip = traceskip
   releasem(mp)
   // can't do anything that might move the G between Ms here.
   mcall(park_m)
}
goparkmcallgoroutineg0park_m
// park continuation on g0.
func park_m(gp *g) {
   _g_ := getg()

   if trace.enabled {
      traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
   }

   casgstatus(gp, _Grunning, _Gwaiting)
   dropg()

   if fn := _g_.m.waitunlockf; fn != nil {
      ok := fn(gp, _g_.m.waitlock)
      _g_.m.waitunlockf = nil
      _g_.m.waitlock = nil
      if !ok {
         if trace.enabled {
            traceGoUnpark(gp, 2)
         }
         casgstatus(gp, _Gwaiting, _Grunnable)
         execute(gp, true) // Schedule it back, never returns.
      }
   }
   schedule()
}
park_mgoroutine_Gwaitinggoroutinechanneldropggmschedule
goroutine

3.2 协程唤醒

goroutinechannel
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   ...

   if sg := c.recvq.dequeue(); sg != nil {
      // Found a waiting receiver. We pass the value we want to send
      // directly to the receiver, bypassing the channel buffer (if any).
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }

   ...
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   ...
   goready(gp, skip+1)
}
channelsendgoready
func goready(gp *g, traceskip int) {
   systemstack(func() {
      ready(gp, traceskip, true)
   })
}

func ready(gp *g, traceskip int, next bool) {
   if trace.enabled {
      traceGoUnpark(gp, traceskip)
   }

   status := readgstatus(gp)

   // Mark runnable.
   _g_ := getg()
   mp := acquirem() // disable preemption because it can be holding p in a local var
   if status&^_Gscan != _Gwaiting {
      dumpgstatus(gp)
      throw("bad g->status in ready")
   }

   // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
   casgstatus(gp, _Gwaiting, _Grunnable)
   runqput(_g_.m.p.ptr(), gp, next)
   wakep()
   releasem(mp)
}
readywakepgoroutinegoroutine

到这里,一个被挂起的协程也就被唤醒了。

4. 小结

channel
您可能感兴趣的文章: