GoLang协程与通道---中
协程的同步:关闭通道-测试阻塞的通道通道可以被显式的关闭;尽管它们和文件不同:不必每次都关闭。只有在当需要告诉接收者不会再提供新的值的时候,才需要关闭通道。只有发送者需要关闭通道,接收者永远不会需要。
继续看示例 goroutine2.go:我们如何在通道的 sendData() 完成的时候发送一个信号,getData() 又如何检测到通道是否关闭或阻塞?
第一个可以通过函数 close(ch) 来完成:这个将通道标记为无法通过发送操作 <- 接受更多的值;给已经关闭的通道发送或者再次关闭都会导致运行时的 panic。在创建一个通道后使用 defer 语句是个不错的办法(类似这种情况):
ch := make(chan float64)
defer close(ch)
第二个问题可以使用逗号,ok 操作符:用来检测通道是否被关闭。
如何来检测可以收到没有被阻塞(或者通道没有被关闭)?
v, ok := <-ch // ok is true if v received value
通常和 if 语句一起使用:
if v, ok := <-ch; ok {
process(v)
}
或者在 for 循环中接收的时候,当关闭或者阻塞的时候使用 break:
v, ok := <-ch
if !ok {
break
}
process(v)
在示例程序中使用这些可以改进为版本 goroutine3.go,输出相同。
实现非阻塞通道的读取,需要使用 select。
package main
import "fmt"
func main() {
ch := make(chan string)
go sendData(ch)
getData(ch)
}
func sendData(ch chan string) {
ch <- "Washington"
ch <- "Tripoli"
ch <- "London"
ch <- "Beijing"
ch <- "Tokio"
close(ch)
}
func getData(ch chan string) {
for {
input, open := <-ch
if !open {
break
}
fmt.Printf("%s ", input)
}
}
改变了以下代码:
现在只有 sendData() 是协程,getData() 和 main() 在同一个线程中:
go sendData(ch)
getData(ch)
在 sendData() 函数的最后,关闭了通道:
func sendData(ch chan string) {
ch <- "Washington"
ch <- "Tripoli"
ch <- "London"
ch <- "Beijing"
ch <- "Tokio"
close(ch)
}
在 for 循环的 getData() 中,在每次接收通道的数据之前都使用 if !open 来检测:
for {
input, open := <-ch
if !open {
break
}
fmt.Printf("%s ", input)
}
使用 for-range 语句来读取通道是更好的办法,因为这会自动检测通道是否关闭:
for input := range ch {
process(input)
}
关于通道的关闭的小结
- 只有接收方goroutine所有的数据都发送完毕后才会关闭
- 通道是种类型,是可以被垃圾回收机制回收的;通道的关闭不是必须的
- 对一个关闭的通道再发送值就会导致panic
- 对一个关闭的通道进行接收会一直获取值直到通道为空。
- 对一个关闭的并且没有值的通道执行接收操作,会得到对应类型的零值。
- 关闭一个已经关闭的通道会导致panic。
阻塞和生产者-消费者模式:
在通道迭代器中,两个协程经常是一个阻塞另外一个。如果程序工作在多核心的机器上,大部分时间只用到了一个处理器。可以通过使用带缓冲(缓冲空间大于 0)的通道来改善。比如,缓冲大小为 100,迭代器在阻塞之前,至少可以从容器获得 100 个元素。如果消费者协程在独立的内核运行,就有可能让协程不会出现阻塞。
由于容器中元素的数量通常是已知的,需要让通道有足够的容量放置所有的元素。这样,迭代器就不会阻塞(尽管消费者协程仍然可能阻塞)。然而,这实际上加倍了迭代容器所需要的内存使用量,所以通道的容量需要限制一下最大值。记录运行时间和性能测试可以帮助你找到最小的缓存容量带来最好的性能。
使用 select 切换协程从不同的并发执行的协程中获取值可以通过关键字select来完成,它和switch控制语句非常相似也被称作通信开关;它的行为像是“你准备好了吗”的轮询机制;select监听进入通道的数据,也可以是用通道发送值的时候。
select {
case u:= <- ch1:
...
case v:= <- ch2:
...
...
default: // no value ready to be received
...
}
defaultfallthroughcasebreakreturnselect
select
defaultdefault
selectdefaultdefaultselect
selectbreak
实例演示:
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go pump1(ch1)
go pump2(ch2)
go suck(ch1, ch2)
time.Sleep(1e9)
}
func pump1(ch chan int) {
for i := 0; ; i++ {
ch <- i * 2
}
}
func pump2(ch chan int) {
for i := 0; ; i++ {
ch <- i + 5
}
}
func suck(ch1, ch2 chan int) {
for {
select {
case v := <-ch1:
fmt.Printf("Received on channel 1: %d\n", v)
case v := <-ch2:
fmt.Printf("Received on channel 2: %d\n", v)
}
}
}
输出:
Received on channel 2: 5
Received on channel 2: 6
Received on channel 1: 0
Received on channel 2: 7
Received on channel 2: 8
Received on channel 2: 9
Received on channel 2: 10
Received on channel 1: 2
Received on channel 2: 11
...
Received on channel 2: 47404
Received on channel 1: 94346
Received on channel 1: 94348
注意:
channel的零值是nil。也许会让你觉得比较奇怪,nil的channel有时候也是有一些用处的。因为对一个nil的channel发送和接收操作会永远阻塞,在select语句中操作nil的channel永远都不会被select到。
通道、超时和计时器(Ticker)time 包中有一些有趣的功能可以和通道组合使用。
其中就包含了 time.Ticker 结构体,这个对象以指定的时间间隔重复的向通道 C 发送时间值:
type Ticker struct {
C <-chan Time // the channel on which the ticks are delivered.
// contains filtered or unexported fields
...
}
time.NewTickerDurationfunc NewTicker(dur) *Ticker
在协程周期性的执行一些事情(打印状态日志,输出,计算等等)的时候非常有用。
Stop()deferselect
ticker := time.NewTicker(updateInterval)
defer ticker.Stop()
...
select {
case u:= <-ch1:
...
case v:= <-ch2:
...
case <-ticker.C:
logState(status) // call some logging function logState
default: // no value ready to be received
...
}
time.Tick()Tick(d Duration) <-chan Timedd
import "time"
rate_per_sec := 10
var dur Duration = 1e9 / rate_per_sec
chRate := time.Tick(dur) // a tick every 1/10th of a second
for req := range requests {
<- chRate // rate limit our Service.Method RPC calls
go client.Call("Service.Method", req, ...)
}
这样只会按照指定频率处理请求:chRate 阻塞了更高的频率。每秒处理的频率可以根据机器负载(和/或)资源的情况而增加或减少。
定时器(Timer)结构体看上去和计时器(Ticker)结构体的确很像(构造为 NewTimer(d Duration)),但是它只发送一次时间,在 Dration d 之后。
还有 time.After(d) 函数,声明如下:
func After(d Duration) <-chan Time
在 Duration d 之后,当前时间被发到返回的通道;所以它和 NewTimer(d).C 是等价的;它类似 Tick(),但是 After() 只发送一次时间。下边有个很具体的示例,很好的阐明了 select 中 default 的作用:
package main
import (
"fmt"
"time"
)
func main() {
tick := time.Tick(1e8)
boom := time.After(5e8)
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return
default:
fmt.Println(" .")
time.Sleep(5e7)
}
}
}
输出:
.
.
tick.
.
.
tick.
.
.
tick.
.
.
tick.
.
.
tick.
BOOM!
习惯用法:简单超时模式
要从通道 ch 中接收数据,但是最多等待1秒。先创建一个信号通道,然后启动一个 lambda 协程,协程在给通道发送数据之前是休眠的:
timeout := make(chan bool, 1)
go func() {
time.Sleep(1e9) // one second
timeout <- true
}()
然后使用 select 语句接收 ch 或者 timeout 的数据:如果 ch 在 1 秒内没有收到数据,就选择到了 time 分支并放弃了 ch 的读取。
select {
case <-ch:
// a read from ch has occured
case <-timeout:
// the read from ch has timed out
break
}
第二种形式:取消耗时很长的同步调用
也可以使用 time.After() 函数替换 timeout-channel。可以在 select 中通过 time.After() 发送的超时信号来停止协程的执行。以下代码,在 timeoutNs 纳秒后执行 select 的 timeout 分支后,执行client.Call 的协程也随之结束,不会给通道 ch 返回值:
ch := make(chan error, 1)
go func() { ch <- client.Call("Service.Method", args, &reply) } ()
select {
case resp := <-ch
// use resp and reply
case <-time.After(timeoutNs):
// call timed out
break
}
注意缓冲大小设置为 1 是必要的,可以避免协程死锁以及确保超时的通道可以被垃圾回收。此外,需要注意在有多个 case 符合条件时, select 对 case 的选择是伪随机的,如果上面的代码稍作修改如下,则 select 语句可能不会在定时器超时信号到来时立刻选中 time.After(timeoutNs) 对应的 case,因此协程可能不会严格按照定时器设置的时间结束。
ch := make(chan int, 1)
go func() { for { ch <- 1 } } ()
L:
for {
select {
case <-ch:
// do something
case <-time.After(timeoutNs):
// call timed out
break L
}
}
第三种形式:假设程序从多个复制的数据库同时读取。只需要一个答案,需要接收首先到达的答案,Query 函数获取数据库的连接切片并请求。并行请求每一个数据库并返回收到的第一个响应:
func Query(conns []Conn, query string) Result {
ch := make(chan Result, 1)
for _, conn := range conns {
go func(c Conn) {
select {
case ch <- c.DoQuery(query):
default:
}
}(conn)
}
return <- ch
}
再次声明,结果通道 ch 必须是带缓冲的:以保证第一个发送进来的数据有地方可以存放,确保放入的首个数据总会成功,所以第一个到达的值会被获取而与执行的顺序无关。正在执行的协程可以总是可以使用 runtime.Goexit() 来停止。
在应用中缓存数据:
应用程序中用到了来自数据库(或者常见的数据存储)的数据时,经常会把数据缓存到内存中,因为从数据库中获取数据的操作代价很高;如果数据库中的值不发生变化就没有问题。但是如果值有变化,我们需要一个机制来周期性的从数据库重新读取这些值:缓存的值就不可用(过期)了,而且我们也不希望用户看到陈旧的数据。
协程和恢复(recover)一个用到 recover 的程序停掉了服务器内部一个失败的协程而不影响其他协程的工作。
func server(workChan <-chan *Work) {
for work := range workChan {
go safelyDo(work) // start the goroutine for that work
}
}
func safelyDo(work *Work) {
defer func() {
if err := recover(); err != nil {
log.Printf("Work failed with %s in %v", err, work)
}
}()
do(work)
}
上边的代码,如果 do(work) 发生 panic,错误会被记录且协程会退出并释放,而其他协程不受影响。
因为 recover 总是返回 nil,除非直接在 defer 修饰的函数中调用,defer 修饰的代码可以调用那些自身可以使用 panic 和 recover 避免失败的库例程(库函数)。举例,safelyDo() 中 defer 修饰的函数可能在调用 recover 之前就调用了一个 logging 函数,panicking 状态不会影响 logging 代码的运行。因为加入了恢复模式,函数 do(以及它调用的任何东西)可以通过调用 panic 来摆脱不好的情况。但是恢复是在 panicking 的协程内部的:不能被另外一个协程恢复。