Don’t communicate by sharing memory, share memory by communicating。相信学过 Go 的同学都知道这句名言, 可以说 channel 就是后边这句话的具体实现。channel 是一个类型安全的循环队列, 能够控制 groutine 在它上面读写消息的行为, 比如阻塞某个 groutine , 或者唤醒某个 groutine。
1.1. chan 注意事项
make
ch := make(chan int) // 阻塞
ch := make(chan int, 5) // 有 buffer
- 如果调试时出现莫名其妙的问题, 除了上述可能性外, 还可能是 其它地方 有 panic。在调试的时候不是所有的 panic 都会被报出来, 今天就遇到一个, 坑!
1.2. 基本特征
一个通道相当于一个先进先出(FIFO)的队列, 各个元素值都是严格地按照发送的顺序排列的, 先被发送通道的元素值一定会先被接收, 一个左尖括号紧接着一个减号形象地代表了元素值的传输方向。下面是创建几种不同的通道:
ch1 := make(chan int) // 无缓冲通道
ch2 := make(chan int, 3) // 有缓冲通道
ch3 := make(chan<- int, 1) // 单向通道: 只能发送不能接收
ch4 := make(<-chan int, 1) // 单向通道: 只能接收不能发送
下面举一个简单的示例:
package main
func main() {
done := make(chan struct{})
c := make(chan string)
go func() {
s := <-c // 接收消息
println(s)
close(done) // 关闭通道, 作为结束通知
}()
c <- "abc" // 发送消息
<-done // 阻塞, 知道有数据或者通道关闭
}
//最后输出: abc
通道发送和接收操作基本特性:
- 元素复制: 进入通道的并不是在接收操作符右边的那个元素值, 而是它的副本(发送操作包括"复制元素值"和"放入通道"2 步, 接收操作包括"复制通道内的元素值"、"放置副本到接收方"和"删掉原值"3 步);
- 不可分割: 一个数据进入通道时, 不会存在还没有复制完毕, 就被接收的情况。
1.3. 底层原理
1.3.1. 数据结构
channel 的数据结构如下:
type hchan struct {
qcount uint // 当前队列中剩余元素个数
dataqsiz uint // 环形队列长度, 即可以存放的元素个数
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 每个元素的大小
closed uint32 // 标识关闭状态
elemtype *_type // 元素类型
sendx uint // 队列下标, 指示元素写入时存放到队列中的位置
recvx uint // 队列下标, 指示元素从队列的该位置读出
recvq waitq // 等待读消息的 goroutine 队列
sendq waitq // 等待写消息的 goroutine 队列
lock mutex // 互斥锁, chan 不允许并发读写
}
chan 内部实现了一个环形队列作为其缓冲区, 队列的长度是创建 chan 时指定的。
1.3.2. 发送
向一个 channel 中写数据简单过程如下:
- 如果等待接收队列 recvq 不为空, 说明缓冲区中没有数据或者没有缓冲区, 此时直接从 recvq 取出 G, 并把数据写入, 最后把该 G 唤醒, 结束发送过程;
- 如果缓冲区中有空余位置, 将数据写入缓冲区, 结束发送过程;
- 如果缓冲区中没有空余位置, 将待发送数据写入 G, 将当前 G 加入 sendq, 进入睡眠, 等待被读 goroutine 唤醒。
1.3.3. 接收
从一个 channel 读数据简单过程如下:
- 如果等待发送队列 sendq 不为空, 且没有缓冲区, 直接从 sendq 中取出 G, 把 G 中数据读出, 最后把 G 唤醒, 结束 读取过程;
- 如果等待发送队列 sendq 不为空, 此时说明缓冲区已满, 从缓冲区中首部读出数据, 把 G 中数据写入缓冲区尾部, 把 G 唤醒, 结束读取过程;
- 如果缓冲区中有数据, 则从缓冲区取出数据, 结束读取过程;
- 将当前 goroutine 加入 recvq, 进入睡眠, 等待被写 goroutine 唤醒。
1.3.4. 关闭
关闭 channel 时会把 recvq 中的 G 全部唤醒, 本该写入 G 的数据位置为 nil。把 sendq 中的 G 全部唤醒, 但这些 G 会 panic。
1.4. 核心知识
1.4.1. 发送
阻塞情况:
nil
ch := make(chan int, 2)ch = nilch <- 4 // all goroutines are asleep - deadlock!
- 无缓冲 channel + 读未 ready: 向无缓冲 channel 写数据, 如果读协程没有准备好, 会阻塞。
- 有缓冲 channel + 缓冲已满: 向有缓冲 channel 写数据, 如果缓冲已满, 会阻塞。
重要知识点:
- panic: closed 的 channel, 写数据会 panic。
ch := make(chan int, 2)
ch <- 4
close(ch)
ch <- 3 // panic: send on closed channel
- 资源回收: channel 使用完后, 需要 close 掉, 否则资源不会回收(包括 channel 资源, 以及 channel 里面存储的数据资源)。
- 数据交换: 就算是有缓冲的 channel , 也不是每次发送、接收都要经过缓存, 如果发送的时候, 刚好有等待接收的协程, 那么会直接交换数据。【这块我存在质疑, 待定! ! ! 】
1.4.2. 接收
阻塞情况:
- nil 阻塞: 从 nil 通道接收数据会被阻塞。
- 无缓冲 channel + 写未 ready: 从无缓冲 channel 读数据, 如果写协程没有准备好, 会阻塞。
- 有缓冲 channel + 缓冲为空: 从有缓冲 channel 读数据, 如果缓冲为空, 会阻塞。
重要知识点:
- 关闭 channel 数据接收: 从已关闭 channel 接收数据, 如果通道有数据, 会返回已缓冲数据; 如果没有数据, 会读到通道传输数据类型的零值, 比如指针类型, 读到 nil。(可以通过 x, ok:=<-c 中的 ok, 判断数据是否读取完毕)
c := make(chan int, 3)
c <- 11
c <- 12
close(c)
for i := 0; i < cap(c)+1; i++ {
x, ok := <-c
println(i, ":", ok, x)
}
// 输出
// 0: true 11
// 1: true 12
// 2: false 0
// 3: false 0
1.4.3. 关闭
重要知识点:
- close panic: 重复关闭, 或关闭 nil 通道会引发 panic。
ch := make(chan int, 2
ch <- 4close(ch)
close(ch) // panic: close of closed channel
- 多线程通道关闭原则: 由于 close 的 channel, 写数据是会 panic, 所以在多线程写入和读取时, 需要遵循"谁写入, 谁负责关闭"原则。(后面会通过完整的示例讲解该知识)
1.4.4. for-range 读取
我们常常会用 for-range 来读取 channel 的数据
ch := make(chan int, 1)
go func(ch chan int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}(ch)
for val := range ch {
fmt.Println(val)
}
重要知识点:
- 如果 channel 已经关闭, 它还是会继续执行, 直到所有值被取完, 然后退出执行;
- 如果 channel 没有关闭, 但是 channel 没有可读取的数据, 它则会阻塞在 range 这句位置, 直到被唤醒;
- 如果 channel 是 nil, 读取会被阻塞, 也就是会一直阻塞在 range 位置。
1.4.5. select
select 是跟 channel 关系最亲密的语句, 它是被专门设计出来处理通道的, 因为每个 case 后面跟的都是通道表达式, 可以是读, 也可以是写。下面看一个简单的示例:
// 准备好几个通道。
intChannels := [3]chan int{make(chan int, 1), make(chan int, 1), make(chan int, 1)}
// 随机选择一个通道, 并向它发送元素值。
index := rand.Intn(3)
fmt.Printf("The index: %d\n", index)
intChannels[index] <- index
// 哪一个通道中有可取的元素值, 哪个对应的分支就会被执行。
select {
case <-intChannels[0]:
fmt.Println("The first candidate case is selected.")
case <-intChannels[1]:
fmt.Println("The second candidate case is selected.")
case elem := <-intChannels[2]:
fmt.Printf("The third candidate case is selected, the element is %d.\n", elem)
default:
fmt.Println("No candidate case is selected!")
}
我们用一个包含了三个候选分支的 select 语句, 分别尝试从上述三个通道中接收元素值, 哪一个通道中有值, 哪一个对应的候选分支就会被执行。后面还有一个默认分支, 不过在这里它是不可能被选中的。在使用 select 语句的时候, 我们需要注意下面几个事情:
- 有 default 情况: select 只要有默认语句, 就不会被阻塞, 换句话说, 如果没有 default, 然后 case 又都不能读或者写, 则会被阻塞。
- 无 default 情况: 如果没有加入默认分支, 那么一旦所有的 case 表达式都没有满足求值条件, 那么 select 语句就会被阻塞。直到至少有一个 case 表达式满足条件为止。
- multi-valued assignment: select 不能够像 for-range 一样发现 channel 被关闭而终止执行, 我们可能会因为通道关闭了, 而直接从通道接收到一个其元素类型的零值。所以, 在很多时候, 我们需要通过接收表达式的第二个结果值来判断通道是否已经关闭。一旦发现某个通道关闭了, 我们就应该及时地屏蔽掉对应的分支或者采取其他措施。这对于程序逻辑和程序性能都是有好处的。
- select + for: select 语句只能对其中的每一个 case 表达式各求值一次。所以, 如果我们想连续或定时地操作其中的通道的话, 就往往需要通过在 for 语句中嵌入 select 语句的方式实现。但这时要注意, 简单地在 select 语句的分支中使用 break 语句, 只能结束当前的 select 语句的执行, 而并不会对外层的 for 语句产生作用。这种错误的用法可能会让这个 for 语句无休止地运行下去。
intChan := make(chan int, 1) // 一秒后关闭通道。
time.AfterFunc(time.Second, func() {
close(intChan)
})
select {
case _, ok := <-intChan:
if !ok {
fmt.Println("The candidate case is closed.")
break
}
fmt.Println("The candidate case is selected.")
}
- 随机选择 case: 如果同时有多个 case 足了条件, 会使用伪随机选择一个 case 来执行。
- 先全部扫描, 再选择: 每次 select 语句的执行, 是会扫码完所有的 case 后才确定如何执行, 而不是说遇到合适的 case 就直接执行了。
- nil 阻塞: nil 的 channel, 不管读写都会被阻塞。
上面的知识需要牢记, 面试常考, 下面是讲解 select 执行的流程:
- 对于每一个 case 表达式, 都至少会包含一个代表发送操作的发送表达式或者一个代表接收操作的接收表达式, 同时也可能会包含其他的表达式。比如, 如果 case 表达式是包含了接收表达式的短变量声明时, 那么在赋值符号左边的就可以是一个或两个表达式, 不过此处的表达式的结果必须是可以被赋值的。当这样的 case 表达式被求值时, 它包含的多个表达式总会以从左到右的顺序被求值。
- select 语句包含的候选分支中的 case 表达式都会在该语句执行开始时先被求值, 并且求值的顺序是依从代码编写的顺序从上到下的。结合上一条规则, 在 select 语句开始执行时, 排在最上边的候选分支中最左边的表达式会最先被求值, 然后是它右边的表达式。仅当最上边的候选分支中的所有表达式都被求值完毕后, 从上边数第二个候选分支中的表达式才会被求值, 顺序同样是从左到右, 然后是第三个候选分支、第四个候选分支, 以此类推。
- 对于每一个 case 表达式, 如果其中的发送表达式或者接收表达式在被求值时, 相应的操作正处于阻塞状态, 那么对该 case 表达式的求值就是不成功的。在这种情况下, 我们可以说, 这个 case 表达式所在的候选分支是不满足选择条件的。
- 仅当 select 语句中的所有 case 表达式都被求值完毕后, 它才会开始选择候选分支。这时候, 它只会挑选满足选择条件的候选分支执行。如果所有的候选分支都不满足选择条件, 那么默认分支就会被执行。如果这时没有默认分支, 那么 select 语句就会立即进入阻塞状态, 直到至少有一个候选分支满足选择条件为止。一旦有一个候选分支满足选择条件, select 语句(或者说它所在的 goroutine)就会被唤醒, 这个候选分支就会被执行。
- 如果 select 语句发现同时有多个候选分支满足选择条件, 那么它就会用一种伪随机的算法在这些分支中选择一个并执行。注意, 即使 select 语句是在被唤醒时发现的这种情况, 也会这样做。
- 一条 select 语句中只能够有一个默认分支。并且, 默认分支只在无候选分支可选时才会被执行, 这与它的编写位置无关。
- select 语句的每次执行, 包括 case 表达式求值和分支选择, 都是独立的。不过, 至于它的执行是否是并发安全的, 就要看其中的 case 表达式以及分支中, 是否包含并发不安全的代码了。
上面写的有些多, 简单总结一下: 执行 select 时, 会从左到右, 从上到下, 对每个 case 表达式求值, 当所有 case 求值完毕后, 会挑选满足的 case 执行, 如果有多条都满足, 就随机选择一条; 如果都没有满足, 就执行 default; 如果连 default 都没有, 就阻塞住, 等有满足条件的 case 出现时, 再执行。
1.5. 并发实例: 海外商城 Push
关于 channel, 零碎的知识点非常多, 我还是想通过一个完整的示例, 将这些知识点全部串起来, 下面就以海外商城 Push 为例, 将上面知识应用到实际场景中。
1.5.1. 示例介绍
海外商城需要对 W 个业务方发送 Push, 针对每个业务方, 为了提高 Push 的并发能力, 采用 N 个协程从 EMQ 中读取数据(EMQ 中都一个消息队列, 里面缓存了大量的 Push 数据), 数据读取后进行处理, 然后将处理后的数据写到 channel 中。同时, 服务有 M 个协程从 channel 中取出数据并消费, 然后通过小米 Push SDK, 给用户发送 Push。整体发送链路如下:
在看后面的内容前, 我先抛出几个问题:
- 生成者往关闭的 Channel 写数据, 会 Panic, 那么 Channel 该如何关闭呢?
- 当 Channel 关闭后(比如服务重启), 需要继续消费 Channel 里面的 Push, 该如何操作呢?
- 每消费一条 Channel 数据, 需要记录 Push 发送成功, 但是一条 Channel 数据包含 2-3 个 Push 内容(IOS/Android/PC), 程序记录 Push 成功前, 如何保证这 2-3 个 Push 都发送完毕了呢?
1.5.2. 初始化
初始化 channel 数组, 数组里面是每个业务方 appTypes 的 channel, channel 的缓存区大小为 30, 并启动 10 个消费者协程:
var (
messageChan map[string]chan *WorkMessage // channel
stopMasterChan chan bool // 消费者结束通知
appTypes = map[int32]string{1: "shop", 2: "bbs", 3: "sharesave"}
)
func initPushChannel() {
maxSize = 30 // channel 缓存区大小
workNum = 10 // goroutine 个数
stopMasterChan = make(chan bool)
messageChan = make(map[string]chan *WorkMessage)
for _, name := range appTypes {
workChan := make(chan *WorkMessage, maxSize)
messageChan[name] = workChan
for i := 0; i < workNum; i++ {
go startMaster(name, workChan) // 启动消费者协程
}
}
}
func startMaster(name string, workChan chan *WorkMessage) {
for {
if exit := dostartMaster(name, workChan); exit {
return
}
}
}
初始化 EMQ 的 Client, 并启动 10 个生产者协程:
var (
clientFactory client.ClientFactory // EMQ Client
stopChan chan bool // 生产者结束通知
)
func initEmq() { // 初始化 EMQ 的 Client 和单次读取数据条数, 该处代码省略。..
maxConsumerNum := 10
stopChan = make(chan bool)
for i := 0; i < maxConsumerNum; i++ {
go receiveMsg(i) // 启动生产者协程
}
}
func receiveMsg(queueID int) {
for {
if exit := doReceiveMsg(queueID); exit {
logz.Info("stop receive msg ...", logz.F("queueID", queueID))
return
}
}
}
主方法调用:
func InitWorker() { // 初始化 push SDK, 逻辑省略。..
initPushChannel() // 初始化 Channel, 启动消费者
initEmq() // 启动生产者
}
1.5.3. 发送
func doReceiveMsg(queueID int) bool {
defer func() {
if err := recover(); err != nil {
println("[panic] recover from error.")
}
}()
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
// 1. 从 EMQ 获取数据 List, 逻辑省略。..
// 2. 遍历 List, 获取业务类型, 逻辑省略。..
// 3. 根据业务类型, 获取对应的 channel
name := "sharesave" // 示例数据
pushChannel, _ := messageChan[name]
// 4. 构造 Push 数据, 然后放入 channel
pushData := &WorkMessage{AppLocal: "id", AppType: 1}
// 示例数据
pushChannel <- pushData
case <-stopChan:
println("stop to send data to channel.")
return true
}
}
}
这部分代码我做了大量简化, 这里主要做了 2 件事情:
- 通过 select + 定时器, 每隔 1S 就会从 EMQ 中获取数据, 然后将构造后的数据放入对应业务的 channel;
- 当收到 stopChan 事件时, 会通知所有的生产者协程, 退出 goroutine, 这里其实就是协程退出的方式之一。
1.5.4. 接收
func dostartMaster(name string, workChan chan *WorkMessage) bool {
defer func() {
if err := recover(); err != nil {
println("[panic] recover from error.")
}
}()
for {
select {
case t := <-workChan:
if t != nil {
for _, message := range t.PushMessages {
// 接受 channel 数据 t, 将数据推给 Push SDK
// 逻辑省略。..
}
}
case <-stopMasterChan:
println("stop to get data from channel.")
return true
}
}
}
这部分代码同样做了大量简化, 这里主要做了 2 件事情:
- 通过 select, 如果 channel 里面有数据, 直接读取, 然后给用户发送 Push;
- 当收到 stopMasterChan 事件时, 会通知所有的生产者协程, 退出 goroutine。
1.5.5. 关闭
// 通知生产者协程关闭, 协程不再写 channel
func stopRecvMsgFromQueue() {
close(stopChan)
}
// 通知消费者协程关闭, 协程不再读 channel, 并关闭 channel, 消费完 channel 中剩余消息
func stopPushChannel() {
close(stopMasterChan)
time.Sleep(time.Second)
for _, c := range messageChan {
close(c)
for msg := range c {
if msg != nil {
for _, message := range msg.PushMessages {
// 接受 channel 数据 t, 将数据推给 Push SDK
// 逻辑省略。..
}
}
}
}
}
// 主方法调用
func StopWorker() {
stopRecvMsgFromQueue()
time.Sleep(time.Second * 2)
stopPushChannel()
}
比如服务重启, 需要关闭协程时, 主要做以下事情:
- 执行 close(stopChan), 先通知生产者协程, 不再往 channel 里面写数据;
- 执行 close(stopMasterChan), 通知消费者协程, 不再从 channel 里面读取数据;
- 关闭数组 messageChan 的每个 channel;
- 继续读取 channel 中剩余的数据, 因为使用的是 for-range 方式, 所以当 channel 里面所有的数据读取完毕后, for-range 会自动退出。
这里有两个地方 sleep 了一下, 分别有以下作用:
- 调用 stopPushChannel() 前 sleep: 关闭生成者后, 消费者继续消费剩余的数据;
1.6. 总结
本章基本都是干货, 上面总结的比较全面, 这里就不再重复了, 如果你能回答我提的这些问题, 你应该就掌握了本章的内容:
- 发送和接收时, 分别有哪些情况会导致 channel 阻塞呢?
- 对于发送和关闭 channel, 有哪些情况会导致 panic 呢?
- 当 channel 关闭后, 继续读取里面的数据, 能读取到么? 如何保证数据读取完毕呢?
- 对于生产者和消费者模型, 如何才能优雅关闭 channel, 避免写 channel 导致的 panic 呢?
- for-range 读取 channel 数据, 对于 channel 关闭和未关闭的情况, 是如何处理的呢? 会存在阻塞情况么?
- 使用 select 时, 有哪些注意事项呢? 你知道 select 执行的流程么?
最后就是 Push 的并发示例, 强烈建议大家能掌握, 掌握了这个示例, 后续你应该也能很容易通过 channel 实现数据共享, 并结合 goroutine 写出你自己的高并发程序。
1.7. 有缓冲 channel 和无缓冲 channel
ch1:=make(chan int)// 无缓冲
ch2:=make(chan int,1)// 有缓冲
无缓冲: 当向 ch1 中存值后需要其他协程取值, 否则一直阻塞
有缓冲: 不会阻塞, 因为缓冲大小是 1, 只有当放第二个值的时候, 第一个还没被人拿走, 才会阻塞。
1.8. 测试
1.8.1. 测试 1: 声明无缓冲channel
func Test_1(t *testing.T) {
// 无缓冲
ch := make(chan int)
// fatal error: all goroutines are asleep - deadlock! 协程阻塞,需要另一个协程取走channel中的值
ch <- 1
}
1.8.2. 测试 2: 开启协程取值
func Test_2(t *testing.T) {
// 无缓冲
ch := make(chan int)
go func() {
// 睡眠1秒,等待主协程在channel写入值
time.Sleep(time.Second * 1)
fmt.Println("开始取值。。。")
<-ch
}()
fmt.Println("开始存值。。。")
ch <- 1
time.Sleep(time.Second * 5)
fmt.Println("结束。。。")
}
运行结果:
=== RUN Test_2
开始存值。。。
开始取值。。。
结束。。。
--- PASS: Test_2 (6.00s)
PASS
1.8.3. 测试 3: 声明有缓冲 channel
func Test_3(t *testing.T) {
// 有缓冲
ch1 := make(chan int, 1)
// 缓冲大小为1 即是当channel中有一个值时不会被阻塞,当再塞入一个时前一个没被其他协程取走才会阻塞
ch1 <- 2
// 此时主协程也可取出值
fmt.Println(<-ch1)
}
运行结果:
=== RUN Test_3
2
--- PASS: Test_3 (0.00s)
PASS
1.8.4. 测试 4: 存入超过缓冲数量的值
func Test_4(t *testing.T) {
// 有缓冲
ch1 := make(chan int, 1)
// 缓冲大小为1 即是当channel中有一个值时不会被阻塞,当再塞入一个时前一个没被其他携程取走才会阻塞
ch1 <- 1
// fatal error: all goroutines are asleep - deadlock!
ch1 <- 2
}