在阅读本文之前,请先阅读通道一文。 那篇文章详细地解释了通道类型和通道值,以及各种通道操作的规则细节。 一个Go新手程序员可能需要反复多次阅读那篇文章和当前这篇文章来精通Go通道编程。
本文余下的内容将展示很多通道用例。 希望这篇文章能够说服你接收下面的观点:
将通道用做future/promise
很多其它流行语言支持future/promise来实现异步(并发)编程。 Future/promise常常用在请求/回应场合。
返回单向接收通道做为函数返回结果
sumSquares
将单向发送通道类型用做函数实参
sumSquareslongTimeRequest
对于上面这个特定的例子,我们可以只使用一个通道来接收回应结果,因为两个参数的作用是对等的。
这可以看作是后面将要提到的数据聚合的一个应用。
采用最快回应
本用例可以看作是上例中只使用一个通道变种的增强。
有时候,一份数据可能同时从多个数据源获取。这些数据源将返回相同的数据。 因为各种因素,这些数据源的回应速度参差不一,甚至某个特定数据源的多次回应速度之间也可能相差很大。 同时从多个数据源获取一份相同的数据可以有效保障低延迟。我们只需采用最快的回应并舍弃其它较慢回应。
注意:如果有N个数据源,为了防止被舍弃的回应对应的协程永久阻塞,则传输数据用的通道必须为一个容量至少为N-1的缓冲通道。
“采用最快回应”用例还有一些其它实现方式,本文后面将会谈及。
更多“请求/回应”用例变种
做为函数参数和返回结果使用的通道可以是缓冲的,从而使得请求协程不需阻塞到它所发送的数据被接收为止。
struct{v T; err error}
有时,一个请求可能需要比预期更长的用时才能回应,甚至永远都得不到回应。 我们可以使用本文后面将要介绍的超时机制来应对这样的情况。
有时,回应方可能会不断地返回一系列值,这也同时属于后面将要介绍的数据流的一个用例。
使用通道实现通知
struct{}
向一个通道发送一个值来实现单对单通知
我们已知道,如果一个通道中无值可接收,则此通道上的下一个接收操作将阻塞到另一个协程发送一个值到此通道为止。 所以一个协程可以向此通道发送一个值来通知另一个等待着从此通道接收数据的协程。
done
从一个通道接收一个值来实现单对单通知
如果一个通道的数据缓冲队列已满(非缓冲的通道的数据缓冲队列总是满的)但它的发送协程队列为空,则向此通道发送一个值将阻塞,直到另外一个协程从此通道接收一个值为止。 所以我们可以通过从一个通道接收数据来实现单对单通知。一般我们使用非缓冲通道来实现这样的通知。
这种通知方式不如上例中介绍的方式使用得广泛。
另一个事实是,上面的两种单对单通知方式其实并没有本质的区别。 它们都可以被概括为较快者等待较慢者发出通知。
多对单和单对多通知
略微扩展一下上面两个用例,我们可以很轻松地实现多对单和单对多通知。
sync.WaitGroup
通过关闭一个通道来实现群发通知
上一个用例中的单对多通知实现在实践中很少用,因为通过关闭一个通道的方式在来实现单对多通知的方式更简单。 我们已经知道,从一个已关闭的通道可以接收到无穷个值,我们可以利用这一特性来实现群发通知。
ready <- struct{}{}close(ready)
当然,我们也可以通过关闭一个通道来实现单对单通知。事实上,关闭通道来是实践中用得最多通知实现方式。
从一个已关闭的通道可以接收到无穷个值这一特性也将被用在很多其它在后面将要介绍的用例中。
定时通知(timer)
用通道实现一个一次性的定时通知器是很简单的。 下面是一个自定义实现:
timeAfterAfterDurationtime.After
<-time.After(aDuration)time.Sleep(aDuration)
<-time.After(aDuration)
将通道用做互斥锁(mutex)
sync
有两种方式将一个容量为1的缓冲通道用做互斥锁:
- 通过发送操作来加锁,通过接收操作来解锁;
- 通过接收操作来加锁,通过发送操作来解锁。
下面是一个通过发送操作来加锁的例子。
下面是一个通过接收操作来加锁的例子,其中只显示了相对于上例而修改了的部分。
将通道用做计数信号量(counting semaphore)
NN
计数信号量经常被使用于限制最大并发数。
和将通道用做互斥锁一样,也有两种方式用来获取一个用做计数信号量的通道的一份所有权。
- 通过发送操作来获取所有权,通过接收操作来释放所有权;
- 通过接收操作来获取所有权,通过发送操作来释放所有权。
下面是一个通过接收操作来获取所有权的例子:
在上例中,只有获得一个座位的顾客才能开始饮酒。 所以在任一时刻同时在喝酒的顾客数不会超过座位数10。
mainfor
在上例中,尽管在任一时刻同时在喝酒的顾客数不会超过座位数10,但是在某一时刻可能有多于10个顾客进入了酒吧,因为某些顾客在排队等位子。 在上例中,每个顾客对应着一个协程。虽然协程的开销比系统线程小得多,但是如果协程的数量很多,则它们的总体开销还是不能忽略不计的。 所以,最好当有空位的时候才创建顾客协程。
在上面这个修改后的例子中,在任一时刻最多只有10个顾客协程在运行。
通过发送操作来获取所有权的实现相对简单一些,省去了摆放座位的步骤。
对话(或称乒乓)
两个协程可以通过一个通道进行对话,整个过程宛如打乒乓球一样。 下面是一个这样的例子,它将打印出一系列斐波那契(Fibonacci)数。
使用通道传送传输通道
chan<- intchan chan<- int
尽管对于上面这个用例来说,使用通道传送传输通道这种方式并非是最有效的实现方式,但是这种方式肯定有最适合它的用武之地。
检查通道的长度和容量
caplencaplenlen
但有时确实有一些场景需要调用这两个函数。比如,有时一个协程欲将一个未关闭的并且不会再向其中发送数据的缓冲通道中的所有数据接收出来,在确保只有此一个协程从此通道接收数据的情况下,我们可以用下面的代码来实现之:
我们也可以用本文后面将要介绍的尝试接收机制来实现这一需求。两者的运行效率差不多,但尝试接收机制的优点是多个协程可以并发地进行读取操作。
有时一个协程欲将一个缓冲通道写满而又不阻塞,在确保只有此一个协程向此通道发送数据的情况下,我们可以用下面的代码实现这一目的:
当然,我们也可以使用后面将要介绍的尝试发送机制来实现这一需求。
使当前协程永久阻塞
Go中的选择机制(select)是一个非常独特的特性。它给并发编程带来了很多新的模式和技巧。
selectselectfor {time.Sleep(time.Second)}select{}
select{}
一个例子:
select{}
尝试发送和尝试接收
defaultcaseselectcase
caseselectcasedefaultcasecaseselectcasedefaultcase
尝试发送和尝试接收代码块永不阻塞。
caseselect
下例演示了尝试发送和尝试接收代码块的工作原理。
输出结果:
后面的很多用例还要用到尝试发送和尝试接收代码块。
无阻塞地检查一个通道是否已经关闭
假设我们可以保证没有任何协程会向一个通道发送数据,则我们可以使用下面的代码来(并发安全地)检查此通道是否已经关闭,此检查不会阻塞当前协程。
另一种“采用最快回应”的实现方式
在上面的“采用最快回应”用例一节已经提到,我们也可以使用选择机制来实现“采用最快回应”用例。 每个数据源协程只需使用一个缓冲为1的通道并向其尝试发送回应数据即可。示例代码如下:
注意,使用选择机制来实现“采用最快回应”的代码中使用的通道的容量必须至少为1,以保证最快回应总能够发送成功。 否则,如果数据请求者因为种种原因未及时准备好接收,则所有回应者的尝试发送都将失败,从而所有回应的数据都将被错过。
第三种“采用最快回应”的实现方式
select
casesource
本小节和上一小节中展示的两种方法也可以用来实现多对单通知。
超时机制(timeout)
在一些请求/回应用例中,一个请求可能因为种种原因导致需要超出预期的时长才能得到回应,有时甚至永远得不到回应。 对于这样的情形,我们可以使用一个超时方案给请求者返回一个错误信息。 使用选择机制可以很轻松地实现这样的一个超时方案。
下面这个例子展示了如何实现一个支持超时设置的请求:
脉搏器(ticker)
我们可以使用尝试发送操作来实现一个每隔一定时间发送一个信号的脉搏器。
timeTick
速率限制(rate limiting)
上面已经展示了如何使用尝试发送实现峰值限制。 同样地,我们也可以使用使用尝试机制来实现速率限制,但需要前面刚提到的定时器实现的配合。 速率限制常用来限制吞吐和确保在一段时间内的资源使用不会超标。
下面的例子借鉴了官方Go维基中的例子。 在此例中,任何一分钟时段内处理的请求数不会超过200。
time.Sleep
开关
selectcaseselect
casecase
1212...
控制代码被执行的几率
selectcase
一个例子:
fg
从动态数量的分支中选择
selectreflectselectselectselectselect
reflectTrySendTryRecv
数据流操纵
本节将介绍一些使用通道进行数据流处理的用例。
一般来说,一个数据流处理程序由多个模块组成。不同的模块执行分配给它们的不同的任务。 每个模块由一个或者数个并行工作的协程组成。实践中常见的工作任务包括:
- 数据生成/搜集/加载;
- 数据服务/存盘;
- 数据计算/处理;
- 数据验证/过滤;
- 数据聚合/分流;
- 数据组合/拆分;
- 数据复制/增殖;
- 等等。
一个模块中的工作协程从一些其它模块接收数据做为输入,并向另一些模块发送输出数据。 换句话数,一个模块可能同时兼任数据消费者和数据产生者的角色。
多个模块一起组成了一个数据流处理系统。
下面将展示一些模块工作协程的实现。这些实现仅仅是为了解释目的,所以它们都很简单,并且它们可能并不高效。
数据生成/搜集/加载
一个数据产生者可能通过以下途径生成数据:
- 加载一个文件、或者读取一个数据库、或者用爬虫抓取网页数据;
- 从一个软件或者硬件系统搜集各种数据;
- 产生一系列随机数;
- 等等。
这里,我们使用一个随机数产生器做为一个数据产生者的例子。 此数据产生者函数没有输入,只有输出。
事实上,此随机数产生器是一个多返回值的future/promise。
一个数据产生者可以在任何时刻关闭返回的通道以结束数据生成。
数据聚合
int64
一个更完美的实现需要考虑一个输入数据流是否已经关闭。(下面要介绍的其它工作协程同理。)
select
数据分流
数据分流是数据聚合的逆过程。数据分流的实现很简单,但在实践中用的并不多。
数据合成
数据合成将多个数据流中读取的数据合成一个。
uint64uint64
数据分解
数据分解是数据合成的逆过程。一个数据分解者从一个通道读取一份数据,并将此数据分解为多份数据。 这里就不举例了。
数据复制/增殖
数据复制(增殖)可以看作是特殊的数据分解。一份输入数据将被复制多份并输出给多个数据流。
一个例子:
数据计算/分析
数据计算和数据分析模块的功能因具体程序不同而有很大的差异。 一般来说,数据分析者接收一份数据并对之加工处理后转换为另一份数据。
uint64
数据验证/过滤
一个数据验证或过滤者的任务是检查输入数据的合理性并抛弃不合理的数据。 比如,下面的工作者协程将抛弃所有的非素数。
请注意这两个函数版本分别被本文下面最后展示的两个例子所使用。
数据服务/存盘
一般,一个数据服务或者存盘模块为一个数据流系统中的最后一个模块。 这里的实现值是简单地将数据输出到终端。
组装数据流系统
现在,让我们使用上面的模块工作者函数实现来组装一些数据流系统。 组装数据流仅仅是创建一些工作者协程函数调用,并为这些调用指定输入数据流和输出数据流。
数据流系统例子1(一个流线型系统)
上面这个流线型系统描绘在下图中:
数据流系统例子2(一个单向无环图系统):
上面这个单向无环图系统描绘在下图中:
更复杂的数据流系统可以表示为任何拓扑结构的图。比如一个复杂的数据流系统可能有多个输出模块。 但是有环拓扑结构的数据流系统在实践中很少用。
从上面两个例子可以看出,使用通道来构建数据流系统是很简单和直观的。
从上例可以看出,通过使用数据聚合模块,我们可以很轻松地实现各个模块的工作协程数量的扇入(fan-in)和扇出(fan-out)。
事实上,我们也可以使用一个简单的通道来代替数据聚合模块的角色。比如,下面的代码使用两个通道代替了上例中的两个数据聚合器。
修改后的数据流的拓扑结构如下图所示:
上面的代码示例并没有太多考虑如何关闭一个数据流。请阅读此篇文章来了解如何优雅地关闭通道。