本文摘自网络,作者,侵删。

有群友说面试的时候被问到:单核CPU,开两个goroutine,其中一个死循环,会怎么样?相信很多小伙伴乍一看一脸懵,我就在群里回了一下go1.14版本实现了基于信号的抢占式调度,可以在goroutine执行时间过长的时候强制让出调度,执行其他的goroutine。接下来看看具体怎么实现的,话不多说直接上代码。基于go1.15 linux amd64。
先看一个常规的栗子

func f1() {
    fmt.Println("This is f1")
}

func f2() {
    fmt.Println("This is f2")
}

func main() {
    runtime.GOMAXPROCS(1)

    go f1()
    go f2()

    time.Sleep(100 * time.Millisecond)
    fmt.Println("success")
}
// always print f2 f1 

以上代码就是模拟单核CPU下go语言的执行情况,无论运行多少次,输出的结果总是 f2 f1,解释如下

func main() {
    // 只有一个 P 了,目前运行主 goroutine
    runtime.GOMAXPROCS(1)

    // 创建一个 G1 , call $runtime.newproc -> runqput(_p_, newg, true)放入到本地队列,注意这里的 next 参数为 true,代表放在队列头部
    go f1()

    // 因为只有一个 P 主 goroutine继续运行

    // 创建一个 G2 , 同上面,也是加入到队列头部,这时候本地队列的顺序就是 G2 G1
    go f2()

    // 等待 f1 f2的执行 gopark 主 goroutine GMP调度可运行的 G
    // 按顺序调用 G2 G1
    // 所以不管执行多少次,结果都是
    // This is f2
    // This is f1
    // success
    time.Sleep(100 * time.Millisecond)
    fmt.Println("success")
}

如果将runtime.GOMAXPROCS(1)改成runtime.GOMAXPROCS(4)即多核CPU,你就会发现 f1 和 f2 交替执行,没有明确的先后,这种事件A和B完全无序执行,即为并发。利用多核CPU同时执行A和B的情况,即为并行。为什么会这样涉及到go语言的GMP调度模型,这里不赘述,有兴趣的小伙伴可以自行学习。

既然每次都是 f2 先执行,那在 f2 中加入一个死循环会怎么样呢?

func f1() {
    fmt.Println("This is f1")
}

func f2() {
    // 死循环
    for {

    }
    fmt.Println("This is f2")
}

func main() {
    runtime.GOMAXPROCS(1)

    go f1()
    go f2()

    time.Sleep(100 * time.Millisecond)
    fmt.Println("success")
}

// This is f1
// success

你会发现虽然 f2 block住了没有输出,但是完全没影响f1和主goroutine的运行,这其实就是抢占式调度。golang在之前的版本中已经实现了抢占调度,但有些场景是无法抢占成功的。比如轮询计算 for { i++ } 等,这类操作无法进行newstack、morestack、syscall,无法检测stackguard0 = stackpreempt的场景。通俗点说之前版本实现的抢占调度发生在goroutine虽然执行了很长时间,但还得继续调用函数等操作,才能检查是不是需要抢占。
以下是src/runtime/stack.go/newstack()中抢占调度的内容:

func newstack() {
    ...
    // NOTE: stackguard0 may change underfoot, if another thread
    // is about to try to preempt gp. Read it just once and use that same
    // value now and below. 
    // 如果是发起的抢占请求而非真正的栈分段
    preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt

    // 如果正持有锁、分配内存或抢占被禁用,则不发生抢占
    if preempt {
        if !canPreemptM(thisg.m) {
            // Let the goroutine keep running for now. 不发生抢占,继续调度
            // gp->preempt is set, so it will be preempted next time.
            gp.stackguard0 = gp.stack.lo + _StackGuard
            gogo(&gp.sched) // never return 重新进入调度循环
        }
    }
        ...
    // 如果需要抢占
    if preempt {
        if gp == thisg.m.g0 {
            throw("runtime: preempt g0")
        }
        if thisg.m.p == 0 && thisg.m.locks == 0 {
            throw("runtime: g is running but p is not")
        }

        if gp.preemptShrink {
            // We're at a synchronous safe point now, so
            // do the pending stack shrink. 
            gp.preemptShrink = false
            shrinkstack(gp)
        }

        if gp.preemptStop {
            preemptPark(gp)    // never returns 进入循环调度
        }

        // Act like goroutine called runtime.Gosched. 表现得像是调用了 runtime.Gosched,主动让权,进入循环调度
        gopreempt_m(gp) // never return
    }
        ...
}

显然上面的抢占调度还是存在一些问题的,GO团队在go1.14版本中实现了基于信号协程调度抢占。下面看下如何实现的。
首先是信号的发送方:Go Runtime 在启动程序的时候,会创建一个独立的 M 作为监控线程,称为 sysmon,它是一个系统级的 daemon 线程。这个sysmon 独立于 GPM 之外,也就是说不需要P就可以运行,也是作为抢占信号的发送方一直运行。
src/runtime/proc.go/main()

// The main goroutine.
func main() {
    ...
    // 启动系统后台监控(定期垃圾回收、并发任务调度)
    if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
        systemstack(func() {
            // 系统监控在一个独立的 m 上运行
            newm(sysmon, nil, -1)
        })
    }
...
}

src/runtime/proc.go/sysmon(),重点是里面的retake()

func sysmon() {
    lock(&sched.lock)
    // 不计入死锁的系统 m 的数量
    sched.nmsys++
    // 死锁检查
    checkdead()
    unlock(&sched.lock)

    lasttrace := int64(0)
    idle := 0 // how many cycles in succession we had not wokeup somebody 没有 wokeup 的周期数
    delay := uint32(0)
    //死循环一直执行
    for {
        if idle == 0 { // start with 20us sleep... 每次启动先休眠 20us
            delay = 20
        } else if idle > 50 { // start doubling the sleep after 1ms... 1ms 后就翻倍休眠时间
            delay *= 2
        }
        if delay > 10*1000 { // up to 10ms 增加到 10ms
            delay = 10 * 1000
        }
        // 休眠
        usleep(delay)
        now := nanotime()
        // timer定时器检查
        next, _ := timeSleepUntil()
        ...
        // retake P's blocked in syscalls
        // and preempt long running G's 抢夺在 syscall 中阻塞的 P、运行时间过长的 G
        if retake(now) != 0 {
            idle = 0
        } else {
            idle++
        }
    ...
    }
}

src/runtime/proc.go/retake()

func retake(now int64) uint32 {
    n := 0
    // Prevent allp slice changes. This lock will be completely
    // uncontended unless we're already stopping the world. 防止 allp 数组发生变化,除非我们已经 STW,此锁将完全没有人竞争
    lock(&allpLock)
    // We can't use a range loop over allp because we may
    // temporarily drop the allpLock. Hence, we need to re-fetch
    // allp each time around the loop.
    for i := 0; i < len(allp); i++ {
        _p_ := allp[i]
        if _p_ == nil {
            // This can happen if procresize has grown
            // allp but not yet created new Ps.
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        sysretake := false
        if s == _Prunning || s == _Psyscall {
            // Preempt G if it's running for too long. 如果 G 运行时时间太长则进行抢占
            t := int64(_p_.schedtick)
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
            } else if pd.schedwhen+forcePreemptNS <= now {
               // 运行超过10ms,就在这里了
                preemptone(_p_)
                // In case of syscall, preemptone() doesn't 对于 syscall 的情况,因为 M 没有与 P 绑定,
                // work, because there is no M wired to P. preemptone() 不工作
                sysretake = true
            }
        }

        // 对阻塞在系统调用上的 P 进行抢占
        if s == _Psyscall {
            // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
            // 如果已经超过了一个系统监控的 tick(20us),则从系统调用中抢占 P
            t := int64(_p_.syscalltick)
            if !sysretake && int64(pd.syscalltick) != t {
                pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            // On the one hand we don't want to retake Ps if there is no other work to do,一方面,在没有其他 work 的情况下,我们不希望抢夺 P
            // but on the other hand we want to retake them eventually
            // because they can prevent the sysmon thread from deep sleep.另一方面,因为它可能阻止 sysmon 线程从深度睡眠中唤醒,所以最终我们仍希望抢夺 P
            if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                continue
            }
            // Drop allpLock so we can take sched.lock. 解除 allpLock,从而可以获取 sched.lock
            unlock(&allpLock)
            // Need to decrement number of idle locked M's 在 CAS 之前需要减少空闲 M 的数量(假装某个还在运行)
            // (pretending that one more is running) before the CAS.
            // Otherwise the M from which we retake can exit the syscall,
            // increment nmidle and report deadlock. 否则发生抢夺的 M 可能退出 syscall 然后再增加 nmidle ,进而发生死锁
            incidlelocked(-1)
            if atomic.Cas(&_p_.status, s, _Pidle) {
                if trace.enabled {
                    traceGoSysBlock(_p_)
                    traceProcStop(_p_)
                }
                n++
                _p_.syscalltick++
               // 转移 P 有其他任务就创建 M 去执行 一圈找下来都没有就放入空闲的 P 列表
                handoffp(_p_)
            }
            incidlelocked(1)
            lock(&allpLock)
        }
    }
    unlock(&allpLock)
    return uint32(n)
}

preemptone -> preemptM(mp)->signalM(mp, sigPreempt)直至发起系统调用,完成抢占信号的发送。

信号的接收方:
Go 运行时初始化是会调用runtime.mstart完成信号处理的初始化行为,调用mstart1()
src/runtime/proc.go/mstart1()

func mstart1() {
    ...

    // Install signal handlers; after minit so that minit can
    // prepare the thread to be able to handle the signals.
    // 设置信号 handler;在 minit 之后,以便 minit 可以准备处理信号的的线程
    if _g_.m == &m0 {
        // 只在当前 m 是 m0 的时候执行, mstartm0主要就是初始化信号处理 initsig
        mstartm0()
    }
     ...
}

src/runtime/proc.go/mstartm0()

func mstartm0() {
    ...
    // 信号处理初始化
    initsig(false)
}

src/runtime/signal_unix.go/initsig()

func initsig(preinit bool) {
    ...
        // 对于一个需要设置 sighandler 的信号,会通过 setsig 来设置信号对应的动作(action):
        setsig(i, funcPC(sighandler))
     ...
}

src/runtime/signal_unix.go/sighandler()

func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
    ...

    if sig == sigPreempt && debug.asyncpreemptoff == 0 {
        // Might be a preemption signal. 可能是一个抢占信号
        doSigPreempt(gp, c)
        // Even if this was definitely a preemption signal, it
        // may have been coalesced with another signal, so we 即便这是一个抢占信号,它也可能与其他信号进行混合,因此我们
        // still let it through to the application. 继续进行处理。
    }
}

doSigPreempt->asyncPreempt->asyncPreempt2

func asyncPreempt2() {
    gp := getg()
    gp.asyncSafePoint = true
    if gp.preemptStop {
        mcall(preemptPark)
    } else {
        mcall(gopreempt_m)
    }
    // 异步抢占过程结束
    gp.asyncSafePoint = false
}

不管是preemptPark还是gopreempt_m,最终都是进入调度循环schedule(),去执行其他的 G。
终于写完了,撒花,菜鸟一枚,有什么不对的欢迎评论留言指正,谢谢。


本文来自:简书

查看原文:单核CPU下Golang调度及抢占式调度的实现

相关阅读 >>

更多相关阅读请进入《Go》频道 >>


Go语言101

老貘

一个与时俱进的Go编程知识库。