前言

gochannel
ch := make(chan interface{})

另一种是 buffered channel,其声明方式为

bufferSize := 5ch := make(chan interface{},bufferSize)

对于一个 buffered channel,无论它的 buffer 有多大,它终究是有极限的。这个极限就是该 channel 最初被 make 时,所指定的 bufferSize 。

jojo,buffer channel 的大小是有极限的,我不做 channel 了。

channel

so how can we make a infinite buffer channel?

本文参考了 medinum 上面的一篇文章,有兴趣的同学可以直接阅读原文。

实现

接口的设计

structstructInfiniteChannel
type InfiniteChannel struct {}
channel
func (c *InfiniteChannel) In(val interface{}) {    // todo}func (c *InfiniteChannel) Out() interface{} {    // todo}

内部实现

In()sliceIn()append()sliceslicechannelinfiniteInfiniteChannel
type InfiniteChannel struct {    data    []interface{}}
In()Out()gochannelchannelinChanoutChaninChanoutChan
type InfiniteChannel struct {    inChan  chan interface{}    outChan chan interface{}    data    []interface{}}func (c *InfiniteChannel) In(val interface{}) {    c.inChan <- val}func (c *InfiniteChannel) Out() interface{} {    return <-c.outChan}
inChanoutChan
selectinChanoutChanselect
func (c *InfiniteChannel) background() {    for true {        select {        case newVal := <-c.inChan:            c.data = append(c.data, newVal)        case c.outChan <- c.pop():       // pop() 将取出队列的首个元素        }    }}func NewInfiniteChannel() *InfiniteChannel {    c := &InfiniteChannel{        inChan:  make(chan interface{}),        outChan: make(chan interface{}),    }    go c.background()   // 注意这里另起了一个协程    return c}

ps:感觉这也算是 go 并发编程的一个套路了。即

  1. 在 new struct 的时候,顺手 go 一个 select 协程,select 协程内执行一个 for 循环,不停的 select,监听一个或者多个 channel 的事件。
  2. struct 对外提供的 method,只会操作 struct 内的 channel(在本例中就是 inChan 和 outChan),不会操作 struct 内的其他数据(在本例中,In() 和 Out() 都没有直接操作 data)。
  3. 触发 channel 的事件后,由 select 协程进行数据的更新(在本例中就是 data )。因为只有 select 协程对除 channel 外的数据成员进行读写操作,且 go 保证了对于 channel 的并发读写是安全的,所以代码是并发安全的。
  4. 如果 struct 是 exported ,用户或许会越过 new ,直接手动 make 一个 struct,可以考虑将 struct 设置为 unexported,把它的首字母小写即可。
pop()
// 取出队列的首个元素,如果队列为空,将会返回一个 nilfunc (c *InfiniteChannel) pop() interface{} {    if len(c.data) == 0 {        return nil    }    val := c.data[0]    c.data = c.data[1:]    return val}

测试一下

用一个协程每秒钟生产一条数据,另一个协程每半秒消费一条数据,并打印。

func main() {    c := NewInfiniteChannel()    go func() {        for i := 0; i < 20; i++ {            c.In(i)            time.Sleep(time.Second)        }    }()    for i := 0; i < 50; i++ {        val := c.Out()        fmt.Print(val)        time.Sleep(time.Millisecond * 500)    }}
// out012345678910111213141516171819Process finished with the exit code 0
InfiniteChannelOut()nilpop()
InfiniteChannelchannelgochannel

优化

我认为此处是是整篇文章最有技巧的地方,我第一次看到时忍不住拍案叫绝。

background()
func (c *InfiniteChannel) background() {    for true {        select {        case newVal := <-c.inChan:            c.data = append(c.data, newVal)        case c.outChan <- c.pop():        }    }}
outChan
func (c *InfiniteChannel) background() {    for true {        select {        case newVal := <-c.inChan:            c.data = append(c.data, newVal)        case c.outChanWrapper() <- c.pop():        }    }}func (c *InfiniteChannel) outChanWrapper() chan interface{} {    return c.outChan}

目前为止,一切照旧。

点睛之笔来了:

func (c *InfiniteChannel) outChanWrapper() chan interface{} {    if len(c.data) == 0 {        return nil    }    return c.outChan}
c.datanil
background()case c.outChan <- c.pop():
case nil <- nil:
gonilchannel
func main() {    var c chan interface{}    select {    case c <- 1:    }}// fatal error: all goroutines are asleep - deadlock!func main() {    var c chan interface{}    select {    case c <- 1:    default:        fmt.Println("hello world")    }}// hello world

因此,对于

select {case newVal := <-c.inChan:    c.data = append(c.data, newVal)case c.outChanWrapper() <- c.pop():}
selectinChan

再测试一下

012345678910111213141516171819fatal error: all goroutines are asleep - deadlock!
panic

补充

channelIn()Out()close()
func main() {    c := make(chan interface{})    close(c)    for true {        v := <-c        fmt.Println(v)        time.Sleep(time.Second)    }}// output// // // // func main() {    c := make(chan interface{})    close(c)    for true {        v, isOpen := <-c        fmt.Println(v, isOpen)        time.Sleep(time.Second)    }}// output//  false//  false//  false//  false

我们也需要实现相同的效果。

func (c *InfiniteChannel) Close() {    close(c.inChan)}func (c *InfiniteChannel) background() {    for true {        select {        case newVal, isOpen := <-c.inChan:            if isOpen {                c.data = append(c.data, newVal)            } else {                c.isOpen = false            }        case c.outChanWrapper() <- c.pop():        }    }}func NewInfiniteChannel() *InfiniteChannel {    c := &InfiniteChannel{        inChan:  make(chan interface{}),        outChan: make(chan interface{}),        isOpen:  true,    }    go c.background()    return c}func (c *InfiniteChannel) outChanWrapper() chan interface{} {    // 这里添加了对 c.isOpen 的判断    if c.isOpen && len(c.data) == 0 {        return nil    }    return c.outChan}

再测试一下

func main() {    c := NewInfiniteChannel()    go func() {        for i := 0; i < 20; i++ {            c.In(i)            time.Sleep(time.Second)        }        c.Close()       // 这里调用了 Close    }()    for i := 0; i < 50; i++ {        val := c.Out()        fmt.Print(val)        time.Sleep(time.Millisecond * 500)    }}
// output012345678910111213141516171819Process finished with the exit code 0

符合预期

遗憾

channelchannel
v,isOpen := <- ch
isOpenchannel
InfiniteChannelmethod
func (c *InfiniteChannel) OutAndIsOpen() (interface{}, bool) {    // todo}
InfiniteChannelOpenInfiniteChannelisOpen
type InfiniteChannel struct {    inChan  chan interface{}    outChan chan interface{}    data    []interface{}    isOpen  bool}
isOpenchannelchannelselect

我不能接受!所以干脆不提供这个 method 了,嘿嘿。

完整代码

func main() {    c := NewInfiniteChannel()    go func() {        for i := 0; i < 20; i++ {            c.In(i)            time.Sleep(time.Second)        }        c.Close()    }()    for i := 0; i < 50; i++ {        val := c.Out()        fmt.Print(val)        time.Sleep(time.Millisecond * 500)    }}type InfiniteChannel struct {    inChan  chan interface{}    outChan chan interface{}    data    []interface{}    isOpen  bool}func (c *InfiniteChannel) In(val interface{}) {    c.inChan <- val}func (c *InfiniteChannel) Out() interface{} {    return <-c.outChan}func (c *InfiniteChannel) Close() {    close(c.inChan)}func (c *InfiniteChannel) background() {    for true {        select {        case newVal, isOpen := <-c.inChan:            if isOpen {                c.data = append(c.data, newVal)            } else {                c.isOpen = false            }        case c.outChanWrapper() <- c.pop():        }    }}func NewInfiniteChannel() *InfiniteChannel {    c := &InfiniteChannel{        inChan:  make(chan interface{}),        outChan: make(chan interface{}),        isOpen:  true,    }    go c.background()    return c}// 取出队列的首个元素,如果队列为空,将会返回一个 nilfunc (c *InfiniteChannel) pop() interface{} {    if len(c.data) == 0 {        return nil    }    val := c.data[0]    c.data = c.data[1:]    return val}func (c *InfiniteChannel) outChanWrapper() chan interface{} {    if c.isOpen && len(c.data) == 0 {        return nil    }    return c.outChan}

参考

https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd

原文链接:https://www.cnblogs.com/XiaoXiaoShuai-/p/14878525.html