本文基于go1.17.6
1.什么是协程?
协程,英文Coroutines,是一种基于线程之上,但又比线程更加轻量级的存在,这种由程序员自己写程序来管理的轻量级线程叫做『用户空间线程』,
具有对内核来说不可见的特性。
1.1 协程的特点
- 线程的切换由操作系统负责调度,协程由用户自己进行调度
- 线程的默认Stack大小是MB级别,而协程更轻量,接近KB级别。
- 在同一个线程上的多个协程,访问某些资源可以不需要锁
- 适用于被阻塞的,且需要大量并发的场景。
1.2 Golang的GMP模型
local runnable queueglobal runnable queuelocal runnable queue
注意:
-
- M和P是绑定关系
-
- M和G是绑定关系
-
- P只是暂存G,他们之间不是绑定关系
E-R图
简化后的E-R图
注意: 后面为了书写简单直接将
local runnable queueglobal runnable queue
延伸
timer的四叉堆和内存分配器使用的mcache也是每个P一个
Q: 为什么默认情况下P的数量与CPU数量一致?
A:
这样可以避免把CPU时间浪费在上线文切换上
1.3 协程和线程的资源消耗对比
类别 | 栈内存 | 上下文切换 | 备注 |
---|---|---|---|
Thread | 1MB | 1us | 内存占用使用的是Java线程的默认栈大小 |
Goroutine | 4KB | 0.2us | 内存占用使用的是Linux+x86下的栈大小 |
2. 常见的Goroutine 让出/调度/抢占场景
2.1 协程被创建
- 1)P的本地队列有剩余空间时,放入P的本地队列
- 2)P的本地队列没有剩余空间时,将本地队列的一部分Goroutine以及待加入的Goroutine添加到全局队列
2.2 主动让出
2.2.1 使用runtime.Gosched
package main
import (
"fmt"
"runtime"
)
func main() {
runtime.GOMAXPROCS(1)
for i := 0; i < 100; i++ {
fmt.Println("Goroutine1:", i)
//time.Sleep(500 * time.Millisecond)
if i == 5 {
go func() {
for i := 0; i < 100; i++ {
fmt.Println("Goroutine2:", i)
//time.Sleep(500 * time.Millisecond)
}
}()
runtime.Gosched()
}
}
}
具体执行流程
1.Gosched() -> 2.mcall(fn func(*g)) -> 3.gosched_m(gp *g) ->
4.goschedImpl(gp *g) -> dropg()
globrunqput()
schedule()
step2 mcall(fn func(*g))
mcall函数是通过汇编实现的,
在 asm_amd64.s 中
- 1)保存当前goroutine的状态(PC/SP)到g->sched中,方便下次调度;
- 2)切换到m->g0的栈;
- 3)然后g0的堆栈上调用fn;
注意:每个m上都有一个自己g0,仅用于调度,不指向任何可执行的函数
mcall returns to the original goroutine g later, when g has been rescheduled.
fn must not return at all; typically it ends by calling schedule, to let the m
run other goroutines.
step4 goschedImpl(gp *g)
- 1)修改Goroutine的状态 _Grunning -> _Grunnable
- 2)dropg() 将G和M解绑
- 3)globrunqput(gp) 将G放入全局runnable队列
- 4)schedule()
进行一轮调度,寻找一个runnable的G,并执行它,函数不会返回
step4-4 schedule()
schedule() –> 1.findrunnable()
2.execute() –>gogo()
step4-4 schedule() –> findrunnable()
- 1)从同一个P的本地runnable队列中
- 2)从全局的runnable队列中
- 3)从网络轮训器(netpoll)中,是否有事件就绪的G
- 4)通过runtime.runqsteal从其它P的本地runnable队列偷取一半G放入本地runnable队列,并取出一个用来执行
gogo()
Q:为什么一定要单独设置一个g0来执行goschedImpl(gp *g)
A: schedule()会将G的stack搞乱
另外有些文章说g0的栈会使用操作系统分配的线程栈,但是根据萌叔的研究,这个与操作系统有关的。
macOS是使用的线程栈(8MB),但是linux下g0的栈初始大小只有8KB。
runtime/proc.go
// In case of cgo or Solaris or illumos or Darwin, pthread_create will make us a stack.
// Windows and Plan 9 will layout sched stack on OS stack.
if iscgo || mStackIsSystemAllocated() {
mp.g0 = malg(-1)
} else {
mp.g0 = malg(8192 * sys.StackGuardMultiplier)
}
// mStackIsSystemAllocated indicates whether this runtime starts on a
// system-allocated stack.
func mStackIsSystemAllocated() bool {
switch GOOS {
case "aix", "darwin", "plan9", "illumos", "ios", "solaris", "windows":
return true
case "openbsd":
switch GOARCH {
case "386", "amd64", "arm", "arm64":
return true
}
}
return false
}
2.2.2 任务执行完毕
goexit1() -> mcall(fn func(*g)) -> goexit0(gp *g)
// goexit continuation on g0.
func goexit0(gp *g) {
_g_ := getg()
casgstatus(gp, _Grunning, _Gdead)
gp.m = nil
dropg()
gfput(_g_.m.p.ptr(), gp)
schedule()
}
- 1)修改G的状态 _Grunning -> _Gdead
- 2)解除G和M的绑定关系
- 3)将G放入P的空闲G的链表(gfree list)
- 4)触发一轮调度
2.3 抢占式调度
抢占式调度是由守护进程 sysmon() 触发的
sysmon()是一个特殊的m,它不需要和P进行绑定。
func sysmon() {
for{
// 1. 运行计时器
// 2. 检查网络轮询器(netpoll)
// 3. 触发抢占式调度
// 4. 触发GC
}
}
最大时间片是10ms
推荐观看 幼麟实验室的视频
2.3.1 基于协作的抢占式调度
依赖栈增长监测代码
sysmon() -> retake() -> preemptone()
type g struct {
stackguard0 uintptr // offset known to liblink
preempt bool // preemption signal, duplicates stackguard0 = stackpreempt
}
2.3.2 基于信号的抢占式调度
发出信号
sysmon() -> retake() -> preemptone() -> signalM(mp, sigPreempt)
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
gp.preempt = true
// Every call in a goroutine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp.stackguard0 = stackPreempt
// Request an async preemption of this P.
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}
return true
}
接收到信号
sighandler() -> doSigPreempt() -> asyncPreempt()-> globalrunqput()
asyncPreempt由汇编实现 preempt_amd64.s
2.4 hand off p
延伸阅读
场景读写磁盘文件
涉及syscall.Syscall()
注意:
- 由于handeroff机制的存在,读写磁盘文件的过程中,如果IO的压力过大可能会导致大量的系统线程被创建
- 在Golang中(Linux平台),读写网络连接是非阻塞式系统调用,并且是边缘触发
- 在Golang中(Linux平台),读写磁盘文件是阻塞式系统调用
File.Read() -> poll.FD.Read() -> syscall.Read() -> syscall.Syscall()
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
...
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p) // 执行syscall
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil { // gopark
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
func read(fd int, p []byte) (n int, err error) {
var _p0 unsafe.Pointer
if len(p) > 0 {
_p0 = unsafe.Pointer(&p[0])
} else {
_p0 = unsafe.Pointer(&_zero)
}
r0, _, e1 := Syscall(SYS_READ, uintptr(fd), uintptr(_p0), uintptr(len(p)))
n = int(r0)
if e1 != 0 {
err = errnoErr(e1)
}
return
}
Syscall和RawSyscall的源码
2.4.1 entersyscall()
1.设置_g_.m.locks++,禁止g被强占
2.设置_g_.stackguard0 = stackPreempt,禁止调用任何会导致栈增长/分裂的函数
3.保存现场,在 syscall 之后会依据这些数据恢复现场
4.更新G的状态为_Gsyscall
5.释放局部调度器P:解绑P与M的关系;
6.更新P状态为_Psyscall
7.g.m.locks–解除禁止强占。
进入系统调用的goroutine会阻塞,导致内核M会阻塞。此时P会被剥离掉, 所以P可以继续去获取其余的空闲M执行其余的goroutine。
2.4.2 阻塞式系统调用长期运行将会导致的流程
sysmon() -> retake() -> handoffp()
midle
2.4.3 exitsyscall()
1.设置 g.m.locks++ 禁止强占
2.调用 exitsyscallfast() 快速退出系统调用
2.1. Try to re-acquire the last P,如果成功就直接接return;
2.2. Try to get any other idle P from allIdleP list;
2.3. 没有获取到空闲的P
3.如果快速获取到了P:
3.1. 更新G 的状态是_Grunning
3.2. 与G绑定的M会在退出系统调用之后继续执行
4. 没有获取到空闲的P:
4.1. 调用mcall()函数切换到g0的栈空间;
4.2. 调用exitsyscall0函数:
4.2.1. 更新G 的状态是_Gwaiting
4.2.2. 调用dropg():解除当前g与M的绑定关系;
4.2.3. 调用globrunqput将G插入global queue的队尾,
4.2.4. 调用stopm()释放M,将M加入全局的idel M列表,这个调用会阻塞,知道获取到可用的P。
4.2.5. 如果4.2.4中阻塞结束,M获取到了可用的P,会调用schedule()函数,执行一次新的调度。
2.5 系统调用
以netpoll为例,linux操作系统下,netpoll基于epoll实现的
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
2.5.1 让出流程
pollDesc.waitRead() -> runtime.poll_runtime_pollWait() -> runtime.gopark()
-> mcall(fn func(*g)) -> park_m(gp *g)
gopark()主要流程
- 1)g切换到g0
- 2)修改G的状态 _Grunning -> _Gwaiting
- 3)dropg() 解除G和M的绑定关系
- 4)schedule() 触发一轮调度
2.5.2 放回 主动触发netpoll()
findrunnable()->netpoll()->injectglist()->globrunqputbatch()/runqputbatch()
pollWork()
startTheWorldWithSema
sysmon()
注意: netpoll函数是非阻塞的
2.6 定时器
场景
time.Sleep(1 * time.Second)
2.6.1 让出
time.Sleep() -> runtime.timeSleep() -> mcall() -> park_m()
-> resetForSleep() -> resettimer() -> doaddtimer()
2.6.2 唤醒
checkTimers() -> runtimer() -> runOneTimer() -> goready() -> systemstack() -> ready()
goready()主要流程
- 1)切换到g0的栈空间(不严谨)
- 2)修改Goroutine的状态 _Gwaiting -> _Grunnable
- 3)runqput() 把Goroutine放入P的本地队列的头部
- 4)如果M的数量不足,尝试创建一些M
注意: mcall()和systemstack()是对应的
2.7 Channel
场景
ch := make(chan int)
ch <- 15
2.7.1 写入channel并阻塞
chansend1() -> chansend() -> hchan.sendq.enqueue() -> gopark()
2.7.2 就绪
chanrecv1() -> chanrecv() -> hchan.sendq.dequeue() -> recv() -> goready()
2.8 同步原语
互斥锁与2.6、2.7的情况非常类似,只是G会存储在Treap中
- Treap是一种平衡二叉查找树
- semaRoot 是全局唯一的
3.参考资料
1.g0-特殊的goroutine
2.golang syscall原理
3.Linux中的EAGAIN含义
4.Golang-gopark函数和goready函数原理分析
5.幼麟实验室-协程让出、抢占、监控和调度
6.Golang 调度器 GMP 原理与调度全分析
7.time.Sleep(1) 后发生了什么
8.mcall systemstack等汇编函数