基于信号的协程抢占调度
package main

import (
    "runtime"
    "time"
)

func main() {
    runtime.GOMAXPROCS(1)
    go func() {
        for {
        }
    }()

    time.Sleep(time.Millisecond)
    println("OK")
}
time.Sleep()main goroutine
基于信号的抢占模式runtime.sighandlerSIGURGruntime.doSigPreempt

本篇从发送与接收信息并处理两方面来看一下它是如何实现的。

发送信号

在上篇文章(认识sysmon监控线程)介绍 sysmon 的时候,我们知道监控线程会在无P的情况下一直运行,定期扫描所有的P,将长时间运行的G 进行解除。

// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
    ......

    for {
        if idle == 0 { // start with 20us sleep...
            delay = 20
        } else if idle > 50 { // start doubling the sleep after 1ms...
            delay *= 2
        }
        if delay > 10*1000 { // up to 10ms
            delay = 10 * 1000
        }
        usleep(delay)

        ......

        // retake P's blocked in syscalls
        // and preempt long running G's
        if retake(now) != 0 {
            idle = 0
        } else {
            idle++
        }
    }

    ......

}
retake()P
// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms

func retake(now int64) uint32 {
    for i := 0; i < len(allp); i++ {
        _p_ := allp[i]
        if _p_ == nil {
            // This can happen if procresize has grown
            // allp but not yet created new Ps.
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        sysretake := false

        if s == _Prunning || s == _Psyscall {
            // Preempt G if it's running for too long.
            // 如果 P 运行得太久, 则抢占 G
            t := int64(_p_.schedtick)
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
            } else if pd.schedwhen+forcePreemptNS <= now {

                // 如果超过了10ms就需要进行抢占了
                preemptone(_p_)

                // In case of syscall, preemptone() doesn't
                // work, because there is no M wired to P.
                sysretake = true
            }
        }

        ......
    }

}
Ppd.schedwhen+forcePreemptNS <= now
// src/runtime/proc.go

// Tell the goroutine running on processor P to stop.
// This function is purely best-effort. It can incorrectly fail to inform the
// goroutine. It can send inform the wrong goroutine. Even if it informs the
// correct goroutine, that goroutine might ignore the request if it is
// simultaneously executing newstack.
// No lock needs to be held.
// Returns true if preemption request was issued.
// The actual preemption will happen at some point in the future
// and will be indicated by the gp->status no longer being
// Grunning
func preemptone(_p_ *p) bool {
    mp := _p_.m.ptr()
    if mp == nil || mp == getg().m {
        return false
    }

    // 被抢占的 goroutine
    gp := mp.curg
    if gp == nil || gp == mp.g0 {
        return false
    }

    // 设置g的抢占标识
    gp.preempt = true

    // Every call in a go routine checks for stack overflow by
    // comparing the current stack pointer to gp->stackguard0.
    // Setting gp->stackguard0 to StackPreempt folds
    // preemption into the normal stack overflow check.
    // 设置栈抢占 stackPreempt,这是一个很大的值比任何栈都大,
    // 在 goroutine 内部的每次调用都会比较栈顶指针和 g.stackguard0,用以判断是否发生了栈溢出。
    gp.stackguard0 = stackPreempt

    // Request an async preemption of this P.
    // 对P发一个异步抢占请示
    if preemptMSupported && debug.asyncpreemptoff == 0 {
        _p_.preempt = true
        preemptM(mp)
    }

    return true
}

这里主要是设备两个抢占标识位,对于信号调用了 preemptM() 函数发送一个抢占请求到m。

// src/runtime/signal_unix.go

const preemptMSupported = true

// preemptM sends a preemption request to mp. This request may be
// handled asynchronously and may be coalesced with other requests to
// the M. When the request is received, if the running G or P are
// marked for preemption and the goroutine is at an asynchronous
// safe-point, it will preempt the goroutine. It always atomically
// increments mp.preemptGen after handling a preemption request.
func preemptM(mp *m) {
    ......
    if atomic.Cas(&mp.signalPending, 0, 1) {
        if GOOS == "darwin" || GOOS == "ios" {
            atomic.Xadd(&pendingPreemptSignals, 1)
        }

        // If multiple threads are preempting the same M, it may send many
        // signals to the same M such that it hardly make progress, causing
        // live-lock problem. Apparently this could happen on darwin. See
        // issue #37741.
        // Only send a signal if there isn't already one pending.
        signalM(mp, sigPreempt)
    }
    ......
}
signalM()
// src/runtime/os_darwin.go

func signalM(mp *m, sig int) {
    pthread_kill(pthread(mp.procid), uint32(sig))
}
pthread_kill()

以上就是发送抢占信号的基本流程,相应有也就应该有处理抢占信号的逻辑。

处理信息

给m发送的信息是 sigPreempt ,它是一个常量

const sigPreempt = _SIGURG

对于它的详细说明,可以参考官方注释文档。

程序在开始运行的时候,

// Initialize signals.
// Called by libpreinit so runtime may not be initialized.
//go:nosplit
//go:nowritebarrierrec
func initsig(preinit bool) {
    if !preinit {
        // It's now OK for signal handlers to run.
        signalsOK = true
    }

    // For c-archive/c-shared this is called by libpreinit with
    // preinit == true.
    if (isarchive || islibrary) && !preinit {
        return
    }

    for i := uint32(0); i < _NSIG; i++ {
        t := &sigtable[i]
        if t.flags == 0 || t.flags&_SigDefault != 0 {
            continue
        }

        // We don't need to use atomic operations here because
        // there shouldn't be any other goroutines running yet.
        fwdSig[i] = getsig(i)

        if !sigInstallGoHandler(i) {
            // Even if we are not installing a signal handler,
            // set SA_ONSTACK if necessary.
            if fwdSig[i] != _SIG_DFL && fwdSig[i] != _SIG_IGN {
                setsigstack(i)
            } else if fwdSig[i] == _SIG_IGN {
                sigInitIgnored(i)
            }
            continue
        }

        handlingSig[i] = 1
        setsig(i, funcPC(sighandler)) // 注册信号对应的回调方法
    }
}

go 在启动的时候会把所有的信息都注册一次。

再通过 sighandler() 函数进行注册。

func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
    ......
    // 如果是抢占信号
    if sig == sigPreempt && debug.asyncpreemptoff == 0 {
        // Might be a preemption signal.
        doSigPreempt(gp, c)
        // Even if this was definitely a preemption signal, it
        // may have been coalesced with another signal, so we
        // still let it through to the application.
    }
    ......
}

然后是执行抢占信号事件

// doSigPreempt handles a preemption signal on gp.
func doSigPreempt(gp *g, ctxt *sigctxt) {
    // Check if this G wants to be preempted and is safe to
    // preempt.
    if wantAsyncPreempt(gp) {
        if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
            // Adjust the PC and inject a call to asyncPreempt.

            // 执行抢占
            ctxt.pushCall(funcPC(asyncPreempt), newpc)
        }
    }

    // Acknowledge the preemption.
    atomic.Xadd(&gp.m.preemptGen, 1)
    atomic.Store(&gp.m.signalPending, 0)

    if GOOS == "darwin" || GOOS == "ios" {
        atomic.Xadd(&pendingPreemptSignals, -1)
    }
}
isAsyncSafePoint()asyncPreempt()

本文基于go version 1.16

参考资料