本文介绍了使用 golang channel 的诸多特性和技巧,已经熟悉了 go 语言特性的小伙伴也可以看看,很有启发。
不同于传统的多线程并发模型使用共享内存来实现线程间通信的方式,golang 的哲学是通过 channel 进行协程 (goroutine) 之间的通信来实现数据共享:
Do not communicate by sharing memory; instead, share memory by communicating.
这种方式的优点是通过提供原子的通信原语,避免了竞态情形 (race condition) 下复杂的锁机制。channel 可以看成一个 FIFO 队列,对 FIFO 队列的读写都是原子的操作,不需要加锁。对 channel 的操作行为结果总结如下:
读取一个已关闭的 channel 时,总是能读取到对应类型的零值,为了和读取非空未关闭 channel 的行为区别,可以使用两个接收值:
// ok is false when ch is closed
v, ok := <-ch
golang 中大部分类型都是值类型(只有 slice / channel / map 是引用类型),读/写类型是值类型的 channel 时,如果元素 size 比较大时,应该使用指针代替,避免频繁的内存拷贝开销。
内部实现
$GOROOT/src/runtime/chan.go
- 读等待协程队列 recvq,维护了阻塞在读此 channel 的协程列表
- 写等待协程队列 sendq,维护了阻塞在写此 channel 的协程列表
- 缓冲数据队列 buf,用环形队列实现,不带缓冲的 channel 此队列 size 则为 0
当协程尝试从未关闭的 channel 中读取数据时,内部的操作如下:
<- ch<- ch<- ch
类似的,当协程尝试往未关闭的 channel 中写入数据时,内部的操作如下:
ch <-ch <-ch <-
当关闭 non-nil channel 时,内部的操作如下:
- 当队列 recvq 非空时,此时 buf 必为空,recvq 中的所有协程都将收到对应类型的零值然后结束阻塞状态;
- 当队列 sendq 非空时,此时 buf 必为满,sendq 中的所有协程都会产生 panic ,在 buf 中数据仍然会保留直到被其他协程读取。
使用场景
除了常规的用来在协程之间传递数据外,本节列出了一些特殊的使用 channel 的场景。
futures / promises
golang 虽然没有直接提供 futrue / promise 模型的操作原语,但通过 goroutine 和 channel 可以实现类似的功能:
package main
import (
"io/ioutil"
"log"
"net/http"
)
// RequestFuture, http request promise.
func RequestFuture(url string) <-chan []byte {
c := make(chan []byte, 1)
go func() {
var body []byte
defer func() {
c <- body
}()
res, err := http.Get(url)
if err != nil {
return
}
defer res.Body.Close()
body, _ = ioutil.ReadAll(res.Body)
}()
return c
}
func main() {
future := RequestFuture("https://api.github.com/users/octocat/orgs")
body := <-future
log.Printf("reponse length: %d", len(body))
}
条件变量 (condition variable)
strct {}
一对一通知
pthread_cond_signal()
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan struct{})
nums := make([]int, 100)
go func() {
time.Sleep(time.Second)
for i := 0; i < len(nums); i++ {
nums[i] = i
}
// send a finish signal
ch <- struct{}{}
}()
// wait for finish signal
<-ch
fmt.Println(nums)
}
广播通知
pthread_cond_broadcast()
package main
import (
"fmt"
"time"
)
func main() {
N := 10
exit := make(chan struct{})
done := make(chan struct{}, N)
// start N worker goroutines
for i := 0; i < N; i++ {
go func(n int) {
for {
select {
// wait for exit signal
case <-exit:
fmt.Printf("worker goroutine #%d exit\n", n)
done <- struct{}{}
return
case <-time.After(time.Second):
fmt.Printf("worker goroutine #%d is working...\n", n)
}
}
}(i)
}
time.Sleep(3 * time.Second)
// broadcast exit signal
close(exit)
// wait for all worker goroutines exit
for i := 0; i < N; i++ {
<-done
}
fmt.Println("main goroutine exit")
}
信号量
channel 的读/写相当于信号量的 P / V 操作,下面的示例程序中 channel 相当于信号量:
package main
import (
"log"
"math/rand"
"time"
)
type Seat int
type Bar chan Seat
func (bar Bar) ServeConsumer(customerId int) {
log.Print("-> consumer#", customerId, " enters the bar")
seat := <-bar // need a seat to drink
log.Print("consumer#", customerId, " drinks at seat#", seat)
time.Sleep(time.Second * time.Duration(2+rand.Intn(6)))
log.Print("<- consumer#", customerId, " frees seat#", seat)
bar <- seat // free the seat and leave the bar
}
func main() {
rand.Seed(time.Now().UnixNano())
bar24x7 := make(Bar, 10) // the bar has 10 seats
// Place seats in an bar.
for seatId := 0; seatId < cap(bar24x7); seatId++ {
bar24x7 <- Seat(seatId) // none of the sends will block
}
// a new consumer try to enter the bar for each second
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
go bar24x7.ServeConsumer(customerId)
}
}
互斥量
互斥量相当于二元信号里,所以 cap 为 1 的 channel 可以当成互斥量使用:
package main
import "fmt"
func main() {
mutex := make(chan struct{}, 1) // the capacity must be one
counter := 0
increase := func() {
mutex <- struct{}{} // lock
counter++
<-mutex // unlock
}
increase1000 := func(done chan<- struct{}) {
for i := 0; i < 1000; i++ {
increase()
}
done <- struct{}{}
}
done := make(chan struct{})
go increase1000(done)
<-done; <-done
fmt.Println(counter) // 2000
}
关闭 channel
关闭不再需要使用的 channel 并不是必须的。跟其他资源比如打开的文件、socket 连接不一样,这类资源使用完后不关闭后会造成句柄泄露,channel 使用完后不关闭也没有关系,channel 没有被任何协程用到后最终会被 GC 回收。关闭 channel 一般是用来通知其他协程某个任务已经完成了。golang 也没有直接提供判断 channel 是否已经关闭的接口,虽然可以用其他不太优雅的方式自己实现一个:
func isClosed(ch chan int) bool {
select {
case <-ch:
return true
default:
}
return false
}
isClosed()isClosed()
- 不要在读取端关闭 channel ,因为写入端无法知道 channel 是否已经关闭,往已关闭的 channel 写数据会 panic ;
- 有多个写入端时,不要再写入端关闭 channle ,因为其他写入端无法知道 channel 是否已经关闭,关闭已经关闭的 channel 会发生 panic ;
- 如果只有一个写入端,可以在这个写入端放心关闭 channel 。
sync
一写多读
for rangefor range
package main
import (
"fmt"
"sync"
)
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, 100)
send := func() {
for i := 0; i < 100; i++ {
ch <- i
}
// signal sending finish
close(ch)
}
recv := func(id int) {
defer wg.Done()
for i := range ch {
fmt.Printf("receiver #%d get %d\n", id, i)
}
fmt.Printf("receiver #%d exit\n", id)
}
wg.Add(3)
go recv(0)
go recv(1)
go recv(2)
send()
wg.Wait()
}
多写一读
sync.Once
package main
import (
"fmt"
"sync"
)
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, 100)
done := make(chan struct{})
send := func(id int) {
defer wg.Done()
for i := 0; ; i++ {
select {
case <-done:
// get exit signal
fmt.Printf("sender #%d exit\n", id)
return
case ch <- id*1000 + i:
}
}
}
recv := func() {
count := 0
for i := range ch {
fmt.Printf("receiver get %d\n", i)
count++
if count >= 1000 {
// signal recving finish
close(done)
return
}
}
}
wg.Add(3)
go send(0)
go send(1)
go send(2)
recv()
wg.Wait()
}
多写多读
这种场景稍微复杂,和上面的例子一样,也需要设置一个额外 channel 用来通知多个写入端和读取端。另外需要起一个额外的协程来通过关闭这个 channel 来广播通知:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, 100)
done := make(chan struct{})
send := func(id int) {
defer wg.Done()
for i := 0; ; i++ {
select {
case <-done:
// get exit signal
fmt.Printf("sender #%d exit\n", id)
return
case ch <- id*1000 + i:
}
}
}
recv := func(id int) {
defer wg.Done()
for {
select {
case <-done:
// get exit signal
fmt.Printf("receiver #%d exit\n", id)
return
case i := <-ch:
fmt.Printf("receiver #%d get %d\n", id, i)
time.Sleep(time.Millisecond)
}
}
}
wg.Add(6)
go send(0)
go send(1)
go send(2)
go recv(0)
go recv(1)
go recv(2)
time.Sleep(time.Second)
// signal finish
close(done)
// wait all sender and receiver exit
wg.Wait()
}
总结
channle 作为 golang 最重要的特性,用起来还是比较爽的。传统的 C 里要实现类型的功能的话,一般需要用到 socket 或者 FIFO 来实现,另外还要考虑数据包的完整性与并发冲突的问题,channel 则屏蔽了这些底层细节,使用者只需要考虑读写就可以了。channel 是引用类型,了解一下 channel 底层的机制对更好的使用 channel 还是很用必要的。虽然操作原语简单,但涉及到阻塞的问题,使用不当可能会造成死锁或者无限制的协程创建最终导致进程挂掉。channel 除在可以用来在协程之间通信外,其阻塞和唤醒协程的特性也可以用作协程之间的同步机制,文中也用示例简单介绍了这种场景下的用法。关闭 channel 并不是必须的,只要没有协程没用引用 channel ,最终会被 GC 清理。所以使用的时候要特别注意,不要让协程阻塞在 channel 上,这种情况很难检测到,而且会造成 channel 和阻塞在 channel 的协程占有的资源无法被 GC 清理最终导致内存泄露。channle 方便 golang 程序使用 CSP 的编程范形,但是 golang 是一种多范形的编程语言,golang 也支持传统的通过共享内存来通信的编程方式。终极的原则是根据场景选择合适的编程范型,不要因为 channel 好用而滥用 CSP 。
转自:ExplorerMan
文章转载:Go开发大全