这就是一个典型的Pipeline(流水线)执行模型。把每一批数据(例如100条)看作流水线上的产品,4个步骤对应流水线上4个处理工序,每个工序处理完毕后就把半成品交给下一个工序。每个工序可以同时处理的产品数各不相同。
你可能首先想到启用1+4+8+2个goroutine,使用channel来传递数据。我也曾经这么干,结论就是这么干会让程序员疯掉:流程并发控制代码非常复杂,特别是你得处理异常、执行时间超出预期、可控中断等问题,你不得不加入一堆channel,直到你自己都不记得有什么用。
可重用的Pipeline模块
为了更高效完成ETL工作,我将Pipeline抽象成模块。我先把代码粘贴出来,再解析含义。模块可以直接使用,主要使用的接口是:NewPipeline、Async、Wait。
使用这个Pipeline组件,我们的ETL程序将会简单、高效、可靠,让程序员从繁琐的并发流程控制中解放出来:
package main import "log" func main() { //恢复上次执行的checkpoint,如果是第一次执行就获取一个初始值。 checkpoint := loadCheckpoint() //工序(1)在pipeline外执行,最后一个工序是保存checkpoint pipeline := NewPipeline(4, 8, 2, 1) for { //(1) //加载100条数据,并修改变量checkpoint //data是数组,每个元素是一条评论,之后的联表、NLP都直接修改data里的每条记录。 data, err := extractReviewsFromA(&checkpoint, 100) if err != nil { log.Print(err) break } //这里有个Golang著名的坑。 //“checkpoint”是循环体外的变量,它在内存中只有一个实例并在循环中不断被修改,所以不能在异步中使用它。 //这里创建一个副本curCheckpoint,储存本次循环的checkpoint。 curCheckpoint := checkpoint ok := pipeline.Async(func() error { //(2) return joinUserFromB(data) }, func() error { //(3) return nlp(data) }, func() error { //(4) return loadDataToC(data) }, func() error { //(5)保存checkpoint log.Print("done:", curCheckpoint) return saveCheckpoint(curCheckpoint) }) if !ok { break } if len(data) <100 { break } //处理完毕 } err := pipeline.Wait() if err != nil { log.Print(err) } }