channel是用于 goroutine 之间的同步、通信的数据结构。它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全。本文通过示例为大家详细介绍了channel的应用,需要的可以参考一下

目录
  • 前言
  • 整体结构
  • 创建
  • 发送
  • 接收
  • 关闭

前言

channel是用于 goroutine 之间的同步、通信的数据结构

channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全,得以让程序员把注意力留到业务上去,提升开发效率

channel的用途包括但不限于以下几点:

  • 协程间通信,同步
  • 定时任务:和timer结合
  • 解耦生产方和消费方,实现阻塞队列
  • 控制并发数

本文将介绍channel的底层原理,包括数据结构,channel的创建,发送,接收,关闭的实现逻辑

整体结构

Go channel的数据结构如下所示:

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    lock mutex
}

qcount:已经存储了多少个元素

dataqsie:最多存储多少个元素,即缓冲区容量

buf:指向缓冲区的位置,实际上是一个数组

elemsize:每个元素占多大空间

closed:channel能够关闭,这里记录其关闭状态

elemtype:保存数据的类型信息,用于go运行时使用

sendx,recvx:

  • 记录下一个要发送到的位置,下一次从哪里还是接收
  • 这里用数组模拟队列,这两个变量即表示队列的队头,队尾
  • 因此channel的缓冲也被称为环形缓冲区

recvq,sendq:

当发送个接收不能立即完成时,需要让协程在channel上等待,所以有两个等待队列,分别针对接收和发送

lock:channel支持协程间并发访问,因此需要一把锁来保护

创建

makechan
// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)

会根据创建的是带缓存,还是无缓冲,决定第二个参数size的值

可以看出,创建出来的是hchan指针,这样就能在函数间直接传递 channel,而不用传递 channel 的指针

func makechan(t *chantype, size int) *hchan {
   elem := t.elem
    
   // mem:缓冲区大小 
   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError( "makechan: size out of range" ))
   }

   var c *hchan
   switch {
   // 缓冲区大小为空,只申请hchanSize大小的内存
   case mem == 0:
       c = (*hchan)(mallocgc(hchanSize, nil, true))
       c.buf = c.raceaddr()
   // 元素类型不包含指针,一次性分配hchanSize+mem大小的内存
   case elem.ptrdata == 0:
       c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
       c.buf = add(unsafe.Pointer(c), hchanSize)
   // 否则就是带缓存,且有指针,分配两次内存
   default:
      // Elements contain pointers.
       c = new(hchan)
       c.buf = mallocgc(mem, elem, true)
   }
    
   // 保存元素类型,元素大小,容量
   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)
   
   return c
}

发送

执行以下代码时:

ch <- 3

编译器会转化为对chansend的调用

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   // 如果channel是空
   if c == nil {
      // 非阻塞,直接返回
      if !block {
         return  false
      }
      // 否则阻塞当前协程
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw( "unreachable" )
   }

   // 非阻塞,没有关闭,且容量满了,无法发送,直接返回
   if !block && c.closed == 0 && full(c) {
      return  false
   }

   // 加锁
   lock(&c.lock)

   // 如果已经关闭,无法发送,直接panic
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError( "send on closed channel" ))
   }

   // 从接收队列弹出一个协程的包装结构sudog
   if sg := c.recvq.dequeue(); sg != nil {
      // 如果能弹出,即有等到接收的协程,说明:
      // 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待
      // 将要发送的数据拷贝到该协程的接收指针上
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return  true
}

   // 缓冲区还有空间
   if c.qcount < c.dataqsiz {
      // qp:计算要发送到的位置的地址
      qp := chanbuf(c, c.sendx)
      // 将数据从ep拷贝到qp
      typedmemmove(c.elemtype, qp, ep)
      // 待发送位置移动
      c.sendx++
      // 由于是数组模拟队列,sendx到顶了需要归零
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      // 缓冲区数量++
      c.qcount++
      unlock(&c.lock)
      return  true
}

   // 往下就是缓冲区无数据,也没有等到接收协程的情况了
   
   // 如果是非阻塞模式,直接返回
   if !block {
      unlock(&c.lock)
      return  false
    }

   // 将当前协程包装成sudog,阻塞到channel上
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
  
   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   
   // 当前协程进入发送等待队列
   c.sendq.enqueue(mysg)
   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
   
 // 被唤醒后从这里开始执行
   
   KeepAlive(ep)

   if mysg != gp.waiting {
      throw( "G waiting list is corrupted" )
   }
   gp.waiting = nil
   gp.activeStackChans = false
   closed := !mysg.success
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   // 被唤醒后发现channel关闭了,panic
   if closed {
      if c.closed == 0 {
         throw( "chansend: spurious wakeup" )
      }
      panic(plainError( "send on closed channel" ))
   }
   return  true
}

整体流程为:

如果当前操作为非阻塞,channel没有关闭,且容量满了,无法发送,直接返回

从接收队列弹出一个协程的包装结构sudog,如果能弹出,即有等到接收的协程,说明:

  • 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待
  • 将要发送的数据拷贝到该协程的接收指针上,返回
  • 这里直接从发送者拷贝到接收者的内存,而不是先把数据拷贝到缓冲区,再从缓冲区拷贝到接收者,节约了一次内存拷贝

否则看看缓冲区还有空间,如果有,将数据拷贝到缓冲区上,也返回

接下来就是既没有接收者等待,缓冲区也为空的情况,就需要将当前协程包装成sudog,阻塞到channel上

将协程阻塞到channel的等待队列时,将其包装成了sudog结构:

type sudog struct {
   // 协程
   g *g
   // 前一个,后一个指针
   next *sudog
   prev *sudog
   // 等到发送的数据在哪,等待从哪个位置接收数据
   elem unsafe.Pointer
   acquiretime int64
   releasetime int64
   ticket      uint32
   isSelect bool
   success bool

   parent   *sudog // semaRoot binary tree
   waitlink *sudog // g.waiting list or semaRoot
   waittail *sudog // semaRoot
   // 在哪个channel上等待
   c        *hchan // channel
}

其目的是:

  • g本身没有存储前一个,后一个指针,需要用sudog结构包装才能加入队列
  • elem字段存储等到发送的数据在哪,等待从哪个位置接收数据,用于从数据能从协程到协程的直接拷贝

来看看一些子函数:

1.判断channel是否是满的

func full(c *hchan) bool {
   // 无缓冲
   if c.dataqsiz == 0 {
      // 并且没有其他协程在等待
      return c.recvq.first == nil
   }
   // 有缓冲,但容量装满了
   return c.qcount == c.dataqsiz
}

2.send方法:

/**
c:要操作的channel
sg:弹出的接收者协程
ep:要发送的数据在的位置
*/
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // 如果接收者指针不为空,直接把数据从ep拷贝到sg.elem
   if sg.elem != nil {
      sendDirect(c.elemtype, sg, ep)
      sg.elem = nil
   }
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   // 唤醒该接收者协程
   goready(gp, skip+1)
}

接收

从channel中接收数据有几种写法:

  • 带不带ok
  • 接不接收返回值

根据带不带ok,决定用下面哪个方法

func chanrecv1(c *hchan, elem unsafe.Pointer) {
        chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
        _, received = chanrecv(c, elem, true)
        return
}

根据接不接收返回值,决定elem是不是nil

最终都会调用chanrecv方法:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 如果channel为nil,根据参数中是否阻塞来决定是否阻塞
   if c == nil {
      if !block {
         return
   }
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw( "unreachable" )
   }

   // 非阻塞,并且channel为空
   if !block && empty(c) {
      // 如果还没关闭,直接返回
   if atomic.Load(&c.closed) == 0 {
      return
   }
      // 否则已经关闭,
      // 如果为空,返回该类型的零值
   if empty(c) {
     if ep != nil {
        typedmemclr(c.elemtype, ep)
     }
     return  true, false
       }
   }

   lock(&c.lock)
   
   // 同样,如果channel已经关闭,且缓冲区没有元素,返回该类型零值
   if c.closed != 0 && c.qcount == 0 {
      unlock(&c.lock)
      if ep != nil {
         typedmemclr(c.elemtype, ep)
      }
      return  true, false
}
    
   // 如果有发送者正在阻塞,说明:
   // 1.无缓冲
   // 2.有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待
   if sg := c.sendq.dequeue(); sg != nil {
      // 将数据从缓冲区拷贝到ep,再将sg的数据拷贝到缓冲区,该函数详细流程可看下文
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return  true, true
}
    
   // 如果缓存区有数据, 
   if c.qcount > 0 {
      // qp为缓冲区中下一次接收的位置
      qp := chanbuf(c, c.recvx)
      // 将数据从qp拷贝到ep
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return  true, true
}

   // 接下来就是既没有发送者在等待,也缓冲区也没数据
   if !block {
      unlock(&c.lock)
      return  false, false
}

   // 将当前协程包装成sudog,阻塞到channel中
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // 记录接收地址
   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.param = nil
   c.recvq.enqueue(mysg)

   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive,        traceEvGoBlockRecv, 2)

   // 从这里唤醒
   if mysg != gp.waiting {
      throw( "G waiting list is corrupted" )
   }
   gp.waiting = nil
   gp.activeStackChans = false
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   success := mysg.success
   gp.param = nil
   mysg.c = nil
   releaseSudog(mysg)
   return  true, success
}

接收流程如为:

如果channel为nil,根据参数中是否阻塞来决定是否阻塞

如果channel已经关闭,且缓冲区没有元素,返回该类型零值

如果有发送者正在阻塞,说明:

  • 要么是无缓冲
  • 有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待
  • 将数据从缓冲区拷贝到ep,再将发送者的数据拷贝到缓冲区,并唤该发送者

如果缓存区有数据, 则从缓冲区将数据复制到ep,返回

接下来就是既没有发送者在等待,也缓冲区也没数据的情况:

将当前协程包装成sudog,阻塞到channel中

来看其中的子函数recv():

/**
c:操作的channel
sg:阻塞的发送协程
ep:接收者接收数据的地址
*/
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // 如果是无缓冲channel,直接将数据从发送者sg拷贝到ep
   if c.dataqsiz == 0 {
      if ep != nil {
         recvDirect(c.elemtype, sg, ep)
      }
   // 接下来是有缓冲,且缓冲区满的情况   
   } else {
      // qp为channel缓冲区中,接收者下一次接收的地址
   qp := chanbuf(c, c.recvx)
      // 将数据从qp拷贝到ep
   if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
    }
    // 将发送者的数据从sg.elem拷贝到qp
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
       c.recvx = 0
    }
    // 由于一接收已发送,缓冲区还是满的,因此 c.sendx = c.recvx
    c.sendx = c.recvx 
}
   sg.elem = nil
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   // 唤醒发送者
   goready(gp, skip+1)
}

关闭

func closechan(c *hchan) {
   // 不能关闭空channel
   if c == nil {
      panic(plainError( "close of nil channel" ))
   }

   lock(&c.lock)
   // 不能重复关闭
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError( "close of closed channel" ))
   }

   // 修改关闭状态
   c.closed = 1

   var glist gList

   // 释放所有的接收者协程,并为它们赋予零值
 for {
      sg := c.recvq.dequeue()
      if sg == nil {
         break
      }
      if sg.elem != nil {
         typedmemclr(c.elemtype, sg.elem)
         sg.elem = nil
      }
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = unsafe.Pointer(sg)
      sg.success = false
      glist.push(gp)
   }

   // 释放所有的发送者协程
 for {
      sg := c.sendq.dequeue()
      if sg == nil {
         break
     }
      sg.elem = nil
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = unsafe.Pointer(sg)
      sg.success = false
      glist.push(gp)
   }
   unlock(&c.lock)

   // 执行唤醒操作
 for !glist.empty() {
      gp := glist.pop()
      gp.schedlink = 0
      goready(gp, 3)
   }
}

关闭的流程比较简单,可以看出:

不能关闭空channel,不能重复关闭channel

先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒:

接收者:会收到该类型的零值

这里返回零值没有问题,因为之所以这些接收者会阻塞,就是因为缓冲区没有数据,因此channel关闭后该接收者收到零值也符合逻辑

发送者:会被唤醒,然后panic

因此不能在有多个sender的时候贸然关闭channel