使用箭头 <- 来读写一个channel,无缓冲的channel 常用于同步的场景,当向一个无缓冲的channel 发送消息,另一端没有人接收,或者当从一个无缓冲的channel 接收消息,另一端没有人发送时,当前goroutine都会阻塞。下面是一个简单的例子,主goroutine会在接收c的时候阻塞,直到匿名函数中向c写入一个0。

c := make(chanint)

gofunc(ch chanint) { ch <-0}(c) <-c

也可以使用range读取一个channel,循环会持续到channel关闭

c := make(chanint)

gofunc(ch chanint) { c <-1close(c) }(c)

fori := rangec { fmt.Println(i) }

而缓冲的channel只有当缓冲区满或者空,写入和读取的时候才会发生阻塞,所以缓冲的channel可以当做简单的队列来使用

c := make( chanint, 10) c <- 1c <- 2fmt.Println(<-c, <-c)

如果以上代码稍作更改,在channel为空时再读取一次,则会发生死锁,代码如下

c := make( chanint, 10) c <- 1c <- 2fmt.Println(<-c, <-c,<-c)

而运行错误信息为

fatal error: all goroutines are asleep - deadlock!

死锁的原因在于channel的另一端没有goroutine写入了,当缓冲的channel为空时,主goroutine会一直阻塞在读取,同时也没有其他goroutine可以调度。

安全的关闭channel

go提供了一个内置的方法close()用于关闭一个channel,需要注意的是:

  • 只能关闭一个双向或者可写的channel。

  • 对于同一个channel,多次调用close(),会导致panic。

  • 对一个已关闭的channel写数据,会导致panic。

  • 从一个已关闭的channel中读数据,不会panic,会读到channel对应类型的0值,比如int为0,bool就为false,但是这样无法确定读取到的是否是正确的数据,所以一般会使用channel返回的第二个可选参数来判断channel是否关闭。

只能关闭一个双向或者可写的channel。

对于同一个channel,多次调用close(),会导致panic。

对一个已关闭的channel写数据,会导致panic。

从一个已关闭的channel中读数据,不会panic,会读到channel对应类型的0值,比如int为0,bool就为false,但是这样无法确定读取到的是否是正确的数据,所以一般会使用channel返回的第二个可选参数来判断channel是否关闭。

close(c)

if_, ok := <-c; !ok { fmt.Println( "channel closed") }

鉴于向一个已关闭的channel发送数据会导致panic,所以一般由发送者调用close()关闭channel,因为发送数据的一方清楚什么时候应该停止数据的写入;同时,在channel关闭之后,所有因为读取这个channel而阻塞的goroutine会立即往后执行,利用这一点可以实现简单的广播。

stopCh := make( chanstruct{})

fori := 0; i < 5; i++ {

gofunc(i int) { <-stopCh fmt.Printf( "goroutine %d stoppedn", i) }(i) }

close(stopCh) time.Sleep(time.Second * 1)

以上代码在主goroutine中关闭channel,其他goroutine会立即退出

goroutine 2stoppedgoroutine 4stoppedgoroutine 1stoppedgoroutine 0stoppedgoroutine 3stopped

需要注意的一种情况是只声明未赋值的channel,即 nil channel,close 一个nil channel会导致panic,而读写一个nil channel会永久阻塞。

Select

从一个无缓冲的channel读取数据会阻塞,如果需要从多个channel读取数据呢?这个时候就需要配合select关键词使用

select关键字的灵感来源与unix中的I/O多路复用函数select(现已被epoll、kqueue等替代),unix中的select函数监听多个文件描述符,当select返回时,会得到可读或可写的描述符集合,这种技术实现了在一个线程内处理多个套接字连接。

在go中使用select - case 可以在多个channel上监听读写事件,某个case产生了读写事件时,则执行相应case中的代码

下面例子中第二个case从c1读取,c1另一端有一个goroutine写入,所以执行第二个case中的代码

c1 := make( chanint) c2 := make( chanint)

gofunc() { c1 <-1}()

select{

casec2 <- 1: fmt.Println( "write to c2")

case<-c1: fmt.Println( "read from c1") }

在实际的应用场景中,需要循环对多个channel中的某一个读写数据,比如服务器编程中,经常使用for - select的方式循环检测多个channel的事件,下面是一个简单的tcp服务的例子。

  • 初始化了两个channel,一个用于读写tcp连接的地址,另一个用于接收系统信号

  • 每建立一个tcp连接,发送连接的远端地址到msg中

  • 在主goroutine中使用for select 循环检测msg和sig的事件,并优先匹配系统信号的事件。

初始化了两个channel,一个用于读写tcp连接的地址,另一个用于接收系统信号

每建立一个tcp连接,发送连接的远端地址到msg中

在主goroutine中使用for select 循环检测msg和sig的事件,并优先匹配系统信号的事件。

deferconn.Close() io.WriteString(conn, "hello") msg <- conn.RemoteAddr().String()}

funcmain() { msg := make( chanstring) sig := make( chanos.Signal) signal.Notify(sig, os.Interrupt)

gofunc() { l, err := net.Listen( "tcp", ":8000")

iferr != nil{

panic(err) }

for{ conn, err := l.Accept()

iferr != nil{

ifnetErr, ok := err.(net.Error); ok && netErr.Temporary() {

continue} else{

panic(err) } }

gohandleConn(conn, msg) } }()

for{

select{

case<-sig: fmt.Println( "exit") os.Exit (0)

caseaddr := <-msg: fmt.Println(addr) } }}

Worker pool

在web开发的时候,可以很容易地使用关键字go将长耗时的任务放到其他goroutine中执行,比如

http.HandleFunc( "/", func(rw http.ResponseWriter, r *http.Request) { //long time taskgofunc() { //...}() rw.WriteHeader(http.StatusOK) })

但是这样做的缺点也很明显:无法控制并发数量,标准库的http server在一个单独的goroutine中处理每一个用户请求,每个请求又可能会创建1到多个goroutine,如果某一段时间用户请求量很高,会导致服务器在短时间内创建大量的goroutine,在高并发的场景中,这样做是会影响性能的,这个时候,我们希望用一些手段来控制并发数量,比较常见的做法是使用queue+worker pool的方式,这样后台可以任意调整worker的数量来控制任务的处理速度。

以下是一个简单的worker pool,以一个缓冲的channel作为任务队列,用一个函数来表示相应的任务,worker循环从queue中读取并执行相应的任务,直到收到stop的信号。

typeTask func()

typePool struct{ maxWorkers intqueue chanTask done chanstruct{}}

func(p Pool) Start() {

fori := 0; i < p.maxWorkers; i++ { worker := Worker{i}

goworker.Consume(p.queue, p.done) }}

func(p Pool) Stop() {

close(p.done)}

typeWorker struct{ id int}

func(w Worker) Consume(queue chanTask, stopCh chanstruct{}) {

for{

select{

case<-stopCh: fmt.Printf( "worker %d stoppedn", w.id)

returncasetask := <-queue: task() } }}

funcmain() { done := make( chanstruct{}) queue := make( chanTask, 100)

fori := 0; i < 100; i++ { i := i queue <- func() {

fmt.Println(i)

} //just print something} pool := Pool{ maxWorkers: 5, queue: queue, done: done, } pool.Start() //stop pool after 5 secondstime.Sleep(time.Second * 5) pool.Stop()}

并行运算

go 1.5之后,GOMAXPROCS被默认设置为CPU的核心数量,goroutine会被调度到多个系统线程之上,这意味着我们可以利用多核CPU的性能做一些并行运算,比如:计算10000个文件的MD5值,生成10W个图片的缩略图等等,在这些场景中,我们可以将任务切分为小块,分派个多个goroutine执行,最后通过channel将结果汇合到一起。

下面是一个使用蒙特卡洛法计算PI近似值的一个例子,基本的思想是利用圆的面积与其外接正方形的面积之比为PI/4,通过产生大量均匀分布的二维点,计算落在圆和正方形内的点的比例,再乘以4,就可以得到PI的近似值,随着样本数量的增加,结果会越来越接近PI。

首先是一个单核的版本

funcMonteCarloPI(samples int) float64{ inside := 0//indicates point is inside the circler := rand.New(rand.NewSource(time.Now().UnixNano()))

fori := 0; i < samples; i++ { x := r.Float64() y := r.Float64()

ifx*x+y*y < 1{ inside++ } }

returnfloat64(inside) / float64(samples) * 4}

funcmain() { fmt.Println(MonteCarloPI (10000000)) fmt.Println(MonteCarloPI (100000000)) fmt.Println(math.Pi)}

结果如下,前两个分别为一千万和一亿样本时的结果,最后一行为go标准库math中的PI

3.14172843.14161463.141592653589793...

当样本数量为1亿时,耗时已经达到了数秒钟。以下例子取CPU核心数作为worker的数量,每个goroutine计算出一个PI的近似值,通过results这个channel汇集goroutine计算的结果,最后求平均值。

funcMonteCarloMultiCore(samples int) float64{ workers := runtime.NumCPU() results := make( chanfloat64, workers) threadSamples := samples / workers

fori := 0; i < workers; i++ {

gofunc() { results<-MonteCarloPI(threadSamples) }() }

vartotal float64fori := 0; i < workers; i++ { total += <-results }

returntotal / float64(workers)}

在10亿的样本情况下,单核和多核的结果如下,可以看到提升是很明显的

MonteCarloPI 27.233842675s MonteCarloMultiCore 5.598302862s

更多关于channel的资料,可以参考如下链接

  • Effective go

    (https://golang.org/doc/effective_go.html#concurrency)

  • Go concurrency patterns

    (https://blog.golang.org/pipelines)

  • Concurrency is not parallelism

    (https://blog.golang.org/concurrency-is-not-parallelism)

Effective go

(https://golang.org/doc/effective_go.html#concurrency)

Go concurrency patterns

(https://blog.golang.org/pipelines)

Concurrency is not parallelism

(https://blog.golang.org/concurrency-is-not-parallelism)