早在2014年,官方就撰文介绍了golang pipeline,借助pipeline可以完成程序良好的解耦和扩展设计,结合golang 的特性可以设计出高效利用 IO 和多核 CPU 的pipeline,尤其是数据处理/任务分发等场景非常适用。

本文结合实际生产实践,讲解不同场景下pipeline的最佳实践,经过良好的封装,pipeline完全可以达到流处理框架(Flink)/Java Streams/Workflow编排框架的效果,甚至更灵活,掌握这个大杀器,向golang高手出发!

要是本文对您有帮助的话,欢迎【关注】作者,【点赞】+【收藏】,保持交流!

基础概念

pipeline(管道),顾名思义,一个大任务被拆分为多个小任务,像流水线一样,一级处理后传给下一级,类比设计模式中的责任链。

基本组成

参考如下,一个成熟的pipeline构成如下

  • 1.输入源(Source)

可以是批数据(文件等)也可以是流数据(kakfa消费等) 也称为生产者

  • 2.处理阶段(Stage)

多个组合,每个stage可能存在多个并发算子(Processor)

  • 3.输出(Sink)

可指定不同的输出位置(命令行/日志/kakfa等) 也称为消费者

  • 4.错误处理


image.png


关键技术点

如下,这里大部分技术,适用于Go的很多场景,感兴趣的可以查看参考文章,之前文章有深入探讨。可以看到pipeline相当于于Go中很多关键技术点的综合使用,算是集大成者。

  • 不同的任务类型对应不同的pipeline编排模式

这里分为如下两种典型需求

  1. 同步pipeline
  2. 异步pipeline
  • 如何控制任务的并发度

同步和异步pipeline处理不一样 核心是最优的利用Go的并发能力

  • 任何处理任务的停止

不同的任务要求不一样,典型两种场景

  1. 一旦通知停止/出现错误时,pipeline立即停止,上游已发的数据不处理
  2. 一旦通知停止时,上游已发送的数据消费处理完成后,pipeline才停止
  • 如何处理任务的错误
  1. 简单处理,出现错误打日志立即退出
  2. 更灵活的,日志统一收集到一个channel(类似图中error bus),灵活控制处理策略(比如错误分等级处理/统一入库/出现多少错误通知退出等等)

同步pipeline

场景

以数据处理的典型WordCount程序为例,如下图

image.png

对输入的文章,如下处理

  1. 按行分割后按照空格分割
  2. 统计每个单词出现的次数
  3. 按照次数排序
  4. 输出出现次数最多的前三单词

一个典型函数实现如下

func TestSimpleWordCount(t *testing.T) {
    r := bufio.NewReader(strings.NewReader(LINES))

    wordStat := make(map[string]int)

    for {
        // 按行分割
        l, _, err := r.ReadLine()
        if err != nil {
            if err != io.EOF {
                t.Log(err.Error())
            }

            break
        }

        // 拆分单词并统计词频
        words := strings.Split(string(l), " ")
        for _, word := range words {
            if v, ok := wordStat[word]; ok {
                wordStat[word] = v + 1
            } else {
                wordStat[word] = 1
            }
        }
    }

    // 倒排
    var wordStatSlice []struct {
        cnt  int
        word string
    }

    for k, v := range wordStat {
        wordStatSlice = append(wordStatSlice, struct {
            cnt  int
            word string
        }{cnt: v, word: k})
    }

    sort.Slice(wordStatSlice, func(i, j int) bool {
        return wordStatSlice[i].cnt > wordStatSlice[j].cnt
    })

    // 输出TOP 3
    for i := 0; i < 3; i++ {
        t.Logf("单词:%s - 出现次数:%d", wordStatSlice[i].word, wordStatSlice[i].cnt)
    }
}

可以看到,对输入的每一个文章(LINES),都需要进行四个阶段处理,同步pipeline适用的场景就是多个stage阶段必须是在同一个协程中有序进行的,一般处理顺序是FIFO(先进先出),前一个算子处理结果直接传递给后续算子;如果是并发处理,那么同一输入的每个stage阶段只能由同一协程处理。

生产实践中,我们利用pipeline拆分各个阶段,使用统一的编排和错误处理来完成封装,一般常用

  • 高阶函数封装,适用于简单场景
  • 面向对象封装,利用OO思想,更加模块化

高阶函数封装

  • 定义输入

返回的是一个输入数据channel,如果中间出现错误,返回error

type PipeSourceFunc func(ctx context.Context) (chan any, error)
  • 定义处理函数和输出函数

接受上游数据处理后输出数据供下游函数调用,如果中间出现错误,返回error

type PipeProcessFunc func(ctx context.Context, params any) (any, error)

注意这里输入输出使用any来完成一定的泛化,下游接受上游数据时,最好做下类型断言,比如按行分割实现如下

// 按行分割
func splitByLine(ctx context.Context, params any) (any, error) {
    if v, ok := params.(string); !ok {
        return nil, errors.New("Split Input type error ")
    } else {
        return strings.Split(v, "\n"), nil
    }
}
  • 定义流程编排

单个并发度,示意图如下 这里的错误处理,返回后集中处理,所有的Stage处理算子,依次调用传递

image.png


func PipeProcessBuildAndRun(ctx context.Context, input PipeSourceFunc, funcs ...PipeProcessFunc) {
    var err error = nil

    // 输入
    dataChan, err := input(ctx)
    if err != nil {
        log.Printf("source error-%v\n", err.Error())
        return
    }

    // pipeline构建和执行
    for data := range dataChan {
        // 依次执行函数,上一个函数返回结果当做下个函数参数
        for _, processFunc := range funcs {
            data, err = processFunc(ctx, data)

            // 错误集中处理,这里选择提前退出
            if err != nil {
                log.Printf("process error-%v\n", err.Error())
                return
            }
        }
    }
}

多并发度版本,示意图如下 这里并发度通过指定并发度,开启指定个数协程,每个协程中包含所有的Stage处理算子,依次调用传递

image.png


func PipeProcessBuildAndRunN(ctx context.Context, input PipeSourceFunc, maxCnt int, funcs ...PipeProcessFunc) {

    var err error = nil

    // 输入
    dataChan, err := input(ctx)
    if err != nil {
        log.Printf("source error-%v\n", err.Error())
        return
    }

    var wg = sync.WaitGroup{}
    wg.Add(maxCnt)

    // pipeline构建和执行
    // maxCnt个协程同时消费处理
    for i := 0; i < maxCnt; i++ {
        go func() {
            defer wg.Done()

            var err error = nil

            for data := range dataChan {
                // 依次执行函数,上一个函数返回结果当做下个函数参数
                for _, processFunc := range funcs {
                    data, err = processFunc(ctx, data)

                    // 错误集中处理,这里选择提前退出
                    if err != nil {
                        log.Printf("process error-%v\n", err.Error())
                        return
                    }
                }
            }

        }()
    }

    wg.Wait()
}
  • 调用演示

单个并发度

func TestSimpleWordCount2(t *testing.T) {
    ctx := context.Background()
    PipeProcessBuildAndRun(ctx, dataSource, splitByLine, countByWord, sortByCount, outTop3)
}

多个并发度 这里通过conetxt显式通知退出,立即停止

func TestSimpleWordCount3(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())

    // 定时通知退出
    go func() {
        time.Sleep(5 * time.Second)
        cancel()
    }()

    PipeProcessBuildAndRunN(ctx, dataTimerSource, 3, splitByLine, countByWord, sortByCount, outTop3)
}

可以看到对使用者非常友好,各个阶段实现拆分到函数中,可以扩展更多的处理函数和输入输出函数,这里流程编排仅仅演示了最常用的方式,生产实践中可以灵活定制更复杂的处理流程

面向对象封装

和函数封装类似,使用接口来标准化,更容易模块化(推荐输入输出和处理算子单独目录/文件中实现) 整个程序完全解耦,输入输出可以实现多种,各个算子实现都可以替换

  • 输入和输出接口
type ISource interface {
    Process(ctx context.Context) (<-chan any, error)
}

type ISink interface {
    Process(ctx context.Context, params any) error
}
  • 处理接口
type IProcessor interface {
    Process(ctx context.Context, params any) (any, error)
}
  • 对象定义和方法
type ProcessorManager struct {
    source ISource
    sink   ISink
    ps     []IProcessor
}

func NewProcessorManager() *ProcessorManager {
    return &ProcessorManager{}
}

func (m *ProcessorManager) AddProcessor(processor IProcessor) {
    m.ps = append(m.ps, processor)
}

func (m *ProcessorManager) AddSource(source ISource) {
    m.source = source
}

func (m *ProcessorManager) AddSink(sink ISink) {
    m.sink = sink
}
  • 流程编排

单个并发度

func (m *ProcessorManager) Run(ctx context.Context) error {
    var err error

    in, err := m.source.Process(ctx)
    if err != nil {
        return err
    }

    // pipeline构建和执行
    for data := range in {
        for _, v := range m.ps {
            data, err = v.Process(ctx, data)

            // 错误集中处理,这里选择提前退出
            if err != nil {
                log.Printf("Process err %s\n", err)
                return nil
            }
        }

        err := m.sink.Process(ctx, data)
        if err != nil {
            log.Printf("Sink err %s\n", err)
            return nil
        }
    }

    return nil
}

多个并发度,示意图如下

image.png


func (m *ProcessorManager) RunN(ctx context.Context, maxCnt int) error {
    var err error

    in, err := m.source.Process(ctx)
    if err != nil {
        return err
    }

    // pipeline构建和执行
    syncProcess := func(data any) {
        for _, v := range m.ps {
            data, err = v.Process(ctx, data)

            // 错误集中处理,这里选择提前退出
            if err != nil {
                log.Printf("Process err %s\n", err)
                return
            }
        }

        err := m.sink.Process(ctx, data)
        if err != nil {
            log.Printf("Sink err %s\n", err)
            return
        }
    }

    wg := sync.WaitGroup{}
    wg.Add(maxCnt)

    // 多个协程消费同一个channel
    for i := 0; i < maxCnt; i++ {
        go func() {
            defer wg.Done()

            for data := range in {
                syncProcess(data)
            }
        }()
    }

    wg.Wait()

    return nil
}
  • 调用演示

单个并发度

func TestSimpleWordCount4(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())
    m := NewProcessorManager()

    // pipeline组装
    m.AddSource(NewTimerSource(LINES))
    m.AddSink(NewConsoleSink())

    m.AddProcessor(&SplitProcessor{})
    m.AddProcessor(&CountProcessor{})
    m.AddProcessor(&SortProcessor{})

    // 定时通知退出
    go func() {
        time.Sleep(3 * time.Second)
        cancel()
    }()

    err := m.Run(ctx)
    if err != nil {
        t.Logf("Run error %v", err.Error())
        return
    }
}

多个并发度

func TestSimpleWordCount5(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())
    m := NewProcessorManager()

    // pipeline组装
    m.AddSource(NewTimerSource(LINES))
    m.AddSink(NewConsoleSink())

    m.AddProcessor(&SplitProcessor{})
    m.AddProcessor(&CountProcessor{})
    m.AddProcessor(&SortProcessor{})

    // 定时通知退出
    go func() {
        time.Sleep(3 * time.Second)
        cancel()
    }()

    err := m.RunN(ctx, 2)
    if err != nil {
        t.Logf("Run error %v", err.Error())
        return
    }
}

异步pipeline

场景

参考官方文档,一个典型的计算任务需求如下图,输入一串数字,先分别计算平方然后计算累加值

image.png

不同于同步pipeline,异步pipeline输入输出和各个Stage算子之间数据传递都是通过channel,每个stage都可以独立控制自己的并发度。 一个简单实现如下

// 生成器 输入数据依次放入输出通道
func producer(nums ...int) <-chan int {
    outChannel := make(chan int)

    go func() {
        defer close(outChannel)
        for _, n := range nums {
            outChannel <- n
        }
    }()

    return outChannel
}

// 计算平方
func sq(in <-chan int) <-chan int {
    outChannel := make(chan int)

    go func() {
        defer close(outChannel)
        for n := range in {
            outChannel <- n * n
        }
    }()

    return outChannel
}

// 累加
func sum(in <-chan int) <-chan int {
    outChannel := make(chan int)

    go func() {
        defer close(outChannel)

        var sum = 0
        for n := range in {
            sum += n
        }

        outChannel <- sum
    }()

    return outChannel
}

// 输出到命令行
func sink(in <-chan int) <-chan int {
    for val := range in {
        fmt.Printf("sink value: %v\n", val)
    }

    return nil
}

func TestSimpleAsync(t *testing.T) {
    sink(sum(sq(producer(1, 2, 3, 4, 5))))
}

高阶函数封装

异步pipieline封装很简单,就是把各个channel串联起来,重点在处理停止和并行度控制上。如下,

  • 定义输入
type SourceFunc func(context.Context, ...int) (<-chan int, error)

这里简答实现定时输出

// 生成器 输入数据依次放入输出通道
func producer(ctx context.Context, nums ...int) (<-chan int, error) {
    outChannel := make(chan int)

    go func() {
        defer close(outChannel)

        for _, s := range nums {
            // 定时间隔输出
            time.Sleep(time.Second * 1)

            select {
            case outChannel <- s:
            case <-ctx.Done():
                log.Printf("producer Receive exit msg %v\n", ctx.Err())
                return
            }
        }
    }()

    return outChannel, nil
}
  • 定义处理函数和输出函数
type PipeFunc func(context.Context, <-chan int) <-chan int

这里使用context通知立即停止,比如sq

// 计算平方
func sq(ctx context.Context, in <-chan int) <-chan int {
    outChannel := make(chan int)

    go func() {
        defer close(outChannel)

        for s := range in {
            select {
            case outChannel <- s * s:
            case <-ctx.Done():
                log.Printf("sq Receive exit msg %v\n", ctx.Err())
                return
            }
        }
    }()

    return outChannel
}
  • 流程编排

这里错误处理依赖各个算子,一旦出错立即打日志退出

func PipeProcessBuildAndRun(ctx context.Context, nums []int, sourceFunc SourceFunc, pipeFncs ...PipeFunc) (<-chan int, error) {
    ch, err := sourceFunc(ctx, nums...)
    if err != nil {
        return ch, err
    }

    for i := range pipeFncs {
        ch = pipeFncs[i](ctx, ch)
    }

    return ch, nil
}
  • 调用演示
func TestSimpleAsync(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 定时取消
    go func() {
        time.Sleep(time.Second * 3)
        cancel()
    }()

    // 组装和执行pipeline
    var nums = []int{1, 2, 3, 4, 5}
    _, err := PipeProcessBuildAndRun(ctx, nums, producer, sq, sum, sink[int])
    if err != nil {
        return
    }
}

并行度控制

异步pipeline各个算子间通过channel解耦,因此可以很方便的对每个算子独立控制处理并发度 这里以sq演示常见的几种并发度控制实现,深入探讨可查看参考文章

  • 不限制并发度
// 不限制并发度,每来一个起一个协程处理
func sqInfinitePool(ctx context.Context, in <-chan int) <-chan int {
    // merge后输出chan
    outChannel := make(chan int)

    go func() {
        defer close(outChannel)

        var wg sync.WaitGroup

        for s := range in {

            // 每个输入起协程处理
            wg.Add(1)
            go func() {
                defer wg.Done()

                select {
                case outChannel <- calc(s):
                case <-ctx.Done():
                    log.Printf("sqDynamicPool Receive exit msg %v\n", ctx.Err())
                    return
                }
            }()
        }

        // 全部处理完毕
        wg.Wait()
    }()

    return outChannel
}
  • 固定并发度
// Bounded parallelism
// 固定并发度,共享一个输出channel 同时处理
func sqFixedPool(ctx context.Context, in <-chan int) <-chan int {
    maxWorker := 2

    // merge后输出chan
    outChannel := make(chan int)

    go func() {
        defer close(outChannel)

        var wg sync.WaitGroup
        wg.Add(maxWorker)

        // 指定个数协程运行
        for i := 0; i < maxWorker; i++ {
            go func() {
                defer wg.Done()

                for s := range in {
                    select {
                    case outChannel <- calc(s):
                    case <-ctx.Done():
                        log.Printf("sq Receive exit msg %v\n", ctx.Err())
                        return
                    }
                }
            }()
        }

        // 全部处理完毕
        wg.Wait()

    }()

    return outChannel
}
  • 动态并发度
// 动态并发度,指定最大允许并发度,根据任务数据多少自行控制
func sqDynamicPool(ctx context.Context, in <-chan int) <-chan int {
    maxWorker := 5

    // merge后输出chan
    outChannel := make(chan int)

    go func() {
        defer close(outChannel)

        var wg sync.WaitGroup
        sm := semaphore.NewWeighted(int64(maxWorker))

        for s := range in {
            // 信号量检查,保证最大并发度限制
            sm.Acquire(ctx, 1)

            // 每个输入起协程处理
            wg.Add(1)
            go func() {
                defer sm.Release(1)
                defer wg.Done()

                select {
                case outChannel <- calc(s):
                case <-ctx.Done():
                    log.Printf("sqDynamicPool Receive exit msg %v\n", ctx.Err())
                    return
                }
            }()
        }

        // 全部处理完毕
        wg.Wait()
    }()

    return outChannel
}
  • Fanout/Fanin灵活控制
// 可指定并发策略,借助Fanout/Fanin Split/Clone+Merge动态调配数据分发过程
func sqFanOutInPool(ctx context.Context, in <-chan int) <-chan int {
    maxWorker := 2
    splitChan := Split(in, maxWorker)

    cs := make([]chan int, 0)

    for i := 0; i < maxWorker; i++ {
        // 每个函数创建一个输出通道
        out := make(chan int)
        cs = append(cs, out)

        // 多个函数共享一个输入通道
        go func(index int) {
            defer close(out)

            for data := range splitChan[index] {
                out <- calc(data)
            }
        }(i)
    }

    return Merge(cs)
}
  • 调用演示

只需要替换算子即可,非常灵活

func TestFanAsync(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go runNumGoroutineMonitor()

    // 定时取消
    go func() {
        time.Sleep(time.Second * 15)
        cancel()
    }()

    // 组装和执行pipeline
    var nums = []int{1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5}
    _, err := PipeProcessBuildAndRun(ctx, nums, producer, sqFanOutInPool, sink[int])
    if err != nil {
        return
    }
}

面向对象封装

类比函数封装,不同的是

  1. 这里使用单独的error channel来统一汇总错误控制处理
  2. 要求处理数据不能丢失,因此context通知关闭时,只有Source响应,然后上游通道关闭逐级通知下游关闭


  • 输入和输出接口

这里使用接口来标准化,更容易模块化(推荐输入/输出/错误处理和处理算子单独目录/文件中实现) 整个程序完全解耦,输入/输出/错误处理可以实现多种,各个算子实现都可以替换

type ISource interface {
    Process(ctx context.Context, wg *sync.WaitGroup, errChan chan error) <-chan int
}

type ISink interface {
    Process(ctx context.Context, wg *sync.WaitGroup, dataChan <-chan int, errChan chan error)
}
  • 处理接口
type IProcessor interface {
    Process(ctx context.Context, wg *sync.WaitGroup, dataChan <-chan int, errChan chan error) <-chan int
}
  • 错误处理接口
type IError interface {
    Process(ctx context.Context, wg *sync.WaitGroup, errChan chan error, cancel context.CancelFunc)
}

这里在error channel上一旦发现错误,cancel通知整个处理pipeline停止,如下实现

// 错误处理
type ErrorPolicyExit struct {
}

func NewErrorPolicyExit() *ErrorPolicyExit {
    return &ErrorPolicyExit{}
}

func (p *ErrorPolicyExit) Process(ctx context.Context, wg *sync.WaitGroup, errChan chan error, cancel context.CancelFunc) {
    for {
        select {
        case err, ok := <-errChan:
            if !ok {
                log.Println("error channel closed and exit!")
                return
            }

            log.Printf("Receive error %v\n", err)
            cancel()
        }
    }
}
  • 对象定义和方法
type ProcessorManager struct {
    source  ISource
    sink    ISink
    err     IError
    ps      []IProcessor
    errChan chan error
}

func NewProcessorManager() *ProcessorManager {
    return &ProcessorManager{errChan: make(chan error, 1)}
}

func (m *ProcessorManager) AddProcessor(processor IProcessor) {
    m.ps = append(m.ps, processor)
}

func (m *ProcessorManager) AddSource(source ISource) {
    m.source = source
}

func (m *ProcessorManager) AddSink(sink ISink) {
    m.sink = sink
}

func (m *ProcessorManager) AddError(err IError) {
    m.err = err
}
  • 流程编排

注意这里错误通道阻塞,直到处理完成关闭错误通道

func (m *ProcessorManager) Run(ctx context.Context) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    var wg = sync.WaitGroup{}

    // 组装pipeline
    wg.Add(1)
    dataChan := m.source.Process(ctx, &wg, m.errChan)

    for _, v := range m.ps {
        wg.Add(1)
        dataChan = v.Process(ctx, &wg, dataChan, m.errChan)
    }

    wg.Add(1)
    m.sink.Process(ctx, &wg, dataChan, m.errChan)

    go func() {
        wg.Wait()
        close(m.errChan)
    }()

    // 错误通道阻塞,错误处理集中处理
    // 出现错误则通知退出,可灵活定制处理策略
    m.err.Process(ctx, &wg, m.errChan, cancel)
}
  • 调用演示
func TestProcessorManager(t *testing.T) {
    m := NewProcessorManager()

    // pipeline组装
    //m.AddSource(NewTimerSource(1, 2, 3, 4, 5))
    m.AddSource(NewTimerSource(1, 2, -1, 3, 4, 5))
    m.AddSink(NewConsoleSink())

    m.AddError(NewErrorPolicyExit())

    m.AddProcessor(&SqProcessor{})
    m.AddProcessor(&SumProcessor{})

    // 执行
    m.Run(context.Background())
}

这里实现,一旦输入出现负数则立即退出,运行如下 可以看到协程依次退出

=== RUN TestProcessorManager
sink value: 1
sink value: 5
2023/01/13 10:13:11 Receive error Invalid Num
2023/01/13 10:13:11 Notify to exit source!!!
2023/01/13 10:13:11 sq data channel closed!
2023/01/13 10:13:11 sum data channel closed!
2023/01/13 10:13:11 sink data channel closed!
2023/01/13 10:13:11 error channel closed and exit!
--- PASS: TestProcessorManager (3.00s)

扩展

混合pipeline

生产场景中,很多时候会遇到同步同步pipeline和异步pipeline混合的场景,典型拓扑图如下

image.png

这时候一个Stage中可能包括多个算子流程,但是核心不变,此时Stage1相当于之前的同步pipeline 具体的封装过程就不再赘述

开源库

围绕pipeline功能,有不少优秀的Golang库,推荐几个如下:

  • Grab公司在用的ETL库

原始版本参考官方实现扩展包来,Github版本较老,可能需要自己扩展开发 https://github.com/taggledevel2/ratchet

  • 类似js lodash库的操作库

基于Go 1.18泛型实现,生产可用 如果你在寻找Go的lambda或者类似 Java Streams的成熟库,不妨看下这个 https://github.com/samber/lo

  • 单机流处理框架

类似单机版本Flink 或 Java Streams,持续维护中,生产可用

  • 类似Flink的分布式流处理框架

项目基本不维护,代码值得参考

  • 工作流(Workflow)框架

参考新版代码分支,生产可用

欢迎关注我 @文大侠666,拒绝灌水,只输出对工作有用的技术文章!
如果本文对你有帮助,欢迎点赞、收藏、转发给朋友,让我有持续创作的动力!

参考

演示代码

官方pipeline文档

值得参考的pipeline文档

之前针对关键技术点的深入实践文章