本文主要分析golang实现并发基础组件channel的实现原理;
主要内容分为几个部分
Section1:channel使用实例分析
Section2:源码分析

Section1 channel使用实例

channel主要是为了实现go的并发特性,用于并发通信的,也就是在不同的协程单元goroutine之间同步通信。

下面主要从三个方面来讲解:

  • make channel,主要也就是hchan的数据结构原型;
  • 发送和接收数据时,goroutine会怎么调度;
  • 设计思考;

1.1 make channel

我们创建channel时候有两种,一种是带缓冲的channel一种是不带缓冲的channel。创建方式分别如下:

// buffered
ch := make(chan Task, 3)
// unbuffered
ch := make(chan int)

buffered channel

如果我们创建一个带buffer的channel,底层的数据模型如下图:
在这里插入图片描述
当我们向channel里面写入数据时候,会直接把数据存入circular queue(send)。当Queue存满了之后就会是如下的状态:
在这里插入图片描述

当dequeue一个元素时候,如下所示:
在这里插入图片描述
从上图可知,recvx自增加一,表示出队了一个元素,其实也就是循环数组实现FIFO语义。

那么还有一个问题,当我们新建channel的时候,底层创建的hchan数据结构是在哪里分配内存的呢?其实Section2里面源码分析时候已经做了分析,hchan是在heap里面分配的。

如下图所示:
在这里插入图片描述
当我们使用make去创建一个channel的时候,实际上返回的是一个指向channel的pointer,所以我们能够在不同的function之间直接传递channel对象,而不用通过指向channel的指针。

1.2 sends and receives

不同goroutine在channel上面进行读写时,涉及到的过程比较复杂,比如下图:
在这里插入图片描述
上图中G1会往channel里面写入数据,G2会从channel里面读取数据。

G1作用于底层hchan的流程如下图:
在这里插入图片描述

  1. 先获取全局锁;
  2. 然后enqueue元素(通过移动拷贝的方式);
  3. 释放锁;

G2读取时候作用于底层数据结构流程如下图所示:
在这里插入图片描述

  1. 先获取全局锁;
  2. 然后dequeue元素(通过移动拷贝的方式);
  3. 释放锁;

上面的读写思路其实很简单,除了hchan数据结构外,不要通过共享内存去通信;而是通过通信(复制)实现共享内存。

写入满channel的场景

如下图所示:channel写入3个task之后队列已经满了,这时候G1再写入第四个task的时候会发生什么呢?
在这里插入图片描述
G1这时候会暂停直到出现一个receiver。

这个地方需要介绍一下Golang的scheduler的。我们知道goroutine是用户空间的线程,创建和管理协程都是通过Go的runtime,而不是通过OS的thread。

但是Go的runtime调度执行goroutine却是基于OS thread的。如下图:
在这里插入图片描述

具体关于golang的scheduler的原理,可以看前面的一篇博客,关于go的scheduler原理分析。

当向已经满的channel里面写入数据时候,会发生什么呢?如下图:
在这里插入图片描述

上图流程大概如下:

  1. 当前goroutine(G1)会调用gopark函数,将当前协程置为waiting状态;
  2. 将M和G1绑定关系断开;
  3. scheduler会调度另外一个就绪态的goroutine与M建立绑定关系,然后M 会运行另外一个G。

所以整个过程中,OS thread会一直处于运行状态,不会因为协程G1的阻塞而阻塞。最后当前的G1的引用会存入channel的sender队列(队列元素是持有G1的sudog)。

那么blocked的G1怎么恢复呢?当有一个receiver接收channel数据的时候,会恢复 G1。

ch<-task4

这个时候,如果G1进行一个读取channel操作,读取前和读取后的变化图如下图:

在这里插入图片描述

整个过程如下:

t:=<-ch

这个时候将G1恢复到可运行状态需要scheduler的参与。G2会调用goready(G1)来唤醒G1。流程如下图所示:
在这里插入图片描述

  1. 首先G2会调用goready(G1),唤起scheduler的调度;
  2. 将G1设置成Runable状态;
  3. G1会加入到局部调度器P的local queue队列,等待运行。

读取空channel的场景

当channel的buffer里面为空时,这时候如果G2首先发起了读取操作。如下图:
在这里插入图片描述
会创建一个sudog,将代表G2的sudog存入recvq等待队列。然后G2会调用gopark函数进入等待状态,让出OS thread,然后G2进入阻塞态。

这个时候,如果有一个G1执行读取操作,最直观的流程就是:

  1. 将recvq中的task存入buffer;
  2. goready(G2) 唤醒G2;

但是我们有更加智能的方法:direct send; 其实也就是G1直接把数据写入到G2中的elem中,这样就不用走G2中的elem复制到buffer中,再从buffer复制给G1。如下图:
在这里插入图片描述

具体过程就是G1直接把数据写入到G2的栈中。这样 G2 不需要去获取channel的全局锁和操作缓冲。

1.3 channel主要特性

(1)goroutine-safe
hchan mutex

(2)store values, pass in FIFO.
copying into and out of hchan buffer

(3)can cause goroutines to pause and resume.
a)hchan sudog queues
b)calls into the runtime scheduler (gopark, goready)

(4)channel的高性能所在:
a)调用runtime scheduler实现,OS thread不需要阻塞;
b)跨goroutine栈可以直接进行读写;

Section2 源码分析

2.1 channel数据存储结构

runtime/chan.go
type hchan struct {
	//channel队列里面总的数据量
	qcount   uint           // total data in the queue
	// 循环队列的容量,如果是非缓冲的channel就是0
	dataqsiz uint           // size of the circular queue
	// 缓冲队列,数组类型。
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	// 元素占用字节的size
	elemsize uint16
	// 当前队列关闭标志位,非零表示关闭
	closed   uint32
	// 队列里面元素类型
	elemtype *_type // element type
	// 队列send索引
	sendx    uint   // send index
	// 队列索引
	recvx    uint   // receive index
	// 等待channel的G队列。
	recvq    waitq  // list of recv waiters
	// 向channel发送数据的G队列。
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	// 全局锁
	lock mutex
}

channel的数据结构相对比较简单,主要是两个结构:
1)一个数组实现的环形队列,数组有两个下标索引分别表示读写的索引,用于保存channel缓冲区数据。
2)channel的send和recv队列,队列里面都是持有goroutine的sudog元素,队列都是双链表实现的。
3)channel的全局锁。

2.2 make channel

make(chan, n)
func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

	if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > maxAlloc-hchanSize {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case size == 0 || elem.size == 0:
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = unsafe.Pointer(c)
	case elem.kind&kindNoPointers != 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
	}
	return c
}

函数接收两个参数,一个是channel里面保存的元素的数据类型,一个是缓冲的容量(如果为0表示是非缓冲buffer),创建流程如下:

  • 根据传递的缓冲大小size是否为零,分别创建不带buffer的channel或则带size大小的缓冲channel:
    • 对于不带缓冲channel,申请一个hchan数据结构的内存大小;
    • 对于带缓冲channel,new一个hchan对象,并初始化buffer内存;
  • 更新 chan中循环队列的关键属性:elemsize、elemtype、dataqsiz。

所以,整个创建channel的过程还是比较简单的。

2.3 协程从channel接收数据(goroutine receive data)

ep < cchanrecv函数

函数的定义如下:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
......
}

从源码注释就可以知道,该函数从channel里面接收数据,然后将接收到的数据写入到ep指针指向的对象里面。

还有一个参数block,表示当channel无法返回数据时是否阻塞等待。当block=false并且channel里面没有数据时候,函数直接返回(false,false)。

接收channel的数据的流程如下:

lock(&c.lock)func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int)

2.4 协程向channel写入数据(goroutine sender data)

c < epchansend函数

函数的定义如下:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......
}

函数有三个参数,第一个代表channel的数据结构,第二个是要指向写入的数据的指针,第三个block代表写入操作是否阻塞。

向channel写入数据主要流程如下:

lock(&c.lock)

2.5 channel close关闭channel源码分析

当我们执行channel的close操作的时候会关闭channel。

关闭的主要流程如下所示:

  • 获取全局锁;
  • 设置channel数据结构chan的关闭标志位;
  • 获取当前channel上面的读goroutine并链接成链表;
  • 获取当前channel上面的写goroutine然后拼接到前面的读链表后面;
  • 释放全局锁;
  • 唤醒所有的读写goroutine。