一、概念

协程是独立执行的,他们之间没有通信。协程间必须通过通信协调/同步他们的工作。

1、协程间通过通道(channel)来通信

协程可以使用共享变量来通信,但是在Go中并不提倡这样做,因为这种方式给所有的共享内存的多线程都带来了困难。

而Go是通过一种特殊的类型,通道(channel),一个可以用于发送类型化数据的管道,由其负责协程之间的通信,从而避开所有由共享内存导致的陷阱;这种通过通道进行通信的方式保证了同步性。

数据在通道中进行传递:在任何给定时间,一个数据被设计为只有一个协程可以对其访问,所以不会发生数据竞争。 数据的所有权(可以读写数据的能力)也因此被传递。

2、通道的声明和初始化

声明:

var identifier chan datatype

初始化:

identifier = make(chan datatype)
//or
identifier := make(chan datatype)
  • 未初始化的通道的值是nil。
  • 通道也是引用类型,所以我们使用 make() 函数来给它分配内存。
  • 通道只能传输一种类型的数据。
  • 所有的类型都可以用于通道,空接口 interface{} 也可以。或者函数通道:funcChan := make(chan func()),甚至可以(有时非常有用)创建通道的通道。
  • 通道实际上是类型化消息的队列:使数据得以传输。它是先进先出的结构所以可以保证发送给他们的元素的顺序。

二、怎么用通道实现通信?

1、通信操作符 <-

流向通道(发送):

ch <- int1 表示:用通道 ch 发送变量 int1

从通道流出(接收):

int2 = <- ch 表示:变量 int2 从通道 ch接收数据,如果int2没有声明过,可以写成int2 := <- ch

直接获取

<- ch 可以单独调用获取通道的(下一个)值,获取到的值会被丢弃,但是可以用来验证,例如:

if <- ch != 1000{
...
}


2、通信操作符的使用示例:

package main

import (
"fmt"
"time"
)

func main() {
ch := make(chan string)

go sendData(ch)
go getData(ch)

time.Sleep(1e9)
}

func sendData(ch chan string) {
ch <- "Washington"
ch <- "Tripoli"
ch <- "London"
ch <- "Beijing"
ch <- "Tokyo"
}

func getData(ch chan string) {
var input string
// time.Sleep(2e9)
for {
input = <-ch
fmt.Printf("%s ", input)
}
}

输出:

Washington Tripoli London Beijing tokyo

示例解释:

  • main() 函数中启动了两个协程:sendData() 通过通道 ch 发送了 5 个字符串,getData() 按顺序接收它们并打印出来。
  • 通道的发送和接收都是原子操作:它们总是互不干扰的完成的。
  • 如果我们移除一个或所有 go 关键字,程序无法运行,Go 运行时会抛出 panic:all goroutines are asleep-deadlock!。因为运行时(runtime)会检查所有的协程是否在等待着什么东西(可从某个通道读取或者写入某个通道),这意味着程序将无法继续执行。这是死锁(deadlock)的一种形式。

三、通道阻塞

默认情况下,通信是同步且无缓冲的:在有接受者接收数据之前,发送不会结束。一个无缓冲的通道在没有空间来保存数据的时候:必须要一个接收者准备好接收通道的数据然后发送者可以直接把数据发送给接收者。

所以通道的发送/接收操作在对方准备好之前是阻塞的:

1、对于同一个通道,发送操作(协程或者函数中的)在接收者准备好之前是阻塞的:如果ch中的数据无人接收,就无法再给通道传入其他数据:新的输入无法在通道非空的情况下传入。所以发送操作会等待 ch 再次变为可用状态:就是通道值被接收时就可以再传入变量。

2、对于同一个通道,接收操作是阻塞的(协程或函数中的),直到发送者可用:如果通道中没有数据,接收者就阻塞了。

示例:

package main

import (
"fmt"
"time"
)

func main() {
ch1 := make(chan int)
go pump(ch1)
fmt.Println(<-ch1)
// prints only 0
time.Sleep(1e9)
}

func pump(ch chan int) {
for i := 0; ; i++ {
ch <- i
fmt.Println("次数:", i)
}
}

输出:

次数: 0
0

我们可以看到接收了一次,后面并无接收者接收,所以在ch1通道里,发送操作是阻塞的。

当为通道解除阻塞定义了 suck 函数来在无限循环中读取通道:

func suck(ch chan int) {
for {
fmt.Println(<-ch)
}
}

在 main() 中使用协程开始它:

go pump(ch1)
go suck(ch1)
time.Sleep(1e9)

给程序 1 秒的时间来运行:输出了上万个整数。

四、通过一个/多个通道交换数据进行协程同步

通信是一种同步形式:通过通道,两个协程在通信中某刻同步交换数据。无缓冲通道成为了多个协程同步的完美工具。

甚至可以在通道两端互相阻塞对方,形成了叫做死锁的状态。Go 运行时会检查并 panic,停止程序。

go协程的死锁例子:

package main

import (
"fmt"
)

func f1(in chan int) {
fmt.Println(<-in)
}

func main() {
out := make(chan int)
out <- 2
go f1(out)
}

Go 运行时检查并 panic: fatal error: all goroutines are asleep - deadlock! (所有的协程都休眠了 - 死锁!)

死锁原因:在out无缓冲通道还没准备好接收的时候,在主线程中进行了发送操作,导致阻塞在这,无法执行到下一步以致所有的协程都进入休眠。

解决死锁办法1:将创建协程进行通道接收的操作放到发送操作之前

package main

import (
"fmt"
)

func f1(in chan int) {
fmt.Println(<-in)
}

func main() {
out := make(chan int)
go f1(out)
out <- 2
}

输出:2

五、同步通道-使用带缓冲的通道

解决死锁办法2:将通道设置为缓冲通道

一个无缓冲通道只能包含 1 个元素,有时显得很局限。我们给通道提供了一个缓存,可以在扩展的 make 命令中设置它的容量,如下:

buf := 100
ch1 := make(chan string, buf)
  • buf 是通道可以同时容纳的元素个数。
  • 在缓冲满载(缓冲被全部使用)之前,给一个带缓冲的通道发送数据是不会阻塞的,而从通道读取数据也不会阻塞,直到缓冲空了。
  • 缓冲容量和类型无关,所以可以给一些通道设置不同的容量,只要他们拥有同样的元素类型。内置的 cap 函数可以返回缓冲区的容量。
  • 如果容量大于 0,通道就是异步的:因为缓冲满载(发送)或变空(接收)之前通信不会阻塞,元素会按照发送的顺序被接收。
  • 如果容量是0或者未设置,通道就是同步的:通信仅在收发双方准备好的情况下才可以成功。

所以下面是使用缓冲通道解决死锁的例子:

package main

import (
"fmt"
"time"
)

func f1(in chan int) {
fmt.Println(<-in)
}

func main() {
out := make(chan int, 1)
out <- 2
go f1(out)
time.Sleep(1e7)
}

输出:2

六、协程中用通道输出结果

为了知道计算何时完成,可以通过信道回报。在例子 go sum(bigArray) 中,要这样写:

package main

import "fmt"

func main() {
ch := make(chan int)
bigArray := []int{1,2,3,4}
go sum(bigArray, ch)
sum := <- ch
fmt.Println(sum)
}

func sum(bigArray []int, ch chan int) {
sum := 0
for _,v := range bigArray{
sum += v
}
ch <- sum
}

输出:10

也可以使用通道来达到同步的目的,这个很有效的用法在传统计算机中称为信号量。或者换个方式:通过通道发送信号告知处理已经完成(在协程中)。

在其他协程运行时让 main 程序无限阻塞的通常做法是在 main 函数的最后放置一个 select {}。

也可以使用通道让 main 程序等待协程完成,就是所谓的信号量模式。

七、实现并行的 for 循环

以下示例中,for 循环的每一个迭代是并行完成的:

for i, v := range data {
go func (i int, v float64) {
doSomething(i, v)
...
} (i, v)
}

在 for 循环中并行计算迭代可能带来很好的性能提升。不过所有的迭代都必须是独立完成的。

八、给通道使用 for 循环

for 循环的 range 语句可以用在通道 ch 上,便可以从通道中获取值,像这样:

for v := range ch {
fmt.Printf("The value is %v\n", v)
}

它从指定通道中读取数据直到通道关闭,才继续执行下边的代码。很明显,另外一个协程必须写入 ch(不然代码就阻塞在 for 循环了),而且必须在写入完成后才关闭。

上面通道阻塞的例子可以修改成如下:

package main

import (
"fmt"
"time"
)

func main() {
suck(pump())
time.Sleep(1e9)
}

func pump() chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
ch <- i
}
}()
return ch
}

func suck(ch chan int) {
go func() {
for v := range ch {
fmt.Println(v)
}
}()
}

九、通道的方向

通道类型可以用注解来表示它只发送或者只接收:

var send_only chan<- int        // 只能往通道里发送数据
var recv_only <-chan int // 只能接受通道的数据

等待被接收数据的通道是无法关闭的,因为关闭通道是发送者用来表示不再给通道发送值了,所以对等待被接收通道是没有意义的。

通道创建的时候都是双向的,但也可以分配有方向的通道变量:

var c = make(chan int) // bidirectional
go source(c)
go sink(c)

func source(ch chan<- int){
for { ch <- 1 }
}

func sink(ch <-chan int) {
for { <-ch }

十、channel的注意事项

  1. channel 在 Golang 中是一等公民,它是线程安全的,面对并发问题,应首先想到 channel
  2. 关闭一个未初始化的 channel 会产生 panic
  3. 重复关闭同一个 channel 会产生 panic
  4. 向一个已关闭的 channel 发送消息会产生 panic
  5. 从已关闭的 channel 读取消息不会产生 panic,且能读出 channel 中还未被读取的消息,若消息均已被读取,则会读取到该类型的零值。
  6. 从已关闭的 channel 读取消息永远不会阻塞,并且会返回一个为 false 的值,用以判断该 channel 是否已关闭(x,ok := <- ch)
  7. 关闭 channel 会产生一个广播机制,所有向 channel 读取消息的 goroutine 都会收到消息