Golang被证明非常适合并发编程,goroutine比异步编程更容易阅读、优雅、高效。 本文提出基于Golang的Pipeline执行模型,提出适合批量处理大量数据(ETL )的方案。
想象一下这样的应用情景。 ((学习推荐: go ) ) ) ) ) )。
从数据库a(cassandra )加载用户评论(数量巨大,例如10亿条); 根据各注释用户ID,从数据库b(MySQL )调用与用户资料相关联的NLP服务(自然语言处理),处理各注释; 将处理结果写入数据库c(elasticsearch )。
由于APP应用中遇到的各种问题,我们总结了这些需求。
需求一:数据必须分割处理。 例如,每批规定100瓶。 如果出现问题(例如,其中一个数据库出现故障),则中断,并在下次程序启动时使用checkpoint从中断位置恢复。
要求2 )为每个进程设置适当的并发行数,并对数据库和NLP服务施加适当的负载(消耗尽可能多的资源来提高ETL性能,而不影响其他业务)。 例如,步骤(1)-4 )分别设定并行数1、4、8、2 .
这是典型的Pipeline (流水线)执行模型。 将每批数据(例如100条)视为生产线上的产品,四个步骤对应于生产线上的四个处理工序,每个工序处理后将半成品传递给下一个工序。 每个工序可同时处理的产品数量各不相同。
首先,您可能考虑启用1、4、8和2个goroutine,然后使用channel传递数据。 我也曾经这样做过。 结论是,这样做会让程序员昏厥。 过程的并发控制代码非常复杂,特别是必须处理异常、执行时间超出预期、可控中断等问题。 我自己都不记得有什么用,就要参加很多通道。
可复用的Pipeline模块
为了更高效地执行ETL工作,我们将Pipeline抽象到模块中。 粘贴代码,然后分析其含义。 模块可以直接使用。 主要使用的接口是新pipeline、异步和等待。
使用此Pipeline组件,ETL程序简单、高效、可靠,使程序员摆脱了繁琐的并发过程控制。 包主
import 'log '
func main () }
//恢复上次执行的checkpoint,首次执行时获取初始值。
check point :=加载检查点(
//工序(1)在pipeline外执行,最后工序保存检查点
pipeline :=新pipeline (4、8、2、1 ) ) )。
for {
//(1) )。
加载//100个数据并更改变量checkpoint
//data是数组,每个元素是注释,然后是内联表,NLP直接修改data中的每个记录。
数据,err :=extractreviewsfroma (检查点,100 ) ) ) ) ) ) ) ) ) )。
if err!=nil {
log.Print(err )
布雷克
}
//这里有谷歌有名的漏洞。
//“checkpoint”是循环体外的变量,内存中只有一个实例,在循环中不断更改,不能异步使用。
//在此处创建curCheckpoint的副本,并保存此周期的checkpoint。
curCheckpoint :=checkpoint
ok:=Pipeline.Async(func ) )错误
//(2) )。
返回joinuserfromb (数据) )。
、func () error { )。
//(3) )。
返回NLP (数据)
、func () error { )。
//(4) )。
恢复负载数据(数据)。
、func () error { )。
//5 )保存检查点
log.print('done: ',curCheckpoint ) )。
returnsavecheckpoint (cur check point )。
() )
if! 确定ok { break }
iflen(data ) 100 { break } //已处理
}
err :=pipeline.Wait (
if err!=nil{log.print(err ) }
}