go在语言层面支持并发编程,也就是goroutine,可以看做一种轻量级的线程。程序启动时,其主函数即在一个单独的goruntine中运行,叫做main goruntine,在程序中通过关键字go跟上函数(支持匿名函数)就可以启动一个新的goroutine,可以叫做sub goruntine。
在基于多线程设计的并发编程模型中,线程间的通信往往通过共享数据来实现,而保证共享数据的一致性非常关键。如果线程间有竞争条件,那么对共享数据的访问往往需要加锁来保证一致性,而针对不同的访问竞争,比如读/读、读/写、写/写,需要用不同的锁机制,要想兼顾性能和一致性保证需要煞费苦心,尤其是线程间共享数据比较多的时候。
为了更简单的并发编程,go语言提出了自己的信仰:用通信来共享内存,而不要用共享内存来通信。对于goroutine之间的通信,channel是最好的选择,铭记这句原则:用通信来共享内存,而不要用共享内存来通信,可以帮助我们更好的理解channel。
channel状态
channel作为go的一种基本数据类型,它有3种基本状态:nil、open、closed:
/* nil channel */
var ch = chan string // A channel is in a nil state when it is declared to its zero value
ch = nil // A channel can be placed in a nil state
/* open channel */
ch := make(chan string) // A channel is in a open state when it’s made using the built-in function make.
/* closed channel */
close(ch) // A channel is in a closed state when it’s closed using the built-in function close.
当channel处于这3种不同的状态时,对于channel上的操作也会有不同的行为,理解这些行为对于正确的使用channel非常重要。
上面这张图总结了这些行为,需要注意的是处于closed状态的channel,执行send操作(ch <- data)将会触发panic异常,而receive操作(<- ch)则不会,这表明了在channel被close之后,goruntine仍然可以从channel取走数据,如果channel中没有数据可取时,receive操作会立刻返回零值(nil)。仔细思考,这样的设计是非常合理的。
range循环可以直接在channel上迭代,当channel被关闭并且没有数据时可以直接跳出循环。
另外,对于nil和closed状态的channel执行close操作也会触发panic异常。
unbufferd channel和bufferd channel
虽然channel最常用于goroutine之间的通信,但是channel上的send和receive操作并不一定需要携带数据。根据channel是否需要传递数据,可以区分出一些channel的使用场景。
没有数据的channel的使用场景:
- goroutine A通过channel告诉goroutine B:”请停止正在做的事情“
- goroutine A通过channel告诉goroutine B:”我完成了要做的事情,但是没有任何结果需要反馈“
通知的方式一般是close操作,goroutine A对channel执行了close操作,而goruntine B得到channel已经被关闭这个信息后可以执行相应的处理。使用没有数据的channel的好处:一个goroutine可以同时给多个goroutine发送消息,只是这个消息不携带额外的数据,所以常被用于批量goruntine的退出。
对于这种不携带数据,只是作为信号的channel,一般使用如下:
ch := make(chan struct{})
ch <- struct{}{}
<- ch
带有数据的channel的使用场景:
- goroutine A通过channel告诉goroutine B:”请根据我传递给你的数据开始做一件事情“
- goroutine A通过channel告诉goroutine B:”我完成了要做的事情,请接收我传递的数据(结果)“
通知的方式就是goroutine A执行send发送数据,而goroutine B执行receive接收数据。channel携带的数据只能被一个goruntine得到,一个goruntine取走数据后这份数据在channel里就不复存在了。
对于需要携带数据的channel,一般又可以分成带有buffer的channel(bufferd channel)和不带buffer的channel(unbufferd channel)。
unbufferd channel
对于unbufferd channel,不存储任何数据,只负责数据的流通,并且数据的接收一定发生在数据发送完成之前。更详细的解释是,goroutine A在往channel发送数据完成之前,一定有goroutine B在等着从这个channel接收数据,否则发送就会导致发送的goruntine被block住,所以发送和接收的goruntine是耦合的。
看下面这个例子,往ch发送数据时就使main gouruntine被永久block住,导致程序死锁。
func main() {
var ch = make(chan string)
ch <- "hello" //fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send]:
fmt.Println(<-ch)
}
有人可能会考虑将接收操作放到前面,不幸的是仍然导致了死锁,因为channel里没有数据,当前goruntine也会被block住,导致程序死锁。
func main() {
var ch = make(chan string)
fmt.Println(<-ch) //fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]:
ch <- "hello"
}
这次,我们在另一个goruntine中执行receive,程序就可以正常工作了。因为在main goruntine发送完数据之前,sub goroutine已经在等待接收数据。
func main() {
var ch = make(chan string)
go func() {
fmt.Println(<-ch) //out: hello
}()
ch <- "hello"
}
再看下面这个例子,我们期望在sub goruntine中打印10个数,实际上却只有main goruntine打印了hello。因为在sub goruntine打印之前,main goruntine就已经执行完成并退出了。
func main() {
go func() {
for i := 0; i < 10; i++ {
fmt.Printf("%d ", i)
}
}()
fmt.Println("hello")
}
/* Output */
M310144TCG8WP:work hunk.he$ go run channel1.go
hello
这个时候就可以用一个unbufferd channel来让两个goruntine之间有一些通信,让main goruntine收到sub goruntine通知后再退出。在这种场景中,channel并不携带任何数据,只是起到一个信号的作用。
func main() {
var ch = make(chan string)
go func() {
for i := 0; i < 10; i++ {
fmt.Printf("%d ", i)
}
ch <- "exit"
}()
fmt.Println("hello")
<-ch
}
bufferd channel
对带有缓冲区的channel执行send和receive操作,其行为和不带缓冲区的channel不太一样。继续讨论最开始的例子,不过这次的channel是一个size=1的bufferd channel,将数据发送给channel后,数据被拷贝到channel的缓冲区,goruntine继续往后执行,所以程序可以正常工作。
func main() {
var ch = make(chan string, 1)
ch <- "hello"
fmt.Println(<-ch) //hello
}
但是当我们调换发送和接收的顺序时,程序又进入了死锁。因为当channel里没有数据时(干涸),执行receive的goroutine也会被block住,最终导致了死锁。
func main() {
var ch = make(chan string, 1)
fmt.Println(<-ch) //fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]:
ch <- "hello"
}
此外,buffer size=1和buffer size>1的channel对于数据的交付也有一些细微的不同:
- 对于buffer size=1的channel,第二个数据发送完成之前,之前发送的第一个数据一定被取走了,否则发送也会被block住,这其实说明了数据的交付得到了延迟保证。
- 对于buffer size>1的channel,发送数据时,之前发送的数据不能保证一定被取走了,并且buffer size越大,数据的交付得到的保证越少。也正是由于这种无保证交付,减少了goroutine之间通信时的阻塞延迟,根据发送数据、接收数据、数据处理的速度合理的设计buffer size,甚至可以在不浪费空间的情况下做到没有任何延迟。
如果channel buffer已经塞满了数据,继续执行发送会导致当前goruntine被block住(阻塞),直到channel中的数据被取走一部分才可以继续向channel发送数据。
通过channel buffer,解耦了发送和接收的goruntine。需要小心的是,buffered channel虽然可以看做一个缓存消息的队列,但是其主要用途还是用于多个goruntine之间的通信,单个goruntine中不要使用buffered channel来做缓存队列,send和receive操作很容让goruntine被永久block住导致整个程序死锁,上面的demo也说明了这件事情。
再看下面这个例子,一个简单的生产消费者模型,manager每200ms有一个新的work需要分发给3个worker来完成,manager每次都只是将work发送到一个channel中,work自动从channel中取出work并处理,每个worker完成一个work需要1s的时间,manager累计分发10个work,这个时候我们发现没有阻塞。但是如果manager继续不停地分发work,就会发现channel缓冲区被塞满,manager总是在等待worker。所以,根据处理需求,合理的设计worker(goruntine)数量和channel buffer size非常重要。
func main() {
const cap = 3
ch := make(chan string, cap)
for index := 0; index < cap; index++ {
go func() {
for p := range ch {
fmt.Println("Worker received: ", p)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
}()
}
worknum := 10
for index := 0; index < worknum; index++ {
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
work := "work " + strconv.Itoa(index)
select {
case ch <- work:
fmt.Println("Manager: send a work")
default:
fmt.Println("Manager: wait worker")
}
}
close(ch)
}
/* Output
M310144TCG8WP:work hunk.he$ go run channel1.go
Worker received: work 0 //这里worker先打印有一定的随机性
Manager: send a work
Manager: send a work
Worker received: work 1
Manager: send a work
Worker received: work 2
Manager: send a work
Worker received: work 3
Manager: send a work
Manager: send a work
Worker received: work 4
Manager: send a work
Worker received: work 5
Manager: send a work
Worker received: work 6
Manager: send a work
Manager: send a work
*/
goruntine泄露
对于下面这个demo,3个sub goruntine向unbuffered channel发送数据,但是main goruntine只接收一次,所以只有最早执行完send的sub goruntine的数据能得到交付,另外两个慢一点的sub goruntine将会被永远block住直到整个程序退出。这种情况也是一个BUG,称为goruntine泄露,泄露的goruntine并不会被自动回收,因此确保每个不再需要的goruntine正常退出非常重要,尤其是常驻内存的后台程序。最需要注意的场景就是负责接收的goruntine在永久退出(return)接收处理时,要确保发送的goruntine不会因为继续发送数据被block住。
func main() {
ch := make(chan string)
count := 3
for index := 0; index < count; index++ {
go func() {
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
ch <- "hello"
}()
}
fmt.Println(<-ch)
time.Sleep(time.Duration(rand.Intn(5000)) * time.Millisecond)
//other work
}
改进后
func main() {
ch := make(chan string)
count := 3
for index := 0; index < count; index++ {
go func() {
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
ch <- "hello"
}()
}
for index := 0; index < count; index++ {
fmt.Println(<-ch)
}
time.Sleep(time.Duration(rand.Intn(5000)) * time.Millisecond)
//other work
}
cap()和len()
cap和len都是go的内置函数,可以用于获取channel的容量和存储的数据个数。
func main() {
var ch = make(chan string, 10)
for index := 0; index < 6; index++ {
ch <- "helo"
}
fmt.Println(cap(ch))
fmt.Println(len(ch))
}
/* Output
10
6
*/
range和select
range可以对channel进行迭代,不断接收channel里的数据(没有数据时阻塞),直到channel被关闭后自动退出迭代。demo如下:
func main() {
var ch = make(chan string, 2)
go func() {
for i := 0; i < 5; i++ {
time.Sleep(500 * time.Millisecond)
ch <- "hello"
}
close(ch)
}()
for recv := range ch {
fmt.Println(recv)
}
}
select是一种多路复用技术,就像POSIX接口里的select可以同时对多个文件描述符进行监控一样,go提供的select可以同时对多个channel进行监控,实现并发接收。当多个case里的channel同时有数据ready的时候,select会随机选择一个case进行处理。
需要注意的是,range操作可以在channel关闭后自动退出,而select不会。所以在用for循环搭配select实现轮询时,select的case语句中必须显示的判断channel是否已经关闭,并做相应的处理,否则select每次从处于closed状态的channel中取出空值,并且继续执行case语句包含的body,程序的运行就可能与期望不一致。demo如下:
func main() {
var ch = make(chan string, 2)
go func() {
for i := 0; i < 5; i++ {
time.Sleep(1000 * time.Millisecond)
ch <- "hello"
}
close(ch)
}()
for {
select {
case recv, ok := <-ch:
if !ok {
os.Exit(0)
}
fmt.Println(recv)
}
}
}
并发循环
在下面这个demo中,根据list的size启动多个sub goruntine,每个sub gouruntine作的仅有处理就是向unbuffered channel中写入一个数据,main goruntine取到所有数据后打印total退出。我们使用了sync.WaitGroup来做计数,每启动一个sub goruntine计数器加1,每个goruntine完成时用wg.Done()将计数器减1,还有一个额外的sub goruntine在监控计数器的变化(阻塞直到计数器减到0),如果所有的sub goruntine都完成了数据发送并退出,额外的goruntine将会关闭channel,main goruntine对channel的range操作一旦检测到channel被关闭便会立即退出。
func main() {
list := []int{0, 1, 2, 3, 4}
sizes := make(chan int)
var wg sync.WaitGroup
for e := range list {
wg.Add(1)
go func(e int) {
defer wg.Done()
sizes <- e
}(e)
}
go func() {
wg.Wait()
close(sizes)
}()
var total int
for num := range sizes {
total += num
}
fmt.Println(total)
}
这种写法常用于这种场景:我们希望使用并发循环,但是不知道迭代次数时,这里的list可能是channel,slice等。需要注意的是:
- 监控计数器的处理必须放到一个单独的goruntine中,因为Wait()会阻塞当前goruntine直到计数器变为0。
- Add()操作可以加负数。无论是Add()还是Done(),如果操作会使计数器变为负数则会出发panic异常。
goruntine退出
在前面goruntine泄露的改进demo中,为了保证3个sub goruntine能正确退出,我们在main goruntine中根据sub goruntine的数量取走了所有数据后。这种方式遗留的问题在于,如果sub goruntine是根据任务临时启动的,main goruntine并不知道到底有多少个goruntine需要退出,所以需要用一个channel做取消信号,但是对于open状态的channel,其中的数据被goruntine取走后就不存在了,无法做到通知到所有goruntine。
考虑文章开头描述的channel被close之后的操作行为:goruntine可以继续从closed channel中接收数据,如果没有实际数据,立即返回零值,这正好符合我们取消信号的需求。仍然有几个需要考虑的问题:
- sub goruntine如果通过channel向main goruntine发送数据,而任务取消后main goruntine可能不再接收channel中的数据,需要保证已经启动的sub goruntine不会因为发送操作被block住。
- 刚启动的sub goruntine在收到取消信号后,可以直接退出,但是对于正在执行任务的goruntine可能无法及时退出,造成取消延时,所以在耗时严重的地方最好也处理取消信号。
下面的扫描磁盘空间占用情况的demo来自go语言圣经这本书,很好的演示了这些问题:
package walk
import (
"bufio"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
//限制同时打开的目录文件数,防止启动过多的goruntine
var sema = make(chan struct{}, 20)
//用于取消整个磁盘扫描任务
var done = make(chan struct{})
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
//收到cancel信号后,goruntine直接退出
if cancelled() {
return
}
for _, entry := range dirents(dir) {
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}:
case <-done:
//发出cancel后,我们期望是这个工作能立即被终止,但是对于已经启动的goruntine,dirents会继续执行并耗费不少时间
//所以这里添加对cancel信号的处理,可以减少cancel操作的时延
return nil
}
defer func() {
<-sema
}()
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
return nil
}
return entries
}
func inputDir() string {
var (
inputReader *bufio.Reader
dir string
err error
)
inputReader = bufio.NewReader(os.Stdin)
fmt.Printf("Please input a directory:")
dir, err = inputReader.ReadString('\n')
if err != nil {
os.Exit(1)
}
dir = strings.Replace(dir, "\n", "", -1)
return dir
}
func printDiskUsage(nFiles int64, nBytes int64) {
fmt.Printf("%d files %.1f GB\n", nFiles, float64(nBytes)/1e9)
}
func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}
func main() {
fmt.Println("=========Enter, walk.Test.()==========")
defer fmt.Println("=========Exit, walk.Test.()==========")
dir := inputDir()
//从标准输入读到任何内容后,关闭done这个channel
go func() {
os.Stdin.Read(make([]byte, 1))
close(done)
}()
fileSizes := make(chan int64)
var n sync.WaitGroup
n.Add(1)
go walkDir(dir, &n, fileSizes)
go func() {
n.Wait()
close(fileSizes)
}()
var nFiles, nBytes int64
var tick <-chan time.Time
tick = time.Tick(100 * time.Millisecond)
loop:
for {
select {
case <-done:
for range fileSizes {
//do nothing,这个for循环的意义在于:收到cancel信号后,统计工作马上结束,我们会退出对fileSizes这个channel的接收操作,但是整个程序未必退出
//这时将fileSizes排空可以防止还在运行walkDir的goruntine因为向fileSies发送数据被阻塞(没有buffer或者buffer已满),导致goruntine泄露
}
return
case size, ok := <-fileSizes:
if !ok { //这里必须显示的判断fileSizes是否已经被close
break loop
}
nFiles++
nBytes += size
case <-tick: //每0.1S产生一次时钟信号,打印一次当前统计数字
printDiskUsage(nFiles, nBytes)
}
}
printDiskUsage(nFiles, nBytes)
}
References
《go语言圣经》