前言
gochannel
ch := make(chan interface{})
另一种是 buffered channel,其声明方式为
bufferSize := 5ch := make(chan interface{},bufferSize)
对于一个 buffered channel,无论它的 buffer 有多大,它终究是有极限的。这个极限就是该 channel 最初被 make 时,所指定的 bufferSize 。
jojo,buffer channel 的大小是有极限的,我不做 channel 了。
channel
so how can we make a infinite buffer channel?
本文参考了 medinum 上面的一篇文章,有兴趣的同学可以直接阅读原文。
实现
接口的设计
structstructInfiniteChannel
type InfiniteChannel struct {}
channel
func (c *InfiniteChannel) In(val interface{}) { // todo}func (c *InfiniteChannel) Out() interface{} { // todo}
内部实现
In()sliceIn()append()sliceslicechannelinfiniteInfiniteChannel
type InfiniteChannel struct { data []interface{}}
In()Out()gochannelchannelinChanoutChaninChanoutChan
type InfiniteChannel struct { inChan chan interface{} outChan chan interface{} data []interface{}}func (c *InfiniteChannel) In(val interface{}) { c.inChan <- val}func (c *InfiniteChannel) Out() interface{} { return <-c.outChan}
inChanoutChan
selectinChanoutChanselect
func (c *InfiniteChannel) background() { for true { select { case newVal := <-c.inChan: c.data = append(c.data, newVal) case c.outChan <- c.pop(): // pop() 将取出队列的首个元素 } }}func NewInfiniteChannel() *InfiniteChannel { c := &InfiniteChannel{ inChan: make(chan interface{}), outChan: make(chan interface{}), } go c.background() // 注意这里另起了一个协程 return c}
ps:感觉这也算是 go 并发编程的一个套路了。即
- 在 new struct 的时候,顺手 go 一个 select 协程,select 协程内执行一个 for 循环,不停的 select,监听一个或者多个 channel 的事件。
- struct 对外提供的 method,只会操作 struct 内的 channel(在本例中就是 inChan 和 outChan),不会操作 struct 内的其他数据(在本例中,In() 和 Out() 都没有直接操作 data)。
- 触发 channel 的事件后,由 select 协程进行数据的更新(在本例中就是 data )。因为只有 select 协程对除 channel 外的数据成员进行读写操作,且 go 保证了对于 channel 的并发读写是安全的,所以代码是并发安全的。
- 如果 struct 是 exported ,用户或许会越过 new ,直接手动 make 一个 struct,可以考虑将 struct 设置为 unexported,把它的首字母小写即可。
pop()
// 取出队列的首个元素,如果队列为空,将会返回一个 nilfunc (c *InfiniteChannel) pop() interface{} { if len(c.data) == 0 { return nil } val := c.data[0] c.data = c.data[1:] return val}
测试一下
用一个协程每秒钟生产一条数据,另一个协程每半秒消费一条数据,并打印。
func main() { c := NewInfiniteChannel() go func() { for i := 0; i < 20; i++ { c.In(i) time.Sleep(time.Second) } }() for i := 0; i < 50; i++ { val := c.Out() fmt.Print(val) time.Sleep(time.Millisecond * 500) }}
// out012345678910111213141516171819Process finished with the exit code 0
InfiniteChannelOut()nilpop()
InfiniteChannelchannelgochannel
优化
我认为此处是是整篇文章最有技巧的地方,我第一次看到时忍不住拍案叫绝。
background()
func (c *InfiniteChannel) background() { for true { select { case newVal := <-c.inChan: c.data = append(c.data, newVal) case c.outChan <- c.pop(): } }}
outChan
func (c *InfiniteChannel) background() { for true { select { case newVal := <-c.inChan: c.data = append(c.data, newVal) case c.outChanWrapper() <- c.pop(): } }}func (c *InfiniteChannel) outChanWrapper() chan interface{} { return c.outChan}
目前为止,一切照旧。
点睛之笔来了:
func (c *InfiniteChannel) outChanWrapper() chan interface{} { if len(c.data) == 0 { return nil } return c.outChan}
c.datanil
background()case c.outChan <- c.pop():
case nil <- nil:
gonilchannel
func main() { var c chan interface{} select { case c <- 1: }}// fatal error: all goroutines are asleep - deadlock!func main() { var c chan interface{} select { case c <- 1: default: fmt.Println("hello world") }}// hello world
因此,对于
select {case newVal := <-c.inChan: c.data = append(c.data, newVal)case c.outChanWrapper() <- c.pop():}
selectinChan
再测试一下
012345678910111213141516171819fatal error: all goroutines are asleep - deadlock!
panic
补充
channelIn()Out()close()
func main() { c := make(chan interface{}) close(c) for true { v := <-c fmt.Println(v) time.Sleep(time.Second) }}// output// // // // func main() { c := make(chan interface{}) close(c) for true { v, isOpen := <-c fmt.Println(v, isOpen) time.Sleep(time.Second) }}// output// false// false// false// false
我们也需要实现相同的效果。
func (c *InfiniteChannel) Close() { close(c.inChan)}func (c *InfiniteChannel) background() { for true { select { case newVal, isOpen := <-c.inChan: if isOpen { c.data = append(c.data, newVal) } else { c.isOpen = false } case c.outChanWrapper() <- c.pop(): } }}func NewInfiniteChannel() *InfiniteChannel { c := &InfiniteChannel{ inChan: make(chan interface{}), outChan: make(chan interface{}), isOpen: true, } go c.background() return c}func (c *InfiniteChannel) outChanWrapper() chan interface{} { // 这里添加了对 c.isOpen 的判断 if c.isOpen && len(c.data) == 0 { return nil } return c.outChan}
再测试一下
func main() { c := NewInfiniteChannel() go func() { for i := 0; i < 20; i++ { c.In(i) time.Sleep(time.Second) } c.Close() // 这里调用了 Close }() for i := 0; i < 50; i++ { val := c.Out() fmt.Print(val) time.Sleep(time.Millisecond * 500) }}
// output012345678910111213141516171819Process finished with the exit code 0
符合预期
遗憾
channelchannel
v,isOpen := <- ch
isOpenchannel
InfiniteChannelmethod
func (c *InfiniteChannel) OutAndIsOpen() (interface{}, bool) { // todo}
InfiniteChannelOpenInfiniteChannelisOpen
type InfiniteChannel struct { inChan chan interface{} outChan chan interface{} data []interface{} isOpen bool}
isOpenchannelchannelselect
我不能接受!所以干脆不提供这个 method 了,嘿嘿。
完整代码
func main() { c := NewInfiniteChannel() go func() { for i := 0; i < 20; i++ { c.In(i) time.Sleep(time.Second) } c.Close() }() for i := 0; i < 50; i++ { val := c.Out() fmt.Print(val) time.Sleep(time.Millisecond * 500) }}type InfiniteChannel struct { inChan chan interface{} outChan chan interface{} data []interface{} isOpen bool}func (c *InfiniteChannel) In(val interface{}) { c.inChan <- val}func (c *InfiniteChannel) Out() interface{} { return <-c.outChan}func (c *InfiniteChannel) Close() { close(c.inChan)}func (c *InfiniteChannel) background() { for true { select { case newVal, isOpen := <-c.inChan: if isOpen { c.data = append(c.data, newVal) } else { c.isOpen = false } case c.outChanWrapper() <- c.pop(): } }}func NewInfiniteChannel() *InfiniteChannel { c := &InfiniteChannel{ inChan: make(chan interface{}), outChan: make(chan interface{}), isOpen: true, } go c.background() return c}// 取出队列的首个元素,如果队列为空,将会返回一个 nilfunc (c *InfiniteChannel) pop() interface{} { if len(c.data) == 0 { return nil } val := c.data[0] c.data = c.data[1:] return val}func (c *InfiniteChannel) outChanWrapper() chan interface{} { if c.isOpen && len(c.data) == 0 { return nil } return c.outChan}
参考
https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd
原文链接:https://www.cnblogs.com/XiaoXiaoShuai-/p/14878525.html