前言
asongGochannelGoGoroutineGoroutineGoroutineGoGoroutinechannelGO1.15channel
什么是channel
channelgoroutineGogoroutine+channel
func GoroutineOne(ch chan <-string) {
fmt.Println("GoroutineOne running")
ch <- "asong真帅"
fmt.Println("GoroutineOne end of the run")
}
func GoroutineTwo(ch <- chan string) {
fmt.Println("GoroutineTwo running")
fmt.Printf("女朋友说:%s\n",<-ch)
fmt.Println("GoroutineTwo end of the run")
}
func main() {
ch := make(chan string)
go GoroutineOne(ch)
go GoroutineTwo(ch)
time.Sleep(3 * time.Second)
}
// 运行结果
// GoroutineOne running
// GoroutineTwo running
// 女朋友说:asong真帅
// GoroutineTwo end of the run
// GoroutineOne end of the run
GoroutineGoroutineOnechannelGoroutineTwochannel
channelchannel
channel
channel
channelchanchan <-<- chan<-channelchannel
chan T // 接收和发送类型为T的数据
chan<- T // 只可以用来发送 T 类型的数据
<- chan T // 只可以用来接收 T 类型的数据
channel
makechannelchannelchannelchannel
示例:
ch_no_buffer := make(chan int)
ch_no_buffer := make(chan int, 0)
ch_buffer := make(chan int, 100)
0channel0channelbuffer
channel
func main() {
var ch chan string
ch <- "asong真帅"
fmt.Println(<- ch)
}
// 运行报错
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send (nil chan)]:
channel
channel
"channel" <- "要入队的值(可以是表达式)"
channelchannelchannel
channel
nilchannel
channel
channel
<- "channel"
channel
nilchannel
channel
val, ok := <-ch
channelchannelokfalseval
channel
for-rangechannel
func main() {
ch := make(chan int,10)
go func() {
for i:=0;i<10;i++{
ch <- i
}
close(ch)
}()
for val := range ch{
fmt.Println(val)
}
fmt.Println("over")
}
range chchannelchannelfor-range
select
GoselectGoroutinechannelchannelselectGoroutine
func fibonacci(ch chan int, done chan struct{}) {
x, y := 0, 1
for {
select {
case ch <- x:
x, y = y, x+y
case <-done:
fmt.Println("over")
return
}
}
}
func main() {
ch := make(chan int)
done := make(chan struct{})
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-ch)
}
done <- struct{}{}
}()
fibonacci(ch, done)
}
selectswitchswitchselectcasechannelselectcasecasecase
selectchannelselectdefaultselect
ChannelChannelcaseChanneldefault
nil channeldefault casenil channelselect
channel
closechannelchannelpanicchannel
func main() {
ch := make(chan int, 10)
ch <- 10
ch <- 20
close(ch)
fmt.Println(<-ch) //1
fmt.Println(<-ch) //2
fmt.Println(<-ch) //0
fmt.Println(<-ch) //0
}
channel
channel
这个思想大家是否理解呢?我在这里分享一下我的理解(查找资料+个人理解),有什么不对的,留言区指正或开喷!
什么是使用共享内存来通信?其实就是多个线程/协程使用同一块内存,通过加锁的方式来宣布使用某块内存,通过解锁来宣布不再使用某块内存。 什么是通过通信来实现共享内存?其实就是把一份内存的开销变成两份内存开销而已,再说的通俗一点就是,我们使用发送消息的方式来同步信息。 为什么鼓励使用通过通信来实现共享内存?使用发送消息来同步信息相比于直接使用共享内存和互斥锁是一种更高级的抽象,使用更高级的抽象能够为我们在程序设计上提供更好的封装,让程序的逻辑更加清晰;其次,消息发送在解耦方面与共享内存相比也有一定优势,我们可以将线程的职责分成生产者和消费者,并通过消息传递的方式将它们解耦,不需要再依赖共享内存。 对于这个理解更深的文章,建议读一下这篇文章:为什么使用通信来共享内存
channel
源码剖析
数据结构
src/runtime/chan.gohchan
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
hchan
qcountdataqsizbufchannelelemsizeclosedchannelelemtypechannelsendxrecvxrecvqgoroutinesenqgoroutinelockhchanchannel
这个结构结合上面那个图理解就更清晰了:
bufdataqsizqcountchannelelemsizeelemtypechannelsendqrecvqgoroutinechannelchannel
对于上面的描述,我们可以画出来这样的一个理解图:
channel
channelmakemakeruntime.makechanruntime.makechan64
// go 1.15.7
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}
runtime.makechan64makechanruntime.makechan64makechanmakechan
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 对发送元素进行限制 1<<16 = 65536
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 检查是否对齐
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 判断是否会发生内存溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// 构造hchan对象
var c *hchan
switch {
// 说明是无缓冲的channel
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
// 元素类型不包含指针,只进行一次内存分配
// 如果hchan结构体中不含指针,gc就不会扫描chan中的元素,所以我们只需要分配
// "hchan 结构体大小 + 元素大小*个数" 的内存
case elem.ptrdata == 0:
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
// 元素包含指针,进行两次内存分配操作
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 初始化hchan中的对象
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
注释我都添加上了,应该很容易懂吧,这里在特殊说一下分配内存这块的内容,其实归一下类,就只有两块:
channelchannelmallocgcchannelhchan
mallocgcchannelclose
channel
channelruntime.chansend1runtime.chansend
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
chansend
channelchannelchannel
前置检查
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
channelnilnilchannelgoparkgoroutinerunningwaitingpanic
channelfullhchan
func full(c *hchan) bool {
if c.dataqsiz == 0 {
return c.recvq.first == nil
}
return c.qcount == c.dataqsiz
}
这里快速失败校验逻辑如下:
qcountdataqsizdataqsiz0channel
加锁/异常检查
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
channelChannelChannel“send on closed channel”
channel
goroutinesrecvqgoroutine
if sg := c.recvq.dequeue(); sg != nil {
//找到一个等待的接收器。我们将想要发送的值直接传递给接收者,绕过通道缓冲区(如果有的话)。
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
Send
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 静态竞争省略掉
// elem是指接收到的值存放的位置
if sg.elem != nil {
// 调用sendDirect方法直接进行内存拷贝
// 从发送者拷贝到接收者
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
// 绑定goroutine
gp := sg.g
// 解锁
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒接收的 goroutine
goready(gp, skip+1)
}
SendDirect
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
memovegoroutinegoroutinecopychannelbuf
channel
channel
// 判断通道缓冲区是否还有可用空间
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
// 指向下一个待发送元素在循环数组中的位置
c.sendx++
// 因为存储数据元素的结构是循环队列,所以当当前索引号已经到队末时,将索引号调整到队头
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 当前循环队列中存储元素数+1
c.qcount++
// 释放锁,发送数据完毕
unlock(&c.lock)
return true
}
这里的几个步骤还是挺好理解的,注释已经添加到代码中了,我们再来详细解析一下:
chanbufsendxtypedmemmovesendx
channel
缓冲区空间也会有满了的时候,这是有两种方式可以选择,一种是直接返回,另外一种是阻塞等待。
直接返回的代码就很简单了,做一个简单的是否阻塞判断,不阻塞的话,直接释放锁,返回即可。
if !block {
unlock(&c.lock)
return false
}
阻塞的话代码稍微长一点,我们来分析一下:
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
gettggoroutineacquireSudogsudoggoroutinesudogggoroutinegoroutinewaitingsudogsudogelemc.sendq.enqueuegoparkgoroutinewait
KeepAlive
goroutinewait
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
// 唤醒后channel被关闭了,直接panic
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 去掉mysg上绑定的channel
mysg.c = nil
// 释放sudog
releaseSudog(mysg)
return true
goroutinechannelpanicmysgchannelsudog
goroutinewaitgch <- "asong"wait
接收数据
channel
val := <- ch
val, ok := <- ch
runtime.chanrecv1runtime.chanrecv2
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
chanrecv
channelchannelchannel
前置检查
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
channelnil channelnil channelnil channelgoparkgoroutine
channel
func empty(c *hchan) bool {
// c.dataqsiz is immutable.
if c.dataqsiz == 0 {
return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
}
return atomic.Loaduint(&c.qcount) == 0
}
0sendqgoroutinechannelchannelepepepepval := <-ch valepchannel
加锁和提前返回
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
channelchannelchannelep
channel
channelchannel
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
goroutine
channelchannel
recv
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 非缓冲channel
if c.dataqsiz == 0 {
// 未忽略接收值
if ep != nil {
// 直接从发送方拷贝数据到接收方
recvDirect(c.elemtype, sg, ep)
}
} else { // 有缓冲channel,但是缓冲区满了
// 缓冲区满时,接收方和发送方游标重合了
// 因为是循环队列,都是游标0的位置
// 获取当前接收方游标位置下的值
qp := chanbuf(c, c.recvx)
// 未忽略值的情况下直接从发送方拷贝数据到接收方
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 将发送者数据拷贝到缓冲区中
typedmemmove(c.elemtype, qp, sg.elem)
// 自增到下一个待接收位置
c.recvx++
// 如果下一个待接收位置等于队列长度了,则下一个待接收位置为队头,因为是循环队列
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 上面已经将发送者数据拷贝到缓冲区中了,所以缓冲区还是满的,所以发送方位置仍然等于接收方位置。
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
// 绑定发送方goroutine
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒发送方的goroutine
goready(gp, skip+1)
}
代码中的注释已经很清楚了,但还是想在解释一遍,这里主要就是分为两种情况:
channelrecvDirectgoroutinegoroutinechannelchanbufrecvsendxrecvx
goreadygoroutine
channel
channel
// 缓冲channel,buf里有可用元素,发送方也可以正常发送
if c.qcount > 0 {
// 直接从循环队列中找到要接收的元素
qp := chanbuf(c, c.recvx)
// 未忽略接收值,直接把缓冲区的值拷贝到接收方中
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清理掉循环数组里相应位置的值
typedmemclr(c.elemtype, qp)
// 接收游标向前移动
c.recvx++
// 超过循环队列的长度时,接收游标归0(循环队列)
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 循环队列中的数据数量减1
c.qcount--
// 接收数据完毕,释放锁
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
这段代码没什么难度,就不再解释一遍了。
channel
channelgoroutine
if !block {
unlock(&c.lock)
return false, false
}
非阻塞接收数据的话,直接返回即可;否则则进入阻塞接收模式:
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
goroutinesudoggoroutinesudoggoroutine
接下来的环境逻辑也没有特别要说的,与发送方唤醒部分一模一样,不懂的可以看前面。唤醒后的主要工作就是恢复现场,释放绑定信息。
channel
closechannelruntime.closechan
func closechan(c *hchan) {
// 对一个nil的channel进行关闭会引发panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 加锁
lock(&c.lock)
// 关闭一个已经关闭的channel也会引发channel
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
// 关闭channnel标志
c.closed = 1
// Goroutine集合
var glist gList
// 接受者的 sudog 等待队列(recvq)加入到待清除队列 glist 中
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 发送方的sudog也加入到到待清除队列 glist 中
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
// 要关闭的goroutine,发送的值设为nil
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 释放了发送方和接收方后,释放锁就可以了。
unlock(&c.lock)
// 将所有 glist 中的 goroutine 状态从 _Gwaiting 设置为 _Grunnable 状态,等待调度器的调度。
// 我们既然是从sendq和recvq中获取的goroutine,状态都是挂起状态,所以需要唤醒他们,走后面的流程。
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
这里逻辑还是比较简单,归纳总结一下:
nilchannelchannelgoroutinechannelval,ok := <-chchannel
总结
哇塞,开往幼儿园的车终于停了,小松子唠唠叨叨一路了,你们学会了吗?
channel
channelchannel