背景:

    读取一个500w行的大文件,将每一行的数据读取出来做数据整合归并之后,再按照一定的逻辑和算法进行处理后存入redis。

文件格式:

    url地址                                                                    用户32位标识               点击次数

    http://jingpin.pgc.panda.tv/hd/xiaopianpian.html    aaaaaaaaaaaaaaaaaaaa    5

具体场景:    

    本节先看一下大文件处理最简单的情况,即在读文件的过程中针对文件每一行都开启一个协程来做数据合并,看看这种情况下的定位以及优化的思路。

问题现象:

    如果将整个文件串行执行来做数据整合的话,只需要4 or 5s就可以完成,但是每行并发处理却需要几十秒到几十分钟不等。

 

代码如下:

simple.go

package main

import (
	"bufio"
	"fmt"
	"io"
	"io/ioutil"
	"log"
	"net/http"
	_ "net/http/pprof"
	"os"
	"singleflight"
	"strconv"
	"strings"
	"sync"
	"time"
	"utils"
)

var (
	wg     sync.WaitGroup
	mu     sync.RWMutex //全局锁
	single = &singleflight.Task{}
)

func main() {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println(err)
		}
	}()

	go func() {
		log.Println(http.ListenAndServe("localhost:8080", nil))
	}()

	file := "/data/origin_data/part-r-00000"
	if fp, err := os.Open(file); err != nil {
		panic(err)
	} else {
		start := time.Now()
		defer fp.Close()
		defer func() { //时间消耗
			fmt.Println("time cost:", time.Now().Sub(start))
		}()

		//统计下每个url的点击用户数
		hostNums := hostsStat()

		buf := bufio.NewReader(fp)
		hostToFans := make(map[string]utils.MidList) //[url][]用户id

		for {
			line, err := buf.ReadString('\n')
			if err != nil {
				if err == io.EOF { //遭遇行尾
					fmt.Println("meet the end")
					break //跳出死循环
				}
				panic(err)
			}

			//每一行单独处理
			wg.Add(1)
			go handleLine(line, hostToFans, hostNums)
		}

		wg.Wait()
        fmt.Println("*************************handle file data complete************************")
	}
}

//处理每一行的数据
func handleLine(line string, hostToFans map[string]utils.MidList, hostNums map[string]int) {
	defer wg.Done()

	line = strings.TrimSpace(line)
	components := strings.Split(line, "\t")
	//先判断是否是合法网站的url
	schemes := strings.Split(components[0], "/")
	if utils.In_array(utils.ValidPlatforms, schemes[2]) == false {
		fmt.Println("invalid url: ", components[0])
		return
	}

	mu.RLock()
	if _, ok := hostToFans[components[0]]; ok {
		mu.RUnlock()
		click_times, _ := strconv.Atoi(components[2])
		mu.Lock()
		hostToFans[components[0]] = hostToFans[components[0]].Append(components[1], click_times)
		mu.Unlock()
	} else { //下一个url
		mu.RUnlock()
		startElement :=  false //用以标识是否是某个url统计的初始元素
        //singleflight代码  防止有多个相同url同时访问时,该url对应的[]string还没有初始化,导致多次make代码的执行
		single.Do(components[0], func() (interface{}, error) {
			mu.RLock()
			if _, ok := hostToFans[components[0]]; ok { //再判断一遍, 防止高并发的情形下,多个相同url的写map操作,都会进入重新分配空间的步骤
				mu.RUnlock()
				return nil, nil
			}
			mu.RUnlock()

			mu.Lock()
			click_times, _ := strconv.Atoi(components[2])
			hostToFans[components[0]] = utils.NewMidList(hostNums[components[0]])
			hostToFans[components[0]] = hostToFans[components[0]].Append(components[1], click_times)
			mu.Unlock()
			startElement = true
			return nil, nil
		})

		if !startElement {
			mu.Lock()
			click_times, _ := strconv.Atoi(components[2])
			hostToFans[components[0]] = hostToFans[components[0]].Append(components[1], click_times)
			mu.Unlock()
		}
	}
}

//针对url:用户的统计文件  该文件列出每个url对应的独立用户个数
func hostsStat() map[string]int {
	hostStats := "./scripts/data/stat.txt"
	bytes, _ := ioutil.ReadFile(hostStats)
    //....some code....
	return hostNums
}

    执行一下这个代码之后,发现程序执行不久,内存占用就噌噌噌的涨到90%+,过了一会cpu占用下降到极低,但是load一直保持再过载的水平,看下图。所以大胆猜测因为gc导致进程夯住了。之后用pprof和gctrace也印证了这个想法,如果对pprof和gctrace不太清楚的同学可以看笔者之前的文章 golang 如何排查和定位GC问题。

       

    其实在笔者最开始的代码中,已经有一些地方在注意降低内存的消耗了,比如说在初始化每个url对应的用户id集合时借鉴groupcache的singleflight,确保不会多次重复申请空间;比如url对应的用户id切片,先算好具体大小再make。虽然代码很简单, 但是上文中的代码显然还是有一些问题。

    通过pprof,我发现程序执行的过程中,大部分时间都消耗在gc上,如下图。划红线的都是和gc有关的函数。所以问题就变成排查为什么gc会这么长时间。

     

    大部分情况下gc被导致的原因是分配的内存达到某个阈值,很显然,本例属于这种情况,前文提到内存占用稳定在90+。那么为什么这个进程会占用这么多的内存呢?笔者一直试图用pprof的heap和profile来分析出这个问题,但是一直无果。直到有一次通过pprof查看goroutine的状态时,发现当前正在工作的协程高达几十万,甚至有时能到达接近150w的量级, 如下图。这样就能够解释一部分问题了,单个协程如果是3K大小,那么当协程数量到达百万时,就算协程里什么都没有也会占用4G的内存。而笔者在做实验的机器只有8g的内存,所以肯定会出现内存被吃满频繁gc导致进程夯住。

       

    所以第一步,肯定是要控制一下当前的协程数,不能无限的增长。在读取文本内容的loop里,加上对行数的计算,这样每到一个阈值时,就可以休息一下,暂缓下协程增长的速率。加上限制之后,进程不会再卡死,整个的执行时间稳定在20~30s之间。

iterator := 0
for {
        line, err := buf.ReadString('\n')
        if err != nil {
                if err == io.EOF { //遭遇行尾
                        fmt.Println("meet the end")
                        break //跳出死循环
                }   
                panic(err)
        }   

        //每一行单独处理 这里需要加逻辑防止并发过大导致大量占用cpu和内存,使得整个进程因为gc夯住,
        //可以每读10w行左右就休息一会降低一下程序同时在线的协程
        iterator++
        if iterator <= 120000 {
                wg.Add(1)
                go handleLine(line, hostToFans, hostNums)
        } else {
                iterator = 1
                <-time.After(130 * time.Millisecond) //暂停需要5s左右
                wg.Add(1)
                go handleLine(line, hostToFans, hostNums)
        }
}
wg.Wait()

    对比一下串行执行的结果,可以发现虽然现在并发执行已经稳定,但是就算刨去休眠时间,和串行执行相比还是慢很多,所以肯定还有优化的空间。这个时候pprof的profile以及heap分析就有了施展拳脚的地方了。看下面两张图,分别是内存消耗和cpu消耗图:

   cpu耗时分析

    

内存占用分析

   

 

    上面只截取了一部分的图,但是从中我们已经能够找到需要优化的地方了。可以看到strings.Split函数耗时和耗内存都很严重,主要是它会生成slice。分析一下前文的代码可以发现至少判断url是否是合法网站这一块的strings.Split是可以不要的。这里不光会有额外的运行时间还会生成slice占用内存导致gc。所以对这块功能进行改造:

//先判断是否是合法平台的主播
if is_valid_platform(utils.ValidPlatforms, components[0]) == false {
        fmt.Println("invalid url: ", components[0])
        return nil, errors.New("invalid url")
}

func is_valid_platform(platforms []string, hostUrl string) bool {                                                                                                                                                 
        for _, platform := range platforms {
                if strings.Index(hostUrl, platform) != -1 {
                        return true
                }
        }

        return false
}

这样的就可以减少不必要的slice引起的分配空间。改完之后再执行,整个任务稳定在15s左右,减去休眠时间的话就是10s。到这里其实已经算是优化的差不多了,但是其实还有一个地方可以看一下。

    上面的heap分析图可以看到其实singleflight.(*Task).Do函数占用更多的内存,并且也占用了很多的cpu时间,如下:

    

   除了一些原生函数之外,就属它最高了,而且该函数也只会在每次新的url出现的时候才会执行。可以看下singleflight的主要结构体,会发现使用了指针变量,而指针变量在gc的时候会导致二次遍历,使得整个gc变慢。虽然笔者此处用的singleflight肯定是不能修改,  但是如果有可能的话,尽量还是要少用指针。

       

 

这一节只能算是简单讲了下优化的思路和过程,希望下一节能把完整版的优化方案写出来。