早在2014年,官方就撰文介绍了golang pipeline,借助pipeline可以完成程序良好的解耦和扩展设计,结合golang 的特性可以设计出高效利用 IO 和多核 CPU 的pipeline,尤其是数据处理/任务分发等场景非常适用。
本文结合实际生产实践,讲解不同场景下pipeline的最佳实践,经过良好的封装,pipeline完全可以达到流处理框架(Flink)/Java Streams/Workflow编排框架的效果,甚至更灵活,掌握这个大杀器,向golang高手出发!
要是本文对您有帮助的话,欢迎【关注】作者,【点赞】+【收藏】,保持交流!
基础概念
pipeline(管道),顾名思义,一个大任务被拆分为多个小任务,像流水线一样,一级处理后传给下一级,类比设计模式中的责任链。
基本组成
参考如下,一个成熟的pipeline构成如下
- 1.输入源(Source)
可以是批数据(文件等)也可以是流数据(kakfa消费等) 也称为生产者
- 2.处理阶段(Stage)
多个组合,每个stage可能存在多个并发算子(Processor)
- 3.输出(Sink)
可指定不同的输出位置(命令行/日志/kakfa等) 也称为消费者
- 4.错误处理
关键技术点
如下,这里大部分技术,适用于Go的很多场景,感兴趣的可以查看参考文章,之前文章有深入探讨。可以看到pipeline相当于于Go中很多关键技术点的综合使用,算是集大成者。
- 不同的任务类型对应不同的pipeline编排模式
这里分为如下两种典型需求
- 同步pipeline
- 异步pipeline
- 如何控制任务的并发度
同步和异步pipeline处理不一样 核心是最优的利用Go的并发能力
- 任何处理任务的停止
不同的任务要求不一样,典型两种场景
- 一旦通知停止/出现错误时,pipeline立即停止,上游已发的数据不处理
- 一旦通知停止时,上游已发送的数据消费处理完成后,pipeline才停止
- 如何处理任务的错误
- 简单处理,出现错误打日志立即退出
- 更灵活的,日志统一收集到一个channel(类似图中error bus),灵活控制处理策略(比如错误分等级处理/统一入库/出现多少错误通知退出等等)
同步pipeline
场景
以数据处理的典型WordCount程序为例,如下图
对输入的文章,如下处理
- 按行分割后按照空格分割
- 统计每个单词出现的次数
- 按照次数排序
- 输出出现次数最多的前三单词
一个典型函数实现如下
func TestSimpleWordCount(t *testing.T) {
r := bufio.NewReader(strings.NewReader(LINES))
wordStat := make(map[string]int)
for {
// 按行分割
l, _, err := r.ReadLine()
if err != nil {
if err != io.EOF {
t.Log(err.Error())
}
break
}
// 拆分单词并统计词频
words := strings.Split(string(l), " ")
for _, word := range words {
if v, ok := wordStat[word]; ok {
wordStat[word] = v + 1
} else {
wordStat[word] = 1
}
}
}
// 倒排
var wordStatSlice []struct {
cnt int
word string
}
for k, v := range wordStat {
wordStatSlice = append(wordStatSlice, struct {
cnt int
word string
}{cnt: v, word: k})
}
sort.Slice(wordStatSlice, func(i, j int) bool {
return wordStatSlice[i].cnt > wordStatSlice[j].cnt
})
// 输出TOP 3
for i := 0; i < 3; i++ {
t.Logf("单词:%s - 出现次数:%d", wordStatSlice[i].word, wordStatSlice[i].cnt)
}
}
可以看到,对输入的每一个文章(LINES),都需要进行四个阶段处理,同步pipeline适用的场景就是多个stage阶段必须是在同一个协程中有序进行的,一般处理顺序是FIFO(先进先出),前一个算子处理结果直接传递给后续算子;如果是并发处理,那么同一输入的每个stage阶段只能由同一协程处理。
生产实践中,我们利用pipeline拆分各个阶段,使用统一的编排和错误处理来完成封装,一般常用
- 高阶函数封装,适用于简单场景
- 面向对象封装,利用OO思想,更加模块化
高阶函数封装
- 定义输入
返回的是一个输入数据channel,如果中间出现错误,返回error
type PipeSourceFunc func(ctx context.Context) (chan any, error)
- 定义处理函数和输出函数
接受上游数据处理后输出数据供下游函数调用,如果中间出现错误,返回error
type PipeProcessFunc func(ctx context.Context, params any) (any, error)
注意这里输入输出使用any来完成一定的泛化,下游接受上游数据时,最好做下类型断言,比如按行分割实现如下
// 按行分割
func splitByLine(ctx context.Context, params any) (any, error) {
if v, ok := params.(string); !ok {
return nil, errors.New("Split Input type error ")
} else {
return strings.Split(v, "\n"), nil
}
}
- 定义流程编排
单个并发度,示意图如下 这里的错误处理,返回后集中处理,所有的Stage处理算子,依次调用传递
func PipeProcessBuildAndRun(ctx context.Context, input PipeSourceFunc, funcs ...PipeProcessFunc) {
var err error = nil
// 输入
dataChan, err := input(ctx)
if err != nil {
log.Printf("source error-%v\n", err.Error())
return
}
// pipeline构建和执行
for data := range dataChan {
// 依次执行函数,上一个函数返回结果当做下个函数参数
for _, processFunc := range funcs {
data, err = processFunc(ctx, data)
// 错误集中处理,这里选择提前退出
if err != nil {
log.Printf("process error-%v\n", err.Error())
return
}
}
}
}
多并发度版本,示意图如下 这里并发度通过指定并发度,开启指定个数协程,每个协程中包含所有的Stage处理算子,依次调用传递
func PipeProcessBuildAndRunN(ctx context.Context, input PipeSourceFunc, maxCnt int, funcs ...PipeProcessFunc) {
var err error = nil
// 输入
dataChan, err := input(ctx)
if err != nil {
log.Printf("source error-%v\n", err.Error())
return
}
var wg = sync.WaitGroup{}
wg.Add(maxCnt)
// pipeline构建和执行
// maxCnt个协程同时消费处理
for i := 0; i < maxCnt; i++ {
go func() {
defer wg.Done()
var err error = nil
for data := range dataChan {
// 依次执行函数,上一个函数返回结果当做下个函数参数
for _, processFunc := range funcs {
data, err = processFunc(ctx, data)
// 错误集中处理,这里选择提前退出
if err != nil {
log.Printf("process error-%v\n", err.Error())
return
}
}
}
}()
}
wg.Wait()
}
- 调用演示
单个并发度
func TestSimpleWordCount2(t *testing.T) {
ctx := context.Background()
PipeProcessBuildAndRun(ctx, dataSource, splitByLine, countByWord, sortByCount, outTop3)
}
多个并发度 这里通过conetxt显式通知退出,立即停止
func TestSimpleWordCount3(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// 定时通知退出
go func() {
time.Sleep(5 * time.Second)
cancel()
}()
PipeProcessBuildAndRunN(ctx, dataTimerSource, 3, splitByLine, countByWord, sortByCount, outTop3)
}
可以看到对使用者非常友好,各个阶段实现拆分到函数中,可以扩展更多的处理函数和输入输出函数,这里流程编排仅仅演示了最常用的方式,生产实践中可以灵活定制更复杂的处理流程。
面向对象封装
和函数封装类似,使用接口来标准化,更容易模块化(推荐输入输出和处理算子单独目录/文件中实现) 整个程序完全解耦,输入输出可以实现多种,各个算子实现都可以替换。
- 输入和输出接口
type ISource interface {
Process(ctx context.Context) (<-chan any, error)
}
type ISink interface {
Process(ctx context.Context, params any) error
}
- 处理接口
type IProcessor interface {
Process(ctx context.Context, params any) (any, error)
}
- 对象定义和方法
type ProcessorManager struct {
source ISource
sink ISink
ps []IProcessor
}
func NewProcessorManager() *ProcessorManager {
return &ProcessorManager{}
}
func (m *ProcessorManager) AddProcessor(processor IProcessor) {
m.ps = append(m.ps, processor)
}
func (m *ProcessorManager) AddSource(source ISource) {
m.source = source
}
func (m *ProcessorManager) AddSink(sink ISink) {
m.sink = sink
}
- 流程编排
单个并发度
func (m *ProcessorManager) Run(ctx context.Context) error {
var err error
in, err := m.source.Process(ctx)
if err != nil {
return err
}
// pipeline构建和执行
for data := range in {
for _, v := range m.ps {
data, err = v.Process(ctx, data)
// 错误集中处理,这里选择提前退出
if err != nil {
log.Printf("Process err %s\n", err)
return nil
}
}
err := m.sink.Process(ctx, data)
if err != nil {
log.Printf("Sink err %s\n", err)
return nil
}
}
return nil
}
多个并发度,示意图如下
func (m *ProcessorManager) RunN(ctx context.Context, maxCnt int) error {
var err error
in, err := m.source.Process(ctx)
if err != nil {
return err
}
// pipeline构建和执行
syncProcess := func(data any) {
for _, v := range m.ps {
data, err = v.Process(ctx, data)
// 错误集中处理,这里选择提前退出
if err != nil {
log.Printf("Process err %s\n", err)
return
}
}
err := m.sink.Process(ctx, data)
if err != nil {
log.Printf("Sink err %s\n", err)
return
}
}
wg := sync.WaitGroup{}
wg.Add(maxCnt)
// 多个协程消费同一个channel
for i := 0; i < maxCnt; i++ {
go func() {
defer wg.Done()
for data := range in {
syncProcess(data)
}
}()
}
wg.Wait()
return nil
}
- 调用演示
单个并发度
func TestSimpleWordCount4(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
m := NewProcessorManager()
// pipeline组装
m.AddSource(NewTimerSource(LINES))
m.AddSink(NewConsoleSink())
m.AddProcessor(&SplitProcessor{})
m.AddProcessor(&CountProcessor{})
m.AddProcessor(&SortProcessor{})
// 定时通知退出
go func() {
time.Sleep(3 * time.Second)
cancel()
}()
err := m.Run(ctx)
if err != nil {
t.Logf("Run error %v", err.Error())
return
}
}
多个并发度
func TestSimpleWordCount5(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
m := NewProcessorManager()
// pipeline组装
m.AddSource(NewTimerSource(LINES))
m.AddSink(NewConsoleSink())
m.AddProcessor(&SplitProcessor{})
m.AddProcessor(&CountProcessor{})
m.AddProcessor(&SortProcessor{})
// 定时通知退出
go func() {
time.Sleep(3 * time.Second)
cancel()
}()
err := m.RunN(ctx, 2)
if err != nil {
t.Logf("Run error %v", err.Error())
return
}
}
异步pipeline
场景
参考官方文档,一个典型的计算任务需求如下图,输入一串数字,先分别计算平方然后计算累加值
不同于同步pipeline,异步pipeline输入输出和各个Stage算子之间数据传递都是通过channel,每个stage都可以独立控制自己的并发度。 一个简单实现如下
// 生成器 输入数据依次放入输出通道
func producer(nums ...int) <-chan int {
outChannel := make(chan int)
go func() {
defer close(outChannel)
for _, n := range nums {
outChannel <- n
}
}()
return outChannel
}
// 计算平方
func sq(in <-chan int) <-chan int {
outChannel := make(chan int)
go func() {
defer close(outChannel)
for n := range in {
outChannel <- n * n
}
}()
return outChannel
}
// 累加
func sum(in <-chan int) <-chan int {
outChannel := make(chan int)
go func() {
defer close(outChannel)
var sum = 0
for n := range in {
sum += n
}
outChannel <- sum
}()
return outChannel
}
// 输出到命令行
func sink(in <-chan int) <-chan int {
for val := range in {
fmt.Printf("sink value: %v\n", val)
}
return nil
}
func TestSimpleAsync(t *testing.T) {
sink(sum(sq(producer(1, 2, 3, 4, 5))))
}
高阶函数封装
异步pipieline封装很简单,就是把各个channel串联起来,重点在处理停止和并行度控制上。如下,
- 定义输入
type SourceFunc func(context.Context, ...int) (<-chan int, error)
这里简答实现定时输出
// 生成器 输入数据依次放入输出通道
func producer(ctx context.Context, nums ...int) (<-chan int, error) {
outChannel := make(chan int)
go func() {
defer close(outChannel)
for _, s := range nums {
// 定时间隔输出
time.Sleep(time.Second * 1)
select {
case outChannel <- s:
case <-ctx.Done():
log.Printf("producer Receive exit msg %v\n", ctx.Err())
return
}
}
}()
return outChannel, nil
}
- 定义处理函数和输出函数
type PipeFunc func(context.Context, <-chan int) <-chan int
这里使用context通知立即停止,比如sq
// 计算平方
func sq(ctx context.Context, in <-chan int) <-chan int {
outChannel := make(chan int)
go func() {
defer close(outChannel)
for s := range in {
select {
case outChannel <- s * s:
case <-ctx.Done():
log.Printf("sq Receive exit msg %v\n", ctx.Err())
return
}
}
}()
return outChannel
}
- 流程编排
这里错误处理依赖各个算子,一旦出错立即打日志退出
func PipeProcessBuildAndRun(ctx context.Context, nums []int, sourceFunc SourceFunc, pipeFncs ...PipeFunc) (<-chan int, error) {
ch, err := sourceFunc(ctx, nums...)
if err != nil {
return ch, err
}
for i := range pipeFncs {
ch = pipeFncs[i](ctx, ch)
}
return ch, nil
}
- 调用演示
func TestSimpleAsync(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 定时取消
go func() {
time.Sleep(time.Second * 3)
cancel()
}()
// 组装和执行pipeline
var nums = []int{1, 2, 3, 4, 5}
_, err := PipeProcessBuildAndRun(ctx, nums, producer, sq, sum, sink[int])
if err != nil {
return
}
}
并行度控制
异步pipeline各个算子间通过channel解耦,因此可以很方便的对每个算子独立控制处理并发度 这里以sq演示常见的几种并发度控制实现,深入探讨可查看参考文章
- 不限制并发度
// 不限制并发度,每来一个起一个协程处理
func sqInfinitePool(ctx context.Context, in <-chan int) <-chan int {
// merge后输出chan
outChannel := make(chan int)
go func() {
defer close(outChannel)
var wg sync.WaitGroup
for s := range in {
// 每个输入起协程处理
wg.Add(1)
go func() {
defer wg.Done()
select {
case outChannel <- calc(s):
case <-ctx.Done():
log.Printf("sqDynamicPool Receive exit msg %v\n", ctx.Err())
return
}
}()
}
// 全部处理完毕
wg.Wait()
}()
return outChannel
}
- 固定并发度
// Bounded parallelism
// 固定并发度,共享一个输出channel 同时处理
func sqFixedPool(ctx context.Context, in <-chan int) <-chan int {
maxWorker := 2
// merge后输出chan
outChannel := make(chan int)
go func() {
defer close(outChannel)
var wg sync.WaitGroup
wg.Add(maxWorker)
// 指定个数协程运行
for i := 0; i < maxWorker; i++ {
go func() {
defer wg.Done()
for s := range in {
select {
case outChannel <- calc(s):
case <-ctx.Done():
log.Printf("sq Receive exit msg %v\n", ctx.Err())
return
}
}
}()
}
// 全部处理完毕
wg.Wait()
}()
return outChannel
}
- 动态并发度
// 动态并发度,指定最大允许并发度,根据任务数据多少自行控制
func sqDynamicPool(ctx context.Context, in <-chan int) <-chan int {
maxWorker := 5
// merge后输出chan
outChannel := make(chan int)
go func() {
defer close(outChannel)
var wg sync.WaitGroup
sm := semaphore.NewWeighted(int64(maxWorker))
for s := range in {
// 信号量检查,保证最大并发度限制
sm.Acquire(ctx, 1)
// 每个输入起协程处理
wg.Add(1)
go func() {
defer sm.Release(1)
defer wg.Done()
select {
case outChannel <- calc(s):
case <-ctx.Done():
log.Printf("sqDynamicPool Receive exit msg %v\n", ctx.Err())
return
}
}()
}
// 全部处理完毕
wg.Wait()
}()
return outChannel
}
- Fanout/Fanin灵活控制
// 可指定并发策略,借助Fanout/Fanin Split/Clone+Merge动态调配数据分发过程
func sqFanOutInPool(ctx context.Context, in <-chan int) <-chan int {
maxWorker := 2
splitChan := Split(in, maxWorker)
cs := make([]chan int, 0)
for i := 0; i < maxWorker; i++ {
// 每个函数创建一个输出通道
out := make(chan int)
cs = append(cs, out)
// 多个函数共享一个输入通道
go func(index int) {
defer close(out)
for data := range splitChan[index] {
out <- calc(data)
}
}(i)
}
return Merge(cs)
}
- 调用演示
只需要替换算子即可,非常灵活
func TestFanAsync(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go runNumGoroutineMonitor()
// 定时取消
go func() {
time.Sleep(time.Second * 15)
cancel()
}()
// 组装和执行pipeline
var nums = []int{1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5}
_, err := PipeProcessBuildAndRun(ctx, nums, producer, sqFanOutInPool, sink[int])
if err != nil {
return
}
}
面向对象封装
类比函数封装,不同的是
- 这里使用单独的error channel来统一汇总错误控制处理
- 要求处理数据不能丢失,因此context通知关闭时,只有Source响应,然后上游通道关闭逐级通知下游关闭
- 输入和输出接口
这里使用接口来标准化,更容易模块化(推荐输入/输出/错误处理和处理算子单独目录/文件中实现) 整个程序完全解耦,输入/输出/错误处理可以实现多种,各个算子实现都可以替换。
type ISource interface {
Process(ctx context.Context, wg *sync.WaitGroup, errChan chan error) <-chan int
}
type ISink interface {
Process(ctx context.Context, wg *sync.WaitGroup, dataChan <-chan int, errChan chan error)
}
- 处理接口
type IProcessor interface {
Process(ctx context.Context, wg *sync.WaitGroup, dataChan <-chan int, errChan chan error) <-chan int
}
- 错误处理接口
type IError interface {
Process(ctx context.Context, wg *sync.WaitGroup, errChan chan error, cancel context.CancelFunc)
}
这里在error channel上一旦发现错误,cancel通知整个处理pipeline停止,如下实现
// 错误处理
type ErrorPolicyExit struct {
}
func NewErrorPolicyExit() *ErrorPolicyExit {
return &ErrorPolicyExit{}
}
func (p *ErrorPolicyExit) Process(ctx context.Context, wg *sync.WaitGroup, errChan chan error, cancel context.CancelFunc) {
for {
select {
case err, ok := <-errChan:
if !ok {
log.Println("error channel closed and exit!")
return
}
log.Printf("Receive error %v\n", err)
cancel()
}
}
}
- 对象定义和方法
type ProcessorManager struct {
source ISource
sink ISink
err IError
ps []IProcessor
errChan chan error
}
func NewProcessorManager() *ProcessorManager {
return &ProcessorManager{errChan: make(chan error, 1)}
}
func (m *ProcessorManager) AddProcessor(processor IProcessor) {
m.ps = append(m.ps, processor)
}
func (m *ProcessorManager) AddSource(source ISource) {
m.source = source
}
func (m *ProcessorManager) AddSink(sink ISink) {
m.sink = sink
}
func (m *ProcessorManager) AddError(err IError) {
m.err = err
}
- 流程编排
注意这里错误通道阻塞,直到处理完成关闭错误通道
func (m *ProcessorManager) Run(ctx context.Context) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg = sync.WaitGroup{}
// 组装pipeline
wg.Add(1)
dataChan := m.source.Process(ctx, &wg, m.errChan)
for _, v := range m.ps {
wg.Add(1)
dataChan = v.Process(ctx, &wg, dataChan, m.errChan)
}
wg.Add(1)
m.sink.Process(ctx, &wg, dataChan, m.errChan)
go func() {
wg.Wait()
close(m.errChan)
}()
// 错误通道阻塞,错误处理集中处理
// 出现错误则通知退出,可灵活定制处理策略
m.err.Process(ctx, &wg, m.errChan, cancel)
}
- 调用演示
func TestProcessorManager(t *testing.T) {
m := NewProcessorManager()
// pipeline组装
//m.AddSource(NewTimerSource(1, 2, 3, 4, 5))
m.AddSource(NewTimerSource(1, 2, -1, 3, 4, 5))
m.AddSink(NewConsoleSink())
m.AddError(NewErrorPolicyExit())
m.AddProcessor(&SqProcessor{})
m.AddProcessor(&SumProcessor{})
// 执行
m.Run(context.Background())
}
这里实现,一旦输入出现负数则立即退出,运行如下 可以看到协程依次退出
=== RUN TestProcessorManager
sink value: 1
sink value: 5
2023/01/13 10:13:11 Receive error Invalid Num
2023/01/13 10:13:11 Notify to exit source!!!
2023/01/13 10:13:11 sq data channel closed!
2023/01/13 10:13:11 sum data channel closed!
2023/01/13 10:13:11 sink data channel closed!
2023/01/13 10:13:11 error channel closed and exit!
--- PASS: TestProcessorManager (3.00s)
扩展
混合pipeline
生产场景中,很多时候会遇到同步同步pipeline和异步pipeline混合的场景,典型拓扑图如下
这时候一个Stage中可能包括多个算子流程,但是核心不变,此时Stage1相当于之前的同步pipeline 具体的封装过程就不再赘述
开源库
围绕pipeline功能,有不少优秀的Golang库,推荐几个如下:
- Grab公司在用的ETL库
原始版本参考官方实现扩展包来,Github版本较老,可能需要自己扩展开发 https://github.com/taggledevel2/ratchet
- 类似js lodash库的操作库
基于Go 1.18泛型实现,生产可用 如果你在寻找Go的lambda或者类似 Java Streams的成熟库,不妨看下这个 https://github.com/samber/lo
- 单机流处理框架
类似单机版本Flink 或 Java Streams,持续维护中,生产可用
- 类似Flink的分布式流处理框架
项目基本不维护,代码值得参考
- 工作流(Workflow)框架
参考新版代码分支,生产可用
欢迎关注我 @文大侠666,拒绝灌水,只输出对工作有用的技术文章!
如果本文对你有帮助,欢迎点赞、收藏、转发给朋友,让我有持续创作的动力!
参考
演示代码
官方pipeline文档
值得参考的pipeline文档
之前针对关键技术点的深入实践文章