优雅的等待goroutine退出

通过Channel传递退出信号

Go的一大设计哲学就是:通过Channel共享数据,而不是通过共享内存共享数据。主流程可以通过channel向任何goroutine发送停止信号,就像下面这样:

这种方式可以实现优雅地停止goroutine,但是当goroutine特别多的时候,这种方式不管在代码美观上还是管理上都显得笨拙不堪。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"fmt"
"time"
)

func run(done chan int) {
for {
select {
case <-done:
fmt.Println("exiting...")
done <- 1
break
default:
}

time.Sleep(time.Second * 1)
fmt.Println("do something")
}
}

func main() {
c := make(chan int)

go run(c)

fmt.Println("wait")
time.Sleep(time.Second * 5)

c <- 1
<-c

fmt.Println("main exited")
}

使用Waitgroup

通常情况下,我们像下面这样使用waitgroup:

  1. 创建一个Waitgroup的实例,假设此处我们叫它wg
  2. 在每个goroutine启动的时候,调用wg.Add(1),这个操作可以在goroutine启动之前调用,也可以在goroutine里面调用。当然,也可以在创建n个goroutine前调用wg.Add(n)
  3. 当每个goroutine完成任务后,调用wg.Done()
  4. 在等待所有goroutine的地方调用wg.Wait(),它在所有执行了wg.Add(1)的goroutine都调用完wg.Done()前阻塞,当所有goroutine都调用完wg.Done()之后它会返回。

那么,如果我们的goroutine是一匹不知疲倦的牛,一直孜孜不倦地工作的话,如何在主流程中告知并等待它退出呢?像下面这样做:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package main

import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"
)

type Service struct {
// Other things

ch chan bool
waitGroup *sync.WaitGroup
}

func NewService() *Service {
s := &Service{
// Init Other things
ch: make(chan bool),
waitGroup: &sync.WaitGroup{},
}

return s
}

func (s *Service) Stop() {
close(s.ch)
s.waitGroup.Wait()
}

func (s *Service) Serve() {
s.waitGroup.Add(1)
defer s.waitGroup.Done()

for {
select {
case <-s.ch:
fmt.Println("stopping...")
return
default:
}
s.waitGroup.Add(1)
go s.anotherServer()
}
}
func (s *Service) anotherServer() {
defer s.waitGroup.Done()
for {
select {
case <-s.ch:
fmt.Println("stopping...")
return
default:
}

// Do something
}
}

func main() {

service := NewService()
go service.Serve()

// Handle SIGINT and SIGTERM.
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
fmt.Println(<-ch)

// Stop the service gracefully.
service.Stop()
}

优雅的通知 goroutine 退出

有时候我们需要通知goroutine停止它正在干的事情,比如一个正在执行计算的web服务,然而它的客户端已经断开了和服务端的连接。

Go语言并没有提供在一个goroutine中终止另一个goroutine的方法,由于这样会导致goroutine之间的共享变量落在未定义的状态上。

在rocket launch程序中,我们往名字叫abort的channel里发送了一个简单的值,在countdown的goroutine中会把这个值理解为自己的退出信号。但是如果我们想要退出两个或者任意多个goroutine怎么办呢?

一种可能的手段是向abort的channel里发送和goroutine数目一样多的事件来退出它们。如果这些goroutine中已经有一些自己退出了,那么会导致我们的channel里的事件数比goroutine还多,这样导致我们的发送直接被阻塞。另一方面,如果这些goroutine又生成了其它的goroutine,我们的channel里的数目又太少了,所以有些goroutine可能会无法接收到退出消息。一般情况下我们是很难知道在某一个时刻具体有多少个goroutine在运行着的。

另外,当一个goroutine从abort channel中接收到一个值的时候,他会消费掉这个值,这样其它的goroutine就没法看到这条信息。为了能够达到我们退出goroutine的目的,我们需要更靠谱的策略,来通过一个channel把消息广播出去,这样goroutine们能够看到这条事件消息,并且在事件完成之后,可以知道这件事已经发生过了。

回忆一下我们关闭了一个channel并且被消费掉了所有已发送的值,操作channel之后的代码可以立即被执行,并且会产生零值。我们可以将这个机制扩展一下,来作为我们的广播机制:不要向channel发送值,而是用关闭一个channel来进行广播。

优雅的控制 goroutine 退出

Goroutine
SocketReadchannelGoroutine
Goroutine
Goroutinechannelchannelchannelchannel
GoroutineGoroutine
closechannelclosechannel
1
2
3
type routineSignal struct {
done <-chan struct{}
}
routineSignal
1
2
3
func (r *reader)init(s *routineSignal) {
r.signal = s
}

在reader的循环中,就可以这么写:

1
2
3
4
5
6
7
8
9
10
func (r *reader)loop() {
for {
select {
case <-r.signal.done:
return
case <-r.queue:
....
}
}
}
Goroutinechannel
1
close(signal.done)
closeGoroutineGoroutineGoroutinesync.WaitGrouproutineSignal
1
2
3
4
type routineSignal struct {
done chan struct{}
wg sync.WaitGroup
}
GoroutineGoroutine
1
2
3
4
5
6
7
8
9
10
11
12
func (r *reader)loop() {
r.signal.wg.Add(1)
defer r.signal.wg.Done()
for {
select {
case <-r.signal.done:
return
case <-r.queue:
....
}
}
}
WaitGroup
1
2
close(signal.done)
signal.wg.Wait()
Wait()Goroutine
routineSignalchanGoroutineWaitGroupGoroutineGoroutineGoroutine