本文深入分析Go调度原理和实现,全文包含的主要内容有:Go程序是怎么运行起来的,经历了哪些流程,调度G的策略和时机,程序是如何在执行runtime代码与用户代码之间来回切换的。文章内容很长,感兴趣的同学可以收藏慢慢看。本文中分析的代码是Go1.14版本,涉及到的文件都在runtime包下。

对于编译性语言,需要将源代码文件编译成一个可执行程序,然后该执行程序被操作系统加载到内存开始运行。对于Go语言来说,就是我们编写的.go文件,通过编译、汇编和链接产生一个可执行的二进制文件。这个二进制文件也称为程序,当它被操作系统加载到内存后,会产生一个进程,进程就是可执行程序的一个实例。程序从加载到被执行这个过程概括起来有以下几个阶段

  1. 操作系统把磁盘上的可执行程序读入到内存中
  2. 创建可执行程序的实例进程,并为进程创建一个主线程,同时为主线程分配栈空间
  3. 将用户的设置的环境列表(environment list)和输入的命令行参数拷贝到主线程的栈中
  4. 把进程(或线程)加入到操作系统的运行队列等待被调度执行

linux系统中典型的进程内存结构如下所示,当程序刚启动执行时,会从arg/environ下面的主线程栈开始执行。随着函数调用,栈向下生长,当函数调用完毕,又主动释放占用的内存空间。

Go程序入口

我们先通过一个例子看如何找到一个Go可执行程序的入口,下面的代码保存在main.go文件中,运行 go build main.go默认生成可执行文件名称为main。然后使用gdb调试可以执行文件main.

package main

import "fmt"

func main() {
 fmt.Println("hello world")
}

执行gdb main,进入程序调试交互窗口,输入info files可以看到程序的入口Entry point: 0x45d990,这里是程序的入口地址,然后在这个地址这里下一个断点,b *0x45d990, 最后执行r,定位到源代码的入口地方在_rt0_adm64_linux()。在go源码目录下grep -rn "_rt0_adm64_linux" 可以定位到该函数在runtime包中rt0_linux_amd64.s中,该文件中的代码是汇编语言。_rt0_amd64_linux执行的指令是跳转到_rt0_amd64符合的位置。

TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8
 JMP _rt0_amd64(SB)
    
TEXT _rt0_amd64(SB),NOSPLIT,$-8
    // 将程序的参数argv地址拷贝到DI寄存器中
 MOVQ 0(SP), DI // argc
 // 将程序参数argv的地址拷贝到SI寄存器中
 LEAQ 8(SP), SI // argv
 // 跳转到runtime·rt0_go符合的位置
 JMP runtime·rt0_go(SB)

调度器的初始化

前面我们通过一个例子找到Go程序的入口,下面我们看Go调度器是如何初始化的。Go程序调度模型也称为GMP模型,调度器初始化也是就核心数据结构G、M、P是怎么初始化的,以及它们之间的关系。对应到代码中也就是G0、M0和P的初始化。G0是一种特殊的G,它运行的是runtime中的代码,M0是进程启动的第一个线程,也称为主线程。下面通过源码分析调度器初始化的细节。

_rt0_amd64将程序参数argc和argv参数地址拷贝到寄存器之后,跳转到runtime·rt0_go位置执行。下面是runtime·rt0_go函数的源码,它也是汇编语言写的,runtime.rt0_g0函数完成了Go程序启动时的所有初始化工作,函数很长,这里提取了与调度相关的代码,这部分内容是我们需要关心的。

rt0_go函数开始的工作是将寄存器DI和SI的值分别赋值给AX和BX,因为DI和SI中的值是分别是函数参数argc和argv的地址,经过赋值之后,参数的信息也就存储在了AX和BX中。然后栈顶寄存器SP向下移动39字节,并调整SP的位置使它按16字节对齐。最后将argc和argv分别放到SP+16和SP+24字节处的内存中。

TEXT runtime·rt0_go(SB),NOSPLIT,$0
 // 将寄存器DI的值赋值给AX
 MOVQ DI, AX  // argc
 // 将寄存器SI的值赋值给BX
 MOVQ SI, BX  // argv
 SUBQ $(4*8+7), SP  // 2args 2auto
 // 调整栈顶寄存器SP使得它按16字节对齐
 ANDQ $~15, SP
 // 将AX中的内容拷贝都SP+16字节处的内存中,AX中存储的是argc的值
 // 也就是将argc放到SP+16字节处的内存中
 MOVQ AX, 16(SP)
 // 同理,将argv放到SP+24字节处的内存中
 MOVQ BX, 24(SP)

下面是G0的初始化,G0是一个全局的变量,它为runtime代码的运行提供一个栈环境。程序先将G0的地址保存在DI寄存器中,然后栈顶寄存器SP向下移动64*1024-104个字节,即向下大约移动64KB,构造G0的栈空间。最后给G0的stackguard0、stackguard1、stack.lo和stack.hi字段设置初始值,这几个字段与栈的位置,以及栈扩容相关。读到这里,有同学可能有疑问这里直接对G0的字段进行赋值,G0本身是怎么分配出来的?因为G0是一个全局变量,在执行rt0_go函数时已经分配过内存空间了。其他的G的是通过newg函数分配的,在后面的源码分析中我们会看到。这也是G0与其他G不同的一个地方,还有不同是这里G0字段初始化用的是汇编语言,其他的G中的字段的初始化是Go语言完成的。

TEXT runtime·rt0_go(SB),NOSPLIT,$0
    ...
    // 将g0的地址放到DI寄存器中
 MOVQ $runtime·g0(SB), DI
 // 将SP指向的位置向下(低地址空间)移动64*1024-104个字节,然后将该位置的地址赋值给BX
 // 这里是在为g0构造栈空间,g0栈的空间大小约为64KB
 LEAQ (-64*1024+104)(SP), BX
 // 将BX的值拷贝给g0.stackguard0,即 g0.stackguard0=*SP-64*1024+104
 MOVQ BX, g_stackguard0(DI)
 // g0.stackguard1=*SP-64*1024+104
 MOVQ BX, g_stackguard1(DI)
 // g0.stack.lo=*SP-64*1024+104
 MOVQ BX, (g_stack+stack_lo)(DI)
 // g0.stack.hi=SP
 MOVQ SP, (g_stack+stack_hi)(DI)
    ...

G0是一个全局变量

var (
    // 全局m0
 m0           m
    // 全局g0
 g0           g
 raceprocctx0 uintptr
)

执行完成上面的语句之后,G0和栈空间的关系如下图所示

完成了G0的初始化之后,rt0_go开始设置线程的本地存储。本地存储简称TLS(Thread Local Storage),它其实是线程的私有的全局变量。普通的全局变量,如果其中一个线程对其进行了修改,其他线程看到这个变量的值是修改后的值。而线程的私有全局变量则不是这样,某线程对私有全局变量的修改,不会影响到其他线程看到的值,可以理解成每个线程对私有全局变量都有自己的一个副本,某个线程修改的只是自己的副本,并不会对其他线程造成影响。

对应到这里线程本地存储中保存的是g0的值,为啥要保存g0的值呢?因为runtime会启动很多个工作线程执行我们的go协程。每个工作线程m都会绑定一个g0,g0是一种特殊的g,它的功能是对我们用户创建的g进行调度,也就是它运行的代码是runtime中的代码,并不是我们编写的用户逻辑。在m的结构体中有一个g0字段,保存的就是这里的g0.利用本地存储保存g0,可以做到各个m之间不会影响各自的g0,又方便代码编写。

TEXT runtime·rt0_go(SB),NOSPLIT,$0
    ...
    // 下面初始化线程本地存储
    // 将m0的tls字段的地址给到DI寄存器,即 DI=&m0.tls
 LEAQ runtime·m0+m_tls(SB), DI
 // 调用settls设置线程本地存储,settls函数的参数已在DI寄存器中
 CALL runtime·settls(SB)

 // 验证刚才设置的本地线程存储是否可以正常工作
 // 获取段基地址(FS)放入BX寄存器,也就是把m0.tls[1]的地址放入到BX寄存器
 get_tls(BX)
 // 将常数0x123赋值给BX寄存器指向的内存地址
 MOVQ $0x123, g(BX)
 // 将AX设置为m0.tls[0]
 MOVQ runtime·m0+m_tls(SB), AX
 // 在将AX指向的内存中的值与0x123进行比较,通过set-get-compare的形式检查tls是否工作正常
 CMPQ AX, $0x123
 JEQ 2(PC)
 CALL runtime·abort(SB)
ok:
 get_tls(BX)
 // 寄存器CX存储g0的地址
 LEAQ runtime·g0(SB), CX
 // 将g0的地址保存到本地线程存储 m0.tls[0]中
 MOVQ CX, g(BX)
 // 将m0的地址保存到寄存器AX中
 LEAQ runtime·m0(SB), AX

 // save m->g0 = g0
 // 将m0和g0互相绑定
 // m0.g0=g0
 MOVQ CX, m_g0(AX)
 // g0.m=m0
 MOVQ AX, g_m(CX)

 CLD    
 CALL runtime·check(SB)

 
TEXT runtime·settls(SB),NOSPLIT,$32
#ifdef GOOS_android
 SUBQ runtime·tls_g(SB), DI
#else
    // DI寄存器中保存的是m.tls[0]的地址,执行下面的ADDQ之后,将DI寄存器中的值+8
    // 此时DI中的值保存的是m.tls[1]的地址
 ADDQ $8, DI 
    // 将DI寄存器的值拷贝到SI寄存器中,这是arch_prctl系统调用的第二个参数
 MOVQ DI, SI
 // 将值0x1002拷贝到DI寄存器中,这时arch_prctl系统调用的第一个参数
 MOVQ $0x1002, DI // ARCH_SET_FS
 // 将SYS_arch_prctl系统调用编号的值拷贝到AX寄存器中
 MOVQ $SYS_arch_prctl, AX
 // 执行系统调用
 SYSCALL
 CMPQ AX, $0xfffffffffffff001
 JLS 2(PC)
 MOVL $0xf1, 0xf1  // crash
 RET

通过arch_prctl系统调用将m0.tls[1]的地址设为了FS段的段基地址,CPU中有个FS寄存器与之对应,这样以后,工作线程代码可以通过FS寄存器找到m.tls.

前面执行已完成G0的初始化,M0与G0的相互绑定,主线程中可以通过get_tls获取到G0,通过G0的成员字段m可以找到M0,实现了M0和G0与主线程之间的关联。此时主线程、m0、g0以及栈之间的关系如下图所示。

下面看Go进程启动的核心处理流程。整个流程主要分为5个步骤:

  1. 处理操作系统传递过来的参数,即把argc和argv放在当前栈顶SP的位置
  2. 调用runtime.osinit函数,获取CPU的核数,存放在全局变量ncpu中,供后面的调度时使用
  3. 调用runtime.schedinit进行调度的初始化,会对p进行初始化,把m0和allp[0]绑定
  4. 调用runtime.newproc函数,创建出main goroutine,也就是主协程,该g将运行的任务函数是runtime.main函数,并把main goroutine放入与m0绑定的p的本地队列
  5. 调用runtime.mstart启动工作线程m,进入调度系统
TEXT runtime·rt0_go(SB),NOSPLIT,$0
    ...
    // 下面语句处理操作系统传递过来的参数
    // 将argc从内存搬到AX存储器中
 MOVL 16(SP), AX  // copy argc
 // 将argc搬到SP+0的位置,即栈顶位置
 MOVL AX, 0(SP)
 // 将argv从内存搬到AX寄存器中
 MOVQ 24(SP), AX  // copy argv
 // 将argv搬到SP+8的位置
 MOVQ AX, 8(SP)
 CALL runtime·args(SB)
 // 调用osinit函数,获取CPU的核数,存放在全局变量ncpu中,供后面的调度时使用
 CALL runtime·osinit(SB)
 // 调用schedinit进行调度的初始化
 CALL runtime·schedinit(SB)

 // runtime.mainPC是runtime.main函数,将runtine.main的地址拷贝到AX寄存器
 MOVQ $runtime·mainPC(SB), AX  // entry
 // 将AX入栈,AX中存储的是runtime.main函数的地址,也就是下面将要开启新goroutine需要执行的函数
 // newproc的第二个参数
 PUSHQ AX
 // newproc的第一个参数入栈,该值表示runtime.main函数占用的大小,因为runtime.main函数没有入参
 // 所以这里是0
 PUSHQ $0   // arg size
 // 创建main goroutine
 CALL runtime·newproc(SB)
 POPQ AX
 POPQ AX

 // 调用mstart函数,主线程进入调度循环,运行刚刚创建的main goroutine
 CALL runtime·mstart(SB)

    // 上面的mstart函数是不会返回的,如果返回了,说明代码有问题,直接abort
 CALL runtime·abort(SB)
 RET
    
 MOVQ $runtime·debugCallV1(SB), AX
 RET

下面开始分析上面每个函数具体做了哪些事情:

osinit

osinit主要是获取CPU的核数并保持在全局变量ncpu中,linux系统下该函数在 runtime/os_linux.go文件中,该函数的实现与具体的系统架构有关,mac系统该函数在runtime/os_darwin.go文件中

func osinit() {
 // 获得CPU的数量
 ncpu = getproccount()
 physHugePageSize = getHugePageSize()
 osArchInit()
}
schedinit

schedinit是真正的调度函数,完成调度的初始化操作。主要完成以下功能:

  1. 设置最大线程数m的数量为10000,m0结构的初始化
  2. 根据cpu的核数或者设置的GOMAXPROCS完成对应数量p对象的创建和初始化
  3. 完成m0和p(allp[0])的互相绑定,至此,g0与m0, m0与p的互相绑定
func schedinit() {
 // _g_为g0
 // 当前的goroutine为g0,获得g0的地址
 _g_ := getg()
 if raceenabled {
  _g_.racectx, raceprocctx0 = raceinit()
 }
 // 初始化最大默认线程为1万个
 sched.maxmcount = 10000

 tracebackinit()
 moduledataverify()
 // 初始化栈空间
 stackinit()
 mallocinit()
 fastrandinit() 
 // 初始化m0,_g_为g0,所以_g_.m即为g0.m,g0.m也就是m0
 mcommoninit(_g_.m)
 ...
 gcinit()

 sched.lastpoll = uint64(nanotime())
 // 初始化p的数量,初始值为cpu的核数,如果程序设置了GOMAXPROCS,则取
 // GOMAXPROCS环境变量的值
 procs := ncpu
 if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
  procs = n
 }
 // 调整p的数量,创建并初始化所有的p,所有的p保存在全局变量allp中
 if procresize(procs) != nil {
  throw("unknown runnable goroutine during bootstrap")
 }
    ...
}

getg函数功能是从本地存储中获取当前正在运行的g,在源码中没有找到它的实现,查阅资料说它是在编译器中实现的。前面已经g0保存到了本地线程存储中,所以这里getg获取到的g就是g0.

schedinit函数中调用了mcommoninit函数,实现对m0的初始化,mcommoninit函数实现如下,它会设置m0的id字段,每个m有唯一的id标识,并将当前的m插入到全局保存m的链表对象allm中。这个链表是一个单链表,allm始终指向链表头。

func mcommoninit(mp *m) {
 // _g_还是g0
 _g_ := getg()

 if _g_ != _g_.m.g0 {
  callers(1, mp.createstack[:])
 }

 lock(&sched.lock)
 // sched.mnext溢出了
 if sched.mnext+1 < sched.mnext {
  throw("runtime: thread ID overflow")
 }
 // 设置m的id字段,每个m有唯一的id标识
 mp.id = sched.mnext
 // 全局计时器mnext自增1
 sched.mnext++
 // 检查当前存活的m数量是否在不超过10000
 checkmcount()
    
    ...
    
 // allm保存了所有的m对象,它是以链表的形式存储的,通过头插法的方式将当前的
 // m(mp)插入到全局链表对象allm中
 mp.alllink = allm

 // 更新allm的头节点为当前的m对象mp
 atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
 unlock(&sched.lock)
    ...
}

schedinit会调用procresize函数完成所有p的创建和初始化,整个procresize处理比较复杂,因为程序启动后我们可用通过runtime.GOMAXPROCS()函数动态修改p的数量。该函数主要有以下步骤:

  1. 计算当前真正p的数量nprocs,初始化保存所有p的全局变量allp,allp为一个切片,它里面保存的对象为*p类型,利用make初始化allp.
  2. 循环创建nprocs个p对象并初始化它,并把*p对象保存到全局变量allp切片中.
  3. 将m0和allp[0]互相绑定,并将allp[0]状态设置为_Prunning
  4. 将allp[1:nprocs]中的p放入到全局变量sched的pidle空闲队列中

上述步骤描述的是最基本的情况,即没有通过runtime.GOMAXPROCS()函数对p的数量进行调整。如果对p的数量进行了调整,会做一些其他处理逻辑,例如将多余的p的进行销毁,如果p的本地队列中已有等待运行的g,从全局空闲m队列中获取一个m将它与p进行绑定等。将所有可运行的p通过p.link串联起来,构成了一个链表结构,函数返回可运行p链表头节点。

func procresize(nprocs int32) *p {
 // 初始时gomaxprocs的值为0
 old := gomaxprocs
 if old < 0 || nprocs <= 0 {
  throw("procresize: invalid arg")
 }
 if trace.enabled {
  traceGomaxprocs(nprocs)
 }

 // 更新统计信息
 now := nanotime()
 if sched.procresizetime != 0 {
  sched.totaltime += int64(old) * (now - sched.procresizetime)
 }
 sched.procresizetime = now

 // 初始时allp中没有p,即len(allp)=0, nprocs显然大于len(allp)
 // 会进入这里的if逻辑
 if nprocs > int32(len(allp)) {
  lock(&allpLock)
  if nprocs <= int32(cap(allp)) {
   // 调整allp的len为nprocs, allp的cap不变
   allp = allp[:nprocs]
  } else {
   // 初始化时会创建新的切片保存p
   nallp := make([]*p, nprocs)
            
   // 将旧切片allp中的p拷贝到新的切片nallp中
   copy(nallp, allp[:cap(allp)])
   // 将allp替换为新创建的切片nallp
   allp = nallp
  }
  unlock(&allpLock)
 }

 // 开始时,old为0,即创建初始化所有的p,保存到allp中
 for i := old; i < nprocs; i++ {
  pp := allp[i]
  // 如果pp非空,复用已有的pp,否则通过new新创建一个p对象
  if pp == nil {
   pp = new(p)
  }
  // 对pp字段进行初始化设置
  pp.init(i)
  atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
 }
 // _g_是g0
 _g_ := getg()
 // 初始时m都还未绑定p,不会进入到这个分支中,程序启动之后,在设置GOMAXPROCS
 // 有可能进入下面的分支
 if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
  // 继续保持之前的p为_Prunning状态
  _g_.m.p.ptr().status = _Prunning
  _g_.m.p.ptr().mcache.prepareForSweep()
 } else { // 初始化的时候会进入到这个分支
  // 但初始化的时候_g_.m(g0.m)也就是m0还是未绑定p 所以不会走到这个if里面
  if _g_.m.p != 0 {
   if trace.enabled {
    traceGoSched()
    traceProcStop(_g_.m.p.ptr())
   }
   _g_.m.p.ptr().m = 0
  }
  // 清理m0.p和mcache
  _g_.m.p = 0
  _g_.m.mcache = nil
  // 从全局p队列allp中取第一个p与m0进行绑定
  p := allp[0]
  p.m = 0
  // 设置p的状态为_Pidle
  p.status = _Pidle
  // 将p就是allp[0]与m0互相绑定,并将p的状态修改为_Prunning
  acquirep(p)
  if trace.enabled {
   traceGoStart()
  }
 }

 // 将多余的p进行销毁
 for i := nprocs; i < old; i++ {
  p := allp[i]
  p.destroy()
 }

 // Trim allp.
 if int32(len(allp)) != nprocs {
  lock(&allpLock)
  allp = allp[:nprocs]
  unlock(&allpLock)
 }

 var runnablePs *p
 // 将除allp[0](与m0绑定的p)之外的所有的p放入到空闲链表中
 for i := nprocs - 1; i >= 0; i-- {
  p := allp[i]
  // 如果当前的p是allp[0],跳过,因为它在前面已经绑定到m0上了
  if _g_.m.p.ptr() == p {
   continue
  }
  // 设置p的状态为_Pidle
  p.status = _Pidle
  // 判断当前的p的本地队列中是不是没有g,初始时p中是没有g的
  if runqempty(p) {
   // p的本地队列没有g,将其放入到全局空闲p队列(sched.pidle)中
   pidleput(p)
  } else {
   // 从全局的空闲m队列(sched.midle)中拿出一个m,将这个m绑定到p上
   p.m.set(mget())
   // 将所有可运行的p通过p.link串联起来,构成了一个链表结构
   // 例如 allp[1].link --> allp[2]  allp[2].link-->allp[3]
   p.link.set(runnablePs)
   runnablePs = p
  }
 }
 stealOrder.reset(uint32(nprocs))
 var int32p *int32 = &gomaxprocs 
 // gomaxprocs设置为nprocs
 atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
 // 返回可运行p链表头节点
 return runnablePs
}

至此,已完成g0和m0的互相绑定,m0和allp[0]的互相绑定,nprocs个p的创建与初始化。得到调度器各个组件的关系如下图所示。

main goroutine的创建与运行

前面说了Go进程启动的核心处理流程有5个步骤,main goroutine的创建对应的是里面的第4个步骤,这里为了看起来方便,将rt0_go中main goroutine创建逻辑重新摘录出来。下面的汇编语句做的一件事就是调用newproc函数创建一个新的goroutine用来执行mainPC所对应的runtime.main函数。而runtime.main函数又会调用我们用户编写的package main包中main函数,这样我们的用户代码就可以被执行到了。

TEXT runtime·rt0_go(SB),NOSPLIT,$0
    ...
 // runtime.mainPC是runtime.main函数,将runtine.main的地址拷贝到AX寄存器
 MOVQ $runtime·mainPC(SB), AX  // entry
 // 将AX入栈,AX中存储的是runtime.main函数的地址,也就是下面将要开启新goroutine需要执行的函数
 // newproc的第二个参数
 PUSHQ AX
 // newproc的第一个参数入栈,该值表示runtime.main函数占用的大小,因为runtime.main函数没有入参
 // 所以这里是0
 PUSHQ $0   // arg size
 // 创建main goroutine
 CALL runtime·newproc(SB)
    ...

下面是newproc函数的实现,可以看到它有两个入参,第二个参数fn表示新创建出来的goroutine将从fn这个函数开始执行,第一个参数siz表示函数fn的参数占用的内存字节数。这里举个例子,方便我们理解,下面go add(1,2)会调用newproc创建一个协程,参数fn=&funcval{fn:add},siz=16. fn是一个结构体,它里面的fn字段就是新建协程调用的函数名称,对应到这里就是add函数。siz为add函数的参数占用的内存,这里add函数的参数是两个int64,所以占用的内存大小为16个字节,即siz为16.

需要传递函数参数大小siz给newproc函数的原因是,newproc函数将创建一个新的goroutine来执行fn函数,这个新创建的goroutine与当前程序正在执行的goroutine会使用不同的栈,它在运行的时候,需要知道fn函数参数的大小,而newproc函数自己不知道需要拷贝多少个字节的参数数据到新创建的goroutine栈上,所以用参数siz指明要拷贝的字节数。

func add(a, b int64) int64 {
 return a + b
}

func main() {
 go add(1, 2)
}

newproc函数在获取到fn函数的第一个参数地址(argp)后,调用systemstack切换到g0栈执行newproc1函数,并将argp参数传递给newproc1。

func newproc(siz int32, fn *funcval) {
 // argp指向第一个fn的第一个参数,siz说明fn参数的个数
 // 函数调用参数入栈顺序是从右向左
 argp := add(unsafe.Pointer(&fn), sys.PtrSize)
 // 获取正在运行的g,初始运行是main goroutine,此时的gp就是m0中的g0
 gp := getg()
 // 将调用newproc时由call指令压栈的函数返回地址,初始时,pc的值是
 // CALL runtime.newproc(SB)指令后面的指令的地址
 pc := getcallerpc()
 // systemstack表示切换到g0栈运行,初始时执行到这里的时候已经在g0栈,
 // 所以直接调用newproc1
 systemstack(func() {
  newproc1(fn, argp, siz, gp, pc)
 })
}

newproc1函数的主要功能是从堆上分配一个g对象newg,并为该对象分配一个大小为2048字节的栈空间,其次初始化它的stack成员变量,newg.stack.hi和newg.stack.lo分别指向栈的栈底(高地址)和栈顶(低地址)位置, 然后将newg需要执行函数的参数从newproc函数栈拷贝到newg栈中,最后将newg的状态修改为_Grunnable,并把它加入到等待运行的g队列中。

func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) {
 // _g_为g0,初始时g0也是m0.go
 _g_ := getg()

 if fn == nil {
  _g_.m.throwing = -1 
  throw("go of nil func value")
 }
 acquirem() 
 siz := narg
 siz = (siz + 7) &^ 7

 // _StackMin=2048, sys.RegSize=8
 // siz>=2048-4*8-8=2048-40=2008

 // 检查参数的大小不能超过2048-4*8-8=2048-40=2008 Bytes
 if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
  throw("newproc: function arguments too large for new goroutine")
 }
 // 获取与当前g0(_g_)关联的m的绑定的p,初始时,_g_.m也就是m0
 // m0绑定的p为allp[0]
 _p_ := _g_.m.p.ptr()
 // 从本地队列_p_中获取一个g,如果本地队列中没有g,从全局队列中获取
 newg := gfget(_p_)
 // 如果从_p_的本地队列和全局队列中都没有获取到g,则新创建一个g
 // 初始时,是没有g的,所以会进入下面的if逻辑,新建一个main goroutine对象
 if newg == nil {
  // 申请创建一个新的g,栈的大小为2KB
  newg = malg(_StackMin)
  casgstatus(newg, _Gidle, _Gdead)
  // 将当前新创建的newg加入到全局g列表allgs中
  allgadd(newg) 
 }
 if newg.stack.hi == 0 {
  throw("newproc1: newg missing stack")
 }

 if readgstatus(newg) != _Gdead {
  throw("newproc1: new g is not Gdead")
 }

 // 设置一定的保留空间
 totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize 
 // 保留空间大小按系统对齐
 totalSize += -totalSize & (sys.SpAlign - 1)
 // 确定sp的位置
 sp := newg.stack.hi - totalSize
 spArg := sp
 if usesLR {
  *(*uintptr)(unsafe.Pointer(sp)) = 0
  prepGoExitFrame(sp)
  spArg += sys.MinFrameSize
 }
 if narg > 0 {
  // 将参数argp从运行newproc函数的栈拷贝到要运行g的栈中
  // 初始时就是从m0.g0栈拷贝参数到新运行g的栈
  memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
  if writeBarrier.needed && !_g_.m.curg.gcscandone {
   f := findfunc(fn.fn)
   stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
   if stkmap.nbit > 0 {
    bv := stackmapdata(stkmap, 0)
    bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)
   }
  }
 }
 // 清理掉newg中sched中内容,下面会对sched重新赋值
 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
 // 设置newg的sched各字段的值,调度器根据这些字段把newg调度到CPU上运行
 newg.sched.sp = sp
 newg.stktopsp = sp
 // 设置程序计数器pc,当newg被调度起来运行时,从这个地址开始执行指令
 // pc的值为goexit函数偏移1(sys.PCQuantum)的位置
 newg.sched.pc = funcPC(goexit) + sys.PCQuantum 
 // 设置sched g的地址为当前g的地址
 newg.sched.g = guintptr(unsafe.Pointer(newg))
 // 调整sched成员和newg的栈
 gostartcallfn(&newg.sched, fn)
 newg.gopc = callerpc
 newg.ancestors = saveAncestors(callergp)
 // 设置newg的startpc为fn.fn,该字段用于函数调用的traceback和栈收缩
 newg.startpc = fn.fn
 if _g_.m.curg != nil {
  newg.labels = _g_.m.curg.labels
 }
 if isSystemGoroutine(newg, false) {
  atomic.Xadd(&sched.ngsys, +1)
 }
 // 设置g的状态为_Grunnable,意味着该g可以运行了
 casgstatus(newg, _Gdead, _Grunnable)

 if _p_.goidcache == _p_.goidcacheend {
  _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
  _p_.goidcache -= _GoidCacheBatch - 1
  _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
 }
 // 设置g的唯一标识goid
 newg.goid = int64(_p_.goidcache)
 _p_.goidcache++
 if raceenabled {
  newg.racectx = racegostart(callerpc)
 }
 if trace.enabled {
  traceGoCreate(newg, newg.startpc)
 }
 // 将newg加入到队列中,具体尝试放入队列的顺序为_p_.runnext、本地队列、全局队列
 // 如果_p_.runnext为空直接将newg放在_p_.runnext位置,如果_p_.runnext不为空
 // 将newg与_p_.runnext交换,原来_p_.runnext的g尝试放入p的本地队列runq中
 // 如果本地队列满了,会将runq中一半的元素转移到全局队列中
 // 初始时,_p_.runnext是空的,所以直接放在这里
 runqput(_p_, newg, true)

 // 唤醒一个m来运行g,初始时不会执行,因为mainStarted为false,即runtime包中的main函数还未执行
 if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
  wakep()
 }
 releasem(_g_.m)
}

newproc1函数中有两条非常关键的语句,这里拿出来进行单独说明. 我们先来看newg.sched字段的功能,查看结构体g的定义,g的sched字段是gobuf类型,它保存的是goroutine的调度信息,重点就是保存几个关键寄存器的值。Go程序goroutine调度的本质是一组CPU寄存器和执行流的切换,当前我们执行某个g的时候,将BP,SP等寄存器设置为合适的值,将程序计数器pc指向g中的函数地址,这样g就被调度运行起来了,当要切换出当前g换其他g运行的时候,需要将当前g的CPU寄存器等信息保存到内存中,以供下次运行该g的时候,直接将保存到内存中的信息恢复到寄存器中又可以运行了。

可以看到这里将newg.sched.pc的值设置为了goexit函数的第二条指令,sys.PCQuantum的值为1,funcPC(goexit) + sys.PCQuantum指向的是goexit的第二条指令。为什么要这样设置呢?理论上不应该是设置为fn.fn吗,下面通过分析gostartcallfn函数一探究竟。

    ...
 newg.sched.pc = funcPC(goexit) + sys.PCQuantum 
    ...
 gostartcallfn(&newg.sched, fn)

gostartcallfn调整sched成员和newg的栈,核心处理调用的是gostartcall函数。gostartcall函数将栈顶寄存器SP向下移动一个指针的位置,然后将goexit+1即goexit的第二条指令。然后将buf.pc即newg.sched.pc重新设为fn(runtime.main函数)。相当于将goexit放到newg的栈顶,伪造成newg是被goeixt函数调用的,当newg中的fn函数执行完成之后,返回到goexit继续执行,做一些清理的操作。

func gostartcallfn(gobuf *gobuf, fv *funcval) {
 var fn unsafe.Pointer
 // 初始化时fn为runtime.main
 if fv != nil {
  fn = unsafe.Pointer(fv.fn)
 } else {
  fn = unsafe.Pointer(funcPC(nilfunc))
 }
 gostartcall(gobuf, fn, unsafe.Pointer(fv))
}

func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
 // newg的栈顶,当前newg栈上只有fn函数的参数,sp指向的是fn的第一个参数
 sp := buf.sp
 if sys.RegSize > sys.PtrSize {
  sp -= sys.PtrSize
  *(*uintptr)(unsafe.Pointer(sp)) = 0
 }
 // 为返回地址预留8字节空间
 sp -= sys.PtrSize
 // buf.pc保存的是goexit+1的地址,将buf.pc赋值给sp指向的内存,下面真正赋值给buf.pc的是fn函数,
 // 初始时是runtime包中的main函数,这样使得fn执行完后返回到goexit继续执行,从而完成清理工作
 // 将fn伪装成是被goexit函数调用的
 *(*uintptr)(unsafe.Pointer(sp)) = buf.pc
 // 重新设置newg的栈顶寄存器
 buf.sp = sp
 // 真正将newg的pc值设置为fn函数,等到newg被调度起来运行时,调度器会将buf.pc放入cpu的ip寄存器
 // 即开始执行fn函数逻辑
 buf.pc = uintptr(fn)
 buf.ctxt = ctxt
}

结构体gobuf用于保存goroutine的调度信息,主要包含几个关键CPU寄存器的值

type gobuf struct {
 // 栈指针,保存rsp寄存器的值
 sp uintptr
 // 程序计数器,保存rip寄存器的值
 pc uintptr
 // 与gobuf关联的goroutine地址
 g    guintptr
 ctxt unsafe.Pointer
 // 系统调用的返回值
 ret sys.Uintreg
 lr  uintptr
 bp  uintptr 
}

分析完了newproc1整理出来流程,现在再后头来看将newg加入到等待运行队列函数runqput实现细节。

runqput将g放入到p的本地队列中,传入参数有3个,_p_为g将放入哪个p的本地队列,gp为待放入的g, next为true表示将gp放入到_p_的runnext中,如果_p_的runnext中已经有g,会进行替换,之前的g会放入到_p_的本地队尾,如果next为false,直接将gp放入到本地队列_p_的队尾.如果本地队列满了,将当前P中前len(p)/2个G批量放入到全局队列中。

func runqput(_p_ *p, gp *g, next bool) {
 if randomizeScheduler && next && fastrand()%2 == 0 {
  next = false
 }

 // 如果next为true,将gp放入到_p_.runnext中,这个过程通过cas保证原子性
 if next {
 retryNext:
  // 对_p_.runnext进行备份
  oldnext := _p_.runnext
  // 通过cas操作,将gp和oldnext进行交换
  if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
   goto retryNext
  }
  // 如果oldnext为0,说明_p_.runnext之前没有g,现在已放入完毕,直接返回
  if oldnext == 0 {
   return
  }
 
  // 将之前的g赋值给gp,下面会将gp放入_p_的本地队列
  gp = oldnext.ptr()
 }

retry:
 // h为本地队列的队头
 h := atomic.LoadAcq(&_p_.runqhead)
 // t为本地队列的队尾
 t := _p_.runqtail
 // t-h小于队列的长度,即本地队列还没有满,放到本地队列的尾部
 if t-h < uint32(len(_p_.runq)) {
  _p_.runq[t%uint32(len(_p_.runq))].set(gp)
  atomic.StoreRel(&_p_.runqtail, t+1) 
  return
 }
 // 走到这里说明本地队列满了,放到全局队列, 放入到全局队列并不是一个,而是将当前P中前len(p)/2个G批量放入到global queue中
 if runqputslow(_p_, gp, h, t) {
  return
 }
 
 // 走到这里说明往全局队列中没有放成功,没有成功的原因是,本地队列没有满,所以进一步重试,
 // 尝试放入本地队列
 goto retry
}

runqputslow将_p_本地队列中一半数量的g(另外加上gp)移动到全局队列中,在上面的runqput函数处理逻辑中,如果本地队列满了,会执行runqputslow.

func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
 // 存储要移动的g,移动的数量为本地队列的一半+1个,这里的1是为传入的gp分配一个位置
 var batch [len(_p_.runq)/2 + 1]*g

 n := t - h
 n = n / 2
 // 确保n为本地队列的长度的一半
 if n != uint32(len(_p_.runq)/2) {
  throw("runqputslow: queue is not full")
 }

 // 将队头的n个g存储到batch中
 for i := uint32(0); i < n; i++ {
  batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
 }
 // 原子更新_p_队列队头的位置,队头向后移动n个位置
 if !atomic.CasRel(&_p_.runqhead, h, h+n) { 
  return false
 }
 // 将传入的gp放入到batch的末尾
 batch[n] = gp

 // 如果要随机化调度,打乱batch中元素的顺序
 if randomizeScheduler {
  for i := uint32(1); i <= n; i++ {
   j := fastrandn(i + 1)
   batch[i], batch[j] = batch[j], batch[i]
  }
 }

 // 将batch中的g串起来,构成一个链表,因为batch中有n+1元素
 // 所以这里循环n次,就将n+1个组成了链表结构
 for i := uint32(0); i < n; i++ {
  batch[i].schedlink.set(batch[i+1])
 }
 var q gQueue
 // 将链表的头节点和尾节点加入到q中,方便一次性将batch中的g加入到全局队列
 q.head.set(batch[0])
 q.tail.set(batch[n])

 lock(&sched.lock)
 // 将g一次性批量放入全局队列
 globrunqputbatch(&q, int32(n+1))
 unlock(&sched.lock)
 return true
}

func globrunqputbatch(batch *gQueue, n int32) {
 // 将batch一次性放入sched.runq中
 sched.runq.pushBackAll(*batch)
 // 更新sched.runq中g的数量
 sched.runqsize += n
 *batch = gQueue{}
}

// 将q2链表中的g加入到全局队列中
func (q *gQueue) pushBackAll(q2 gQueue) {
 if q2.tail == 0 {
  return
 }
 q2.tail.ptr().schedlink = 0
 // 直接将全局队列的队尾连接到q2的头节点,这样q2就加入了全局g链表中
 if q.tail != 0 {
  q.tail.ptr().schedlink = q2.head
 } else {
  q.head = q2.head
 }
 // 更新全局链表尾节点的位置,指向q2的尾部
 q.tail = q2.tail
}

通过流程图的形式将runqput处理逻辑描述出来,得到如下流程图。

经过上面的处理逻辑,main goroutine(newg)就被创建了出来,此时主线程、g0栈和main goroutine栈之间的关系如下图所示。

mstart

目前已分析完Go进程启动的核心处理流程的前四个步骤,现在来看最后一个步骤:调用runtime.mstart启动工作线程m,进入调度系统.

mstart函数设置了g的堆栈保护字段stackguard0和stackguard1之后,直接调用了函数mstart1。

func mstart() {
 // _g_为g0
 _g_ := getg()

 // 初始时,g0的stack.lo已完成初始化,它不等于0
 osStack := _g_.stack.lo == 0
 if osStack {
  size := _g_.stack.hi
  if size == 0 {
   size = 8192 * sys.StackGuardMultiplier
  }
  _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
  _g_.stack.lo = _g_.stack.hi - size + 1024
 }

 // 初始化堆栈保护
 _g_.stackguard0 = _g_.stack.lo + _StackGuard
 _g_.stackguard1 = _g_.stackguard0
 mstart1()
    ...
 mexit(osStack)
}

mstart1会检测当前的g是否是g0,设置工作线程m的备用信号堆栈和信号掩码,如果当前的m是m0,做一些特殊处理,设置系统信号量的处理函数,检测m是否有起始任务函数,如果有就执行它,例如sysmon函数,获取一个p并将它与m进行绑定,最后执行调度程序函数schedule。可以看到mstart1还没有进行真正的调度,而是为调度做一些处理工作。

func mstart1() {
 // 初始时,_g_为m0中的g0,其他情况下_g_也是各个m的g0
 _g_ := getg()

 // 确保g是系统栈上的g0,调度器只在g0上直执行
 if _g_ != _g_.m.g0 {
  throw("bad runtime·mstart")
 }

 // getcallerpc()获取mstart1执行完的返回地址
 // getcallersp()获取调用mstart1时的栈顶地址
 // 保存g0调度信息
 save(getcallerpc(), getcallersp())
 asminit()
 // 初始化m,主要设置线程的备用信号堆栈和掩码
 minit()

 // 如果当前的m是m0,执行mstartm0操作
 if _g_.m == &m0 {
  // 对于初始m,需要设置系统信号量的处理函数
  mstartm0()
 }
 // 对于m0是没有mstartfn函数,对其他m如果有起始任务函数,则需要执行,比如sysmon函数
 if fn := _g_.m.mstartfn; fn != nil {
  fn()
 }

 // 如果当前g的m不是m0,它现在还没有p,需要获取一个p, m0已经绑定了allp[0],所以不用关心m0
 if _g_.m != &m0 {
  // 完成_g_.m和p的互相绑定
  acquirep(_g_.m.nextp.ptr())
  _g_.m.nextp = 0
 }
 // 调用调度函数schedule,该函数不会返回
 schedule()
}

调度策略

schedule函数的核心功能就是竭尽所能找到一个可运行的g,然后调用execute执行g中的函数。查找g的算法流程如下:

  1. 如果当前真正进行垃圾回收(GC)操作,需要暂停所有程序的运行(STW),调用gcstopm休眠当前的工作线程m
  2. 保证公平性,每进行61次调度时优先从全局队列中获取待运行的g
  3. 从p的本地队列中获取,优先从p.runnext中获取,没有再从p.runq中获取
  4. 调用findrunnable从其他p中偷取g,如果也没有偷取到的话,将当前的m休眠,等待被唤醒
func schedule() {
 // _g_为每个工作线程m对应的g0,初始时_g_为m0的g0
 _g_ := getg()
    ...
 var gp *g
 var inheritTime bool
    ...
 if gp == nil && gcBlackenEnabled != 0 {
  gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
  tryWakeP = tryWakeP || gp != nil
 }
 if gp == nil {
  // 每进行61次调度时优先从全局队列中获取待运行的g, 这样做是为了保证调度的公平性
  // 防止全局队列中的g得不到调度饿死
  if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
   // 先对全局队列加锁然后从中获取g
   lock(&sched.lock)
   gp = globrunqget(_g_.m.p.ptr(), 1)
   unlock(&sched.lock)
  }
 }
 if gp == nil {
  // 从与m绑定的p的本地队列中获取g
  gp, inheritTime = runqget(_g_.m.p.ptr())
 }
 if gp == nil {
  // 走到这里说明,从全局队列或p的本地队列中都没有获取到g,则调用findrunnable函数
  // 从其他工作线程绑定的p的本地队列中偷取g,如果没有偷取到,也就是当前都没有待运行的g
  // 了,当前的工作线程进入睡眠模式,直到获取到运行的g之后findrunnable函数才会返回
  gp, inheritTime = findrunnable() 
 }
    ...
 // 当前运行的是runtime的代码,函数调用栈使用的是g0的栈空间,调用execute会切换到
 // 用户程序创建的g(gp)的代码和栈空间去运行
 execute(gp, inheritTime)
}

上面介绍了schedule函数整体是怎么找g的流程算法,其实就是GMP调度模型中的调度策略(找G的策略),有3大策略,对应到上面的处理函数分别是globrunqget、runqget和findrunnable。下面分别详细介绍每个策略的实现细节。

从全局队列中获取G

globrunqget从全局队列最多获取max个g,并将获取的max-1个g放入_p_的本地队列,并将剩下的1个作为返回值给调用方进行调度。该函数是一个通用函数,也就是一次可以从全局队列中搬多个g到本地队列,进行批量搬运。考虑到公平性,搬运的数量有一定限制,将全局队列中g的数量根据p的数量均分,每个p能从全局队列中最多获取到sched.runqsize/gomaxprocs + 1,证公平性每个p都有机会从全局队列中拿到一部分g,同时搬运的数量不能超过本地队列长度的一半(128)。如果取回来的g数量超过本地队列_p_可以容纳的怎么解决呢?可以看到下的runqput函数中如果本地队列满了又放回全局队列,所以g不会丢失。

// globrunqget从全局队列最多获取max个g,并将获取的max-1个g放入_p_的本地队列
// 并将剩下的1个作为返回值给调用方进行调度
func globrunqget(_p_ *p, max int32) *g {
 // 如果全局队列中没有g,直接返回nil
 if sched.runqsize == 0 {
  return nil
 }

 // 将全局队列中g的数量根据p的数量均分,每个p能从全局队列中最多获取到sched.runqsize/gomaxprocs + 1
 // 个g,这样保证公平性,每个p都有机会从全局队列中拿到一部分g
 n := sched.runqsize/gomaxprocs + 1
 // 进一步判断n不能大于全局队列中g的数量,这种情况在gomaxprocs为1的时候会出现n>全局队列中g的数量
 if n > sched.runqsize {
  n = sched.runqsize
 }

 // 调整n的值不超过max,max作为入参的含义是一次性从全局队列取走的g的最大数量
 if max > 0 && n > max {
  n = max
 }
 // 继续调整n的值,n的值不能超过本地队列长度的一半,本地队列_p_.runq为一个256的数组
 // 它的一半为128,即n不能超过128,将_p_.runq作为一个环形数组循环使用的,这里有一种担心
 // 如果取回来的g数量超过本地队列_p_可以容纳的怎么解决呢?可以看到下的runqput函数中如果本地
 // 队列满了又放回全局队列,所以g不会丢失
 if n > int32(len(_p_.runq))/2 {
  n = int32(len(_p_.runq)) / 2
 }
 // 全局队列中的g的数量减少n个
 sched.runqsize -= n

 // 将全局队列的对头的g出队,直接通过函数返回gp,其他的g通过runqput放入_p_的本地队列
 gp := sched.runq.pop()
 n--
 // 循环n次从全局队列中取出n个g并放入本地队列
 for ; n > 0; n-- {
  gp1 := sched.runq.pop()
  // 将从全局队列获取的gp1放入到_p_的本地队列中
  runqput(_p_, gp1, false)
 }
 return gp
}
从P的本地队列中获取G

runqget从_p_的本地队列中取走一个g返回。优先从p的runnext中获取,runnext只保存一个g,当runnext没有g时才会从runq中获取。当循环队列的队头与队尾相同时,表明队列中没有可运行的g。否则,从队头获取一个g返回。因为存在其他P偷取g造成同时访问的情况,所以写了两个for{}死循环,通过cas手段保证并发安全。

// runqget从_p_的本地队列中取走一个g返回,注意它有两个返回值,第一个返回值就是获取到的
// g,第二参数是bool类型,表示返回的gp的是否应该继承当前时间片中的剩余时间,如果为true
// 表示继承当前时间片的剩余时间,返回false表示开启一个新的时间片
func runqget(_p_ *p) (gp *g, inheritTime bool) {
 // If there's a runnext, it's the next G to run.
 // 上来就是两个for{}循环
 for {
  // 优先从p的runnext中获取,runnext只保存一个g,当runnext没有g时
  // 才会从runq中获取
  next := _p_.runnext
  if next == 0 {
   break
  }
  // 获取的过程是cas操作,防止并发问题
  if _p_.runnext.cas(next, 0) {
   return next.ptr(), true
  }
 }

 for {
  // 从循环队列中获取一个g
  h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
  t := _p_.runqtail
  // h为一个下标,表示小于h下标 runq中的g已取走
  // t表示从[h,t)范围 runq中的g还未取走
  // t与h相等表示本地队列已经空了
  if t == h {
   return nil, false
  }
  // 将runq中h对应下标的g取出
  gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
  // 原子性自增_p_.runqhead
  if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
   return gp, false
  }
 }
}
从其他P的本地队列中偷取G

findrunnable从其他p中偷取可以运行的g,在从其他p偷取g前会当前是否有已经准备好运行的网络协程,如果有返回一个可运行的协程。并通过injectglist函数将其余协程放入全局运行队列等待被调度。如果没有可以运行的网络协程,全局运行队列中也没有可运行的g,当前也没有垃圾回收中进行标记的g需要运行,这时会调用runqsteal从其他p中的本地队列中偷取它里面一半数量的g到当前的本地队列。

func findrunnable() (gp *g, inheritTime bool) {
 _g_ := getg()
top:
 _p_ := _g_.m.p.ptr()
 // 检查是否在处理gc
 if sched.gcwaiting != 0 {
  gcstopm()
  goto top
 }
 if _p_.runSafePointFn != 0 {
  runSafePointFn()
 }

 ...
 // 对本地队列再进行一次检查,查看是否有待运行的g
 if gp, inheritTime := runqget(_p_); gp != nil {
  return gp, inheritTime
 }

 // 查看全局队列是否有待运行的g
 if sched.runqsize != 0 {
  lock(&sched.lock)
  gp := globrunqget(_p_, 0)
  unlock(&sched.lock)
  if gp != nil {
   return gp, false
  }
 }

 // 从网络io轮询器中找到准备就绪的g,把这个g变为可运行的g
 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
  if list := netpoll(0); !list.empty() { 
   gp := list.pop()
   // 把找到的可运行的网络io的g列表插入到全局队列
   injectglist(&list)
   // 将gp的状态从_Gwaiting修改为_Grunnable
   casgstatus(gp, _Gwaiting, _Grunnable)
   if trace.enabled {
    traceGoUnpark(gp, 0)
   }
   return gp, false
  }
 }

 procs := uint32(gomaxprocs)
 ranTimer := false
 // 如果当前的m没有自旋,并且工作中的p的数量小于正在自旋的m数的2倍,就不让它也在进行自旋了,
 // 主要是为了防止有过多的m在寻找可以运行的g而消耗太多的cpu
 if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
  goto stop
 }
 // 将当前的m自旋
 if !_g_.m.spinning {
  // 设置m的状态为自旋状态
  _g_.m.spinning = true
  // 处于自旋状态的m的数量+1
  atomic.Xadd(&sched.nmspinning, 1)
 }

 // 从其他的p的本地运行队列中偷取g,p的选择是随机的,尝试4次
 for i := 0; i < 4; i++ {
  for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
   if sched.gcwaiting != 0 {
    goto top
   }
   stealRunNextG := i > 2 
   p2 := allp[enum.position()]
   if _p_ == p2 {
    continue
   }
   // 从p2偷取它本地队列一半数量的g到_p_中
   if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
    return gp, false
   }
           ...
  }
 }
    ...
 // 垃圾回收中有进行标记工作的g需要运行,调度运行标记工作的g
 if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
  _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
  gp := _p_.gcBgMarkWorker.ptr()
  casgstatus(gp, _Gwaiting, _Grunnable)
  if trace.enabled {
   traceGoUnpark(gp, 0)
  }
  return gp, false
 }
 ...
 // 走到这里,说明前面都没有找到可运行的g,将当前的工作线程与p之间的绑定解除
 if releasep() != _p_ {
  throw("findrunnable: wrong p")
 }
 // 把与当前m绑定的p放入空闲队列中,m接下来要准备休眠了
 pidleput(_p_)
 unlock(&sched.lock)
 wasSpinning := _g_.m.spinning
 if _g_.m.spinning {
  // 取消m的自旋状态标记
  _g_.m.spinning = false
  // 处于自旋状态的m数量减1
  if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
   throw("findrunnable: negative nmspinning")
  }
 }
 // 再一次检查所有的p中是否有有可用运行的g
 for _, _p_ := range allpSnapshot {
  ...
 }

 // 再次检查是有垃圾回收中进行标记的g需要运行
 if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) {
    ...
    }
    ...
 // 当前的m进入休眠
 stopm()
 goto top
}

// runqsteal从p2的本地队列中偷取它里面一半数量的g到_p_的本地队列
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
 t := _p_.runqtail
 // 从p2中偷取一批g放入到p中
 n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
 if n == 0 {
  // 没有偷取到,直接返回
  return nil
 }
 // 这里减1个是因为函数会返回一个g
 n--
 gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
 // 只偷取到了1个g,直接返回它
 if n == 0 {
  return gp
 }
 // 检查_p_的本地队列数量是否越界
 h := atomic.LoadAcq(&_p_.runqhead) 
 if t-h+n >= uint32(len(_p_.runq)) {
  throw("runqsteal: runq overflow")
 }
 // 更新_p_本地队列队尾的位置
 atomic.StoreRel(&_p_.runqtail, t+n) 
 return gp
}

// runqgrab从_p_的本地运行队列中选择一批g放入到batch中
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
 for {
  // 获取_p_本地队列队头的位置,原子操作
  h := atomic.LoadAcq(&_p_.runqhead) 
  // 获取_p_本地队列队尾的位置,原子操作
  t := atomic.LoadAcq(&_p_.runqtail) 
  // n为_p_的本地队列中有多少个可待运行的g
  n := t - h
  // n为队列中g数量的一半
  n = n - n/2
  if n == 0 {
   // 走进这里,说明n为0,什么时候n为0呢,就是h=t的时候,也就是_p_的本地队列中没有g的时候
   // 如果设置了从_p_.runnext(stealRunNextG为true)获取,尝试从runnext获取g
   if stealRunNextG {
    // 将_p_.runnext中的g放入到batch中
    if next := _p_.runnext; next != 0 {
     if _p_.status == _Prunning {
      if GOOS != "windows" {
       usleep(3)
      } else {
       osyield()
      }
     }
     if !_p_.runnext.cas(next, 0) {
      continue
     }
     batch[batchHead%uint32(len(batch))] = next
     return 1
    }
   }
   // 说明_p_.runnext中也没有g,直接返回
   return 0
  }
  // 进一步判断n不能超过_p_本地队列中g数量的一半,为什么要做这个判断呢?
  // 因为在前面读取runqhead和runqtail的两个操作整体看不是原子性的,虽然它们单独是原子性的
  // 也就是说在这两个操作的中间有可能有其他的线程也在偷取g或向里面放g,导致计算不一致,所以这里
  // 进一步检查
  if n > uint32(len(_p_.runq)/2) { 
   continue
  }
  // 从_p_本地队列搬n个g到batch中
  for i := uint32(0); i < n; i++ {
   g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
   batch[(batchHead+i)%uint32(len(batch))] = g
  }
  // 更新_p_本地队列队头的位置,向后移动n个位置
  if atomic.CasRel(&_p_.runqhead, h, h+n) { 
   return n
  }
 }
}

分析完了怎么选取运行g的问题,现在在回到schedule调度函数,分析它是如何执行要运行g的,也就是对应到schedule中的execute处理逻辑。

execute

execute切换到选择的要运行的g的栈执行,设置g为运行状态和是否被抢占等信息之后,调用gogo函数从当前g0栈切换到要运行g的栈,真正开始执行用户程序。

func execute(gp *g, inheritTime bool) {
 // _g_为g0
 _g_ := getg()

 // 设置当前m运行用户程序的g位gp,即将执行用户程序的g和m绑定
 _g_.m.curg = gp
 // 设置执行用户程序的g(gp)的m为_g_.m,这样gp和当前的m相互绑定
 gp.m = _g_.m
 // 将gp的状态从_Grunnable修改为_Grunning,意味着gp马上会得到执行
 casgstatus(gp, _Grunnable, _Grunning)
 gp.waitsince = 0
 // 设置gp的抢占标志为false
 gp.preempt = false
 gp.stackguard0 = gp.stack.lo + _StackGuard
 if !inheritTime {
  _g_.m.p.ptr().schedtick++
 }

 hz := sched.profilehz
 if _g_.m.profilehz != hz {
  setThreadCPUProfiler(hz)
 }

 if trace.enabled {
  if gp.syscallsp != 0 && gp.sysblocktraced {
   traceGoSysExit(gp.sysexitticks)
  }
  traceGoStart()
 }
 // gogo将从g0栈切换到执行用户程序的gp,真正开始执行用户程序
 gogo(&gp.sched)
}

gogo函数是用汇编语言实现的,具体源码解析如下,切换原理就是将要运行g的调度信息g.sched从内存中恢复到CPU寄存器,设置SP和IP等寄存器的值,跳转到要运行的位置开始执行指令。总之一句话,gogo函数完成了从g0到用户g的切换,即CPU执行权的转让以及栈的切换。

TEXT runtime·gogo(SB), NOSPLIT, $16-8
    // 0(FP)表示第一个参数,即buf=&gp.sched
    // BX=buf
 MOVQ buf+0(FP), BX  // gobuf
 // DX=buf.g=&gp.sched.g
 MOVQ gobuf_g(BX), DX
 MOVQ 0(DX), CX  // make sure g != nil
 get_tls(CX)
 // 把g放入到tls[0],即把要运行的g的指针放进线程本地存储,后面的代码可以通过本地线程存储
 // 获取到当前正在执行的goroutine的地址,在这之前,本地线程存储中存放的是g0的地址
 MOVQ DX, g(CX)
 // SP=buf.sp=&gp.sched.sp,即把CPU的栈顶寄存器SP设置为gp.sched.sp,成功完成从
 // g0栈切换到gp栈
 MOVQ gobuf_sp(BX), SP // restore SP
 // 恢复调度上下文到CPU对应的寄存器
 // 将系统调用的返回值放入到AX寄存器中
 MOVQ gobuf_ret(BX), AX
 MOVQ gobuf_ctxt(BX), DX
 MOVQ gobuf_bp(BX), BP
 // 前面已经将调度相关的值都放入到CPU的寄存器中了,将gp.sched中的值清空,这样可以减轻gc的工作量
 MOVQ $0, gobuf_sp(BX) 
 MOVQ $0, gobuf_ret(BX)
 MOVQ $0, gobuf_ctxt(BX)
 MOVQ $0, gobuf_bp(BX)
 // 把gp.sched.pc的值放入到BX寄存器,对于main goroutine,sched.pc中存储的是runtime包
 // 中main()函数的地址
 MOVQ gobuf_pc(BX), BX
 // JMP把BX寄存器中的地址值放入到CPU的IP寄存器中,然后CPU跳转到该地址的位置开始执行指令
 // 即跳转到main()函数执行代码
 JMP BX

execute调用gogo从g0栈切换到用户g栈,在刚开始时,队列中只有一个main goroutine,所以第一个被运行的用户g就是main goroutine.执行完gogo后会跳转到runtime.main(main goroutine的function)运行。

runtime.main函数会调用我们main包中的main函数,在调用main包中main函数前,会做一些其他工作,主要有:

  1. 新建一个线程执行sysmon函数,sysmon的工作是系统后台监控,通过轮询的方式判断是否需要进行垃圾回收和调度抢占
  2. 启动gc清扫的goroutine
  3. 执行runtime包的初始化,main包以及main包import的所有包的初始化

执行完上述准备逻辑之后,才开始真正执行main.main函数,从main.main函数返回后调用exit(0)系统调用退出进程.如果exit(0)进程没退出,通过for循环一直访问非法地址,正常情况下,一但出现非法地址访问,系统就会把该进程杀死,用这样的方法来确保进程退出。

可以看到runtime.main执行完main.main函数之后,此时函数调用链是schedule()-->execute()-->gogo()-->runtime.main()-->main.main(),直接调用exit系统调用结束进程了。读到这里有同学可能疑问?前面不是说在创建goroutine的时候伪造成g是被goexit函数调用的,按理说执行完g逻辑之后,要返回goexit函数继续执行。这是对main goroutine以外的其他goroutine准备的,对于非main goroutine执行完之后会返回到goexit函数继续执行,而main goroutine执行完直接结束整个进程了,因为main goroutine执行完意味着程序结束了,所以不用再做其他处理了,也就无需再回到goexit函数继续执行。

func main() {
 // 这里的g位于main goroutine
 g := getg()
 g.m.g0.racectx = 0
 // 设置栈大小的最大值,对于32系统,栈的大小不能超过250M,对于64位系统,栈的大小不超过1G
 if sys.PtrSize == 8 {
  maxstacksize = 1000000000
 } else {
  maxstacksize = 250000000
 }
 // mainStarted为全局变量,表示main goroutine现在已经开始运行了
 // 现在可以允许其他创建的goroutine启动新的m来运行创建的g了
 mainStarted = true

 // wasm架构不支持线程,所以也就没有监控线程sysmon
 if GOARCH != "wasm" { 
  // 切换到g0栈,启动一个监控线程
  systemstack(func() {
   // 传递给newm的第二参数p为nil,说明该线程不需要与p绑定,即sysmon工作线程
   // m不需要调度器的调度,独立于调度器,因为该m只干一件事就是执行sysmon函数,不会
   // 执行用户程序的g
   newm(sysmon, nil)
  })
 }
 ...
 runtimeInitTime = nanotime()
 //启动垃圾清理的goroutine
 gcenable()
    ...
 // 执行package main 中的init函数, main包中引用的依赖包中的init函数也会执行
 doInit(&main_inittask)

 close(main_init_done)

 needUnlock = false
 unlockOSThread()

 //使用 -buildmode=c-archive 或 c-shared 编译的程序有一个 main,但它不会被执行。
 if isarchive || islibrary {
  return
 }
 // 进行间接调用,因为链接器在放置运行时不知道主包的地址
 fn := main_main 
 // 执行main包中的main函数
 fn()
    ...
 // 通过exit系统调用退出进程,可以看到main goroutine并没有返回,直接进入系统调用退出程序
 exit(0)

 for {
  var x *int32
  // x是一个空指针,是一个非法的地址,给他赋值会导致程序崩溃,操作系统就会把该进程杀死,确保程序一定会退出
  *x = 0
 }
}

前面分析了main goroutine的退出流程,它执行完main.main()函数后,直接调用exit系统调用结束进程了。而对于非main goroutine来说,执行完用户逻辑之后会回到goexit函数,因为在创建非main goroutine的时候伪造成g是被goexit函数调用的.下面开始分析回到goexit函数做了哪些处理。

执行完用户逻辑回到goexit+PCQuantum,也就是goexit的第二条指令,就是CALL runtime·goexit1(SB). 它直接调用goexit1函数。

TEXT runtime·goexit(SB),NOSPLIT,$0-0
 BYTE $0x90 // NOP
 // 用户程序执行完返回goexit的第二条指令就是这里
 CALL runtime·goexit1(SB) 
 BYTE $0x90 // NOP

goexit1函数处理一些data race等检查逻辑之后,调用了mcall函数,mcall是汇编实现的,具体实现细节见下面的注解。mcall函数的功能是从当前用户程序g切换到g0上运行,然后在g0栈上执行goexit0函数。概括起来,mcall完成两个主要逻辑:

  1. 保存当前的g的调度信息到内存中,通过当前的g,找到与它绑定的m,在通过m找到m中保存的g0,然后将g0保存到tls中,修改CPU寄存器的值为g0栈的内容
  2. 切换到g0栈,执行goexit0函数

可以看到mcall完成的功能与前面介绍的gogo函数功能完全相反。gogo函数实现从g0栈到用户程序g的切换,而这里的mcall恰好实现从用户程序g到g0的切换,所以通过gogo和mcall函数,我们可以在runtime代码和用户程序代码之间来回切换。

func goexit1() {
    // 这里是检查data race逻辑
    ...
 mcall(goexit0)
}

// 传给mcall函数的参数是一个指向funcval对象的指针
TEXT runtime·mcall(SB), NOSPLIT, $0-8
    // 取出参数的值也就是goexit0的地址,放入到DI寄存器中,
 MOVQ fn+0(FP), DI

    // 获取当前的g,这里的g还是用户程序的g
 get_tls(CX)
 // 设置AX寄存器的值为g
 MOVQ g(CX), AX 
 // mcall返回地址放入到BX寄存器中
 MOVQ 0(SP), BX 
 // 将当前的调度信息也就是CPU寄存器的值保存到g的sched字段中
 // 因为接下来将发送goroutine的切换,从用户程序g切换到g0

 // 保存PC
 MOVQ BX, (g_sched+gobuf_pc)(AX)
 LEAQ fn+0(FP), BX 
 // 保存SP
 MOVQ BX, (g_sched+gobuf_sp)(AX)
 // 保存g
 MOVQ AX, (g_sched+gobuf_g)(AX)
 // 保存BP
 MOVQ BP, (g_sched+gobuf_bp)(AX)

 // 将g保存到BX寄存器中
 MOVQ g(CX), BX
 // 将g.m保存到BX寄存器中
 MOVQ g_m(BX), BX
 // 将g.m.g0保存到寄存器SI中
 MOVQ m_g0(BX), SI
 // 经过上面3步操作,成功拿到了g0的地址,非常完美
 // 比较g0是否与g相等,理论上不应该相等,如果出现相等,一定是有问题
 CMPQ SI, AX 
 JNE 3(PC)
 MOVQ $runtime·badmcall(SB), AX
 JMP AX
 // SI中保存的是g0的地址,经过这步操作将g0的地址放到了线程本地存储中
 MOVQ SI, g(CX) // g = m->g0
 // 恢复g0的栈顶指针到CPU的SP寄存器中,成功将g0栈放入到CPU上,完成了从
 // 用户程序g栈到g0的栈切换
 MOVQ (g_sched+gobuf_sp)(SI), SP 
 // 将goexit0的参数g入栈
 PUSHQ AX
 // 将funcval对象的指针保存到DX
 MOVQ DI, DX
 // 获取funcval对象的第一个成员也就是goexit0的地址
 MOVQ 0(DI), DI
 // 调用goexit0(g)函数
 CALL DI
 POPQ AX
 MOVQ $runtime·badmcall2(SB), AX
 JMP AX
 RET

goexit0函数把gp(用户程序g)状态从_Grunning修改为_Gdead,然后清理gp对象中保存内容,其次通过函数dropg解除gp和m之间的绑定关系,然后将gp放入到P的freeg队列中缓存起来,以便后续复用,最后调用schedule,进行新一轮调度.

// goexit0函数是在g0上执行的,入参gp是用户程序g
func goexit0(gp *g) {
 // _g_为g0
 _g_ := getg()
 // 将gp的状态从_Grunning修改为_Gdead
 casgstatus(gp, _Grunning, _Gdead)
 if isSystemGoroutine(gp, false) {
  atomic.Xadd(&sched.ngsys, -1)
 }
 // 清理gp对象中保存内容
 gp.m = nil
    ...

 // 如果当前在进行垃圾回收,将gp赋值标记的信息刷新到全局信用池中
 if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
  scanCredit := int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes))
  atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
  gp.gcAssistBytes = 0
 }
 // 解除gp和m之间的绑定关系
 dropg()
    ...
 // 将gp放入到p的freeg队列中,以便下次可以服用,不用new一个g对象,避免重新申请内存
 gfput(_g_.m.p.ptr(), gp)
 if locked {
  if GOOS != "plan9" { 
   gogo(&_g_.m.g0.sched)
  } else {
   _g_.m.lockedExt = 0
  }
 }
 // 再次调用schedule,进行新一轮调度
 schedule()
}

至此,我们已经分析完了main goroutine和非main goroutine的整个调度流程,下图概括了调度流中函数调度栈。对于非main goroutine来说,从shedule开始,经过一些列函数调用执行用户的代码,最后又会回到shedule进行新一轮调度,这个过程会进行无数轮这样的循环,而函数的每一次调用会占用一定的内存空间,这样一直循环调度下去那不是会耗尽g0栈的所有空间,太危险了?这里是不会耗尽的,因为每次执行完mcall切换到g0栈时都是从一个固定的g0.sched.sp即固定的g0栈顶位置开始的,所以会重复使用上一轮调度时使用过的栈内存空间,这时没有问题的,因为前一轮调度已经结束了,覆盖掉之前空间的内容是没有问题的。

调度时机

前面分析了调度器的初始化,main goroutine的创建与运行,调度选g的策略等,下面看在什么时候什么情况下会发生调度,也就是调度的时机。根据调度方式的不同,调度时机分为3种,分别是主动调度、被动调度和抢占调度。

主动调度

在用户代码中通过调用runtime.Gosched()函数发生主动调度。Gosched函数会调用mcall从当前g的栈切换到m的g0栈执行gosched_m函数,mcall细节前面已经分析过了。下面直接看gosched_m函数的实现,gosched_m调用goschedImpl挂起用户协程g. goschedImpl主要完成以下4个功能:

  1. 将用户协程g状态从_Grunning修改为_Grunnable
  2. 调用dropg解除m和gp的互相绑定,分别将m.curg和gp.m设置为nil
  3. 将当前的用户协程g放入到全局运行队列中
  4. 调用schedule进入新一轮调度

主动调度的逻辑还是很好理解的,就是当前的g放弃CPU执行权,将其放入到全局运行队列中,因为它还是可以继续运行的,只是我们主动放弃了,等待下次被调度程序执行。

// Gosched提供给用户程序主动让出调度的接口
func Gosched() {
 // checkTimeouts为空实现
 checkTimeouts()
 // 切换到当前m的g0栈执行gosched_m函数
 mcall(gosched_m)
}

func gosched_m(gp *g) {
 if trace.enabled {
  traceGoSched()
 }
 // gp为准备挂起的用户协程g,即调用runtime.Gosched()函数所在的协程g
 goschedImpl(gp)
}

func goschedImpl(gp *g) {
 // 获取准备挂起g(gp)的状态,它的状态为目前处于_Grunning,因为它正在运行
 status := readgstatus(gp)
 // 检查gp的状态是否合法,如果不为_Grunning状态说明状态出现了异常
 if status&^_Gscan != _Grunning {
  dumpgstatus(gp)
  throw("bad g status")
 }
 // 将gp的状态从_Grunning修改为_Grunnable
 casgstatus(gp, _Grunning, _Grunnable)
 // 解除m和gp的互相绑定,分别将m.curg和gp.m设置为nil
 dropg()
 lock(&sched.lock)
 // 把gp放入全局的运行队列中
 globrunqput(gp)
 unlock(&sched.lock)

 // 进行新一轮调度
 schedule()
}
被动调度

被动调度就是一个g被迫让出CPU执行权,有哪些情况会让出CPU使用权,我们常见的有channel发生阻塞、网络IO阻塞、执行垃圾回收而暂停用户程序执行等。那为什么要让出CPU的使用权,为了提供CPU的使用率,与其发生阻塞导致CPU干等不如让出CPU给其他可以执行的程序使用。下面以channel阻塞为例分析被动调度的处理流程。通道的接收操作<-c底层调用的是chanrecv1函数,本文主要分析channnel操作时是如果让出调度又是如何恢复调度的,下面只截取了与之相关的关键的代码,详细channel源码分析见说说channel哪些事-上篇说说channel哪些事-下篇. chanrecv1只是一个包装函数,真正调用的是chanrecv函数。chanrecv会先判断channel上是否有数据可读,如果有数据直接读取并返回,如果没有数据,则把当前的g放到当前channel对象的读取队列(recvq)上,然后调用goparkunlock函数阻塞当前g的运行。

// 接收操作入口<-c, 有2个入参,c表示通道结构体指针
// elem是接收通道元素的变量地址,即<-c左边的接收者
func chanrecv1(c *hchan, elem unsafe.Pointer) {
 chanrecv(c, elem, true)
}

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
 // 把协程g挂在channel c的读取队列上,调用goparkunlock函数阻塞当前的g
 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    ...
}

gopark函数最后会调用mcall(park_m)函数,切换到g0栈上执行park_m函数。park_m函数先将用户程序协程gp的状态从_Grunning修改为_Gwaiting,然后调用dropg函数解除gp和m的互相绑定关系,最后调用schedule()进入新一轮调度循环。

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
 ...
 // 切换到g0栈上执行park_m函数
 mcall(park_m)
}

func park_m(gp *g) {
 // _g_为g0
 _g_ := getg()

 if trace.enabled {
  traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
 }
 // 将用户程序协程gp的状态从_Grunning修改为_Gwaiting
 casgstatus(gp, _Grunning, _Gwaiting)
 // 解除gp与m的绑定
 dropg()
    ...
 // 进行新一轮调度
 schedule()
}

分析完了channel读取操作中调度相关的操作,下面接着看channel发送操作中调度相关的代码。发送操作c<-x底层调用的是chansend1函数,chansend1直接调用了chansend函数,chansend函数会先检查是否有因channel没有数据可读而挂起的g,如果有直接发数据给sg。如果发送阻塞,出现阻塞原因是要么是非缓冲channel的还没有读取的g,要么是缓存channel buffer满了,这时与接收操作类似调用gopark挂起当前的发送协程g.

// 发送操作c<-x调用入口
func chansend1(c *hchan, elem unsafe.Pointer) {
 chansend(c, elem, true, getcallerpc())
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
 ...
 // 查看是否有因channel没有数据可读而挂起的g,如果有直接发数据给sg
 if sg := c.recvq.dequeue(); sg != nil {
  send(c, sg, ep, func() { unlock(&c.lock) }, 3)
  return true
 }
 ...
 c.sendq.enqueue(mysg)
 // 走到这里,说明要么是非缓冲channel的还没有读取的g,要么是缓存channel buffer满了
 // 都是直接发送不了数据,需要挂起当前的g
 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
 ...
}

如果不阻塞,调用send函数直接发送数据,send函数最后会调用goready函数,goready会切换到g0栈上执行ready函数。ready函数把要唤醒的g的状态设置为_Grunnable并将它加入运行队列,如果有空闲的p并且没有工作线程m处于自旋状态,也就是没有别的m正在进行偷取g的工作,则需要唤醒或新建一个m来运行可运行的g.

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 ...
 goready(gp, skip+1)
}

// goready完成的功能是将一个g状态修改为_Grunnable,并将它加入待运行队列,
// 等待被调度程序运行
func goready(gp *g, traceskip int) {
 // 切换到g0栈执行ready
 systemstack(func() {
  ready(gp, traceskip, true)
 })
}

// ready函数把要唤醒的g的状态设置为_Grunnable并将它加入运行队列
func ready(gp *g, traceskip int, next bool) {
 if trace.enabled {
  traceGoUnpark(gp, traceskip)
 }
 // status为即将唤醒的g(gp)的状态
 status := readgstatus(gp)

 // Mark runnable.
 _g_ := getg()
 mp := acquirem() 
 // 检查gp的状态是不是_Gwaiting,如果不是说明gp的状态出现了异常
 if status&^_Gscan != _Gwaiting {
  dumpgstatus(gp)
  throw("bad g->status in ready")
 }

 // 设置gp的状态为可以运行状态(_Grunnable)
 casgstatus(gp, _Gwaiting, _Grunnable)
 // 将gp放入运行队列
 runqput(_g_.m.p.ptr(), gp, next)
 // 有空闲的p并且没有工作线程m处于自旋状态,也就是没有别的m正在进行偷取g的工作,则需要唤醒或新建一个m来运行可
 // 运行的g
 if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
  wakep()
 }
 releasem(mp)
}

// 尝试获取一个m来运行可以运行的g,在新创建一个g(newproc函数)和唤醒一个g(ready)函数中
// 会调用wakep
func wakep() {
 // 原子性检查当前是否已经有线程m在进行自旋了,因为很可能也有别的线程执行了goready,已唤醒了m
 // 有一个自旋的m会努力找g运行,就不用再找一个m来工作了
 if !atomic.Cas(&sched.nmspinning, 0, 1) {
  return
 }
 startm(nil, true)
}

// startm函数功能是获取一个m,在获取m之前先获取一个空闲的p,如果获取不到p则返回
// 获取p后,在从全局空闲m队列中获取一个m,如果没有获取到则新创建一个m
func startm(_p_ *p, spinning bool) {
 lock(&sched.lock)
 if _p_ == nil {
  // 如果p为nil,则从全局空闲p队列中获取一个空闲的p
  _p_ = pidleget()
  if _p_ == nil {
   unlock(&sched.lock)
   if spinning {
    // 没有获取到p,也不会获取m了,因为在调用函数中spinning为true,已对sched.nmspinning
    // 进行了+1操作,这里需要减一进行还原
    if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
     throw("startm: negative nmspinning")
    }
   }
   // 没有拿到空闲的p,不会获取m,直接返回
   return
  }
 }
 // 从空闲的m队列中获取一个空闲(休眠)的工作线程
 mp := mget()
 unlock(&sched.lock)
 if mp == nil {
  // 空闲队列中没有休眠的m
  var fn func()
  if spinning {
   fn = mspinning
  }
  // 创建一个新的工作线程m
  newm(fn, _p_)
  return
 }
 // 走到这里,表明mp是从空闲m队列获取的,这里的m都是处于休眠状态,它的状态不可能是spinning
 if mp.spinning {
  throw("startm: m is spinning")
 }
 // 空闲m队列中获取的m的nextp不可能为非0,mp.nextp将会和前面获取的p关联绑定
 if mp.nextp != 0 {
  throw("startm: m has p")
 }
 // spinning为true,需要进行自旋,就是找不到g了,但是本地队列中还存在等待运行的g,
 // 是相互矛盾的
 if spinning && !runqempty(_p_) {
  throw("startm: p has runnable gs")
 }
 // 设置mp的自旋状态,是自旋还是非自旋
 mp.spinning = spinning
 // 将p保存到m的nextp中
 mp.nextp.set(_p_)
 // 唤醒处于休眠状态的工作线程
 notewakeup(&mp.park)
}
抢占调度

为了确保每个g都有机会被调度执行,保证调度的公平性,在初始化的时候会启动一个特殊的线程来执行监控任务(sysmon函数),sysmon在runtime.main函数被启动,详情看前面runtime.main代码分析。下面看sysmon具体处理逻辑。sysmon函数是一个死循环函数,在第一轮循环的时候休眠20微妙,之后每轮循环中休眠时间加倍,直到最大休眠时间达到10毫秒。在循环中会检查是否有准备就绪的网络,并将其放入到全局队列中,也会进行抢占处理,按时间强制执行gc等操作。这里主要关注抢占处理retake函数,具体怎么抢占都是由该函数实现的。

// 监控线程工作函数
func sysmon() {
 lock(&sched.lock)
 sched.nmsys++
 // 检查程序是否已经死锁
 checkdead()
 unlock(&sched.lock)

 lasttrace := int64(0)
 idle := 0 
 delay := uint32(0)
 ...
  lastpoll := int64(atomic.Load64(&sched.lastpoll))
  // 如果超过10ms没有进行netpoll,强制进行一次netpoll,如果获取到了可运行的g,插入g的全局列表
  if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
   atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
   list := netpoll(0) 
   if !list.empty() {
    incidlelocked(-1)
    injectglist(&list)
    incidlelocked(1)
   }
  }
  if next < now {
   startm(nil, false)
  }
  
  // 重新获取在系统调用中阻塞的p并抢占长时间运行的g
  if retake(now) != 0 {
   idle = 0
  } else {
   idle++
  }
  
  // 检查是否需要启动垃圾回收程序
  if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
   lock(&forcegc.lock)
   forcegc.idle = 0
   var list gList
   list.push(forcegc.g)
   injectglist(&list)
   unlock(&forcegc.lock)
  }
    ...
 }
}

retake函数重新获取在系统调用中阻塞的p并抢占长时间运行的g。retake是一个大循环,检查所有的p,所有的p保存在全局变量allp中。对于p只对它的两种情况进行处理,分别是_Prunning和_Psyscall,因为只有这两种状态的p与之关联的g正在执行,需要判断是否进行抢占。下面对两种情况分别进行细致的介绍。

func retake(now int64) uint32 {
 n := 0
 lock(&allpLock)
 // 我们不能在 allp 上使用范围循环,因为我们可能会暂时删除 allpLock。因此,
 // 我们每次都需要在循环中重新获取allp

 // 遍历所有的p,因为m运行g需要绑定一个p,检查每个与p绑定的m中运行的g是否需要被抢占
 for i := 0; i < len(allp); i++ {
  _p_ := allp[i]
  if _p_ == nil {
   // 如果procresize已经增长了allp但还没有创建新的Ps,就会发生这种情况。
   continue
  }

  // _p_.sysmontick用于监控线程(sysmon)记录被监控的p的运行时间和系统调用时间
  // 以及运行g时的计数器值和进行系统调用时计数器值
  pd := &_p_.sysmontick
  // 获取p的状态保存到s中
  s := _p_.status
  sysretake := false
  // 如果当前的p正在运行或者处于系统调用中,需要检查是否需要进行抢占
  if s == _Prunning || s == _Psyscall {
   // 每进行一次调度,_p_.schedtick会+1,也就是每切换一个g运行,schedtick会+1
   t := int64(_p_.schedtick)
   // 如果pd.schedtick和t不等,也就是pd.schedtick和_p_.schedtick不等
   // 说明p上有进行过调度操作,即切换过g运行,重新更新pd的值schedtick和schedwhen
   // 为下一次判断做准备
   if int64(pd.schedtick) != t {
    pd.schedtick = uint32(t)
    pd.schedwhen = now
   } else if pd.schedwhen+forcePreemptNS <= now {
    // 走到这里说明pd.schedtick与t相等,说明从pd.schedwhen到now这段时间
    // 没有发生过调度,也就是在这段时间,同一个g一直在运行,检查这个g运行的时间
    // 是否超过了10毫秒,就是schedwhen+forcePreemptNS比当前时间小,说明已经超过了

    // 对与_p_绑定的m中运行的g进行抢占
    preemptone(_p_)
    // 在系统调用的情况下,preemptone()不起作用, 因为没有m绑定到p
    sysretake = true
   }
  }

  // p处在系统调用中,需要检查是否进行抢占
  if s == _Psyscall {
   // _p_.syscalltick 用于记录系统调用的次数,在完成系统调用之后加 1
   t := int64(_p_.syscalltick)
   // 如果sysretake为false并且p的pd中记录的系统调用tick和当前p的系统调用tick不等
   // 说明进行过调度,不用进行抢占,直接更新pd的值
   if !sysretake && int64(pd.syscalltick) != t {
    pd.syscalltick = uint32(t)
    pd.syscallwhen = now
    continue
   }
   
   // 同时满足下面3个条件,并进行抢占:
   // 1. _p_的本地运行队列和runnext都没有g
   // 2. 当前进行自旋的m数量+当前空闲的p的数量之和大于0
   // 3. 进入系统调用的时间到现在还不够10毫秒
   if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
    continue
   }
   unlock(&allpLock)
   incidlelocked(-1)
   // 将_p_的状态从_Psyscall修改为_Pidle
   if atomic.Cas(&_p_.status, s, _Pidle) {
    if trace.enabled {
     traceGoSysBlock(_p_)
     traceProcStop(_p_)
    }
    // n记录处于在系统调用中的p并且需要被抢占的总数
    n++
    // 系统调用tick+1
    _p_.syscalltick++
    // 将与进入系统调用的m绑定的p分离
    handoffp(_p_)
   }
   incidlelocked(1)
   lock(&allpLock)
  }
 }
 unlock(&allpLock)
 return uint32(n)
}

p在_Prunning状态,说明与p关联的m上的g正在被运行,判断g运行时间是否超过10毫秒,如果超过了调用preemptone进行抢占。哪是怎么判断当前的g运行时间是否超过了10毫秒呢?p中有一个schedtick字段,每进行一次调度,schedtick的值会+1,也就是每切换一个g运行,schedtick的值加1. p中还有一个sysmontick字段,该字段是一个复合结构,它有四个成员,见下面的定义。这里我们先只关心前两个,schedtick和schedwhen分别保存上一次p调度的时候计数器值和调度的时间。p的schedtick保存的是当前的计数器值,如果两者相等,说明这段时间没有进行g的切换,那么就比较当前的时间与sysmontick中schedwhen时间差是否大于10毫秒,如果大于10毫秒,需要进行抢占。如果p的schedtick值与sysmontick与schedtick值不等,说明这段时间进行了g的切换,直接更新sysmontick计数器的值为当前p的schedtick值,更新sysmontick调度时间为当前时间。

type sysmontick struct {
 // 调度的计数器
 schedtick   uint32
 // 保存调度的时间
 schedwhen   int64
 // 进行系统调用的计数器
 syscalltick uint32
 // 进入系统调用时的时间
 syscallwhen int64
}

下面看进行真正抢占的处理逻辑函数preemptone,此函数处理比较简单,设置当前被抢占的g的preempt字段为true,并将stackguard0字段设置为0xfffffade之后很快就返回了,并没有看到将当前g暂停执行的逻辑。猜测很可能这是一个异步处理,即被抢占的g的执行函数时可能会检查这里设置的标志位,判断如果是设置了抢占,执行某些处理。接下来进行对猜测进行验证。我们看哪些处理逻辑涉及到了这里的stackguard0和preempt关键词,在runtime文件夹中执行grep查找,嗯,很快找到了newstack()函数用到了这里的关键词。继续追溯newstack函数在哪里被使用了,找到了一条这样的函数调用关系。morestack_noctxt()调用了morestack(),morestack中调用了newstack()。

func preemptone(_p_ *p) bool {
 mp := _p_.m.ptr()
 if mp == nil || mp == getg().m {
  return false
 }
 // gp为当前被抢占的g
 gp := mp.curg
 if gp == nil || gp == mp.g0 {
  return false
 }
 // 设置gp的是否可被抢占标志为true,表示可以被抢占
 gp.preempt = true

 // 设置gp.stackguard0值为0xfffffade,这是一个很大的数,用于区分判断
 gp.stackguard0 = stackPreempt
 if preemptMSupported && debug.asyncpreemptoff == 0 {
  _p_.preempt = true
  preemptM(mp)
 }

 return true
}

为了验证上面的程序是否真的会执行上面的函数调用关系,这里通过举个例子进行验证。下面是一个简单的输出hello world简单程序(main.go),然后查看它的汇编代码。

package main

import "fmt"

func main() {
 fmt.Println("hello world")
}

对上面的程序执行go tool compile -S main.go得到汇编代码。

Go在函数调用的时候会设置安全点,这个是编译器为我们插入的代码。在函数调用的时候会检查stackguard0的大小,而决定是否调用runtime.morestack_noctxt函数。morestack_noctxt是汇编编写的,实现如下,它直接调用morestack函数,morestack会将当前调度信息保存起来,然后切换到g0栈执行newstack函数。

TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0
 MOVL $0, DX
 JMP runtime·morestack(SB)
    
TEXT runtime·morestack(SB),NOSPLIT,$0-0
    ...

 // 将当前的调度信息保存到g.sched字段中,具体是将PC、SP、
 // BP寄存器的值和当前g的地址值保存到内存中
 MOVQ 0(SP), AX // f's PC
 MOVQ AX, (g_sched+gobuf_pc)(SI)
 MOVQ SI, (g_sched+gobuf_g)(SI)
 LEAQ 8(SP), AX // f's SP
 MOVQ AX, (g_sched+gobuf_sp)(SI)
 MOVQ BP, (g_sched+gobuf_bp)(SI)
 MOVQ DX, (g_sched+gobuf_ctxt)(SI)

 // 下面开始从当前的g切换到g0
 // 将m.g0的地址拷贝到BX寄存器中
 MOVQ m_g0(BX), BX
 // 将寄存器BX中的值拷贝到tls中,即设置tls中的g为g0
 MOVQ BX, g(CX)
 // 把g0栈的栈顶寄存器的值恢复到CPU的寄存器SP中,顺利的切换到g0栈
 MOVQ (g_sched+gobuf_sp)(BX), SP
 // 调用newstack函数
 CALL runtime·newstack(SB)
 CALL runtime·abort(SB) // crash if newstack returns
 RET

newstack函数主要完成两项工作:一是检查是否响应sysmon发起的抢占请求,二是检查栈是否需要进行扩容,下面只抽取了与响应抢占相关的代码。newstack检查被抢占g的状态,如果处于抢占状态,调用gopreempt_m将被抢占的gp切换出去放入到全局g运行队列。gopreempt_m是goschedImpl的简单包装,真正处理逻辑在goschedImpl函数,此函数在前面的主动调度中已分析过了,详情见前面的分析。

func newstack() {
 // thisg为g0
 thisg := getg()
 ...
 // gp为用户程序g,就是需要进行抢占或者需要扩容的g
 gp := thisg.m.curg
    ...
 // 设置了抢占
 if preempt {
  // 检查被抢占g的状态,如果不是真正处于抢占状态
  if !canPreemptM(thisg.m) {
   // 将stackguard0的值恢复为原来的正常值,表示已经处理过该抢占请求了
   gp.stackguard0 = gp.stack.lo + _StackGuard
   // 实际并不需要进行抢占,调用gogo函数继续运行当前的用户程序g
   gogo(&gp.sched) 
  }
 }
   ...
 if preempt {
     ...
  // 调用gopreempt_m将被抢占的gp切换出去放入到全局g运行队列
  gopreempt_m(gp) // never return
 }
    
 // 栈扩容的逻辑
 ...
}

func gopreempt_m(gp *g) {
 if trace.enabled {
  traceGoPreempt()
 }
 // 调用goschedImpl将gp的状态从_Grunning修改为_Grunnable,并把gp放入到全局g运行队列
 goschedImpl(gp)
}

p在_Psyscall状态,说明与p关联的m上的g正在进行系统调用。只要下面的三个条件有任何一个不满足,就会对处于_Psyscall状态的p进行抢占。

  1. p的运行队列中还有等待运行的g,需要及时的让p的本地队列中的g得到调度,而当前的g又处在系统调用中,无法执行其他的g,因此将当前的p抢占过来运行其他的g.
  2. 当前没有空闲的p(sched.npidle为0),也没有自旋的m(sched.nmspinning为0),说明大家都在忙,这时抢占当前正处于系统调用中而实际上用不上的p,分配给其他线程用于调度执行其他的g.
  3. 从上一次监控线程留意到p对应的m处于系统调用中到现在已经超过了10毫秒,说明系统调用已花费了比较长的时间,需要进行抢占。使得retake函数的返回值不等于0,让sysmon线程保持20微妙的轮询检查周期,提高监控的实时性。

当上述三个条件有任何一个不满足时,会将p的状态从_Psyscall修改为_Pidle,然后调用handoffp函数,将与进入系统调用的m绑定的p分离。handoff会对当前的条件进行检查,如果满足下面的条件则会调用startm函数启动新的工作线程来与当前的p进行关联,执行可运行的g.具体条件有以下几种:

  1. _p_的本地队列或全局队列中有运行的g,立即启动工作线程对g进行调度
  2. 当前处于在垃圾回收标记阶段,启动线程运行gc work
  3. 当前没有进行自旋的m也没有空闲的p,启动一个线程m进行自旋
  4. 这是最后一个运行的p并且没有人在轮询网络,则需要唤醒另一个m来轮询网络

最后,如果上述三个条件都满足,说明当前比较空闲,将p放入到P的全局空闲队列中即可。

func handoffp(_p_ *p) {
 // 如果_p_的本地队列或全局队列中有运行的g,立即启动工作线程对g进行调度
 if !runqempty(_p_) || sched.runqsize != 0 {
  startm(_p_, false)
  return
 }
 
 // 如果处于在垃圾回收标记阶段,启动线程运行gc work goroutine
 if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
  startm(_p_, false)
  return
 }

 // 走到这里,说明_p_的本地队列和全局队列都没有可运行的g,也没有赋值垃圾回收标记的g需要运行
 // 如果当前没有进行自旋的m也没有空闲的p,启动一个线程m进行自旋
 if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { 
  startm(_p_, true)
  return
 }
 lock(&sched.lock)
 // 当前在gc阶段,需要STW,将_p_的状态切换到__Pgcstop
 // 暂停_p_提供g资源
 if sched.gcwaiting != 0 {
  _p_.status = _Pgcstop
  sched.stopwait--
  if sched.stopwait == 0 {
   notewakeup(&sched.stopnote)
  }
  unlock(&sched.lock)
  return
 }
 // _p_.runSafePointFn被设置为1,需要调用安全点函数safePointFn处理一些gc相关的内容
 if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
  sched.safePointFn(_p_)
  sched.safePointWait--
  if sched.safePointWait == 0 {
   notewakeup(&sched.safePointNote)
  }
 }
 // 如果全局队列中有g,启动一个线程执行g
 if sched.runqsize != 0 {
  unlock(&sched.lock)
  startm(_p_, false)
  return
 }
 
 // 如果这是最后一个运行的p并且没有人在轮询网络,则需要唤醒另一个m来轮询网络。
 if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
  unlock(&sched.lock)
  startm(_p_, false)
  return
 }
 if when := nobarrierWakeTime(_p_); when != 0 {
  wakeNetPoller(when)
 }
 // 将p放入全局空闲队列中
 pidleput(_p_)
 unlock(&sched.lock)
}

总结

协程是Go语言中非常精华的部分,我们知道Go语言可以轻松在单机上运行成千上万个协程,得益于调度器很好的设计.例如通过GMP模型实现了M的复用,引入P让各个处理器可以并行,充分发挥多核的优势,通过优先级队列保证调度的公平性等。小编在这次深入细致的学习调度器原理和实现中收获多多。

golang-scheduler-1-history[1]runtime-bootstrap[2]Go语言goroutine调度器初始化[3]Go语言调度器之调度main goroutine[4]深入golang runtime的调度[5]锲而不舍 —— M 是怎样找工作的[6]

Reference

[1]

golang-scheduler-1-history: http://lessisbetter.site/2019/03/10/golang-scheduler-1-history/

[2]

runtime-bootstrap: https://blog.altoros.com/golang-internals-part-5-runtime-bootstrap-process.html

[3]

Go语言goroutine调度器初始化: https://mp.weixin.qq.com/s?__biz=MzU1OTg5NDkzOA==&mid=2247483769&idx=1&sn=3d77609a293d87e64639afc8d2219e1c&scene=19#wechat_redirect

[4]

Go语言调度器之调度main goroutine: https://mp.weixin.qq.com/s?__biz=MzU1OTg5NDkzOA==&mid=2247483783&idx=1&sn=1128dbd7794d7d53c37abab94771a7d7&scene=19#wechat_redirect

[5]

深入golang runtime的调度: https://zboya.github.io/post/go_scheduler/

[6]

锲而不舍 —— M 是怎样找工作的: https://mp.weixin.qq.com/s/6sNtrdlKtwfJIvBA8UPnKg