for{

buf := make([]byte,4*1024) //the chunk size

n, err := r.Read(buf) //loading chunk into buffer

buf = buf[:n]

ifn == 0 {

iferr != nil {

fmt.Println(err)

break

}

iferr == io.EOF {

break

}

returnerr

}

}

一旦我们将文件分块,我们就可以分叉一个线程,即Go routine,同时处理多个文件区块。上述代码将修改为:

//sync pools to reuse the memory and decrease the preassure on Garbage Collector

linesPool := sync.Pool{New: func interface{} {

lines := make([]byte, 500*1024)

returnlines

}}

stringPool := sync.Pool{New: func interface{} {

lines := ""

returnlines

}}

slicePool := sync.Pool{New: func interface{} {

lines := make([]string, 100)

returnlines

}}

r := bufio.NewReader(f)

var wg sync.WaitGroup // waitgroup to keep track off all threads

for{

buf := linesPool.Get.([]byte)

n, err := r.Read(buf)

buf = buf[:n]

ifn == 0 {

iferr != nil {

fmt.Println(err)

break

}

iferr == io.EOF {

break

}

returnerr

}

nextUntillNewline, err := r.ReadBytes( '\n')// readentire line

iferr != io.EOF {

buf = append(buf, nextUntillNewline...)

}

wg.Add(1)

go func{

//process each chunk concurrently

//start -> logstart time, end -> logend time

ProcessChunk(buf, &linesPool, &stringPool, &slicePool, start, end)

wg.Done

}

}

wg.Wait

}

上面的代码,引入了两个优化点:

  • sync.Pool是一个强大的对象池,可以重用对象来减轻垃圾收集器的压力。我们将重用各个分片的内存,以减少内存消耗,大大加快我们的工作。
  • Go Routines帮助我们同时处理缓冲区块,这大大提高了处理速度。

现在让我们实现ProcessChunk函数,它将处理以下格式的日志行。

2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n

我们将根据命令行提供的时间戳提取日志。

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {

//another waitgroup to process every chunk further

var wg2 sync.WaitGroup

logs := stringPool.Get.(string)

logs = string(chunk)

linesPool.Put(chunk) //put back the chunk inpool

//split the string by "\n", so that we have slice of logs

logsSlice := strings.Split(logs, "\n")

stringPool.Put(logs) //put back the string pool

chunkSize := 100 //process the bunch of 100 logs inthread

n := len(logsSlice)

noOfThread := n / chunkSize

ifn%chunkSize != 0 { //check foroverflow

noOfThread++

}

length := len(logsSlice)

//traverse the chunk

fori := 0; i < length; i += chunkSize {

wg2.Add(1)

//process each chunk insaperate chunk

go func(s int, e int) {

fori:= s; i<e;i++{

text := logsSlice[i]

iflen(text) == 0 {

continue

}

logParts := strings.SplitN(text, ",", 2)

logCreationTimeString := logParts[0]

logCreationTime, err := time.Parse( "2006-01- 02T15:04:05.0000Z", logCreationTimeString)

iferr != nil {

fmt.Printf( "\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)

return

}

// check iflog's timestamp is inbetween our desired period

if logCreationTime.After(start) && logCreationTime.Before(end) {

fmt.Println(text)

}

}

textSlice = nil

wg2.Done

}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))

//passing the indexes for processing

}

wg2.Wait //wait for a chunk to finish

logsSlice = nil

}

对上面的代码进行基准测试。以16 GB的日志文件为例,提取日志所需的时间约为25秒。

完整的代码示例如下:

func main{

s := time.Now

args := os.Args[1:]

iflen(args) != 6 { // forformat LogExtractor.exe -f "From Time"-t "To Time"-i "Log file directory location"

fmt.Println( "Please give proper command line arguments")

return

}

startTimeArg := args[1]

finishTimeArg := args[3]

fileName := args[5]

file, err := os.Open(fileName)

iferr != nil {

fmt.Println( "cannot able to read the file", err)

return

}

defer file.Close //close after checking err

queryStartTime, err := time.Parse( "2006-01-02T15:04:05.0000Z", startTimeArg)

iferr != nil {

fmt.Println( "Could not able to parse the start time", startTimeArg)

return

}

queryFinishTime, err := time.Parse( "2006-01-02T15:04:05.0000Z", finishTimeArg)

iferr != nil {

fmt.Println( "Could not able to parse the finish time", finishTimeArg)

return

}

filestat, err := file.Stat

iferr != nil {

fmt.Println( "Could not able to get the file stat")

return

}

fileSize := filestat.Size

offset := fileSize - 1

lastLineSize := 0

for{

b := make([]byte, 1)

n, err := file.ReadAt(b, offset)

iferr != nil {

fmt.Println( "Error reading file ", err)

break

}

char := string(b[0])

ifchar == "\n"{

break

}

offset--

lastLineSize += n

}

lastLine := make([]byte, lastLineSize)

_, err = file.ReadAt(lastLine, offset+1)

iferr != nil {

fmt.Println( "Could not able to read last line with offset", offset, "and lastline size", lastLineSize)

return

}

logSlice := strings.SplitN(string(lastLine), ",", 2)

logCreationTimeString := logSlice[0]

lastLogCreationTime, err := time.Parse( "2006-01-02T15:04:05.0000Z", logCreationTimeString)

iferr != nil {

fmt.Println( "can not able to parse time : ", err)

}

iflastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {

Process(file, queryStartTime, queryFinishTime)

}

fmt.Println( "\nTime taken - ", time.Since(s))

}

func Process(f *os.File, start time.Time, end time.Time) error {

linesPool := sync.Pool{New: func interface{} {

lines := make([]byte, 250*1024)

returnlines

}}

stringPool := sync.Pool{New: func interface{} {

lines := ""

returnlines

}}

r := bufio.NewReader(f)

var wg sync.WaitGroup

for{

buf := linesPool.Get.([]byte)

n, err := r.Read(buf)

buf = buf[:n]

ifn == 0 {

iferr != nil {

fmt.Println(err)

break

}

iferr == io.EOF {

break

}

returnerr

}

nextUntillNewline, err := r.ReadBytes( '\n')

iferr != io.EOF {

buf = append(buf, nextUntillNewline...)

}

wg.Add(1)

go func{

ProcessChunk(buf, &linesPool, &stringPool, start, end)

wg.Done

}

}

wg.Wait

returnnil

}

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {

var wg2 sync.WaitGroup

logs := stringPool.Get.(string)

logs = string(chunk)

linesPool.Put(chunk)

logsSlice := strings.Split(logs, "\n")

stringPool.Put(logs)

chunkSize := 300

n := len(logsSlice)

noOfThread := n / chunkSize

ifn%chunkSize != 0 {

noOfThread++

}

fori := 0; i < (noOfThread); i++ {

wg2.Add(1)

go func(s int, e int) {

defer wg2.Done //to avaoid deadlocks

fori := s; i < e; i++ {

text := logsSlice[i]

iflen(text) == 0 {

continue

}

logSlice := strings.SplitN(text, ",", 2)

logCreationTimeString := logSlice[0]

logCreationTime, err := time.Parse( "2006-01-02T15:04:05.0000Z", logCreationTimeString)

iferr != nil {

fmt.Printf( "\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)

return

}

iflogCreationTime.After(start) && logCreationTime.Before(end) {

//fmt.Println(text)

}

}

}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))

}

wg2.Wait

logsSlice = nil

}

文链接:https://medium.com/swlh/processing-16gb-file-in-seconds-go-lang-3982c235dfa2

Kubernetes线下培训

本次培训在 上海开班, 基于最新考纲,通过 线下授课、考题解读、模拟演练等方式,帮助学员快速掌握Kubernetes的理论知识和专业技能,并针对考试做特别强化训练,让学员能从容面对CKA认证考试,使学员既能掌握Kubernetes相关知识,又能通过CKA认证考试, 学员可多次参加培训,直到通过认证。点击下方图片或者阅读原文链接查看详情。 返回搜狐,查看更多