Golang中channel的优雅设计
1. 前言
这篇文章将会介绍Golang并发编程CSP模型中最核心的数据结构及其实现,即channel。若你对协程/线程以及多线程编程没有任何了解,则不建议阅读本篇文章。
2. 简介
Channel在Golang中常被用为Goroutine间的通信工具,当然Golang也能通过共享内存加互斥锁的方式来通信。但是在Go中常提及的设计模式是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。这句话有点绕,让我们来看一个例子,这个例子也经常会被用来考量一个Golang工程师对协程间通信的数量度。
如何使用两个协程有序打印0-9,即“0123456789”?想一想,然后我们直接看代码。
const (
LOOPS = 10
)
func main() {
chan1 := make(chan int, 1)
chan2 := make(chan int, 1)
chan3 := make(chan struct{})
chan1 <- 1
//打印协程A
go func() {
for i := 0; i < LOOPS; i += 2 {
<- chan1
fmt.Println(i)
chan2 <- 1
}
}()
//打印协程B
go func() {
for i := 1; i < LOOPS; i += 2 {
<-chan2
fmt.Println(i)
chan1 <- 1
}
chan3 <- struct{}{}
}()
<-chan3
}
这段代码使用了三个channel:
- chan1和chan2,用来控制交替打印的顺序
- chan3,阻塞主协程,等待两个打印协程完成工作
以上就是channel的一个用法,其中设计了多协程以及协程中的通信,接下来我们看看channel的数据结构是怎样的。
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
- qcount —— channel中存在的元素个数
- dataqsiz —— 循环队列的大小
- buf —— 实际存放元素的数组指针
- closed —— channel关闭标志,1为关闭
- sendx —— channel发送处理到的位置
- recvx —— channel接受处理到的位置
- recvq,sendq —— 阻塞在此channel的goroutine
3. 创建Channel
在Go中通常使用make来创建一个管道,如代码所示:
chan1 := make(chan int) //无缓冲
chan2 := make(chan int, 10) //有缓冲
在编译期会被转化为runtime.makechan, 缓冲大小将作为参数传入,若是无缓冲,则参数中的size为0 :
func makechan(t *chantype, size int) *hchan {
elem := t.elem
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
makechan的逻辑也比较直白: 1. 计算所需的缓冲区内存空间mem,判断是否溢出 2. 如果缓冲区大小为0,则只为hchan分配大小为hchanSize的空间,hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1));如果缓冲区不为0,且元素类型不为指针,则分配一段连续的空间;否则分别为缓冲区和hchan分配空间。 3. 设置hchan的其他参数。
4. 向channel发送数据
chan <-
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
chansend1中调用了chansend函数,我们分成几段来看:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
}
chansend参数中c为待操作channel,ep为待插入元素,block用来表示此操作是否允许阻塞,若不允许阻塞,则一旦不满足插入条件则立马返回false,例如缓冲区已满。这段代码中,对于关闭的channel,插入时会直接panic退出。
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
若在recvq队列中有等待接收的goroutine,则使用send函数将元素直接发送至等待goroutine中,例如我们在开头给出的示例,则是通过这种方式。send函数如下:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
在send中,会使用sendDirect将元素ep直接拷贝至等待队列sq的插入位置,并且使用goready将goroutine标记为runnable可运行状态,加入调度。回到chansend:
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
当recvq中没有等待接收的goroutine且缓冲区未填满时,通过chanbuf计算出下一个接收位置qp,并将元素拷贝至qp,同时调整sendx以及qcount。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
// 被唤醒
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
当recvq没有goroutine等待接收且缓冲区已填满时,send操作进入阻塞过程: 1. 使用getg()获得当前goroutine 2. 使用acquireSudog()获得sudog结构,并且填充相关字段,例如插入元素,当前goroutine,channel等。 3. 将2中的sudog插入等待接收队列,并使用gopark()将此goroutine改为沉睡,等待唤醒。 4. 当goroutine被唤醒后,清除之前阻塞设置的状态。
到此,channel就完成了一次插入操作,让我们总结一下send操作可能进入分支: 1. 由于channel关闭,或者block字段导致send操作直接失败。 2. 当recvq存在等待接收的goroutine,则直接发送至此goroutine并且唤醒之。 3. recvq不存在值且缓冲区未填满,则插入缓冲区buf。 4. recvq不存在且缓冲区被填满,则插入sendq并且阻塞当前goroutine。
5. 从channel接收数据
在Go中可以通过两种方式接收数据:
value <- ch
value, ok <- ch
这两种方法分别被转换成runtime.chanrecv1和runtime.chanrecv2,而chanrecv1和chanrecv2最终都是调用runtime.chanrecv。在实现上,chanrecv与chansend非常相似,先判断有无等待中的goroutine,再判断缓存是否被填满,由此决定chanrecv的操作: 1. 从等待发送的Goroutine上接收数据; 2. 从缓冲区接收数据; 3. 等待其他Goroutine向channel发送数据;
从等待发送的Goroutine上接收数据:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
value, ok <- ch_, ok <- ch
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx
}
...
goready(gp, skip+1)
}
根据缓冲区的情况做不同的处理: - 缓冲区大小为0: 通过recvDirect直接将元素拷贝至目的地址。 - 缓冲区大小不为0:将发送队列头的数据拷贝至发送方,并且将发送方加入调度。
从缓冲区接收数据:
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
计算出接收位置qp,若ep不为空则拷贝一份至ep。紧接着清空qp位置内容,修改recvx指向位置。
等待其他Goroutine向channel发送数据:
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
若以上条件都不满足,则将此goroutine加入等待接收队列并且阻塞,等待唤醒。
可以看到,chanrecv的逻辑与chansend有一定的相似性,理解了chansend之后,chanrecv就变得非常容易阅读。
6. 关闭channel
在Go中可以通过close来关闭channel,虽然通常我们不建议这么做。在close channel之后,会唤醒并释放所有的等待接收者和等待发送者。