前言
上次分享了 goroutine 的底层原理和 GMP 的调度模型,这次来看看 Golang 的 channel 底层是怎么实现的?
什么是 channel
简单地说,channel 是管道,负责 goroutine 之间的消息传递和事件通知。
如何理解 channel 的结构体
关于 channel 的底层,它是一个叫 hchan 的结构体,我们可以查看一下源码:(我的 go 版本 1.17)
/在你的 go 目录下 /go/src/runtime/chan.go
channel 的结构体如下:
type hchan struct {
qcount uint // 循环链表中,已经存在的元素个数
dataqsiz uint // 循环链表的长度
buf unsafe.Pointer // 指向底层循环链表的指针
elemsize uint16 // channel中的元素的大小
closed uint32 //channel 是否关闭的标识,
elemtype *_type // channel中的元素的类型
sendx uint // 循环链表中,已发送数据的索引位置
recvx uint // 循环链表中已接受的索引位置
recvq waitq // 等待接收的sudog 的双向链表,sudog 里面封装了 goroutine
sendq waitq // 等待发送的sudo 的双向链表,sudog 里面封装了 goroutine
lock mutex //互斥锁,读写的时候,都会加上锁
}
结合我这张手工图来理解吧,因为现有的工具中,没办法画出这个循环链表,所以就手工画出来了。
循环链表中一共有两个元素,所以 qcount 是 2,循环链表的长度是 4,所以 dataqsize 是 4,channel 还没有关闭,所以 closed 是 0
还有需要说明的是,channel 是线程安全的,因为读和写的时候,channel 都会加上锁。
make(chan int) 是如何创建 channel 的?
ch := make(chan int, 4)
ch <- 1
/在你的 go 目录下 /go/src/runtime/chan.go
make 是调用了 chan.go 文件里面的 makechan 方法,返回了 hchan 的指针。这里主要起到初始化和分配内存的作用。
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
// 初始化、分配内存
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
channel 是如何发送数据的?
ch := make(chan int, 4)
ch <- 1
/在你的 go 目录下 /go/src/runtime/chan.go
箭头发送数据,是 golang 的语法糖,底层是调用了 chan.go 文件下的 chansend() 函数。
- 向一个nil channel发送数据,会调用gopark函数将当前goroutine挂起
- 向一个已经关闭的channel发送数据,直接会panic
- 如果channel的recvq当前队列中有被阻塞的接收者,则直接将数据发送给当前goroutine, 并将它设置成下一个运行的goroutine
- 当channel的缓冲区还有空闲空间,则将数据发送到sendx指向缓冲区的位置
- 当没有缓冲区或者缓冲区满了,则会创建一个sudog的结构体将其放到channel的sendq队列当中陷入休眠等待被唤醒
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// 向关闭的 channel 发送数据会引发 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
channel 是如何关闭的?
我们关闭 channel 的代码如下,那么它底层是调用了什么函数呢?
close(ch)
close 函数底层调用了 chan.go 文件里面的 closechan 函数
从以下代码中,我们可以看出:
- 关闭 nil 的channel 会引发 panic
- 如果 channel 已经关闭,那么再次关闭会引发 panic
- 释放发送队列 sendq
- 释放接受队列 recvq
func closechan(c *hchan) {
// 关闭 nil 的channel 会引发 panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 加锁,线程安全
lock(&c.lock)
// 如果 channel 已经关闭,那么再次关闭会引发 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
// 修改 channel 的关闭状态
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}