or-done channel
donedoneor-doneor-done

给出代码示例:

package main
func main() {
    orDone := func(done, c <-chan interface{}) <-chan interface{} {
        valStream := make(chan interface{})
        go func() {
            defer close(valStream)
            for {
                select {
                case <-done:
                    return
                case v, ok := <-c:
                    if ok == false {  // 外界关闭数据流
                        return
                    }
                    select {  // 防止写入阻塞
                    case valStream <- v:
                    case <-done:
                    }
                }
            }
        }()
        return valStream
    }
}
tee-channel
teechannelchannel
package main
import "fmt"
func main() {
    tee := func(done <-chan interface{}, in <-chan interface{}) (<-chan interface{}, <-chan interface{}) {
        out1 := make(chan interface{})
        out2 := make(chan interface{})
        go func() {
            defer close(out1)
            defer close(out2)
            for val := range in {
                var out1, out2 = out1, out2  // 私有变量覆盖
                for i := 0; i < 2; i++ {
                    select {
                    case <-done:
                        return
                    case out1 <- val:
                        out1 = nil  // 置空阻塞机制完成select轮询
                    case out2 <- val:
                        out2 = nil
                    }
                }
            }
        }()
        return out1, out2
    }
    genetor := func(done <-chan interface{}, n int) <-chan interface{} {
        genCh := make(chan interface{})
        go func() {
            defer close(genCh)
            for i := 0; i < n; i++ {
                select {
                case <-done:
                    return
                case genCh <- i:
                }
            }
        }()
        return genCh
    }
    done := make(chan interface{})
    defer close(done)
    out1, out2 := tee(done, genetor(done, 10))
    for val1 := range out1 {
        val2 := <-out2
        fmt.Printf("%v, %v\n", val1, val2)
    }
}
桥接channel

扇出扇出模式中,输入不需要有序。而桥接channel正好与这个相反,该模式适用于输入有序的情况,及多个数据流的输入需要有序的获取。我们必须先处理完一个channel中的数据,然后才能继续向下处理,此时可以把这个多个输入的channel,作为集合单独放到channel中。

给出代码示例:

package main

func main() {
    bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
        valStream := make(chan interface{})
        go func() {
            defer close(valStream)
            for {
                var stream <-chan interface{}
                select {
                case maybeStream, ok := <-chanStream:
                    if ok == false {  // 只要channel是来自外界的,那么使用时就先判断是否关闭
                        return
                    }
                    stream = maybeStream
                case <-done:
                    return
                }
                // orDone机制同样的,防止外界关闭造成channel无法释放
                for val := range orDone(done, stream) {
                    select {
                    case valStream <- val:
                    case <-done:
                    }
                }
            }
        }()
        return valStream
    }
}