go在语言层面支持并发编程,也就是goroutine,可以看做一种轻量级的线程。程序启动时,其主函数即在一个单独的goruntine中运行,叫做main goruntine,在程序中通过关键字go跟上函数(支持匿名函数)就可以启动一个新的goroutine,可以叫做sub goruntine。

在基于多线程设计的并发编程模型中,线程间的通信往往通过共享数据来实现,而保证共享数据的一致性非常关键。如果线程间有竞争条件,那么对共享数据的访问往往需要加锁来保证一致性,而针对不同的访问竞争,比如读/读、读/写、写/写,需要用不同的锁机制,要想兼顾性能和一致性保证需要煞费苦心,尤其是线程间共享数据比较多的时候。

为了更简单的并发编程,go语言提出了自己的信仰:用通信来共享内存,而不要用共享内存来通信。对于goroutine之间的通信,channel是最好的选择,铭记这句原则:用通信来共享内存,而不要用共享内存来通信,可以帮助我们更好的理解channel。

channel状态

channel作为go的一种基本数据类型,它有3种基本状态:nil、open、closed:

/* nil channel */
var ch = chan string // A channel is in a nil state when it is declared to its zero value
ch = nil // A channel can be placed in a nil state

/* open channel */
ch := make(chan string) // A channel is in a open state when it’s made using the built-in function make.

/* closed channel */
close(ch) // A channel is in a closed state when it’s closed using the built-in function close.

 当channel处于这3种不同的状态时,对于channel上的操作也会有不同的行为,理解这些行为对于正确的使用channel非常重要。 

上面这张图总结了这些行为,需要注意的是处于closed状态的channel,执行send操作(ch <- data)将会触发panic异常,而receive操作(<- ch)则不会,这表明了在channel被close之后,goruntine仍然可以从channel取走数据,如果channel中没有数据可取时,receive操作会立刻返回零值(nil)。仔细思考,这样的设计是非常合理的。

range循环可以直接在channel上迭代,当channel被关闭并且没有数据时可以直接跳出循环。

另外,对于nil和closed状态的channel执行close操作也会触发panic异常。

unbufferd channel和bufferd channel

虽然channel最常用于goroutine之间的通信,但是channel上的send和receive操作并不一定需要携带数据。根据channel是否需要传递数据,可以区分出一些channel的使用场景。

没有数据的channel的使用场景:

  • goroutine A通过channel告诉goroutine B:”请停止正在做的事情“
  • goroutine A通过channel告诉goroutine B:”我完成了要做的事情,但是没有任何结果需要反馈“

通知的方式一般是close操作,goroutine A对channel执行了close操作,而goruntine B得到channel已经被关闭这个信息后可以执行相应的处理。使用没有数据的channel的好处:一个goroutine可以同时给多个goroutine发送消息,只是这个消息不携带额外的数据,所以常被用于批量goruntine的退出。

对于这种不携带数据,只是作为信号的channel,一般使用如下:

ch := make(chan struct{})
ch <- struct{}{}
<- ch

带有数据的channel的使用场景:

  • goroutine A通过channel告诉goroutine B:”请根据我传递给你的数据开始做一件事情“
  • goroutine A通过channel告诉goroutine B:”我完成了要做的事情,请接收我传递的数据(结果)“

通知的方式就是goroutine A执行send发送数据,而goroutine B执行receive接收数据。channel携带的数据只能被一个goruntine得到,一个goruntine取走数据后这份数据在channel里就不复存在了。

对于需要携带数据的channel,一般又可以分成带有buffer的channel(bufferd channel)和不带buffer的channel(unbufferd channel)。

unbufferd channel

对于unbufferd channel,不存储任何数据,只负责数据的流通,并且数据的接收一定发生在数据发送完成之前。更详细的解释是,goroutine A在往channel发送数据完成之前,一定有goroutine B在等着从这个channel接收数据,否则发送就会导致发送的goruntine被block住,所以发送和接收的goruntine是耦合的。

看下面这个例子,往ch发送数据时就使main gouruntine被永久block住,导致程序死锁。

func main() {
	var ch = make(chan string)
	ch <- "hello" //fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send]:
	fmt.Println(<-ch)
}

有人可能会考虑将接收操作放到前面,不幸的是仍然导致了死锁,因为channel里没有数据,当前goruntine也会被block住,导致程序死锁。

func main() {
	var ch = make(chan string)
	fmt.Println(<-ch) //fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]:
	ch <- "hello"
}

这次,我们在另一个goruntine中执行receive,程序就可以正常工作了。因为在main goruntine发送完数据之前,sub goroutine已经在等待接收数据。

func main() {
	var ch = make(chan string)
	go func() {
		fmt.Println(<-ch) //out: hello
	}()
	ch <- "hello"
}

再看下面这个例子,我们期望在sub goruntine中打印10个数,实际上却只有main goruntine打印了hello。因为在sub goruntine打印之前,main goruntine就已经执行完成并退出了。

func main() {
	go func() {
		for i := 0; i < 10; i++ {
			fmt.Printf("%d ", i)
		}
	}()
	fmt.Println("hello")
}

/* Output */
M310144TCG8WP:work hunk.he$ go run channel1.go
hello

这个时候就可以用一个unbufferd channel来让两个goruntine之间有一些通信,让main goruntine收到sub goruntine通知后再退出。在这种场景中,channel并不携带任何数据,只是起到一个信号的作用。

func main() {
	var ch = make(chan string)
	go func() {
		for i := 0; i < 10; i++ {
			fmt.Printf("%d ", i)
		}
		ch <- "exit"
	}()
	fmt.Println("hello")
	<-ch
}

bufferd channel

对带有缓冲区的channel执行send和receive操作,其行为和不带缓冲区的channel不太一样。继续讨论最开始的例子,不过这次的channel是一个size=1的bufferd channel,将数据发送给channel后,数据被拷贝到channel的缓冲区,goruntine继续往后执行,所以程序可以正常工作。

func main() {
	var ch = make(chan string, 1)
	ch <- "hello"
	fmt.Println(<-ch) //hello
}

但是当我们调换发送和接收的顺序时,程序又进入了死锁。因为当channel里没有数据时(干涸),执行receive的goroutine也会被block住,最终导致了死锁。

func main() {
	var ch = make(chan string, 1)
	fmt.Println(<-ch) //fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]:
	ch <- "hello"
}

此外,buffer size=1和buffer size>1的channel对于数据的交付也有一些细微的不同:

  • 对于buffer size=1的channel,第二个数据发送完成之前,之前发送的第一个数据一定被取走了,否则发送也会被block住,这其实说明了数据的交付得到了延迟保证
  • 对于buffer size>1的channel,发送数据时,之前发送的数据不能保证一定被取走了,并且buffer size越大,数据的交付得到的保证越少。也正是由于这种无保证交付,减少了goroutine之间通信时的阻塞延迟,根据发送数据、接收数据、数据处理的速度合理的设计buffer size,甚至可以在不浪费空间的情况下做到没有任何延迟。

如果channel buffer已经塞满了数据,继续执行发送会导致当前goruntine被block住(阻塞),直到channel中的数据被取走一部分才可以继续向channel发送数据。

通过channel buffer,解耦了发送和接收的goruntine。需要小心的是,buffered channel虽然可以看做一个缓存消息的队列,但是其主要用途还是用于多个goruntine之间的通信,单个goruntine中不要使用buffered channel来做缓存队列,send和receive操作很容让goruntine被永久block住导致整个程序死锁,上面的demo也说明了这件事情。

再看下面这个例子,一个简单的生产消费者模型,manager每200ms有一个新的work需要分发给3个worker来完成,manager每次都只是将work发送到一个channel中,work自动从channel中取出work并处理,每个worker完成一个work需要1s的时间,manager累计分发10个work,这个时候我们发现没有阻塞。但是如果manager继续不停地分发work,就会发现channel缓冲区被塞满,manager总是在等待worker。所以,根据处理需求,合理的设计worker(goruntine)数量和channel buffer size非常重要。

func main() {
	const cap = 3
	ch := make(chan string, cap)

	for index := 0; index < cap; index++ {
		go func() {
			for p := range ch {
				fmt.Println("Worker received: ", p)
				time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
			}
		}()
	}

	worknum := 10
	for index := 0; index < worknum; index++ {
		time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
		work := "work " + strconv.Itoa(index)
		select {
		case ch <- work:
			fmt.Println("Manager: send a work")
		default:
			fmt.Println("Manager: wait worker")
		}
	}

	close(ch)
}

/* Output
M310144TCG8WP:work hunk.he$ go run channel1.go
Worker received:  work 0  //这里worker先打印有一定的随机性
Manager: send a work
Manager: send a work
Worker received:  work 1
Manager: send a work
Worker received:  work 2
Manager: send a work
Worker received:  work 3
Manager: send a work
Manager: send a work
Worker received:  work 4
Manager: send a work
Worker received:  work 5
Manager: send a work
Worker received:  work 6
Manager: send a work
Manager: send a work
*/

goruntine泄露

对于下面这个demo,3个sub goruntine向unbuffered channel发送数据,但是main goruntine只接收一次,所以只有最早执行完send的sub goruntine的数据能得到交付,另外两个慢一点的sub goruntine将会被永远block住直到整个程序退出。这种情况也是一个BUG,称为goruntine泄露,泄露的goruntine并不会被自动回收,因此确保每个不再需要的goruntine正常退出非常重要,尤其是常驻内存的后台程序。最需要注意的场景就是负责接收的goruntine在永久退出(return)接收处理时,要确保发送的goruntine不会因为继续发送数据被block住。

func main() {
	ch := make(chan string)
	count := 3
	for index := 0; index < count; index++ {
		go func() {
			time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
			ch <- "hello"
		}()
	}
	fmt.Println(<-ch)
	time.Sleep(time.Duration(rand.Intn(5000)) * time.Millisecond)
	//other work
}

改进后

func main() {
	ch := make(chan string)
	count := 3
	for index := 0; index < count; index++ {
		go func() {
			time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
			ch <- "hello"
		}()
	}
	for index := 0; index < count; index++ {
		fmt.Println(<-ch)
	}
	time.Sleep(time.Duration(rand.Intn(5000)) * time.Millisecond)
	//other work
}

cap()和len()

cap和len都是go的内置函数,可以用于获取channel的容量和存储的数据个数。

func main() {
	var ch = make(chan string, 10)
	for index := 0; index < 6; index++ {
		ch <- "helo"
	}
	fmt.Println(cap(ch))
	fmt.Println(len(ch))
}
/* Output 
10
6
*/

range和select

range可以对channel进行迭代,不断接收channel里的数据(没有数据时阻塞),直到channel被关闭后自动退出迭代。demo如下:

func main() {
	var ch = make(chan string, 2)
	go func() {
		for i := 0; i < 5; i++ {
			time.Sleep(500 * time.Millisecond)
			ch <- "hello"
		}
		close(ch)
	}()

	for recv := range ch {
		fmt.Println(recv)
	}
}

select是一种多路复用技术,就像POSIX接口里的select可以同时对多个文件描述符进行监控一样,go提供的select可以同时对多个channel进行监控,实现并发接收。当多个case里的channel同时有数据ready的时候,select会随机选择一个case进行处理。

需要注意的是,range操作可以在channel关闭后自动退出,而select不会。所以在用for循环搭配select实现轮询时,select的case语句中必须显示的判断channel是否已经关闭,并做相应的处理,否则select每次从处于closed状态的channel中取出空值,并且继续执行case语句包含的body,程序的运行就可能与期望不一致。demo如下:

func main() {
	var ch = make(chan string, 2)
	go func() {
		for i := 0; i < 5; i++ {
			time.Sleep(1000 * time.Millisecond)
			ch <- "hello"
		}
		close(ch)
	}()

	for {
		select {
		case recv, ok := <-ch:
			if !ok {
				os.Exit(0)
			}
			fmt.Println(recv)
		}
	}

}

并发循环

在下面这个demo中,根据list的size启动多个sub goruntine,每个sub gouruntine作的仅有处理就是向unbuffered channel中写入一个数据,main goruntine取到所有数据后打印total退出。我们使用了sync.WaitGroup来做计数,每启动一个sub goruntine计数器加1,每个goruntine完成时用wg.Done()将计数器减1,还有一个额外的sub goruntine在监控计数器的变化(阻塞直到计数器减到0),如果所有的sub goruntine都完成了数据发送并退出,额外的goruntine将会关闭channel,main goruntine对channel的range操作一旦检测到channel被关闭便会立即退出。

func main() {
	list := []int{0, 1, 2, 3, 4}
	sizes := make(chan int)
	var wg sync.WaitGroup
	for e := range list {
		wg.Add(1)
		go func(e int) {
			defer wg.Done()
			sizes <- e
		}(e)
	}

	go func() {
		wg.Wait()
		close(sizes)
	}()

	var total int
	for num := range sizes {
		total += num
	}
	fmt.Println(total)
}

这种写法常用于这种场景:我们希望使用并发循环,但是不知道迭代次数时,这里的list可能是channel,slice等。需要注意的是:

  • 监控计数器的处理必须放到一个单独的goruntine中,因为Wait()会阻塞当前goruntine直到计数器变为0。
  • Add()操作可以加负数。无论是Add()还是Done(),如果操作会使计数器变为负数则会出发panic异常。

goruntine退出

在前面goruntine泄露的改进demo中,为了保证3个sub goruntine能正确退出,我们在main goruntine中根据sub goruntine的数量取走了所有数据后。这种方式遗留的问题在于,如果sub goruntine是根据任务临时启动的,main goruntine并不知道到底有多少个goruntine需要退出,所以需要用一个channel做取消信号,但是对于open状态的channel,其中的数据被goruntine取走后就不存在了,无法做到通知到所有goruntine。

考虑文章开头描述的channel被close之后的操作行为:goruntine可以继续从closed channel中接收数据,如果没有实际数据,立即返回零值,这正好符合我们取消信号的需求。仍然有几个需要考虑的问题:

  • sub goruntine如果通过channel向main goruntine发送数据,而任务取消后main goruntine可能不再接收channel中的数据,需要保证已经启动的sub goruntine不会因为发送操作被block住。
  • 刚启动的sub goruntine在收到取消信号后,可以直接退出,但是对于正在执行任务的goruntine可能无法及时退出,造成取消延时,所以在耗时严重的地方最好也处理取消信号。

下面的扫描磁盘空间占用情况的demo来自go语言圣经这本书,很好的演示了这些问题:

package walk

import (
	"bufio"
	"fmt"
	"io/ioutil"
	"os"
	"path/filepath"
	"strings"
	"sync"
	"time"
)

//限制同时打开的目录文件数,防止启动过多的goruntine
var sema = make(chan struct{}, 20)

//用于取消整个磁盘扫描任务
var done = make(chan struct{})

func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
	defer n.Done()
	//收到cancel信号后,goruntine直接退出
	if cancelled() {
		return
	}
	for _, entry := range dirents(dir) {
		if entry.IsDir() {
			n.Add(1)
			subdir := filepath.Join(dir, entry.Name())
			go walkDir(subdir, n, fileSizes)
		} else {
			fileSizes <- entry.Size()
		}
	}
}

func dirents(dir string) []os.FileInfo {
	select {
	case sema <- struct{}{}:
	case <-done:
		//发出cancel后,我们期望是这个工作能立即被终止,但是对于已经启动的goruntine,dirents会继续执行并耗费不少时间
		//所以这里添加对cancel信号的处理,可以减少cancel操作的时延
		return nil
	}

	defer func() {
		<-sema
	}()

	entries, err := ioutil.ReadDir(dir)
	if err != nil {
		fmt.Fprintf(os.Stderr, "du1: %v\n", err)
		return nil
	}
	return entries
}

func inputDir() string {
	var (
		inputReader *bufio.Reader
		dir         string
		err         error
	)

	inputReader = bufio.NewReader(os.Stdin)
	fmt.Printf("Please input a directory:")
	dir, err = inputReader.ReadString('\n')
	if err != nil {
		os.Exit(1)
	}
	dir = strings.Replace(dir, "\n", "", -1)
	return dir
}

func printDiskUsage(nFiles int64, nBytes int64) {
	fmt.Printf("%d files %.1f GB\n", nFiles, float64(nBytes)/1e9)
}

func cancelled() bool {
	select {
	case <-done:
		return true
	default:
		return false
	}
}

func main() {
	fmt.Println("=========Enter, walk.Test.()==========")
	defer fmt.Println("=========Exit, walk.Test.()==========")
	dir := inputDir()

	//从标准输入读到任何内容后,关闭done这个channel
	go func() {
		os.Stdin.Read(make([]byte, 1))
		close(done)
	}()

	fileSizes := make(chan int64)
	var n sync.WaitGroup
	n.Add(1)
	go walkDir(dir, &n, fileSizes)

	go func() {
		n.Wait()
		close(fileSizes)
	}()

	var nFiles, nBytes int64
	var tick <-chan time.Time
	tick = time.Tick(100 * time.Millisecond)

loop:
	for {
		select {
		case <-done:
			for range fileSizes {
				//do nothing,这个for循环的意义在于:收到cancel信号后,统计工作马上结束,我们会退出对fileSizes这个channel的接收操作,但是整个程序未必退出
				//这时将fileSizes排空可以防止还在运行walkDir的goruntine因为向fileSies发送数据被阻塞(没有buffer或者buffer已满),导致goruntine泄露
			}
			return
		case size, ok := <-fileSizes:
			if !ok { //这里必须显示的判断fileSizes是否已经被close
				break loop
			}
			nFiles++
			nBytes += size
		case <-tick: //每0.1S产生一次时钟信号,打印一次当前统计数字
			printDiskUsage(nFiles, nBytes)
		}
	}

	printDiskUsage(nFiles, nBytes)
}

References

《go语言圣经》