浅谈Rust和Golang协程设计
go_rust.jpg

何为有栈协程——以goroutine为例

根据维基百科的定义,协程,是指在非抢占式地处理多任务场景下,用于生成子程序的计算机程序组件,它允许在执行过程中被暂停或恢复。

从逻辑上来说,协程和线程的主要区别,在于协程是协作式处理多任务,而线程是抢占式处理多任务。协程之间的切换一般不涉及系统调用,在用户态就可以完成。

gogoroutine
package main

func add(a, b int64) (int64, int64) {
	var tmp int64 = 1
	tmp = tmp + a
	return a + b, a - b
}

func main() {
	var c int64 = 10
	var d int64 = 12
	go add(c, d)
}
go toolgo
go tool compile -N -l -S main.go
SPSBSUBQSPADDQSP
        0x001d 00029 (main.go:10)       MOVQ    $10, "".c+56(SP)
        0x0026 00038 (main.go:11)       MOVQ    $12, "".d+48(SP)
        0x002f 00047 (main.go:12)       MOVL    $32, (SP);
        0x0036 00054 (main.go:12)       LEAQ    "".add·f(SB), AX;将函数指针保存到AX寄存器
        0x003d 00061 (main.go:12)       MOVQ    AX, 8(SP);将AX寄存器保存到新分配的8字节
        0x0042 00066 (main.go:12)       MOVQ    "".c+56(SP), AX;将参数c保存到AX寄存器
        0x0047 00071 (main.go:12)       MOVQ    AX, 16(SP);将AX寄存器保存到新分配的8字节
        0x004c 00076 (main.go:12)       MOVQ    $12, 24(SP);将参数d保存到新分配的8字节
        0x0055 00085 (main.go:12)       PCDATA  $1, $0;和GC相关,可忽略
        0x0055 00085 (main.go:12)       CALL    runtime.newproc(SB);!!此处创建goroutine
        0x005a 00090 (main.go:13)       MOVQ    64(SP), BP;一般是接收返回值
        0x005f 00095 (main.go:13)       ADDQ    $72, SP;回收main本地栈空间
        0x0063 00099 (main.go:13)       RET
goruntimenewproc
newprocstackSPgoroutinegoroutine
system stack

关于函数——func systemstack(fn func()) systemstack is being called from the limited stack of an ordinary goroutine. In this case, systemstack switches to the per-OS-thread stack, calls fn, and switches back.

systemstack(func() {
		newg := newproc1(fn, argp, siz, gp, pc)

		_p_ := getg().m.p.ptr()
		runqput(_p_, newg, true)

		if mainStarted {
			wakep()
		}
	})
goroutinegoroutine
// Allocate a new g, with a stack big enough for stacksize bytes.
func malg(stacksize int32) *g
addgoroutine
var (
		buf [256]byte
		n   = runtime.Stack(buf[:], false)
		stk = string(buf[:n])
	)
	println(stk)
addgoroutine
goroutine 5 [running]:
main.add(0xa, 0xc, 0x1068800, 0xc00001c0b8)
        main.go:11 +0x69

有栈协程的好处,由于栈帧可以直接完全保存运行期上下文(主要是寄存器值),因此可以在任何时刻暂停协程的运行,这就很方便地支持了抢占式的调度器。而无栈协程的上下文是一般通过类似结构体的方式保存在内存中,它依赖使用者显式地切换协程,否则协程不会主动让出执行权。

go

Rust无栈协程

既然已经有了有栈协程,那么无栈协程是否还有优势呢。答案肯定的!

通常,无栈协程在内存空间和协程上下文切换的效率更高。值得说明的是,无栈协程并不是说不需要运行时的栈空间,而是和协程的创建者共用同一块运行时的栈空间。

如果一定要用一句话概括无栈协程,那就是:无栈协程可以看做是有状态的函数(generator),每次执行时会根据当前的状态和输入参数,得到(generate)输出,但不一定为最终结果

Async-await

async fn
async fnFutureasync fnfuture.awaitasync fn
asyncawait
1.png

如果要搞懂异步API的实现,不断优化程序性能,那么理解Rust异步的实现机制,就是必不可少的了。

运行前准备

运行测试代码前,先添加如下依赖

[dependencies]
futures = "0.3"
tokio = { version = "0.3", features = ["full"] }

以及导入依赖模块

use futures::Future;
use std::pin::Pin;
use std::sync::{
    atomic::{AtomicBool, AtomicU64, Ordering::SeqCst},
    Arc,
};
use tokio::runtime::Runtime;
use futures::task::AtomicWaker;
use std::task::Context;
use std::task::Poll;
use std::{thread, time::Duration};
Test
struct Test {
    waker: Arc<AtomicWaker>, // 用来通知Executor执行poll
    result: AtomicU64,       // 用来暂存每次运行的中间结果
    signal: Arc<AtomicBool>, // 用来模拟事件消息
}

Future

Future
FuturepollFuturepoll
pollSelfpoll
ContextWakerWaker
pollPoll
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>
TestFuture
impl Future for Test {
    type Output = u64;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let result = self.result.load(SeqCst);
        self.waker.register(cx.waker());
        println!("poll called {}", result);
        self.result.store(result + 1, SeqCst);
        if self.signal.load(SeqCst) {
            println!("poll ready");
            Poll::Ready(result)
        } else {
            println!("poll not ready");
            Poll::Pending
        }
    }
}
poll
image.png
FutureWaker

Waker

FutureFutureWakerWakerFuture
fn main() {
    let a = AtomicU64::new(0);
    let b = Arc::new(AtomicBool::new(false));
    let waker = Arc::new(AtomicWaker::new());
    let waker_clone = waker.clone();
    let c = b.clone();
    thread::spawn(move || {
        let t = Test {
            waker: waker,
            result: a,
            signal: b,
        };
        let rt = Runtime::new().unwrap();
        rt.spawn(async {
            let result = t.await;
            println!("finally result is {}", result);
        });
        thread::sleep(Duration::from_secs(10));
        println!("thread runtime exited");
    });
    thread::sleep(Duration::from_secs(2));
    c.store(true, SeqCst);
    println!("notify to poll");
    waker_clone.wake();
    thread::sleep(Duration::from_secs(2));
}
FutureAtomicWakerregisterWaker
waker: Arc<AtomicWaker>
pollregisterWakerwaker
self.waker.register(cx.waker());
wakerExecutor
let waker_clone = waker.clone();
waker_clone.wake();
wakeExecutorRawWakerExecutor
// RawWakerVTable用来定义函数指针表的字段
clone: unsafe fn(*const ()) -> RawWaker
wake: unsafe fn(*const ())
wake_by_ref: unsafe fn(*const ())
drop: unsafe fn(*const ())
waketokio
RawWakerVTable::new(
    clone_arc_raw::<W>,
    wake_arc_raw::<W>,
    wake_by_ref_arc_raw::<W>,
    drop_arc_raw::<W>,
)
image.png
wakeExecutorExecutor
WakerExecutorExecutorExecutorTask IDWakerTask IDExecutorExecutorExecutorWakerWakerExecutor
tokio
//harness.rs
pub(super) fn wake_by_ref(&self) {
    if self.header().state.transition_to_notified() {
        self.core().schedule(Notified(self.to_task()));
    }
}
wakeSchedulerExecutor

Executor

FutureExecutor
Executor
RuntimeFutureFutureTask
// 向Runtime提交一个新的Future
let rt = Runtime::new().unwrap();
rt.spawn(async {
    let result = t.await;
    println!("finally result is {}", result);
});
// Runtime全局队列字段
inject: queue::Inject<Arc<Worker>>

然后,查看当前有几个空闲线程,找出一个空闲线程执行

// 查找空闲线程的过程
fn notify_parked(&self) {
    if let Some(index) = self.idle.worker_to_notify() {
        self.remotes[index].unpark.unpark();
    }
}
unparkparking_lot
fn unpark_condvar(&self) {
    drop(self.mutex.lock());
    self.condvar.notify_one()
}

线程启动后,从本地的工作队列获取任务,开始执行,如果本地工作队列没有任务,那么可以从其他线程的执行队列中偷取任务。

fn run( & self, mut core: Box < Core > ) - > RunResult {
    while !core.is_shutdown {
        // Increment the tick
        core.tick();

        // Run maintenance, if needed
        core = self.maintenance(core);

        // First, check work available to the current worker.
        if let Some(task) = core.next_task( & self.worker) {
            core = self.run_task(task, core) ? ;
            continue;
        }

        // There is no more **local** work to process, try to steal work
        // from other workers.
        if let Some(task) = core.steal_work( & self.worker) {
            core = self.run_task(task, core) ? ;
        } else {
            // Wait for work
            core = self.park(core);
        }
    }

    // Signal shutdown
    self.worker.shared.shutdown(core, self.worker.clone());
    Err(())
}
image.png
ExecutorTask

小结

go

Rust和Golang类似,也是一门非常年轻的语言,但它遵循的原则是zero-cost abstraction,目的是最终生成安全高效的程序。为了实现这个目的,Rust既有编译器的严格检查、无runtime、无GC的设计,又有对于标准库范围的严格限制。Rust并发编程的实现思路,也充分体现着上述的特点:

FutureExecutorReactorScheduler

Golang和Rust现在都在快速发展当中,它们虽然设计思想上大相径庭,但追求更易用、更安全、更高性能的初心,是不会变的。

参考资料