通道声明和初始化
chan
var ch chan int // 声明一个通道类型变量 ch,并且通道中只能传递 int 类型数据
我们还可以通过如下方式声明通道数组、切片、字典,以下声明方式表示 chs 中的元素都是 chan int 类型的通道:
var chs [10]chan int
var chs []chan int
var chs map[string]chan int
不过,实际编码时,我们更多使用的是下面这种快捷方式同时声明和初始化通道类型:
ch := make(chan int)
由于在 Go 语言中,通道也是引用类型(和切片、字典一样),所以可以通过 make 函数进行初始化,在通过 make 函数初始化通道时,还可以传递第二个参数,表示通道的容量:
ch := make(chan int, 10)
ps:
chs := make([]chan int, 10) //初始化的是切片,而不是通道,只是切片中的元素类型是通道,这个时候第二个参数是切片的初始容量,而不是通道的。
通道操作符
通道类型变量只支持发送和接收操作,即往通道中写入数据和从通道中读取数据,对应的操作符都是 <-,我们判断是发送还是接收操作的依据是通道类型变量位于 <- 左侧还是右侧,位于左侧是发送操作,位于右侧是接收操作:
ch <- 1 // 往通道中写入数据 1
x := <- ch // 从通道中读取数据并赋值给变量
当我们将数据发送到通道时,发送的是数据的副本,同理,从通道中接收数据时,接收的也是数据的副本。
发送和接收数据都是原子操作,同时只能进行发送或者接收操作,不存在数据发送一半被接收,或者接收一半发送新数据的情况,并且两者都是阻塞的。如果通道中没有数据,进行读取操作的话会导致读取操作所在的协程阻塞,直到通道中写入了数据;反过来,如果通道中已经有了数据,再往里面写入数据的话,也会导致写入操作所在的协程阻塞,直到其中的数据被其他协程接收。
化的通道,则其缓冲区大小是 10,这意味着,在没有被任何其他协程接收的情况下,我们可以一直往 ch 通道中写入 10 个数据,超过 10 个数据才会阻塞当前协程,直到通道被其他协程读取,显然,合理设置缓冲区可以提高通道的操作效率,尤其是在需要持续传输大量数据的场景。
我们可以通过如下示例代码简单测试下通道的缓冲机制:
package main
import (
"fmt"
"time"
)
func test(ch chan int) {
for i := 0; i < 100; i++ {
ch <- i
}
close(ch)
}
func main() {
start := time.Now()
ch := make(chan int, 20)
go test(ch)
for i := range ch {
fmt.Println("接收到的数据:", i)
}
end := time.Now()
consume := end.Sub(start).Seconds()
fmt.Println("程序执行耗时(s):", consume)
}
close(ch)fatal error: all goroutines are asleep - deadlock!
关闭通道的操作只能执行一次,试图关闭已关闭的通道会引发 panic。此外,关闭chan的操作只能在发送数据的一方关闭,如果在接收一方关闭,会panic,因为接收方不知道发送方什么时候执行完毕,向一个已经关闭的chan发送数据会导致panic
回到主协程,我们通过 i := range ch 循环从通道中读取数据,并将其打印出来。当通道关闭后会退出循环。
单向通道及其使用
通常,管道都是支持双向操作的:既可以往管道发送数据,也可以从管道接收数据。但在某些场景下,可能我们需要限制只能往管道发送数据,或者只能从管道接收数据,这个时候,就需要用到单向通道。
不过,这里我们需要澄清一下,通道本身还是要支持读写的,如果某个通道只支持写入操作,那么即便数据写进去了,不能被读取也毫无意义,同理,如果某个通道只支持读取操作,不能写入数据,那么通道永远是空的,从一个空的通道读取数据会导致协程的阻塞,无法执行后续代码。
因此,Go 语言支持的单向管道,实际上是在使用层面对通道进行限制,而不是语法层面:即我们在某个协程中只能对通道进行写入操作,而在另一个协程中只能对该通道进行读取操作。从这个层面来说,单向通道的作用是约束在生产协程中只能发送数据到通道,而在消费协程中只能从通道接收数据,从而让代码遵循「最小权限原则」,避免误操作和通道使用的混乱,让代码更加稳健。
下面我们就来看看如何在 Go 协程之间实现单向通道的约束。
当我们将一个通道类型变量传递到一个函数时(通常是在另外一个协程中执行),如果这个函数只能发送数据到通道,可以通过如下将其指定为单向只写通道(发送通道):
func test(ch chan<- int)
反过来,如果我们将一个通道类型变量传递到一个只允许从该通道读取数据的函数,可以通过如下方式将通道指定为单向只读通道(接收通道):
func test(ch <-chan int)
声明:
var ch1 chan int
var ch2 chan<- int
var ch3 <-chan int
单向通道的初始化和双向通道一样:
ch1 := make(chan int)
ch2 := make(chan<- int)
ch3 := make(<-chan int)
此外,我们还可以通过如下方式实现双向通道和单向通道的转化:
ch1 := make(chan int)
ch2 := <-chan int(ch1)
ch3 := chan<- int(ch1)
基于双向通道 ch1,我们通过类型转化初始化了两个单向通道:单向只读的 ch2 和单向只写的 ch3。注意这个转化是不可逆的,双向通道可以转化为任意类型的单向通道,但单向通道不能转化为双向通道,读写通道之间也不能相互转化。
实际上,我们在将双向通道传递到限定通道参数操作类型的函数时,就应用到了类型转化。
package main
import (
"fmt"
"time"
)
// 子协程对通道的单向写入操作
func test(ch chan<- int) {
for i := 0; i < 100; i++ {
ch <- i
}
close(ch)
}
func main() {
start := time.Now()
ch := make(chan int, 20)
go test(ch)
for i := range ch {
fmt.Println("接收到的数据:", i)
}
end := time.Now()
consume := end.Sub(start).Seconds()
fmt.Println("程序执行耗时(s):", consume)
}
test(ch <-chan int)
# command-line-arguments
./channel3.go:10:12: invalid operation: ch <- i (send to receive-only type <-chan int)
./channel3.go:12:10: invalid operation: close(ch) (cannot close receive-only channel)
提示传入的通道是只读通道(receive-only channel),不能进行写入操作,此外,关闭通道函数 close 也不能作用到只读通道。
ch := make(chan<- int)
我们也可以定义一个返回值类型为单向只读通道的函数,以便得到该返回值的代码只能从通道中接收数据:
func test() <-chan int {
ch := make(chan int, 20)
for i := 0; i < 100; i++ {
ch <- i
}
close(ch)
return ch
}
通过 select 语句等待通道就绪
selectselectswitchselectcase
select {
case <-chan1:
// 如果从 chan1 通道成功接收数据,则执行该分支代码
case chan2 <- 1:
// 如果成功向 chan2 通道成功发送数据,则执行该分支代码
default:
// 如果上面都没有成功,则进入 default 分支处理流程
}
Go 语言的 select 语句借鉴自 Unix 的 select() 函数,在 Unix 中,可以通过调用 select() 函数来监控一系列的文件句柄,一旦其中一个文件句柄发生了 IO 动作,该 select() 调用就会被返回(C 语言中就是这么做的),后来该机制也被用于实现高并发的 Socket 服务器程序。Go 语言直接在语言级别支持 select 关键字,用于处理并发编程中通道之间异步 IO 通信问题。
可以看出,select 不像 switch,case 后面并不带判断条件,而是直接去查看 case 语句,每个 case 语句都必须是一个面向通道的操作,比如上面的示例代码中,第一个 case 试图从 chan1 接收数据并直接忽略读到的数据,第二个 case 试图向 chan2 通道发送一个整型数据 1,需要注意的是这两个 case 的执行不是 if…else… 那种先后关系,而是会并发执行,然后 select 会选择先操作成功返回的那个 case 分支去执行,如果两者同时返回,则随机选择一个执行,如果这两者都没有返回,则进入 default 分支,这里也不会出现阻塞,如果 chan1 通道为空,或者 chan2 通道已满,就会立即进入 default 分支,但是如果没有 default 语句,则会阻塞直到某个通道操作成功。
package main
import (
"fmt"
"math/rand"
)
func main() {
chs := [3]chan int{
make(chan int, 1),
make(chan int, 1),
make(chan int, 1),
}
index := rand.Intn(5) // 随机生成0-4之间的数字
fmt.Printf("随机索引/数值: %d\n", index)
if(index < 3){
chs[index] <- index // 向通道发送随机数字
}
// 哪一个通道中有值,哪个对应的分支就会被执行
select {
case <- chs[0]:
fmt.Println("第一个条件分支被选中")
case <- chs[1]:
fmt.Println("第二个条件分支被选中")
case num := <- chs[2]:
fmt.Println("第三个条件分支被选中:", num)
default:
fmt.Println("没有分支被选中")
}
}
我们创建了一个包含 3 个 chan int 类型元素的通道数组,然后随机往某个通道中发送一个随机数据,再通过 select 语句从上面定义的三个通道中接收数据,只要是发送数据成功,就一定能将其取出来,如果通道都为空,则直接执行 default 语句。
package main
import (
"fmt"
"math/rand"
)
func main() {
chs := [3]chan int{
make(chan int, 3),
make(chan int, 3),
make(chan int, 3),
}
index1 := rand.Intn(3) // 随机生成0-2之间的数字
fmt.Printf("随机索引/数值: %d\n", index1)
chs[index1] <- rand.Int() // 向通道发送随机数字
close(chs[index1])
index2 := rand.Intn(3)
fmt.Printf("随机索引/数值: %d\n", index2)
if index2 != index1{
chs[index2] <- rand.Int()
close(chs[index2])
}
index3 := rand.Intn(3)
fmt.Printf("随机索引/数值: %d\n", index3)
if index3 != index2 && index3 != index1{
chs[index3] <- rand.Int()
close(chs[index3])
}
for {
// 哪一个通道中有值,哪个对应的分支就会被执行
// 简单地在 select 语句的分支中使用 break 语句,只能结束当前的 select 语句的执行,而并不会对外层的 for 语句产生作用,如果 for 循环本身没有退出机制的话会无休止地运行下去。
select {
case num, ok := <- chs[0]:
if !ok {
fmt.Println("0 channel closed!")
break
}
fmt.Println("第一个条件分支被选中: chs[0]=>", num)
case num, ok := <- chs[1]:
if !ok {
fmt.Println("1 channel closed!")
break
}
fmt.Println("第二个条件分支被选中: chs[1]=>", num)
case num, ok := <- chs[2]:
if !ok {
fmt.Println("2 channel closed!")
break
}
fmt.Println("第三个条件分支被选中: chs[2]=>", num)
default:
fmt.Println("没有分支被选中")
}
}
}
default与没有default
package main
import (
"fmt"
"time"
)
func main() {
chs := [3]chan int{
make(chan int, 3),
make(chan int, 3),
make(chan int, 3),
}
// 只要有一个分支被执行了,select就会跳出
// 一直阻塞,直到有一个通道有值或者关闭了某一个通道
go func() {
select {
case num := <- chs[0]:
fmt.Println("第一个条件分支被选中: chs[0]=>", num)
case num := <- chs[1]:
fmt.Println("第二个条件分支被选中: chs[1]=>", num)
case num := <- chs[2]:
fmt.Println("第三个条件分支被选中: chs[2]=>", num)
}
}()
/* go func() {
chs[0] <- rand.Int()
}()
go func() {
chs[1] <- rand.Int()
}()
go func() {
chs[2] <- rand.Int()
}()*/
fmt.Println("aaaaa");
time.Sleep(time.Second * 3)
fmt.Println("bbbbbb")
close(chs[0])
time.Sleep(time.Second * 5)
fmt.Println("vvvv")
close(chs[1])
close(chs[2])
}
package main
import (
"fmt"
"time"
)
func main() {
chs := [3]chan int{
make(chan int, 3),
make(chan int, 3),
make(chan int, 3),
}
// 只要有一个分支被执行了,select就会跳出
// 不会阻塞,直接执行default然后跳出
go func() {
select {
case num := <- chs[0]:
fmt.Println("第一个条件分支被选中: chs[0]=>", num)
case num := <- chs[1]:
fmt.Println("第二个条件分支被选中: chs[1]=>", num)
case num := <- chs[2]:
fmt.Println("第三个条件分支被选中: chs[2]=>", num)
default:
fmt.Println("default")
}
}()
/* go func() {
chs[0] <- rand.Int()
}()
go func() {
chs[1] <- rand.Int()
}()
go func() {
chs[2] <- rand.Int()
}()*/
fmt.Println("aaaaa");
time.Sleep(time.Second * 3)
fmt.Println("bbbbbb")
close(chs[0])
time.Sleep(time.Second * 5)
fmt.Println("vvvv")
close(chs[1])
close(chs[2])
}
错误和异常处理
在并发编程的通信过程中,最需要处理的就是超时问题:比如向通道发送数据时发现通道已满,或者从通道接收数据时发现通道为空。如果不正确处理这些情况,很可能会导致整个协程阻塞并产生死锁。此外,如果我们试图向一个已经关闭的通道发送数据或关闭已经关闭的通道,也会引发 panic。以上都是我们在使用通道进行并发通信时需要尤其注意的。
超时处理机制实现
Go 语言没有提供直接的超时处理机制,但我们可以借助 select 语句来实现类似机制解决超时问题,因为 select 语句的特点是只要其中一个 case 对应的通道操作已经完成,程序就会继续往下执行,而不会考虑其他 case 的情况
package main
import (
"fmt"
"time"
)
func main() {
// 初始化 ch 通道
ch := make(chan int, 1)
// 初始化 timeout 通道
timeout := make(chan bool, 1)
// 实现一个匿名超时等待函数
go func() {
time.Sleep(1e9) // 睡眠1秒钟
timeout <- true
}()
// 借助 timeout 通道结合 select 语句实现 ch 通道读取超时效果
select {
case <- ch:
fmt.Println("接收到 ch 通道数据")
case <- timeout:
fmt.Println("超时1秒,程序退出")
}
}
使用 select 语句可以避免永久等待的问题,因为程序会在从 timeout 通道中接收到数据后继续执行,无论对 ch 的读取是否还处于等待状态,从而实现 1 秒超时的效果。这种写法看起来是一个编程小技巧,但却是在 Go 语言并发编程中避免通道通信超时的最有效方法。
执行上述代码,打印结果如下:
超时1秒,程序退出
而如果没有 timeout 通道和上述 select 机制,从 ch 通道接收数据会得到如下 panic(死锁):
fatal error: all goroutines are asleep - deadlock!
避免对已关闭通道进行操作
为了避免对已关闭通道再度执行关闭操作引发 panic,一般我们约定只能在发送方关闭通道,而在接收方,我们则通过通道接收操作返回的第二个参数是否为 false 判定通道是否已经关闭,如果已经关闭,则不再执行发送操作
package main
import "fmt"
func main() {
ch := make(chan int, 2)
// 发送方
go func() {
for i := 0; i < 5; i++ {
fmt.Printf("发送方: 发送数据 %v...\n", i)
ch <- i
}
fmt.Println("发送方: 关闭通道...")
close(ch)
}()
// 接收方
for {
num, ok := <-ch
if !ok {
fmt.Println("接收方: 通道已关闭")
break
}
fmt.Printf("接收方: 接收数据: %v\n", num)
}
fmt.Println("程序退出")
}
如果我们试图在通道 ch 关闭后发送数据到该通道,则会得到如下 panic:
panic: send on closed channel
而如果我们试图在通道 ch 关闭后再次关闭它,则会得到如下 panic:
panic: close of closed channel