上一篇文章介绍了 Go 和 Erlang 在调度器的实现,本文将简要介绍这两种并发语言的消息传递机制

简要对比

Erlang和Go虽然在实现及功能上差异较大,但是都支持高并发的轻量级用户任务(Erlang的轻量进程,Go的Goroutine), 并且都采用了消息传递的方式作为任务间交互的方式。

在Erlang中,采用了一种比较纯粹的消息传递机制,进程间几乎没有任何形式的数据共享,只能通过彼此间发送消息进行通信; 而Go虽然是基于共享内存的,但是也必须通过消息传递来进行共享数据的同步。 可以说消息传递机制是两种语言任务间交互的首要方式。

但是在具体实现中,鉴于两种语言的差异,也表现为不同的形式:

  • 在Erlang中,进程之间以彼此的Pid作为标识进行消息的发送,一切数据都仅可以消息的形式在进程间复制
  • 在Go中,不同的Goroutine间通过共享的channel进行通信,由于Go本质上是建立在共享存储模型上的,因此全局变量、参数甚至是一部分栈变量都是可以共享的,通信一般控制在较小的规模,仅用来保证共享的同步语义

下面将分别就Erlang和Go各自实现分别进行分析介绍。

Erlang中的消息传递机制

语法

消息传递是Erlang语言并发机制的基础。在Erlang中,主要包含以下并发原语:

  • 创建一个新的并发轻量进程,用于执行函数Fun,并返回其进程标识符Pid:
Pid = spawn(Fun)
Pid ! MPid1 ! Pid2 ! Pid3 ! … M
Pid  ! Message
receive ... endPattern1Guard1Pattern2Guard2Expressions
receive
  Pattern1 [when Guard1] ->
      Expressions1;
  Pattern2 p[when Guard2] ->
      Expressions2;
  ... ...
end
  • 对于接收操作,我们还可以为其设置一个超时控制,一旦超过某个预设的时长仍没有消息到达,则执行相应的超时操作,语法如下:
receive
  Pattern1 [when Guard1] ->
      Expressions1;
  Pattern2 p[when Guard2] ->
      Expressions2;
        ... ...
after Time ->
  ExpressionTimeout
end
  • 可以建立一个只包含超时的接收操作,事实上就相当于一个延时操作:
sleep(T) ->
  receive
      after T ->
          true
  end

以上就是Erlang中基本的消息传递的接口。

内部实现

相对于Go而言,Erlang的发展历史更加悠久,因此代码复杂程度要大大高于Go这门新型语言。 因此在分析过程中,主要还是以介绍实现机制为主,具体的数据结构及源代码实现就不作过分细致的剖析了。

otp/erts/emulator/beam/otp/erts/emulator/beam/erl_message.cotp/erts/emulator/beam/beam_emu.c

之所以不在一处实现,是因为Erlang把接收操作作为一种BEAM基本指令来实现,而发送操作则以内部函数的方式实现。

在具体实现中,Erlang采用了消息Copy的方法实现消息传递——消息从发送进程的堆拷贝到接收进程的堆:

  • 在发送时,如果接收进程正在另一个调度器上执行,或者有其他并发的进程也在向其发送消息时,本次发送就存在竞争风险,不能直接完成
  • 发送者会在堆上为接收进程分配一个临时区域并将消息拷贝到该区(该内存区将在后续进行垃圾收集时合并到接收进程的堆空间)
  • 拷贝完成后,会将指向这块新区域的指针链入接收进程的消息队列
  • 如果接收进程正处于阻塞态,则将其唤醒并添加到就绪队列中

在SMP版Erlang虚拟机中,进程的消息队列由两部分组成—— 一个公共队列和一个私有队列。 公共队列用于接收其他进程发送的消息,通过互斥锁加以保护;私有队列用来减少对锁的争用: 接收进程首先在接收队列中查找符合匹配的消息,如果没有,再从公共队列中复制消息到私有队列中。

(在非SMP的Erlang虚拟机中,进程只有一个消息队列。)

接收进程发现当前任务队列上没有匹配的消息后,会跳转执行wait_timeout: 设置一个定时器并阻塞在该定时器上—— 当该定时器触发或者有新消息到来时,都会唤醒接收进程。

由于进程的消息缓冲是以队列形式维护的,因此从发送进程角度来看,可以认为消息缓冲的大小是无限的, 因此Send操作一般不会阻塞,这点注意与后面要将的Go消息传递机制相区别。 这也就是为什么Erlang中仅对Receive操作提供超时响应机制的原因了!

高级特性

与Go主要面向SMP服务器应用不同,Erlang是一种面向分布式集群环境的编程语言, 因此消息传递除了支持本地进程间的通信外,还支持分布式环境的进程间通信。

基本流程是:

  • 首先通过Pid(或在分布式环境上的等价概念)查询“名字服务”
  • 找到该远程进程所在的远程主机地址信息,进而通过套接字进行后续发送操作
  • 远程Erlang虚拟机进程接收到消息,再进一步分析,将其派发到指定的进程消息队列中

进一步的实现细节会涉及Erlang分布式处理的机制,这里就不展开分析了,待后续单开主题讨论。

Go中channel机制介绍

语法

在Go中,channel结构是Goroutine间消息传递的基础,属于基本类型,在runtime库中以C语言实现。 Go中针对channel的操作主要包括以下几种:

ch = make(chan int, N)ch <- ll <- chclose(ch)

另外,还可以通过select语句同时在多个channel上进行收发操作,语法如下:

select {
  case ch01 <- x:
      ... ... /* do something ... */
  case y <- ch02:
      ... ... /* do something ... */
  default:
      ... ... /* no event ... */
}

此外,基于select的操作tai还支持超时控制,具体的语法示例如下:

select {
  case v := <- ch:
      ... ...
  case <- time.After(5 * time.Second):
      ... ...
}

尽管Go以消息传递作为Goroutine间交互的主要方式,但是基于channel的通信又必须依赖channel引用的共享才能得以实现, 因此Go语言绝不是一种纯粹的消息传递语言。 一般而言,channel的引用可以通过以下几种方式在不同Goroutine间共享:

  • “父Goroutine” 的栈变量,通过用Go语句创建Goroutine时的参数进行传递
ch = make (chan int)
go fn (ch)
l <- ch
  • “父Goroutine” 的栈变量,Go创建的Goroutine以闭包作为执行函数,栈变量自动共享
ch = make (chan int)
go func () { i = 1; chan <- i; } ()
x <- chan
  • 由于Go的垃圾收集器认为channel的引用只能在栈上,因此一般不用全局的引用进行共享

另外,在创建channel时,还可以指定是否采用buffer及buffer的大小,默认buffer为0 。 当buffer大于0时,可以进行异步的消息传递:接收方只有在当前buffer为空时才阻塞,而发送方则只有在buffer满时才阻塞。 详细情形将在后面介绍。

内部实现

核心数据结构

src/pkg/runtime/chan.c
struct Hchan
{
  uintgo   count;        // total data in q
  uintgo   dataqsize;    // size of the circular q
  uint16   elemsize;
  uint16   pad;      // ensures proper alignment of the buffer that follows Hchan in memory
  bool    closed;
  Alg*  elemalg;  // interface for element type
  uintgo   sends;        // send index
  uintgo   recvx;        // receive index
  WaitQ    recvq;        // list of recv waiters
  WaitQ    sendq;        // list of send waiters
  Lock;
}

我们可以按照表示属性还是表示状态将Hchan的内部成员进行分类并逐一分析。

表示channel属性的成员如下,这些成员在用make进行初始化后确定,并且在后续操作中不会变化:

struct Hchan
{
  ... ...
  uintgo   dataqsize;
  uint16   elemsize;
  uint16   pad;
  Alg*  elemalg;
  ... ...
}

其中,

dataqsizemake(chan T, N)elemsizepadelemalg

另外一类成员则用来表示当前channel的状态,是随着程序运行而发生变化的:

struct Hchan
{
  ... ...
  bool    closed;

  uintgo   count;
  uintgo   sendx;
  uintgo   recvx;

  WaitQ    recvq;
  WaitQ    sendq;
  Lock;
  ... ...
}

我们按功能将状态信息分为三类:

closedcountsendxrecvxcountsendxrecvxcountdataqsizechanbuf(c, i)

Send/Recv实现分析

我们以Send操作为例,介绍在单个channel上的具体流程,基于select语句的多channel操作将在下一节进行介绍。

首先,我们来看Send操作在Go runtime中的接口定义:

void runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc);
ChanType *tHchan * cbyte *epbool *pres* pres = falsevoid * pc
SudoGWaitQ
struct SudoG
{
  G*        g;            // g and selgen constitute
  uint32   selgen;       // a weak pointer to g  
  SudoG*    link;
  int64    releasetime;
  byte* elem;     // data elment
}
gelemlinkreleasetimeselgen

下面具体分析该函数流程:

c->recvqSudoGSudoGelemSudoGreleasetimeruntime·readyNILpresboolfalseSudoGgepSudoGc->sendqruntime·parkgreleasetime

对于带buffer的channel,将进入async模式进行处理:

  1. 首先判断当前缓冲区是否已满,若满,则阻塞,过程与上面不带buffer的情况类似
  2. 若缓冲区不满,则将ep指针指向的区域按c元素大小复制到环形缓冲区,并修改缓冲区内部状态
dequeue(WaitQ *)enqueue(WaitQ *)

对于Recv操作,其步骤基本上与Send类似,仅仅是操作的队列和copy的方向有别,这里不再赘述。

为了更好的理解上述操作,我做了一组简单的示意图,以“异步channel”(即带缓冲区的channel)为例,来描述这一过程:

Go#1#1#1Go#1

Go#4#4

Go#5

Go#8Go#4#4Go#4

Select操作

介绍

Go在语言级提供了一个select语句用来支持同时在多个channel上进行操作的功能,很类似于Unix系统上的select系统调用,只不过后者操作的类型时文件描述符而已。

与Unix的select操作类似,任意一个注册的channel操作返回则select操作亦返回。 Unix的select通过逐一测试文件描述符集合判断是当前操作,而Go则通过不同的case语句指示当前响应的channel操作。

两外,二者也都支持超时控制。但是Unix的超时控制是建立在分时调度机制上的,更加可靠; 而Go的调度器如前所述,是协作式的,因此超时控制不可能精确的实现。

主要数据结构

ScaseSelect
Scase
struct Scase
{
  SudoG    sg;           // must be first member (cast to Scase)
  Hchan*    chan;     // chan
  byte* pc;           // return pc
  uint16   kind;
  uint16   so;           // vararg of selected bool
  bool*    receivedp;    // pointer to received bool 
}
ScaseSudoGsgchanpckindCaseRecvCaseSendCaseDefaultrecievedpsoSelect
struct Select
{
  uint16   tease;        // total count of scare[]
  uint16   ncase;        // currently filled scare[]
  uint16*   pollorder;    // case poll order
  Hcan**    lockorder;    // channel lock order
  Scase    scase[1];  // one per case (in order of appearance )
}
SelectteaseSelectncasetcasepollorderlockorderscase[1]ScaseSelect

代码生成

我们知道,Go的parser遇到select语句时会将其翻译成对应的runtime库函数调用以实现其功能,那么这种映射是如何实现的呢?

void runtime·newselect(int size, …)SelectSelectScasevoid runtime·selectsend(Select *sel, Hchan *c, void *elem, bool selected)void runtime·selectrecv(Select *sel, Hchan *c, void *elem, bool selected)void runtime·selectdefault*(Select *sel, bool selected)void * runtime·selectgo(Select ** selp)

流程分析完了,是否觉得有点奇怪?问题出在哪儿?

对了,就是在selectgo返回时,程序如何判断目前是在注册阶段Scase阶段返回的,还是从selectgo返回的呢?

runtime·selectsend
selectsend(del *byte, hchan chan<- any, elem *any) (selected bool);
selectsendselectgoScaseso
selectgoScase

下面列出一个Go中的select语句翻译为C语言后的形式,作为参考 (事实上Go编译器会直接将Go代码翻译成汇编语言,这里用C描述主要是为了简化描述):

// Go 语言版代码片段
... ...
select {
  case ch01 <- x:
      foo() /* do something of case #1 */
  case y <- ch02:
      bar() /* do something of case #2 */
  default:
      def() /* do something for default  */
}
... ...
// 对应C代码片段示意
Select *sel = newselect(3);
... ...
if (selectsend(sel, ch01, &x)) {
  foo() ; // do something of case #1 
} else if (selectrecv(sel, ch02, &y)) {
  bar() ; // do something of case #2
} else if (selectdefault(sel)) {
  def(); // do something of default
} else {
  void (*rpc) (void);
  rpc = selectgo (&sel);
  (*rpc)(); //goto rpc;
}
... ...

算法实现

runtime·selectgo

应该说该函数实际上类似于上节介绍的Send、Recv操作在多channel情况下的扩展,基本流程也比较类似。 但是需要特别注意以下几点:

gg
selectgo
runtime·parkScase

整个过程大体如下图所示:

selectgoSudoG
dequeuedequeuedequeue
SudoGselgenGselgenselgenNOSELGENdequeueGselgenSudoGselgendequeueSudoGselgenNOSELGEN
SudoG *sgp = q->first;
... ...
if ( CAS(&sgp->g->selgen, sgp->g->selgen, sgp->selgen+2) )
  return sgp;
else
... ...

由 CAS 原子操作定义可知,该阻塞的 goroutine 尽可能被唤醒一次。

初窥“超时控制”机制

这里我们先简单介绍一下Go语言select中的超时控制机制,实际上,在Go中,定时控制并非为select语句所独有,而是一种通用的机制。 在这里我们仅针对select的例子简单的介绍一下Go的定时控制机制,后续有空时再作深入分析。

回到开始时那个超时控制的例子:

... ...
select {
  case v := <- ch:
            ... ...
  case <- time.After(5 * time.Second):
            ... ...
}
... ...

在第二个case语句中,我们调用了time.After()函数定义了一个5秒钟的定时器,我们来看看这个函数的原型定义:

fund After(d Duration) <- chan Time {
  return NewTimer(d).C
}

这个函数实际上返回一个channel类型,并在定义的时间到时,向该channel发送一个Time型数据。

了解了这些,上面的例子就被统一起来了,每个case仍然是针对channel的,如果ch在5秒内响应,则执行ch对应case的语句; 否则第二个case的channel响应,也就进入了超时的处理过程中。

time.Timertime.TickerruntimeTimerruntimeTimer
src/pkg/runtime/time.goc

总结

通过上面分析,我们不难得出两种语言在消息传递上的区别与共性, 我总结了以下几点,请大家补充、指正。

区别:

  • Go基于channel进行通信,而channel引用必须在任务间共享访问; Erlang利用Pid进行发送,而接收时仅根据消息内容区别后续处理的方式,过程中没有任何形式的数据共享

  • Go的Send/Recv操作都可能阻塞; Erlang一般仅有接收操作可能引起阻塞

  • Go的select可以同时等待多channel上的Send/Recv操作,并提供超时处理机制; Erlang的发送操作仅能每次针对一个进程,接收操作针对本进程的消息队列,也提供超时机制

  • Go的消息传递仅支持本机Goroutine间通信; Erlang消息传递支持在分布式环境上的进程间通信

共性:

  • Go和Erlang的消息传递都提供了语法级支持,并且都是语言的重要组成部分
  • Go和Erlang的消息传递都是通过数据的Copy实现的,因此都强调“小消息,大计算”的处理方式

结尾的思考

Go语言的并发模型源自Hoare的CSP模型(Communicating Sequential Processes, 国内译为“通信顺序进程”,台湾译为“交谈循序程序”),可以被视为一种类似 Unix 管道的东西。 它的特点是每个任务程序的编写完全按照其执行的时序进行,方便编程及调试分析。

与之相对的另一种并发编程模式就是基于异步消息及回调的模型,比如 libev 以及前阶段非常火的 Node.js, 这类模式的特点是调度以回调函数为单位,所有的消息发送都是异步的,程序员看到的就是消息类型及其对应的处理函数。

我认为两种方式各有优缺点,Go的方案更符合人类思考问题的习惯,编程和调试效率较高;但由于需要保证高并发性,就要实现用户任务层的调度,例如采用协程(如Go、rust等),或是采用基于虚拟机的方案(如 Erlang、Lua等),这无疑增加了上下文切换的开销,同时也增加了runtime或虚拟机的实现难度。

基于“异步消息/回调函数”的方案恰好相反,由于调度粒度变小为函数,导致实现上比较简单,顺序编程的固有性质使得系统开发者不需要考虑保护上下文或者类似的东西,每个回调就是依次被取出、派发、执行、返回,因此后端实现相对简单;反之,应用程序员就要小心的设计程序,考虑很多诸如功能划分、线程安全之类的细节,同时也给分析及调试程序带来了很大的问题。

参考资料

除了参考Go及Erlang的代码外,本文还参考了以下文献: