go语言以优异的并发特性而闻名,刚好手上有个小项目比较适合。

项目背景:

公司播控平台的数据存储包括MySQL和ElasticSearch(ES)两个部分,编辑、运营的数据首先保存在MySQL中,为了实现模糊搜索和产品关联推荐,特别增加了ES,ES中保存的是节目集的基本信息。

本项目是为了防止实时同步数据出现问题或者系统重新初始化时的全量数据同步而做的。项目主要是从MySQL读取所有的节目集数据写入到ES中。

项目特点:

因为节目集数量较大,不能一次性的读入内存,因此每次读出一部分记录写入ES。ORM使用的是beego。为了提高性能使用了协程,其中读MySQL的部分最大开启20个协程,ES写入部分开启了15个协程(因为ES分片设置的问题,5个协程和15个协程性能映像不大)。

项目主要包括三个文件:

1、PrdES_v3.go,项目的入口,负责协调MySQL读取和ES写入。

 在run函数里可以看到使用了reflect.SelectCase,使用reflect.SelectCase的原因是读MySQL数据是多个协程,不可预计哪个会首先返回,selectCase是任何一个处理完毕reflect.Select函数就会返回,MySQL读取的数据放在channel中宕Select函数返回时chosen, recv, ok := reflect.Select(selectCase)判断ok是否未true            chosen代表的是协程id通过result = recv.Interface().([]*prd.Series)获得返回的数据,因为MySQL读取的数据是对象的结果集,因次使用recv.Interface函数,如果是简单类型可以使用recv.recvInt(),recv.recvString()等函数直接获取channel返回数据。 

这里通过counter控制协程的数量,也可以通过channel,用select的方式控制协程的数量,之所以用counter计数器的方式控制协程数量是我想知道同时有多少协程在运行。

注:此处channel可以不用创建数组形式,channel带回来的数据也没有顺序问题。

2、es.go,负责写入ES和es的写入协程调度

 在es.go里有两把锁lock和lockTotal,前者是针对counter变量,记录es正在写入的协程数量的;后者为记录总共写入多少条记录到es而增加的。

这里必须要提到的是Over函数,初步使用协程的容易忽略。golang的原则是当main函数运行结束后,所有正在运行的协程都会终止,因袭在MySQL读取数据完毕必须调用Over函数,等待所有的协程结束。这里使用sync.waiGrooup。每次协程启动执行下面两个语句:

//增加计数器
this.wg.Add(1)
//减少计数器,函数结束时自动执行
defer this.wg.Done()
Over函数中调用wg.Wait()等待计数器为0时返回,否则就一直阻塞。当然读者也可以看到通过检查counter是否小于等于0也可以判断协程是否都结束(Over函数被注释的部分),显然使用waitGroup更优雅和高效。

 

3、db.go,负责MySQL数据的读取

 从项目上看。go语言开发还是比较简洁的,多协程实现也相对容易,但要求开发者必须对概念非常清晰,像select和selectCase理解和defer的理解要很到位,个人层经做过多年的C/C++程序员,从经验上看,C/C++的经验(多线程的理解)对运用go语言还是很有帮助的。