1、代码
从一段代码入手,分析 chan 的原理(代码拷贝直接能运行)
wg := sync.WaitGroup{}
wg.Add(3)
ch := make(chan uint16, 2)
G1 := func() {
defer wg.Done()
fmt.Println("G1 read: ", <- ch)
}
G2 := func() {
defer wg.Done()
fmt.Println("G2 read: ", <- ch)
}
G3 := func() {
defer wg.Done()
defer func() {
if exception := recover(); exception != nil {
fmt.Println("panic: ", exception)
}
}()
time.Sleep(2 * time.Second)
ch <- 1
fmt.Println("G3 send: ", 1)
ch <- 2
fmt.Println("G3 send: ", 2)
ch <- 3
fmt.Println("G3 send: ", 3)
ch <- 4
fmt.Println("G3 send: ", 4)
ch <- 5
fmt.Println("G3 send: ", 5)
}
G4 := func() {
timeout := time.After(5 * time.Second)
<- timeout
fmt.Println("G4 close chan")
close(ch)
}
go G1()
go G2()
go G3()
go G4()
wg.Wait()
fmt.Println("main exit")
2、创建 chan
初始化带缓冲的 chan
ch := make(chan uint16, 2)
make 底层调用 makechan,对于元素非指针或者不带缓冲的 channel,一次开辟好 hchan 和 buf 的内存(即在一段连续空间里),然后 buf 指向相对 hchan 的地址即可;若元素是指针时,hchan 和 buf 单独开辟。
看此时 hchan 的结构
make 要点
make 返回的是指针,因此 chan 是引用类型 元素大小为 0 表示 struct{} 非缓冲 chan 底层不会为buf单独开辟内存
接收流程
① 读 chan 时,编译翻译如下
// 翻译伪代码 ok = chanrecv2(ch, &val)
val, ok := <- ch
// 翻译伪代码 chanrecv1(ch, &val)
val := <-ch
② 读 nil chan 协程将会被挂起
③ 两个条件必须满足,若是不阻塞,且没有数据,则直接返回
④ 从已关闭的通道读数据,将返回 chan 元素类型的 0 值
⑤ 从发送队列取协程,若取成功,代表两种情况:对于不带缓冲buf,说明该发送协程没有对应的接收协程而挂起并放入到了 c.sendq 中, 此时直接从发送协程的栈中把数据拷贝到接收协程的栈中;对于带缓冲buf,说明由于 c.buf 满,导致发送协程被挂起并被放入到 c.sendq 中,此时从 c.buf 的c.recvx位置取数据拷贝到当前接收协程的栈中,同时把发送协程的数据拷贝到 c.buf 的c.sendx位置处
⑥ c.sendx 为空,则看缓冲区是否有数据,有的话,直接从 c.buf 的位置 c.recvx 处取数据,拷贝到当前调用读chan的协程的栈中,同时更新 hchan 的属性(零值化c.recvx 对应的c.buf 位置,c.recvx 移到下一个位置,c.qcount 减1表示数据被读走了)
⑦ 缓冲区没数据或者不带缓冲,此时将当前调用读chan的协程挂起并加入到c.recvq 的队列中
从读chan逻辑分析代码
// G1 和 G2 读取 ch
G1 := func() {
defer wg.Done()
fmt.Println("G1 read: ", <- ch)
}
G2 := func() {
defer wg.Done()
fmt.Println("G2 read: ", <- ch)
}
go G1()
go G2()
G1 和 G2 消费通道 ch,此时没有发送(发送协程 G3 在2s 后才开始发送数据), c.sendq 为空,同时c.buf 缓冲区也没有可读数据,所以 G1, G2 读时,被挂起(waiting 状态)并阻塞并加入到 c.recvq 队列中,此时相当于走到了图中 ⑦ 的位置,hchan 的状态如下
4、向 chan 发送数据
发送数据流程
⑤ 接收队列c.recvq有协程,两种情况:对于非缓冲chan,表示发送协程没有准备好,此时当前发送的数据直接拷贝到取出的等待接收协程中;对于缓冲chan表示缓冲区没有数据,此时将数据拷贝到接收协程的栈中
// G3 数据发送
G3 := func() {
defer wg.Done()
// G4 超时 5s 后会关闭通道,关闭后 G3 会收到
// panic,详见 close 关闭逻辑分析
defer func() {
if exception := recover(); exception != nil {
fmt.Println("panic: ", exception)
}
}()
time.Sleep(2 * time.Second)
// 发送 1 时,G1 和 G2 在 c.recvq 中
// 此时相当于走了 ⑤ 的步骤,写成功并返回
ch <- 1
fmt.Println("G3 send: ", 1)
// 发送 1 时,G2 在 c.recvq 中
// 此时相当于走了 ⑤ 的步骤,写成功并返回
ch <- 2
fmt.Println("G3 send: ", 2)
// 发送 3 时, c.recvq 已经为空,此时由于
// c.buf 没有满,所以写成功,相当于走⑥步骤
ch <- 3
fmt.Println("G3 send: ", 3)
// 发送 4 时, c.recvq 已经为空,此时由于
// c.buf 没有满,所以写成功,相当于走⑥步骤
ch <- 4
fmt.Println("G3 send: ", 4)
// 发送 5 时,c.recvq 为空,c.buf 满,
// 并且是阻塞,无法写入数据,G3 被挂起,并
// 放入到 c.sendq 中,相当于走 ⑦ 步骤
// G3 会加入到 c.sendq 队列中,同时G3
// 被挂起,直到 G4 关闭抛出异常
ch <- 5
fmt.Println("G3 send: ", 5)
}
G3 运行后,hchan 的状态如下
5、关闭 close chan
关闭流程相对简单,首先把接收队列 c.recvq 中的协程要读的数据地址设置为0,其次情况 c.sendq 队列中的协程,并将它们加入到可运行队列中,被调度运行。
已关闭的chan再次关闭时,抛出异常
c.recvq 中读数据协程的存储数据的地址处设置零值,一旦被调度运行,则该协程得到零值
c.sendq 中写数据协程被清空,设置可运行状态,一旦被调度,则抛出异常(写已关闭的协程抛出异常)
G3 := func() {
defer wg.Done()
defer func() {
// G4 关闭时,G3 从 s.sendq 上清楚,被调度时抛出异常
// panic: send on closed channel
if exception := recover(); exception != nil {
fmt.Println("panic: ", exception)
}
}()
.........
// 写 5 时被挂起,并加入到 c.sendq 中
ch <- 5
fmt.Println("G3 send: ", 5)
}
G4 := func() {
// G4 超时5s后关闭通道,此时 G3 会被调度运行,
// 并捕获异常
timeout := time.After(5 * time.Second)
<- timeout
fmt.Println("G4 close chan")
close(ch)
}
chan 操作 | chan 为 nil | chan 已关闭 | 打开的 chan |
读:<- ch | 挂起 | 读到零值 | 非缓冲:有发送协程,则读到数据,否则阻塞或失败返回 带缓冲:buf 有数据,从buf中读到数据;buf无数据,则阻塞或失败返回; |
写:ch <- val | 挂起 | panic | 非缓冲:有收数据协程,则写成功,否则阻塞或失败返回 带缓冲:buf不满,写入成功;buf 满,被挂起;等待接收队列不空,则写成功返回 |
关闭:close(ch) | panic | panic | 正常关闭 |