背景

最近使用go开发后端服务,服务关闭需要保证channel中的数据都被读取完,理由很简单,在收到系统的中断信号后,系统需要做收尾工作,保证channel的数据都要被处理掉,然后才可以关闭系统。

后面我会给出方案,见示例代码,但在解决这个问题之前我们先了解下close channel的一些特性。

channel
  • 关闭channel
ch := make(chan bool) 
close(ch) 
close(ch)  // 这样会panic的,channel不能close两次 
  • 向已经关闭的channel写数据
ch := make(chan string) 
close(ch) 
ch <- "good" // 会panic的
  • 从已经关闭的channel读取数据
    需要分两种情况:
    • 无缓冲channel或者缓冲channel已经读取完毕
    • 缓冲channel未读取完毕,可以继续读取channel中的剩余的数据
//无缓冲channel
ch := make(chan string) 
close(ch) 
i := <- ch // 不会panic, i读取到的值是空 "",  如果channel是bool的,那么读取到的是false 
  • 判断channel是否关闭
i, ok := <- ch 
if ok { 
    println(i) 
} else { 
    println("channel closed") 
} 
方案

我直接上示例代码

package main

import (
    "fmt"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"  
)

func main() {
    var wg sync.WaitGroup

    ch := make(chan int, 100)
    chSend := make(chan int)
    chConsume := make(chan int)

    sc := make(chan os.Signal, 1)
    signal.Notify(sc,
        os.Kill,
        os.Interrupt,
        syscall.SIGHUP,
        syscall.SIGINT,
        syscall.SIGTERM,
        syscall.SIGQUIT)

    go func(ch, quit chan int) {
        defer func() {
            if err := recover(); err != nil {
                fmt.Println("send to ch panic.===", err)
            }
        }()

        i := 0
        for {
            select {
            case ch <- i:
                fmt.Println("send", i)
                time.Sleep(time.Second)
                i++
            case <-quit:
                fmt.Println("send quit.")
                return
            }

        }
    }(ch, chSend)

    go func(ch, quit chan int) {
        wg.Add(1)
        for {
            select {
            case i, ok := <-ch:
                if ok {
                    fmt.Println("read1", i)
                    time.Sleep(time.Second * 2)
                } else {
                    fmt.Println("close ch1.")
                }

            case <-quit:
                for {
                    select {
                    case i, ok := <-ch:
                        if ok {
                            fmt.Println("read2", i)
                            time.Sleep(time.Second * 2)
                        } else {
                            fmt.Println("close ch2.")
                            goto L
                        }
                    }
                }
            L:
                fmt.Println("consume quit.")
                wg.Done()
                return

            }
        }
    }(ch, chConsume)

    <-sc

    close(ch)
    fmt.Println("close ch ")
    close(chSend)
    close(chConsume)
    wg.Wait()
}

输出结果:
send 0
read1 0
send 1
send 2
read1 1
send 3
send 4
read1 2
send 5
close ch
send quit.
read1 3
read2 4
read2 5
close ch2.
consume quit.

说明
收到中断信号后,会关闭带缓冲的channel ch、无缓冲的chSend、chConsume.从打印的日志可以看出 close ch之后,send的goroutine就结束了(可能打印send quit,也可能打印send to ch panic,可以多执行几次就会发现这种情况,原因就是select case有多个case满足条件会随机执行一个case),此时还可以从ch继续读取channel中的数据(打印了read1 3 read2 4 read2 5),后面就打印了close ch2 说明ok是false,此时才知道ch已经关闭。通过这个特性可以很优雅的关闭服务。

如果服务被强行kill掉或者机器异常等情况,channel中的未读取数据还是会丢失,系统设计需要允许这种情况的发生

后话

说实话,上面示例的做法虽然能达到安全关闭服务的效果,但个人觉得实现不够优雅,具体也说不出为什么。

如果各位有更好的实现方式,请给我留言,谢谢。