Golang被证明非常适合并发编程,goroutine比异步编程更容易阅读、优雅、高效。 本文提出基于Golang的Pipeline执行模型,提出适合批量处理大量数据(ETL )的方案。

想象一下这样的应用情景。 ((学习推荐: go ) ) ) ) ) )。

从数据库a(cassandra )加载用户评论(数量巨大,例如10亿条); 根据各注释用户ID,从数据库b(MySQL )调用与用户资料相关联的NLP服务(自然语言处理),处理各注释; 将处理结果写入数据库c(elasticsearch )。

由于APP应用中遇到的各种问题,我们总结了这些需求。

需求一:数据必须分割处理。 例如,每批规定100瓶。 如果出现问题(例如,其中一个数据库出现故障),则中断,并在下次程序启动时使用checkpoint从中断位置恢复。

要求2 )为每个进程设置适当的并发行数,并对数据库和NLP服务施加适当的负载(消耗尽可能多的资源来提高ETL性能,而不影响其他业务)。 例如,步骤(1)-4 )分别设定并行数1、4、8、2 .

这是典型的Pipeline (流水线)执行模型。 将每批数据(例如100条)视为生产线上的产品,四个步骤对应于生产线上的四个处理工序,每个工序处理后将半成品传递给下一个工序。 每个工序可同时处理的产品数量各不相同。

首先,您可能考虑启用1、4、8和2个goroutine,然后使用channel传递数据。 我也曾经这样做过。 结论是,这样做会让程序员昏厥。 过程的并发控制代码非常复杂,特别是必须处理异常、执行时间超出预期、可控中断等问题。 我自己都不记得有什么用,就要参加很多通道。

可复用的Pipeline模块

为了更高效地执行ETL工作,我们将Pipeline抽象到模块中。 粘贴代码,然后分析其含义。 模块可以直接使用。 主要使用的接口是新pipeline、异步和等待。

使用此Pipeline组件,ETL程序简单、高效、可靠,使程序员摆脱了繁琐的并发过程控制。 包主

import 'log '

func main () }

//恢复上次执行的checkpoint,首次执行时获取初始值。

check point :=加载检查点(

//工序(1)在pipeline外执行,最后工序保存检查点

pipeline :=新pipeline (4、8、2、1 ) ) )。

for {

//(1) )。

加载//100个数据并更改变量checkpoint

//data是数组,每个元素是注释,然后是内联表,NLP直接修改data中的每个记录。

数据,err :=extractreviewsfroma (检查点,100 ) ) ) ) ) ) ) ) ) )。

if err!=nil {

log.Print(err )

布雷克

}

//这里有谷歌有名的漏洞。

//“checkpoint”是循环体外的变量,内存中只有一个实例,在循环中不断更改,不能异步使用。

//在此处创建curCheckpoint的副本,并保存此周期的checkpoint。

curCheckpoint :=checkpoint

ok:=Pipeline.Async(func ) )错误

//(2) )。

返回joinuserfromb (数据) )。

、func () error { )。

//(3) )。

返回NLP (数据)

、func () error { )。

//(4) )。

恢复负载数据(数据)。

、func () error { )。

//5 )保存检查点

log.print('done: ',curCheckpoint ) )。

returnsavecheckpoint (cur check point )。

() )

if! 确定ok { break }

iflen(data ) 100 { break } //已处理

}

err :=pipeline.Wait (

if err!=nil{log.print(err ) }

}