一、利用chan实现并发协程优雅的退出
1、使用一个专门的退出通道,接收退出的信号。 当启动了多个worker工作协程时,只要main()执行关闭退出通道,每一个worker都会都到信号,进而关闭。
func worker(stopCh <-chan struct{}) {
go func() {
defer fmt.Println("worker exit")
// Using stop channel explicit exit
for {
select {
case <-stopCh:
fmt.Println("Recv stop signal")
return
case <-t.C:
fmt.Println("Working .")
}
}
}()
return
}
//main中
close(stopCh)
for-rangerange
go func(in <-chan int) {
// Using for-range to exit goroutine
// range has the ability to detect the close/end of a channel
for x := range in {
fmt.Printf("Process %d\n", x)
}
}(inCh)
for-select
,okok=false
go func() {
// in for-select using ok to exit goroutine
for {
select {
case x, ok := <-in:
if !ok {
return
}
fmt.Printf("Process %d\n", x)
processedCnt++
case <-t.C:
fmt.Printf("Working, processedCnt = %d\n", processedCnt)
}
}
}()
第二种:如果某个通道关闭了,不再处理该通道,而是继续处理其他case,退出是等待所有的可读通道关闭。我们需要使用select的一个特征:select不会在nil的通道上进行等待。这种情况,把只读通道设置为nil即可解决。
go func() {
// in for-select using ok to exit goroutine
for {
select {
case x, ok := <-in1:
if !ok {
in1 = nil
}
// Process
case y, ok := <-in2:
if !ok {
in2 = nil
}
// Process
case <-t.C:
fmt.Printf("Working, processedCnt = %d\n", processedCnt)
}
// If both in channel are closed, goroutine exit
if in1 == nil && in2 == nil {
return
}
}
}()
最佳实践
- 发送协程主动关闭通道,接收协程不关闭通道。技巧:把接收方的通道入参声明为只读,如果接收协程关闭只读协程,编译时就会报错。
- 协程处理1个通道,并且是读时,协程优先使用for-range,因为range可以关闭通道的关闭自动退出协程。
- for-select中的x, ok := <-chan可以处理多个读通道关闭,可以关闭当前使用for-select的协程或循环中的特定分支。
- 主协程显式关闭通道stopCh可以处理主动通知多work协程同时退出的场景。
4、使用go context进行多嵌套的goroutine协程间的退出
如果我们可以在简单的通知上附加传递额外的信息来控制取消:为什么取消,或者有一个它必须要完成的最终期限,更或者有多个取消选项,我们需要根据额外的信息来判断选择执行哪个取消选项。
考虑下面这种情况:假如主协程中有多个任务1, 2, …m,主协程对这些任务有超时控制;而其中任务1又有多个子任务1, 2, …n,任务1对这些子任务也有自己的超时控制,那么这些子任务既要感知主协程的取消信号,也需要感知任务1的取消信号。
如果还是使用done channel的用法,我们需要定义两个done channel,子任务们需要同时监听这两个done channel。嗯,这样其实好像也还行哈。但是如果层级更深,如果这些子任务还有子任务,那么使用done channel的方式将会变得非常繁琐且混乱。我们需要一种优雅的方案来实现这样一种机制:
- 上层任务取消后,所有的下层任务都会被取消;
- 中间某一层的任务取消后,只会将当前任务的下层任务取消,而不会影响上层的任务以及同级任务。
参考链接1
参考链接2
参考链接3
参考链接4
二、利用chan实现生产者消费者模式
通过定义一个channel 变量接收某种事件,然后通过工作 goroutine 消费执行这个 channel 中的事件。
produce_goroutine->event -> consumer_goroutine
| |
produce_goroutine->event -> channel -> consumer_goroutine
| |
produce_goroutine->event -> consumer_goroutine
生产者消费者的使用场景:
1、生成协程生产数据到管道中
2、消费协程在管道中取数据进行处理
通过捕捉特定信号对程序进行相关处理,当某个信号进行触发的时候,主协程将向各个协程发送退出指令,当数据管道处理完成时,若接收到退出指令 将结束协程的执行。
package main
import (
"fmt"
"git.code.oa.com/gongyi/gongyi_base/log"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
var dataChan chan int
/*************************生产类*************************/
type Producer struct {
closedChan chan struct{}
wg sync.WaitGroup
}
func (producer *Producer)Produce() {
defer producer.wg.Done()
data := 1
for {
dataChan<-data
log.Infof("push data:%d succ", data)
data++
time.Sleep(time.Second * 1)
select {
// 若关闭了通道,直接退出
case <-producer.closedChan:
return
// 不可阻塞
default:
continue
}
}
}
func (producer *Producer) Stop() {
close(producer.closedChan)
producer.wg.Wait()
log.Infof("producer has stoped...")
}
/*************************消费类*************************/
type Consumer struct {
workNo int
closedChan chan struct{}
wg sync.WaitGroup
}
func (test* Consumer)Work() {
defer test.wg.Done()
for {
select {
case data := <-dataChan:
//process data....
continue
case <-test.closedChan:
log.Infof("worker %d exit...", test.workNo)
return
}
}
}
func (test* Consumer)Stop() {
close(test.closedChan)
test.wg.Wait()
log.Infof("%d has stoped...", test.workNo)
}
/*************************主逻辑*************************/
func main() {
dataChan = make(chan int, 86400)
// 创建生产协程,并启动
producer := Producer{
closedChan: make(chan struct{}),
}
producer.wg.Add(1)
go producer.Produce()
// 创建消费协程,并启动
consumerNumber := 7
var consumers []Consumer
for i := 0; i < consumerNumber; i++ {
workers = append(consumers, Consumer{
workNo: i,
closedChan: make(chan struct{}),
})
}
for i := 0; i < consumerNumber; i++ {
consumers[i].wg.Add(1)
go consumers[i].Work()
}
// 信号处理
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
select {
case sig := <-c:
fmt.Printf("Got %s signal. Aborting...\n", sig)
producer.Stop()
for i := 0; i < consumerNumber; i++ {
consumers[i].Stop()
}
}
}
三、利用chan实现任务的分发
设计一个分发器,当有数据进入分发器后,需要将数据分发到多个处理器处理,每个处理器可以想象为一个协程,处理器在没有数据的时候要阻塞。代码参考
分发器的角色介绍:
Worker: 工作处理者,真正处理业务逻辑的地方,负责接收并处理任务,同时它需要告诉“调度员”是否做好了 接收更多任务的准备。
WorkerPool: 我们把它比喻为“工人队列”,具体对应GO里的buffered channel of channels。
work: 工作请求
WorkQueue:工作请求队列。chan可以是有缓冲的,进而实现任务队列workqueue。
步骤一 : 定义工作任务请求(WorkRequest)数据结构。通过dispatcher把任务请求分发给worker。定义work channel的统一接收类型的数据结构。(WorkRequest结构体)。
package main
import "time"
type WorkRequest struct {
Name string
Delay time.Duration
}
步骤二 : Collector(工作收集者)接收客户的任务请求,将请求封装成worker能够理解的 WorkRequest,然后将其添加到全局任务队列的尾部。 Collector需要辨别哪些客户请求才是合理的。具体而言collector需要校验请求类型,必要数据 字段,数据边界等等。collector定义了工作队列workqueue,采用http的形式用于从多个客户处收集任务请求并缓存到workqueue。
package main
import (
"fmt"
"net/http"
"time"
)
// A buffered channel that we can send work requests on.
var WorkQueue = make(chan WorkRequest, 100)
func Collector(w http.ResponseWriter, r *http.Request) {
// Make sure we can only be called with an HTTP POST request.
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Parse the delay.
delay, err := time.ParseDuration(r.FormValue("delay"))
if err != nil {
http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)
return
}
// Check to make sure the delay is anywhere from 1 to 10 seconds.
if delay.Seconds() < 1 || delay.Seconds() > 10 {
http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest)
return
}
// Now, we retrieve the person's name from the request.
name := r.FormValue("name")
// Just do a quick bit of sanity checking to make sure the client actually provided us with a name.
if name == "" {
http.Error(w, "You must specify a name.", http.StatusBadRequest)
return
}
// Now, we take the delay, and the person's name, and make a WorkRequest out of them.
work := WorkRequest{Name: name, Delay: delay}
// Push the work onto the queue.
WorkQueue <- work
fmt.Println("Work request queued")
// And let the user know their work request was created.
w.WriteHeader(http.StatusCreated)
return
}
步骤三: 工作者(worker)需要一个专门的工作通道(Work chan WorkRequest)用于接收dispatcher分发给它的任务请求(WorkRequest)。worker的工作通道是一个不带缓冲的任务通道(WorkRequest channel),用来接收Work Request,之所以不带缓冲是因为一个worker一次只能处理一个任务,dispatcher负责将任务请求分发给空闲等待的worker。另外我们将赋予每个工人 一个ID,以便区分哪一个工人正在处理某项任务。 另外多个worker共享一个worker队列WorkerQueue(WorkerQueue chan chan WorkRequest ),用于将worker空闲时将自身的工作通道(chan WorkRequest)注册到WorkerQueue,从而worker能接收任务请求。并且每个worker共享一个退出通道(QuitChan),用于工作协程的集中优雅退出。
package main
import (
"fmt"
"time"
)
// NewWorker creates, and returns a new Worker object. Its only argument
// is a channel that the worker can add itself to whenever it is done its
// work.
func NewWorker(id int, workerQueue chan chan WorkRequest,stopCh chan bool) Worker {
// Create, and return the worker.
worker := Worker{
ID: id,
Work: make(chan WorkRequest),
WorkerQueue: workerQueue,
QuitChan: stopCh}
return worker
}
type Worker struct {
ID int
Work chan WorkRequest
WorkerQueue chan chan WorkRequest
QuitChan chan bool
}
// This function "starts" the worker by starting a goroutine, that is
// an infinite "for-select" loop.
func (w *Worker) Start() {
go func() {
for {
// Add ourselves into the worker queue.
w.WorkerQueue <- w.Work
select {
case work := <-w.Work:
// Receive a work request.
fmt.Printf("worker%d: Received work request, delaying for %f seconds\n", w.ID, work.Delay.Seconds())
time.Sleep(work.Delay)
fmt.Printf("worker%d: Hello, %s!\n", w.ID, work.Name)
case <-w.QuitChan:
// We have been asked to stop.
fmt.Printf("worker%d stopping\n", w.ID)
return
}
}
}()
}
// Stop tells the worker to stop listening for work requests.
//
// Note that the worker will only stop *after* it has finished its work.
func (w *Worker) Stop() {
go func() {
w.QuitChan <- true
}()
}
步骤四: dispatch任务分发的实现 ,dispatcher声明了全局的工作者队列(WorkerQueue),它用来接收每个空闲worker注册的工作通道(WorkRequest channel)。StartDispatcher方法里根据需要的worker数量,初始化worker队列。然后我们创建所需数量的 worker实例并“启动”它们。 最后一段代码,起了一个匿名协程,负责从全局任务队列(work queue)获取任务请求 (WorkRequest);然后在另一个匿名协程中,从全局工人队列(worker queue)“唤醒”一个 worker;最后将任务请求分发给worker。我们通过另外起一 个协程来“唤醒”worker并分配任务的原因在于,该操作(worker := <-WorkerQueue)是有可能阻塞的,将其异步化可以确保任务队列不会堆积。
package main
import "fmt"
var WorkerQueue chan chan WorkRequest
var stopCh chan bool
func StartDispatcher(nworkers int) {
// First, initialize the channel we are going to but the workers' work channels into.
WorkerQueue = make(chan chan WorkRequest, nworkers)
stopCh = make(chan bool)
// Now, create all of our workers.
for i := 0; i<nworkers; i++ {
fmt.Println("Starting worker", i+1)
worker := NewWorker(i+1, WorkerQueue,stopCh)
worker.Start()
}
go func() {
for {
select {
case work := <-WorkQueue:
fmt.Println("Received work requeust")
go func() {
worker := <-WorkerQueue
fmt.Println("Dispatching work request")
worker <- work
}()
}
}
}()
}
步骤五: 聚合组件处理请求。main函数是分发器的运行入口,允许客户指定worker的数量,以及HTTP server需要监听的地址。这两个命令行参数是可选的,我们提供了默认值。
package main
import (
"flag"
"fmt"
"net/http"
)
var (
NWorkers = flag.Int("n", 4, "The number of workers to start")
HTTPAddr = flag.String("http", "127.0.0.1:8000", "Address to listen for HTTP requests on")
)
func main() {
// Parse the command-line flags.
flag.Parse()
// Start the dispatcher.
fmt.Println("Starting the dispatcher")
StartDispatcher(*NWorkers)
// Register our collector as an HTTP handler function.
fmt.Println("Registering the collector")
http.HandleFunc("/work", Collector)
go func(){
// Start the HTTP server!
fmt.Println("HTTP server listening on", *HTTPAddr)
if err := http.ListenAndServe(*HTTPAddr, nil); err != nil {
fmt.Println(err.Error())
}
}()
// 信号处理
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
select {
case sig := <-c:
fmt.Printf("Got %s signal. Aborting...\n", sig)
close(stopCh)
}
}
四、chan中常见的bug
4.1 无缓冲的通道须知
func main() {
ch := make(chan int)
ch <- 10
fmt.Println("发送成功")
}
上面这段代码能够通过编译,但是执行的时候会出现以下错误:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
.../main.go:8 +0x54
deadlockch := make(chan int)ch <- 10
goroutine
func recv(c chan int) {
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch) // 启用goroutine从通道接收值
ch <- 10
fmt.Println("发送成功")
}
goroutinegoroutinegoroutinegoroutine同步通道
4.2 无缓冲通道由于通道接收者退出导致发送端侧阻塞
本意是想调用process()时加上超时的功能,如果process()在超时时间没有返回,则返回nil。但是当超时发 生的时候,针对代码中第二行创建的ch来说,由于已经没有receiver了,第5行将会被block 住,导致这个goroutine永远不会退出。
func test(timeout time.Duration) ob {
ch := make(chan ob)
go func() {
result := process()
ch <- result // block
}()
select {
case result = <-ch:
return result
case <-time.After(timeout):
return nil
}
}
这个bug的修复方式也是非常的简单,把unbuffered channel修改成buffered channel。在上面的例子中,虽然这样不会block了,但是channel一直没有被关闭,channel保持不关闭会导致资源的泄漏。channel 没有被任何协程用到后最终会被 GC 回收。关闭 channel 一般是用来通知其他协程某个任务已经完成了。
func test(timeout time.Duration) ob {
ch := make(chan ob, 1)
go func() {
result := process()
ch <- result // block
}()
select {
case result = <-ch:
return result
case <-time.After(timeout):
return nil
}
}
4.3 切忌重复关闭channel
select {
case <-c.closed:
default:
close(c.closed)
}
上面这块代码可能会被多个goroutine同时执行,这段代码的逻辑是,case这个分支判断closed这个channel是否被关闭了,如果被关闭的话,就什么都不做;如果closed没有被关闭的话,就执行default分支关闭这个channel,多个goroutine并发执行的时候,有可能会导致closed这个channel被关闭多次。
这个bug的修复方式是:
Once.Do(func() {
close(c.closed)
})
或者只允许在发送端关闭。
4.4 for range 的bug
使用for range可以自动的从channel中读取,当channel被关闭时,for循环退出,否则一直挂起。由于for range不等到信道关闭,不会结束读取,所以可能阻塞当前的gorouting,导致死锁。被关闭的信道会禁止数据流入,是只读的,我们仍然可以从关闭的信道中取数据,但是不能在写入数据了,被关闭的信道禁止数据写入,是可读的,可以从关闭的信道中读取数据。
func main() {
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
//死锁
for c := range ch {
fmt.Println(c)
}
}
func main() {
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
//close解除死锁
close(ch)
for c := range ch {
fmt.Println(c)
}
}
循环读取关闭的channel不会阻塞,会一直读取空值。可以通过读取结果的bool值判断该channel是否关闭。
//从关闭的
data, ok := <-nochan
if !ok {
fmt.Println("receive close chan")
}
4.5 只有一个协程(main)时,只往channel读数据或者写数据。比如一直向无数据流入的空信道取数据,就会引起死锁。
package main
import "fmt"
func main() {
ch1 := make(chan int)
fmt.Println(<-ch1)
ch := make(chan int, 3)
// 在无数据填入ch时,执行fmt.Println(<-ch),会发生死锁
// fmt.Println(<-ch)
ch <- 1
ch <- 2
ch <- 3
for v := range ch {
fmt.Println(v)
//在读取完ch时,若不关闭ch,继续读取,也同样会发生死锁
if v==3{
close(ch)
}
}
}
4.6 main协程一般要有永久循环。否则主协程退出,子协程和程序都将退出。通常有专门的退出通道或信号接收等联动程序整体退出。
//通道阻塞
func mainloop() {
shouldExist := false
for !shouldExist {
shouldExist = <-shutdown
}
}
//信号阻塞
func main(){
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
select {
case sig := <-c:
fmt.Printf("Got %s signal. Aborting...\n", sig)
close(stopCh)
}
}