channel是用于 goroutine 之间的同步、通信的数据结构。它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全。本文通过示例为大家详细介绍了channel的应用,需要的可以参考一下
目录
- 前言
- 整体结构
- 创建
- 发送
- 接收
- 关闭
前言
channel是用于 goroutine 之间的同步、通信的数据结构
channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全,得以让程序员把注意力留到业务上去,提升开发效率
channel的用途包括但不限于以下几点:
- 协程间通信,同步
- 定时任务:和timer结合
- 解耦生产方和消费方,实现阻塞队列
- 控制并发数
本文将介绍channel的底层原理,包括数据结构,channel的创建,发送,接收,关闭的实现逻辑
整体结构
Go channel的数据结构如下所示:
type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters lock mutex }
qcount:已经存储了多少个元素
dataqsie:最多存储多少个元素,即缓冲区容量
buf:指向缓冲区的位置,实际上是一个数组
elemsize:每个元素占多大空间
closed:channel能够关闭,这里记录其关闭状态
elemtype:保存数据的类型信息,用于go运行时使用
sendx,recvx:
- 记录下一个要发送到的位置,下一次从哪里还是接收
- 这里用数组模拟队列,这两个变量即表示队列的队头,队尾
- 因此channel的缓冲也被称为环形缓冲区
recvq,sendq:
当发送个接收不能立即完成时,需要让协程在channel上等待,所以有两个等待队列,分别针对接收和发送
lock:channel支持协程间并发访问,因此需要一把锁来保护
创建
makechan
// 无缓冲通道 ch1 := make(chan int) // 有缓冲通道 ch2 := make(chan int, 10)
会根据创建的是带缓存,还是无缓冲,决定第二个参数size的值
可以看出,创建出来的是hchan指针,这样就能在函数间直接传递 channel,而不用传递 channel 的指针
func makechan(t *chantype, size int) *hchan { elem := t.elem // mem:缓冲区大小 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError( "makechan: size out of range" )) } var c *hchan switch { // 缓冲区大小为空,只申请hchanSize大小的内存 case mem == 0: c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = c.raceaddr() // 元素类型不包含指针,一次性分配hchanSize+mem大小的内存 case elem.ptrdata == 0: c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) // 否则就是带缓存,且有指针,分配两次内存 default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } // 保存元素类型,元素大小,容量 c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) return c }
发送
执行以下代码时:
ch <- 3
编译器会转化为对chansend的调用
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 如果channel是空 if c == nil { // 非阻塞,直接返回 if !block { return false } // 否则阻塞当前协程 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw( "unreachable" ) } // 非阻塞,没有关闭,且容量满了,无法发送,直接返回 if !block && c.closed == 0 && full(c) { return false } // 加锁 lock(&c.lock) // 如果已经关闭,无法发送,直接panic if c.closed != 0 { unlock(&c.lock) panic(plainError( "send on closed channel" )) } // 从接收队列弹出一个协程的包装结构sudog if sg := c.recvq.dequeue(); sg != nil { // 如果能弹出,即有等到接收的协程,说明: // 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待 // 将要发送的数据拷贝到该协程的接收指针上 send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 缓冲区还有空间 if c.qcount < c.dataqsiz { // qp:计算要发送到的位置的地址 qp := chanbuf(c, c.sendx) // 将数据从ep拷贝到qp typedmemmove(c.elemtype, qp, ep) // 待发送位置移动 c.sendx++ // 由于是数组模拟队列,sendx到顶了需要归零 if c.sendx == c.dataqsiz { c.sendx = 0 } // 缓冲区数量++ c.qcount++ unlock(&c.lock) return true } // 往下就是缓冲区无数据,也没有等到接收协程的情况了 // 如果是非阻塞模式,直接返回 if !block { unlock(&c.lock) return false } // 将当前协程包装成sudog,阻塞到channel上 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // 当前协程进入发送等待队列 c.sendq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 被唤醒后从这里开始执行 KeepAlive(ep) if mysg != gp.waiting { throw( "G waiting list is corrupted" ) } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) // 被唤醒后发现channel关闭了,panic if closed { if c.closed == 0 { throw( "chansend: spurious wakeup" ) } panic(plainError( "send on closed channel" )) } return true }
整体流程为:
如果当前操作为非阻塞,channel没有关闭,且容量满了,无法发送,直接返回
从接收队列弹出一个协程的包装结构sudog,如果能弹出,即有等到接收的协程,说明:
- 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待
- 将要发送的数据拷贝到该协程的接收指针上,返回
- 这里直接从发送者拷贝到接收者的内存,而不是先把数据拷贝到缓冲区,再从缓冲区拷贝到接收者,节约了一次内存拷贝
否则看看缓冲区还有空间,如果有,将数据拷贝到缓冲区上,也返回
接下来就是既没有接收者等待,缓冲区也为空的情况,就需要将当前协程包装成sudog,阻塞到channel上
将协程阻塞到channel的等待队列时,将其包装成了sudog结构:
type sudog struct { // 协程 g *g // 前一个,后一个指针 next *sudog prev *sudog // 等到发送的数据在哪,等待从哪个位置接收数据 elem unsafe.Pointer acquiretime int64 releasetime int64 ticket uint32 isSelect bool success bool parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot // 在哪个channel上等待 c *hchan // channel }
其目的是:
- g本身没有存储前一个,后一个指针,需要用sudog结构包装才能加入队列
- elem字段存储等到发送的数据在哪,等待从哪个位置接收数据,用于从数据能从协程到协程的直接拷贝
来看看一些子函数:
1.判断channel是否是满的
func full(c *hchan) bool { // 无缓冲 if c.dataqsiz == 0 { // 并且没有其他协程在等待 return c.recvq.first == nil } // 有缓冲,但容量装满了 return c.qcount == c.dataqsiz }
2.send方法:
/** c:要操作的channel sg:弹出的接收者协程 ep:要发送的数据在的位置 */ func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 如果接收者指针不为空,直接把数据从ep拷贝到sg.elem if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 唤醒该接收者协程 goready(gp, skip+1) }
接收
从channel中接收数据有几种写法:
- 带不带ok
- 接不接收返回值
根据带不带ok,决定用下面哪个方法
func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return }
根据接不接收返回值,决定elem是不是nil
最终都会调用chanrecv方法:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // 如果channel为nil,根据参数中是否阻塞来决定是否阻塞 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw( "unreachable" ) } // 非阻塞,并且channel为空 if !block && empty(c) { // 如果还没关闭,直接返回 if atomic.Load(&c.closed) == 0 { return } // 否则已经关闭, // 如果为空,返回该类型的零值 if empty(c) { if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } lock(&c.lock) // 同样,如果channel已经关闭,且缓冲区没有元素,返回该类型零值 if c.closed != 0 && c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } // 如果有发送者正在阻塞,说明: // 1.无缓冲 // 2.有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待 if sg := c.sendq.dequeue(); sg != nil { // 将数据从缓冲区拷贝到ep,再将sg的数据拷贝到缓冲区,该函数详细流程可看下文 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // 如果缓存区有数据, if c.qcount > 0 { // qp为缓冲区中下一次接收的位置 qp := chanbuf(c, c.recvx) // 将数据从qp拷贝到ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } // 接下来就是既没有发送者在等待,也缓冲区也没数据 if !block { unlock(&c.lock) return false, false } // 将当前协程包装成sudog,阻塞到channel中 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // 记录接收地址 mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 从这里唤醒 if mysg != gp.waiting { throw( "G waiting list is corrupted" ) } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success }
接收流程如为:
如果channel为nil,根据参数中是否阻塞来决定是否阻塞
如果channel已经关闭,且缓冲区没有元素,返回该类型零值
如果有发送者正在阻塞,说明:
- 要么是无缓冲
- 有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待
- 将数据从缓冲区拷贝到ep,再将发送者的数据拷贝到缓冲区,并唤该发送者
如果缓存区有数据, 则从缓冲区将数据复制到ep,返回
接下来就是既没有发送者在等待,也缓冲区也没数据的情况:
将当前协程包装成sudog,阻塞到channel中
来看其中的子函数recv():
/** c:操作的channel sg:阻塞的发送协程 ep:接收者接收数据的地址 */ func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 如果是无缓冲channel,直接将数据从发送者sg拷贝到ep if c.dataqsiz == 0 { if ep != nil { recvDirect(c.elemtype, sg, ep) } // 接下来是有缓冲,且缓冲区满的情况 } else { // qp为channel缓冲区中,接收者下一次接收的地址 qp := chanbuf(c, c.recvx) // 将数据从qp拷贝到ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 将发送者的数据从sg.elem拷贝到qp typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } // 由于一接收已发送,缓冲区还是满的,因此 c.sendx = c.recvx c.sendx = c.recvx } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 唤醒发送者 goready(gp, skip+1) }
关闭
func closechan(c *hchan) { // 不能关闭空channel if c == nil { panic(plainError( "close of nil channel" )) } lock(&c.lock) // 不能重复关闭 if c.closed != 0 { unlock(&c.lock) panic(plainError( "close of closed channel" )) } // 修改关闭状态 c.closed = 1 var glist gList // 释放所有的接收者协程,并为它们赋予零值 for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) } // 释放所有的发送者协程 for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) } unlock(&c.lock) // 执行唤醒操作 for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }
关闭的流程比较简单,可以看出:
不能关闭空channel,不能重复关闭channel
先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒:
接收者:会收到该类型的零值
这里返回零值没有问题,因为之所以这些接收者会阻塞,就是因为缓冲区没有数据,因此channel关闭后该接收者收到零值也符合逻辑
发送者:会被唤醒,然后panic
因此不能在有多个sender的时候贸然关闭channel