前言:

       很无聊,闲来无事看golang一些开源代码,发现有些人作为生产者把消息扔到channel就直接顺手给close掉了,也不等消费者消费完。有些惊奇,这不会丢数据么?按照字面上的意思,我既然close了,呢么消费者应该被唤醒退出。 但经过我的测试发了大量 ch <- data 之后,立马close的话,消费者还是照样会消费数据,不会因为close channel丢消息。

     那么我们来分析下 channel 为什么不会丢数据 ?  涉及到原理性的话题,不要猜,只能看golang源码了。 golang channel源码里面的hchan结构,close方法及chanrecv方法就可以完美的解释刚才的问题。  channel源码地址, https://golang.org/src/runtime/chan.go,大家可以对照下源码。

Hchan

Hchan是channel的主要数据结构,我们关心的 qcount用来表示消息的个数,closed int32标识用来表示chan的开关,1为关闭.

# xiaorui.cc

type hchan struct {
  qcount   uint           // total data in the queue
  ...
  closed   uint32
  ...

Closechan

Closechan是chan.go里的关闭channel的方法,该方法除了将 c.closed 设置为 1。还需要唤醒 recvq和sendq 队列里面的阻塞 goroutine. 如果你不唤醒的化,因为没有生产者send数据,也就无法通过事件来唤醒goroutine,所以这时候需要把他们都唤醒起来,判断是否退出的逻辑。

func closechan(c *hchan) {
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    c.closed = 1
    ....

Chanrecv

消费者接收事件相关的逻辑,不是简单的判断closed标志位,而且会判断channel消息个数…  就因为不好好看源码,导致我想让消费者退出的时候,用另外一个exit channel来通信,这样每个消费者用select监听两个channel,一个业务数据,一个通知退出…  

玩了几年channel,居然不知道这特性,也是醉了。 这里我给自己解释下,我开发的后端服务基本是那种channel作为队列常住的那样,不会像有些服务动不动就new一个channel,所以…. 

# xiaorui.cc

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...

    lock(&c.lock)


    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(unsafe.Pointer(c))
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
}

...

通过上面的channel源码,我们知道了channel是安全退出的。下面是我测试的脚本,我兴趣的可以跑跑。

// xiaorui.cc 

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    ch := make(chan int, 2000)

    var wg sync.WaitGroup

    go func() {
        for i := 0; i < 200; i++ {
            ch <- i
        }
        close(ch)
    }()

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go work(&wg, ch)
    }
    wg.Wait()
    fmt.Println("exit")
}

func work(wg *sync.WaitGroup, ch chan int) {
    defer wg.Done()
    for {
        time.Sleep(100 * time.Millisecond)
        select {
        case data, ok := <- ch:
            fmt.Println(data, ok)
        default:
            fmt.Println("not data")
        }

        item, ok := <-ch
        if !ok {
            break
        }
    }
}

总结:

     多看源码,多读书 .  golang社区里的源码质量大多质量比较高,建议大家多看点源码。 先前看了 golang nsq, boltdb, influxdb的一些源码,感触很多.  当你不知道该学点什么的时候,可以随意在github中闲逛下,总有点项目会让你眼前一亮。