不要通过共享内存的方式通信,而是应该通过通信的方式共享内存
在大多数编程语言的并发模型中,在传递数据的时候一般是通过共享内存的方式,这种方式存在线程之间的竞态。而go里面Goroutine之间可以通过Channel传输数据,Channel本身也用到锁资源,但两者粒度不一样,两者都是并发安全的。
结构体
channel分为发送者sendq和接收者recvq,在没有缓冲槽情况,发送端和接收端都可能发生阻塞的状态。有缓冲槽的的情况则根据缓冲槽是否满了,来决定阻塞和非阻塞。因为发送和接收本身可以理解为是解耦的,channel相当于是队列,即FIFO,遵循的是先进先出的概念。
初始化
func makechan(t *chantype, size int) *hchan {elem := t.elem// 计算所需的空间的大小elem.size*size,并判断是否溢出mem, overflow := math.MulUintptr(elem.size, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchanswitch {case mem == 0: // 没有缓冲槽,size是0,分配chan内存空间c = (*hchan)(mallocgc(hchanSize, nil, true))c.buf = c.raceaddr()case elem.ptrdata == 0:// 当前的chan和缓冲槽分配一块连续的内存空间c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default:// 指针单独为chan和缓冲槽分配内存空间c = new(hchan)c.buf = mallocgc(mem, elem, true)}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)return c
}
发送
发送可以分为三个部分:
1.有接收者,将数据写到接受者内存地址,并唤醒对应接收者的g。
2.没有接收者,但是有缓冲区
3.没有接收者,没有缓冲区或缓冲区已满,将数据写入sudog,在发送者队列尾部插入,g进入休眠状态
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {lock(&c.lock)if c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}//有等待接收的队列,直接将数据发送到接收端的内存地址中,并设置对应的g为可执行状态if sg := c.recvq.dequeue(); sg != nil {send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}//chan有缓存槽,把数据写入缓冲槽中if c.qcount < c.dataqsiz {//根据发送的索引计算出要写入缓冲槽的位置qp := chanbuf(c, c.sendx)//将数据拷贝到缓冲槽对应的位置typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true}if !block {unlock(&c.lock)return false}//如果没有缓存槽,往发送队列中加入一条等待执行的sudgogp := getg()//获取一个等待执行的sudgomysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}//将数据写入sudogmysg.elem = epmysg.waitlink = nil//sudo绑定goroutionemysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nilc.sendq.enqueue(mysg)atomic.Store8(&gp.parkingOnChan, 1)//让当前goroutine进入休眠状态,等待被唤醒gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)KeepAlive(ep)//到这里说明gorourtine被接收端唤醒,对sudog资源进行释放if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseclosed := !mysg.successgp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)if closed {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}return true
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {// 将数据拷贝到对端if sg.elem != nil {// 拷贝值到目标内存地址sendDirect(c.elemtype, sg, ep)sg.elem = nil}gp := sg.gunlockf()gp.param = unsafe.Pointer(sg) // g也会绑定sudog信息sg.success = trueif sg.releasetime != 0 {sg.releasetime = cputicks()}// 唤醒处于读等待的Goroutinegoready(gp, skip+1)
}
func goready(gp *g, traceskip int) {// 系统栈执行调度systemstack(func() {ready(gp, traceskip, true)})
}
func ready(gp *g, traceskip int, next bool) {// 获取g的状态status := readgstatus(gp)// g0_g_ := getg()// 获取g0上的mmp := acquirem()if status&^_Gscan != _Gwaiting {dumpgstatus(gp)throw("bad g->status in ready")}// 修改g的状态为可执行状态casgstatus(gp, _Gwaiting, _Grunnable)// 放入p的优先者队列runqput(_g_.m.p.ptr(), gp, next)// 开始调度wakep()releasem(mp)
}
接收
接收也可以分为三部分:
1.有发送者队列
(1)无缓冲区产生了发送者队列
(2)有缓冲区(满了)且产生了发送者队列
2.有缓冲区,没有发送者队列
3.没有发送者队列,没有缓冲区或缓冲区没数据,将接收数据指针地址写入sudog,在接收者队列尾部插入
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {lock(&c.lock)//如果有等待的发送队列,那么直接接收if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}//没有在等待的发送队列,缓冲槽中有数据,从缓存槽中读取数据if c.qcount > 0 {qp := chanbuf(c, c.recvx)if ep != nil {typedmemmove(c.elemtype, ep, qp)}//将已经被拷贝到发送方的缓冲槽释放typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--//chan元素个数减少unlock(&c.lock)return true, true}if !block {unlock(&c.lock)return false, false}//如果没有缓存槽,往接收队列中加入一条等待执行的sudoggp := getg()//获取一个等待执行的sudogmysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}//将对应接受者指针地址写入sudogmysg.elem = epmysg.waitlink = nilgp.waiting = mysg//sudo绑定goroutionemysg.g = gpmysg.isSelect = false//sudo绑定chanmysg.c = cgp.param = nil//将sudog加入chan的接收队列c.recvq.enqueue(mysg)//让当前goroutine进入休眠状态,等待被唤醒gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)//到这里说明gorourtine被发送端唤醒,对sudog资源进行释放if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}success := mysg.successgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, success
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {// 如果没有缓存槽,那么直接拷贝发送队列的值if c.dataqsiz == 0 {if ep != nil {// copy data from senderrecvDirect(c.elemtype, sg, ep)}} else {// 如果有缓冲槽,说明缓存槽满了并且产生了等待发送的队列// 从缓冲槽中获取数据,并且将发送队列的头节点保存的数据写入缓冲槽中qp := chanbuf(c, c.recvx)// copy data from queue to receiver// 将缓冲槽的数据拷贝到接收者目标地址if ep != nil {typedmemmove(c.elemtype, ep, qp)}//拷贝发送者数据到缓冲区typedmemmove(c.elemtype, qp, sg.elem)c.recvx++ // 接收索引增加if c.recvx == c.dataqsiz {c.recvx = 0}// 缓冲槽满了的情况是缓冲槽发送索引等于接收的索引值c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}sg.elem = nilgp := sg.gunlockf()gp.param = unsafe.Pointer(sg)sg.success = trueif sg.releasetime != 0 {sg.releasetime = cputicks()}// 将阻塞的发送方头节点goroutine唤醒goready(gp, skip+1)
}
关闭
func closechan(c *hchan) {c.closed = 1//声明一个g列表var glist gList//释放所有接收者队列,把等待的接收者队列的g加入到g列表中for {sg := c.recvq.dequeue()if sg == nil {break}if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseglist.push(gp)}//释放所有发送者队列,把等待的发送者队列的g加入到g列表中for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseglist.push(gp)}unlock(&c.lock)//将所有阻塞的发送者和接收者的goroutine唤醒for !glist.empty() {gp := glist.pop()gp.schedlink = 0goready(gp, 3)}
}