一、利用chan实现并发协程优雅的退出

1、使用一个专门的退出通道,接收退出的信号。 当启动了多个worker工作协程时,只要main()执行关闭退出通道,每一个worker都会都到信号,进而关闭。

func worker(stopCh <-chan struct{}) {
    go func() {
        defer fmt.Println("worker exit")
        // Using stop channel explicit exit
        for {
            select {
            case <-stopCh:
                fmt.Println("Recv stop signal")
                return
            case <-t.C:
                fmt.Println("Working .")
            }
        }
    }()
    return
}
//main中
close(stopCh)
for-rangerange
go func(in <-chan int) {
    // Using for-range to exit goroutine
    // range has the ability to detect the close/end of a channel
    for x := range in {
        fmt.Printf("Process %d\n", x)
    }
}(inCh)
for-select
,okok=false
go func() {
    // in for-select using ok to exit goroutine
    for {
        select {
        case x, ok := <-in:
            if !ok {
                return
            }
            fmt.Printf("Process %d\n", x)
            processedCnt++
        case <-t.C:
            fmt.Printf("Working, processedCnt = %d\n", processedCnt)
        }
    }
}()

第二种:如果某个通道关闭了,不再处理该通道,而是继续处理其他case,退出是等待所有的可读通道关闭。我们需要使用select的一个特征:select不会在nil的通道上进行等待。这种情况,把只读通道设置为nil即可解决。

go func() {
    // in for-select using ok to exit goroutine
    for {
        select {
        case x, ok := <-in1:
            if !ok {
                in1 = nil
            }
            // Process
        case y, ok := <-in2:
            if !ok {
                in2 = nil
            }
            // Process
        case <-t.C:
            fmt.Printf("Working, processedCnt = %d\n", processedCnt)
        }

        // If both in channel are closed, goroutine exit
        if in1 == nil && in2 == nil {
            return
        }
    }
}()
最佳实践
  • 发送协程主动关闭通道,接收协程不关闭通道。技巧:把接收方的通道入参声明为只读,如果接收协程关闭只读协程,编译时就会报错。
  • 协程处理1个通道,并且是读时,协程优先使用for-range,因为range可以关闭通道的关闭自动退出协程。
  • for-select中的x, ok := <-chan可以处理多个读通道关闭,可以关闭当前使用for-select的协程或循环中的特定分支。
  • 主协程显式关闭通道stopCh可以处理主动通知多work协程同时退出的场景。

4、使用go context进行多嵌套的goroutine协程间的退出

如果我们可以在简单的通知上附加传递额外的信息来控制取消:为什么取消,或者有一个它必须要完成的最终期限,更或者有多个取消选项,我们需要根据额外的信息来判断选择执行哪个取消选项。

考虑下面这种情况:假如主协程中有多个任务1, 2, …m,主协程对这些任务有超时控制;而其中任务1又有多个子任务1, 2, …n,任务1对这些子任务也有自己的超时控制,那么这些子任务既要感知主协程的取消信号,也需要感知任务1的取消信号。

如果还是使用done channel的用法,我们需要定义两个done channel,子任务们需要同时监听这两个done channel。嗯,这样其实好像也还行哈。但是如果层级更深,如果这些子任务还有子任务,那么使用done channel的方式将会变得非常繁琐且混乱。我们需要一种优雅的方案来实现这样一种机制:

  • 上层任务取消后,所有的下层任务都会被取消;
  • 中间某一层的任务取消后,只会将当前任务的下层任务取消,而不会影响上层的任务以及同级任务。

在这里插入图片描述
参考链接1
参考链接2
参考链接3
参考链接4


二、利用chan实现生产者消费者模式

通过定义一个channel 变量接收某种事件,然后通过工作 goroutine 消费执行这个 channel 中的事件。

produce_goroutine->event ->						consumer_goroutine
       					 	 |				|
produce_goroutine->event ->		channel -> 		consumer_goroutine
        					 |				|
produce_goroutine->event ->						consumer_goroutine

生产者消费者的使用场景:
1、生成协程生产数据到管道中
2、消费协程在管道中取数据进行处理

通过捕捉特定信号对程序进行相关处理,当某个信号进行触发的时候,主协程将向各个协程发送退出指令,当数据管道处理完成时,若接收到退出指令 将结束协程的执行。

package main
import (
	"fmt"
	"git.code.oa.com/gongyi/gongyi_base/log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)
var dataChan chan int
/*************************生产类*************************/
type Producer struct {
	closedChan chan struct{}
	wg sync.WaitGroup
}
func (producer *Producer)Produce() {
	defer producer.wg.Done()
	data := 1
	for { 
		dataChan<-data
		log.Infof("push data:%d succ", data)
		data++
		time.Sleep(time.Second * 1)
		select {
			// 	若关闭了通道,直接退出
			case <-producer.closedChan:
				return
			// 不可阻塞
			default:
				continue
		}
	}
}
func (producer *Producer) Stop() {
	close(producer.closedChan)
	producer.wg.Wait()
	log.Infof("producer has stoped...")
}
/*************************消费类*************************/
type Consumer struct {
	workNo int
	closedChan chan struct{}
	wg sync.WaitGroup
}
func (test* Consumer)Work() {
	defer test.wg.Done()
	for {
		select {
			case data := <-dataChan:
				//process data....
				continue
			case <-test.closedChan:
				log.Infof("worker %d exit...", test.workNo)
				return
		}
	}
}
func (test* Consumer)Stop() {
	close(test.closedChan)
	test.wg.Wait()
	log.Infof("%d has stoped...", test.workNo)
}
/*************************主逻辑*************************/
func main() {
	dataChan = make(chan int, 86400)
	// 创建生产协程,并启动
	producer := Producer{
		closedChan: make(chan struct{}),
	}
	producer.wg.Add(1)
	go producer.Produce()
	// 创建消费协程,并启动
	consumerNumber := 7
	var consumers []Consumer
	for i := 0; i < consumerNumber; i++ {
		workers = append(consumers, Consumer{
			workNo:    i,
			closedChan: make(chan struct{}),
		})
	}
	for i := 0; i < consumerNumber; i++ {
		consumers[i].wg.Add(1)
		go consumers[i].Work()
	}
	// 信号处理
	c := make(chan os.Signal)
	signal.Notify(c, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
	select {
	case sig := <-c:
		fmt.Printf("Got %s signal. Aborting...\n", sig)
		producer.Stop()
		for i := 0; i < consumerNumber; i++ {
			consumers[i].Stop()
		}
	}
}

三、利用chan实现任务的分发

在这里插入图片描述

设计一个分发器,当有数据进入分发器后,需要将数据分发到多个处理器处理,每个处理器可以想象为一个协程,处理器在没有数据的时候要阻塞。代码参考

分发器的角色介绍:

Worker: 工作处理者,真正处理业务逻辑的地方,负责接收并处理任务,同时它需要告诉“调度员”是否做好了 接收更多任务的准备。

WorkerPool: 我们把它比喻为“工人队列”,具体对应GO里的buffered channel of channels。

work: 工作请求

WorkQueue:工作请求队列。chan可以是有缓冲的,进而实现任务队列workqueue。

步骤一 : 定义工作任务请求(WorkRequest)数据结构。通过dispatcher把任务请求分发给worker。定义work channel的统一接收类型的数据结构。(WorkRequest结构体)。

package main
import "time"
type WorkRequest struct {
  Name  string
  Delay time.Duration
}

步骤二 : Collector(工作收集者)接收客户的任务请求,将请求封装成worker能够理解的 WorkRequest,然后将其添加到全局任务队列的尾部。 Collector需要辨别哪些客户请求才是合理的。具体而言collector需要校验请求类型,必要数据 字段,数据边界等等。collector定义了工作队列workqueue,采用http的形式用于从多个客户处收集任务请求并缓存到workqueue。

package main

import (
  "fmt"
  "net/http"
  "time"
)

// A buffered channel that we can send work requests on.
var WorkQueue = make(chan WorkRequest, 100)

func Collector(w http.ResponseWriter, r *http.Request) {
  // Make sure we can only be called with an HTTP POST request.
  if r.Method != "POST" {
    w.Header().Set("Allow", "POST")
    w.WriteHeader(http.StatusMethodNotAllowed)
    return
  }

  // Parse the delay.
  delay, err := time.ParseDuration(r.FormValue("delay"))
  if err != nil {
    http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)
    return
  }
  
  // Check to make sure the delay is anywhere from 1 to 10 seconds.
  if delay.Seconds() < 1 || delay.Seconds() > 10 {
    http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest)
    return
  }
  
  // Now, we retrieve the person's name from the request.
  name := r.FormValue("name")
  
  // Just do a quick bit of sanity checking to make sure the client actually provided us with a name.
  if name == "" {
    http.Error(w, "You must specify a name.", http.StatusBadRequest)
    return
  }
  
  // Now, we take the delay, and the person's name, and make a WorkRequest out of them.
  work := WorkRequest{Name: name, Delay: delay}
  
  // Push the work onto the queue.
  WorkQueue <- work
  fmt.Println("Work request queued")
  
  // And let the user know their work request was created.
  w.WriteHeader(http.StatusCreated)
  return
}

步骤三: 工作者(worker)需要一个专门的工作通道(Work chan WorkRequest)用于接收dispatcher分发给它的任务请求(WorkRequest)。worker的工作通道是一个不带缓冲的任务通道(WorkRequest channel),用来接收Work Request,之所以不带缓冲是因为一个worker一次只能处理一个任务,dispatcher负责将任务请求分发给空闲等待的worker。另外我们将赋予每个工人 一个ID,以便区分哪一个工人正在处理某项任务。 另外多个worker共享一个worker队列WorkerQueue(WorkerQueue chan chan WorkRequest ),用于将worker空闲时将自身的工作通道(chan WorkRequest)注册到WorkerQueue,从而worker能接收任务请求。并且每个worker共享一个退出通道(QuitChan),用于工作协程的集中优雅退出。

package main

import (
  "fmt"
  "time"
)

// NewWorker creates, and returns a new Worker object. Its only argument
// is a channel that the worker can add itself to whenever it is done its
// work.
func NewWorker(id int, workerQueue chan chan WorkRequest,stopCh chan bool) Worker {
  // Create, and return the worker.
  worker := Worker{
    ID:          id,
    Work:        make(chan WorkRequest),
    WorkerQueue: workerQueue,
    QuitChan:    stopCh}
  
  return worker
}

type Worker struct {
  ID          int
  Work        chan WorkRequest
  WorkerQueue chan chan WorkRequest
  QuitChan    chan bool
}

// This function "starts" the worker by starting a goroutine, that is
// an infinite "for-select" loop.
func (w *Worker) Start() {
    go func() {
      for {
        // Add ourselves into the worker queue.
        w.WorkerQueue <- w.Work
        
        select {
        case work := <-w.Work:
          // Receive a work request.
          fmt.Printf("worker%d: Received work request, delaying for %f seconds\n", w.ID, work.Delay.Seconds())
          
          time.Sleep(work.Delay)
          fmt.Printf("worker%d: Hello, %s!\n", w.ID, work.Name)
          
        case <-w.QuitChan:
          // We have been asked to stop.
          fmt.Printf("worker%d stopping\n", w.ID)
          return
        }
      }
    }()
}

// Stop tells the worker to stop listening for work requests.
//
// Note that the worker will only stop *after* it has finished its work.
func (w *Worker) Stop() {
  go func() {
    w.QuitChan <- true
  }()
}

步骤四: dispatch任务分发的实现 ,dispatcher声明了全局的工作者队列(WorkerQueue),它用来接收每个空闲worker注册的工作通道(WorkRequest channel)。StartDispatcher方法里根据需要的worker数量,初始化worker队列。然后我们创建所需数量的 worker实例并“启动”它们。 最后一段代码,起了一个匿名协程,负责从全局任务队列(work queue)获取任务请求 (WorkRequest);然后在另一个匿名协程中,从全局工人队列(worker queue)“唤醒”一个 worker;最后将任务请求分发给worker。我们通过另外起一 个协程来“唤醒”worker并分配任务的原因在于,该操作(worker := <-WorkerQueue)是有可能阻塞的,将其异步化可以确保任务队列不会堆积。

package main
import "fmt"
var WorkerQueue chan chan WorkRequest
var stopCh chan bool
func StartDispatcher(nworkers int) {
  // First, initialize the channel we are going to but the workers' work channels into.
  WorkerQueue = make(chan chan WorkRequest, nworkers)
  stopCh = make(chan bool)
  // Now, create all of our workers.
  for i := 0; i<nworkers; i++ {
    fmt.Println("Starting worker", i+1)
    worker := NewWorker(i+1, WorkerQueue,stopCh)
    worker.Start()
  }
  
  go func() {
    for {
      select {
      case work := <-WorkQueue:
        fmt.Println("Received work requeust")
        go func() {
          worker := <-WorkerQueue
          
          fmt.Println("Dispatching work request")
          worker <- work
        }()
      }
    }
  }()
}

步骤五: 聚合组件处理请求。main函数是分发器的运行入口,允许客户指定worker的数量,以及HTTP server需要监听的地址。这两个命令行参数是可选的,我们提供了默认值。

package main

import (
  "flag"
  "fmt"
  "net/http"
)

var (
  NWorkers = flag.Int("n", 4, "The number of workers to start")
  HTTPAddr = flag.String("http", "127.0.0.1:8000", "Address to listen for HTTP requests on")
)

func main() {
  // Parse the command-line flags.
  flag.Parse()
  
  // Start the dispatcher.
  fmt.Println("Starting the dispatcher")
  StartDispatcher(*NWorkers)
  
  // Register our collector as an HTTP handler function.
  fmt.Println("Registering the collector")
  http.HandleFunc("/work", Collector)
    
  go func(){
 	 // Start the HTTP server!
  	 fmt.Println("HTTP server listening on", *HTTPAddr)
  	if err := http.ListenAndServe(*HTTPAddr, nil); err != nil {
   		 fmt.Println(err.Error())
     }
  }()
   // 信号处理
  c := make(chan os.Signal)
  signal.Notify(c, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
  select {
	case sig := <-c:
	  fmt.Printf("Got %s signal. Aborting...\n", sig)
      close(stopCh)
  }
}

四、chan中常见的bug

4.1 无缓冲的通道须知

func main() {
	ch := make(chan int)
	ch <- 10
	fmt.Println("发送成功")
}

上面这段代码能够通过编译,但是执行的时候会出现以下错误:

fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
        .../main.go:8 +0x54
deadlockch := make(chan int)ch <- 10
goroutine
func recv(c chan int) {
	ret := <-c
	fmt.Println("接收成功", ret)
}
func main() {
	ch := make(chan int)
	go recv(ch) // 启用goroutine从通道接收值
	ch <- 10
	fmt.Println("发送成功")
}
goroutinegoroutinegoroutinegoroutine同步通道

4.2 无缓冲通道由于通道接收者退出导致发送端侧阻塞

本意是想调用process()时加上超时的功能,如果process()在超时时间没有返回,则返回nil。但是当超时发 生的时候,针对代码中第二行创建的ch来说,由于已经没有receiver了,第5行将会被block 住,导致这个goroutine永远不会退出。

func test(timeout time.Duration) ob {
	ch := make(chan ob)
	go func() {
		result := process()
		ch <- result // block
	}()
	select {
		case result = <-ch:
			return result
		case <-time.After(timeout):
			return nil
	}
}

这个bug的修复方式也是非常的简单,把unbuffered channel修改成buffered channel。在上面的例子中,虽然这样不会block了,但是channel一直没有被关闭,channel保持不关闭会导致资源的泄漏。channel 没有被任何协程用到后最终会被 GC 回收。关闭 channel 一般是用来通知其他协程某个任务已经完成了。

func test(timeout time.Duration) ob {
	ch := make(chan ob, 1)
	go func() {
		result := process()
		ch <- result // block
	}()
	select {
		case result = <-ch:
			return result
		case <-time.After(timeout):
			return nil
	}
}

4.3 切忌重复关闭channel

select {
case <-c.closed:
default:
    close(c.closed)
}

上面这块代码可能会被多个goroutine同时执行,这段代码的逻辑是,case这个分支判断closed这个channel是否被关闭了,如果被关闭的话,就什么都不做;如果closed没有被关闭的话,就执行default分支关闭这个channel,多个goroutine并发执行的时候,有可能会导致closed这个channel被关闭多次。

这个bug的修复方式是:

Once.Do(func() {
    close(c.closed)
})

或者只允许在发送端关闭。

4.4 for range 的bug

使用for range可以自动的从channel中读取,当channel被关闭时,for循环退出,否则一直挂起。由于for range不等到信道关闭,不会结束读取,所以可能阻塞当前的gorouting,导致死锁。被关闭的信道会禁止数据流入,是只读的,我们仍然可以从关闭的信道中取数据,但是不能在写入数据了,被关闭的信道禁止数据写入,是可读的,可以从关闭的信道中读取数据。

func main() {
	ch := make(chan int, 3)
	ch <- 1
	ch <- 2
	ch <- 3
//死锁
	for c := range ch {
	    fmt.Println(c)
	}
}

func main() {
	ch := make(chan int, 3)
	ch <- 1
	ch <- 2
	ch <- 3
//close解除死锁
	close(ch)
	for c := range ch {
		fmt.Println(c)
	}
}

循环读取关闭的channel不会阻塞,会一直读取空值。可以通过读取结果的bool值判断该channel是否关闭。

 //从关闭的
    data, ok := <-nochan
    if !ok {
        fmt.Println("receive close chan")
    }

4.5 只有一个协程(main)时,只往channel读数据或者写数据。比如一直向无数据流入的空信道取数据,就会引起死锁。

package main
import "fmt"
func main() {
	ch1 := make(chan int)
	fmt.Println(<-ch1)
	ch := make(chan int, 3)
	// 在无数据填入ch时,执行fmt.Println(<-ch),会发生死锁
   // fmt.Println(<-ch)
	ch <- 1
	ch <- 2
	ch <- 3
	for v := range ch {
		fmt.Println(v)
		//在读取完ch时,若不关闭ch,继续读取,也同样会发生死锁
		if v==3{
			close(ch)
		}
	}
}

4.6 main协程一般要有永久循环。否则主协程退出,子协程和程序都将退出。通常有专门的退出通道或信号接收等联动程序整体退出。

//通道阻塞
func mainloop() {
	shouldExist := false
	for !shouldExist {
		shouldExist = <-shutdown
	}
}
//信号阻塞
func main(){
  c := make(chan os.Signal)
  signal.Notify(c, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
  select {
	case sig := <-c:
	  fmt.Printf("Got %s signal. Aborting...\n", sig)
      close(stopCh)
  }
}