Channel是Golang实现CSP的核心。
基于channel通信主要涉及buf(数据)和sendq、recvq(维护阻塞的G),lock保证并发访问安全;
本质是一个基于环形缓存的有锁队列,但G的阻塞是在用户空间;
目录
新建channel
发送数据
协程直接发送数据
接收数据
协程直接接收数据
关闭channel
Select原理
新建channel
runtime.hchan
type hchan struct {
qcount uint // total data in the queue 长度
dataqsiz uint // size of the circular queue 容量
buf unsafe.Pointer // points to an array of dataqsiz elements 环形队列
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index 环形数组的index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
//FIFO的队列
type waitq struct {
first *sudog //sudog represents a g in a wait list, such as for sending/receiving on a channel.
last *sudog
}
发送数据
- 加锁;
- 存在等待的接受者时,直接发给接收者;
- 缓冲区存在剩余空间时,写入缓冲区;
- 不存在缓冲区或者满了的情况下,挂在sendq上;
- 被阻塞的发送者,接收者会负责消息的传输,所以被唤醒后进行收尾工作;
/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//向nil的channel发消息会持续阻塞
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//获取channel的锁
lock(&c.lock)
//向close的channel发消息会Panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//已有g阻塞在接收队列,直接发消息,绕过channel的buf; (没有缓冲也就是这样了)
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 没有阻塞的
// 没满,加入buf,然后返回;
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx) //返回位置的指针
typedmemmove(c.elemtype, qp, ep) //数据拷贝
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// 满了,发送方会阻塞
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep //发送的数据地址
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) //当前g+数据封装的mysg,挂在channel的发送队列上;
//当前协程用户态阻塞,释放lock
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
// 重新恢复调度,此时以及不需要传输数据了,因为数据以及被接受了,释放资源即可;
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
协程直接发送数据
如果存在挂在channel的接收者时,发送者直接将数据传输给最早的接收者FIFO,绕过环形缓存;
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked. send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil { //接收者的变量
sendDirect(c.elemtype, sg, ep)//直接拷贝过去
sg.elem = nil
}
gp := sg.g
unlockf()//拷贝完毕再释放channel锁,避免多个发送者;
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)//唤醒接受者
}
接收数据
- 加锁;
- channel关闭&数据为空,返回零值;
- 如果有挂在sendq的发送者,从环形缓存拿到第一个数据,然后帮发送者将数据写入环形缓存的末尾;和发送时绕过缓存不同,保证消息FIFO,避免缓存的数据被饿死;
- 从环形缓存中接收数据;
- 数据为空,挂在recvq上;被唤醒,收尾工作;
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 向nil发消息普通会阻塞,select直接返回;
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 获取channel的锁
lock(&c.lock)
// case1:channel关闭&数据为空,清空ep->拿到零值,返回;
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// channel关闭&数据不为空 || channel没关闭
// channel已满的情况,直接接收阻塞的发送者消息,绕过channel;
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
//有数据
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx) //位置
if ep != nil {
typedmemmove(c.elemtype, ep, qp) //数据copy
}
typedmemclr(c.elemtype, qp)//清楚buf的数据
c.recvx++ //更改位置
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
//没数据
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) //封装mysg信息,阻塞在recvq队列;
//让出调度
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
//恢复调度,此时已经接受了数据,做收尾工作。
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil //没关闭会赋值mysg的地址
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
协程直接接收数据
对于带缓冲的channel,此处接收者和发送者并没有直接数据传输。
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
// and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
// written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
//接收先拿buf的数据,然后将发送者的数据放到buf中。
//避免数据buf的数据被饿死;发的时候不用,因为buf是空的。
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf() //释放锁
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1) //唤醒发送者
}
关闭channel
主要是处理channel的recvq和sendq队列:recvq会拿到零值,sendq中的G都是在关闭之前阻塞的;
//go:linkname reflect_chanclose reflect.chanclose
func reflect_chanclose(c *hchan) {
closechan(c)
}
func closechan(c *hchan) {
// 关闭nil,panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 加锁
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel")) //重复关闭
}
c.closed = 1
var glist gList
// release all readers
//如果有recvq,此时的buf肯定是空的,相当于给零值然后唤醒;
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil //此时才为nil,被唤醒的g就知道是否关闭了。
glist.push(gp)
}
// release all writers (they will panic)
// 如果有sendq,
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
glist.push(gp)
}
//释放锁
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock. 唤醒
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
Select原理
pollOrderlockOrderpollOrdercaselockOrdercaseruntime.sudog
// compiler implements
//
// select {
// case v = <-c:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if selectnbrecv(&v, c) {
// ... foo
// } else {
// ... bar
// }
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
selected, _ = chanrecv(c, elem, false) //非阻塞
return
}