使用箭头 <- 来读写一个channel,无缓冲的channel 常用于同步的场景,当向一个无缓冲的channel 发送消息,另一端没有人接收,或者当从一个无缓冲的channel 接收消息,另一端没有人发送时,当前goroutine都会阻塞。下面是一个简单的例子,主goroutine会在接收c的时候阻塞,直到匿名函数中向c写入一个0。
c := make(chanint)
gofunc(ch chanint) { ch <-0}(c) <-c
也可以使用range读取一个channel,循环会持续到channel关闭
c := make(chanint)
gofunc(ch chanint) { c <-1close(c) }(c)
fori := rangec { fmt.Println(i) }
而缓冲的channel只有当缓冲区满或者空,写入和读取的时候才会发生阻塞,所以缓冲的channel可以当做简单的队列来使用
c := make( chanint, 10) c <- 1c <- 2fmt.Println(<-c, <-c)
如果以上代码稍作更改,在channel为空时再读取一次,则会发生死锁,代码如下
c := make( chanint, 10) c <- 1c <- 2fmt.Println(<-c, <-c,<-c)
而运行错误信息为
fatal error: all goroutines are asleep - deadlock!
死锁的原因在于channel的另一端没有goroutine写入了,当缓冲的channel为空时,主goroutine会一直阻塞在读取,同时也没有其他goroutine可以调度。
安全的关闭channel
go提供了一个内置的方法close()用于关闭一个channel,需要注意的是:
只能关闭一个双向或者可写的channel。
对于同一个channel,多次调用close(),会导致panic。
对一个已关闭的channel写数据,会导致panic。
从一个已关闭的channel中读数据,不会panic,会读到channel对应类型的0值,比如int为0,bool就为false,但是这样无法确定读取到的是否是正确的数据,所以一般会使用channel返回的第二个可选参数来判断channel是否关闭。
只能关闭一个双向或者可写的channel。
对于同一个channel,多次调用close(),会导致panic。
对一个已关闭的channel写数据,会导致panic。
从一个已关闭的channel中读数据,不会panic,会读到channel对应类型的0值,比如int为0,bool就为false,但是这样无法确定读取到的是否是正确的数据,所以一般会使用channel返回的第二个可选参数来判断channel是否关闭。
close(c)
if_, ok := <-c; !ok { fmt.Println( "channel closed") }
鉴于向一个已关闭的channel发送数据会导致panic,所以一般由发送者调用close()关闭channel,因为发送数据的一方清楚什么时候应该停止数据的写入;同时,在channel关闭之后,所有因为读取这个channel而阻塞的goroutine会立即往后执行,利用这一点可以实现简单的广播。
stopCh := make( chanstruct{})
fori := 0; i < 5; i++ {
gofunc(i int) { <-stopCh fmt.Printf( "goroutine %d stoppedn", i) }(i) }
close(stopCh) time.Sleep(time.Second * 1)
以上代码在主goroutine中关闭channel,其他goroutine会立即退出
goroutine 2stoppedgoroutine 4stoppedgoroutine 1stoppedgoroutine 0stoppedgoroutine 3stopped
需要注意的一种情况是只声明未赋值的channel,即 nil channel,close 一个nil channel会导致panic,而读写一个nil channel会永久阻塞。
Select
从一个无缓冲的channel读取数据会阻塞,如果需要从多个channel读取数据呢?这个时候就需要配合select关键词使用
select关键字的灵感来源与unix中的I/O多路复用函数select(现已被epoll、kqueue等替代),unix中的select函数监听多个文件描述符,当select返回时,会得到可读或可写的描述符集合,这种技术实现了在一个线程内处理多个套接字连接。
在go中使用select - case 可以在多个channel上监听读写事件,某个case产生了读写事件时,则执行相应case中的代码
下面例子中第二个case从c1读取,c1另一端有一个goroutine写入,所以执行第二个case中的代码
c1 := make( chanint) c2 := make( chanint)
gofunc() { c1 <-1}()
select{
casec2 <- 1: fmt.Println( "write to c2")
case<-c1: fmt.Println( "read from c1") }
在实际的应用场景中,需要循环对多个channel中的某一个读写数据,比如服务器编程中,经常使用for - select的方式循环检测多个channel的事件,下面是一个简单的tcp服务的例子。
初始化了两个channel,一个用于读写tcp连接的地址,另一个用于接收系统信号
每建立一个tcp连接,发送连接的远端地址到msg中
在主goroutine中使用for select 循环检测msg和sig的事件,并优先匹配系统信号的事件。
初始化了两个channel,一个用于读写tcp连接的地址,另一个用于接收系统信号
每建立一个tcp连接,发送连接的远端地址到msg中
在主goroutine中使用for select 循环检测msg和sig的事件,并优先匹配系统信号的事件。
deferconn.Close() io.WriteString(conn, "hello") msg <- conn.RemoteAddr().String()}
funcmain() { msg := make( chanstring) sig := make( chanos.Signal) signal.Notify(sig, os.Interrupt)
gofunc() { l, err := net.Listen( "tcp", ":8000")
iferr != nil{
panic(err) }
for{ conn, err := l.Accept()
iferr != nil{
ifnetErr, ok := err.(net.Error); ok && netErr.Temporary() {
continue} else{
panic(err) } }
gohandleConn(conn, msg) } }()
for{
select{
case<-sig: fmt.Println( "exit") os.Exit (0)
caseaddr := <-msg: fmt.Println(addr) } }}
Worker pool
在web开发的时候,可以很容易地使用关键字go将长耗时的任务放到其他goroutine中执行,比如
http.HandleFunc( "/", func(rw http.ResponseWriter, r *http.Request) { //long time taskgofunc() { //...}() rw.WriteHeader(http.StatusOK) })
但是这样做的缺点也很明显:无法控制并发数量,标准库的http server在一个单独的goroutine中处理每一个用户请求,每个请求又可能会创建1到多个goroutine,如果某一段时间用户请求量很高,会导致服务器在短时间内创建大量的goroutine,在高并发的场景中,这样做是会影响性能的,这个时候,我们希望用一些手段来控制并发数量,比较常见的做法是使用queue+worker pool的方式,这样后台可以任意调整worker的数量来控制任务的处理速度。
以下是一个简单的worker pool,以一个缓冲的channel作为任务队列,用一个函数来表示相应的任务,worker循环从queue中读取并执行相应的任务,直到收到stop的信号。
typeTask func()
typePool struct{ maxWorkers intqueue chanTask done chanstruct{}}
func(p Pool) Start() {
fori := 0; i < p.maxWorkers; i++ { worker := Worker{i}
goworker.Consume(p.queue, p.done) }}
func(p Pool) Stop() {
close(p.done)}
typeWorker struct{ id int}
func(w Worker) Consume(queue chanTask, stopCh chanstruct{}) {
for{
select{
case<-stopCh: fmt.Printf( "worker %d stoppedn", w.id)
returncasetask := <-queue: task() } }}
funcmain() { done := make( chanstruct{}) queue := make( chanTask, 100)
fori := 0; i < 100; i++ { i := i queue <- func() {
fmt.Println(i)
} //just print something} pool := Pool{ maxWorkers: 5, queue: queue, done: done, } pool.Start() //stop pool after 5 secondstime.Sleep(time.Second * 5) pool.Stop()}
并行运算
go 1.5之后,GOMAXPROCS被默认设置为CPU的核心数量,goroutine会被调度到多个系统线程之上,这意味着我们可以利用多核CPU的性能做一些并行运算,比如:计算10000个文件的MD5值,生成10W个图片的缩略图等等,在这些场景中,我们可以将任务切分为小块,分派个多个goroutine执行,最后通过channel将结果汇合到一起。
下面是一个使用蒙特卡洛法计算PI近似值的一个例子,基本的思想是利用圆的面积与其外接正方形的面积之比为PI/4,通过产生大量均匀分布的二维点,计算落在圆和正方形内的点的比例,再乘以4,就可以得到PI的近似值,随着样本数量的增加,结果会越来越接近PI。
首先是一个单核的版本
funcMonteCarloPI(samples int) float64{ inside := 0//indicates point is inside the circler := rand.New(rand.NewSource(time.Now().UnixNano()))
fori := 0; i < samples; i++ { x := r.Float64() y := r.Float64()
ifx*x+y*y < 1{ inside++ } }
returnfloat64(inside) / float64(samples) * 4}
funcmain() { fmt.Println(MonteCarloPI (10000000)) fmt.Println(MonteCarloPI (100000000)) fmt.Println(math.Pi)}
结果如下,前两个分别为一千万和一亿样本时的结果,最后一行为go标准库math中的PI
3.14172843.14161463.141592653589793...
当样本数量为1亿时,耗时已经达到了数秒钟。以下例子取CPU核心数作为worker的数量,每个goroutine计算出一个PI的近似值,通过results这个channel汇集goroutine计算的结果,最后求平均值。
funcMonteCarloMultiCore(samples int) float64{ workers := runtime.NumCPU() results := make( chanfloat64, workers) threadSamples := samples / workers
fori := 0; i < workers; i++ {
gofunc() { results<-MonteCarloPI(threadSamples) }() }
vartotal float64fori := 0; i < workers; i++ { total += <-results }
returntotal / float64(workers)}
在10亿的样本情况下,单核和多核的结果如下,可以看到提升是很明显的
MonteCarloPI 27.233842675s MonteCarloMultiCore 5.598302862s
更多关于channel的资料,可以参考如下链接
Effective go
(https://golang.org/doc/effective_go.html#concurrency)
Go concurrency patterns
(https://blog.golang.org/pipelines)
Concurrency is not parallelism
(https://blog.golang.org/concurrency-is-not-parallelism)
Effective go
(https://golang.org/doc/effective_go.html#concurrency)
Go concurrency patterns
(https://blog.golang.org/pipelines)
Concurrency is not parallelism
(https://blog.golang.org/concurrency-is-not-parallelism)