for{
buf := make([]byte,4*1024) //the chunk size
n, err := r.Read(buf) //loading chunk into buffer
buf = buf[:n]
ifn == 0 {
iferr != nil {
fmt.Println(err)
break
}
iferr == io.EOF {
break
}
returnerr
}
}
一旦我们将文件分块,我们就可以分叉一个线程,即Go routine,同时处理多个文件区块。上述代码将修改为:
//sync pools to reuse the memory and decrease the preassure on Garbage Collector
linesPool := sync.Pool{New: func interface{} {
lines := make([]byte, 500*1024)
returnlines
}}
stringPool := sync.Pool{New: func interface{} {
lines := ""
returnlines
}}
slicePool := sync.Pool{New: func interface{} {
lines := make([]string, 100)
returnlines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup // waitgroup to keep track off all threads
for{
buf := linesPool.Get.([]byte)
n, err := r.Read(buf)
buf = buf[:n]
ifn == 0 {
iferr != nil {
fmt.Println(err)
break
}
iferr == io.EOF {
break
}
returnerr
}
nextUntillNewline, err := r.ReadBytes( '\n')// readentire line
iferr != io.EOF {
buf = append(buf, nextUntillNewline...)
}
wg.Add(1)
go func{
//process each chunk concurrently
//start -> logstart time, end -> logend time
ProcessChunk(buf, &linesPool, &stringPool, &slicePool, start, end)
wg.Done
}
}
wg.Wait
}
上面的代码,引入了两个优化点:
- sync.Pool是一个强大的对象池,可以重用对象来减轻垃圾收集器的压力。我们将重用各个分片的内存,以减少内存消耗,大大加快我们的工作。
- Go Routines帮助我们同时处理缓冲区块,这大大提高了处理速度。
现在让我们实现ProcessChunk函数,它将处理以下格式的日志行。
2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n
我们将根据命令行提供的时间戳提取日志。
func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
//another waitgroup to process every chunk further
var wg2 sync.WaitGroup
logs := stringPool.Get.(string)
logs = string(chunk)
linesPool.Put(chunk) //put back the chunk inpool
//split the string by "\n", so that we have slice of logs
logsSlice := strings.Split(logs, "\n")
stringPool.Put(logs) //put back the string pool
chunkSize := 100 //process the bunch of 100 logs inthread
n := len(logsSlice)
noOfThread := n / chunkSize
ifn%chunkSize != 0 { //check foroverflow
noOfThread++
}
length := len(logsSlice)
//traverse the chunk
fori := 0; i < length; i += chunkSize {
wg2.Add(1)
//process each chunk insaperate chunk
go func(s int, e int) {
fori:= s; i<e;i++{
text := logsSlice[i]
iflen(text) == 0 {
continue
}
logParts := strings.SplitN(text, ",", 2)
logCreationTimeString := logParts[0]
logCreationTime, err := time.Parse( "2006-01- 02T15:04:05.0000Z", logCreationTimeString)
iferr != nil {
fmt.Printf( "\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
return
}
// check iflog's timestamp is inbetween our desired period
if logCreationTime.After(start) && logCreationTime.Before(end) {
fmt.Println(text)
}
}
textSlice = nil
wg2.Done
}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
//passing the indexes for processing
}
wg2.Wait //wait for a chunk to finish
logsSlice = nil
}
对上面的代码进行基准测试。以16 GB的日志文件为例,提取日志所需的时间约为25秒。
完整的代码示例如下:
func main{
s := time.Now
args := os.Args[1:]
iflen(args) != 6 { // forformat LogExtractor.exe -f "From Time"-t "To Time"-i "Log file directory location"
fmt.Println( "Please give proper command line arguments")
return
}
startTimeArg := args[1]
finishTimeArg := args[3]
fileName := args[5]
file, err := os.Open(fileName)
iferr != nil {
fmt.Println( "cannot able to read the file", err)
return
}
defer file.Close //close after checking err
queryStartTime, err := time.Parse( "2006-01-02T15:04:05.0000Z", startTimeArg)
iferr != nil {
fmt.Println( "Could not able to parse the start time", startTimeArg)
return
}
queryFinishTime, err := time.Parse( "2006-01-02T15:04:05.0000Z", finishTimeArg)
iferr != nil {
fmt.Println( "Could not able to parse the finish time", finishTimeArg)
return
}
filestat, err := file.Stat
iferr != nil {
fmt.Println( "Could not able to get the file stat")
return
}
fileSize := filestat.Size
offset := fileSize - 1
lastLineSize := 0
for{
b := make([]byte, 1)
n, err := file.ReadAt(b, offset)
iferr != nil {
fmt.Println( "Error reading file ", err)
break
}
char := string(b[0])
ifchar == "\n"{
break
}
offset--
lastLineSize += n
}
lastLine := make([]byte, lastLineSize)
_, err = file.ReadAt(lastLine, offset+1)
iferr != nil {
fmt.Println( "Could not able to read last line with offset", offset, "and lastline size", lastLineSize)
return
}
logSlice := strings.SplitN(string(lastLine), ",", 2)
logCreationTimeString := logSlice[0]
lastLogCreationTime, err := time.Parse( "2006-01-02T15:04:05.0000Z", logCreationTimeString)
iferr != nil {
fmt.Println( "can not able to parse time : ", err)
}
iflastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {
Process(file, queryStartTime, queryFinishTime)
}
fmt.Println( "\nTime taken - ", time.Since(s))
}
func Process(f *os.File, start time.Time, end time.Time) error {
linesPool := sync.Pool{New: func interface{} {
lines := make([]byte, 250*1024)
returnlines
}}
stringPool := sync.Pool{New: func interface{} {
lines := ""
returnlines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup
for{
buf := linesPool.Get.([]byte)
n, err := r.Read(buf)
buf = buf[:n]
ifn == 0 {
iferr != nil {
fmt.Println(err)
break
}
iferr == io.EOF {
break
}
returnerr
}
nextUntillNewline, err := r.ReadBytes( '\n')
iferr != io.EOF {
buf = append(buf, nextUntillNewline...)
}
wg.Add(1)
go func{
ProcessChunk(buf, &linesPool, &stringPool, start, end)
wg.Done
}
}
wg.Wait
returnnil
}
func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {
var wg2 sync.WaitGroup
logs := stringPool.Get.(string)
logs = string(chunk)
linesPool.Put(chunk)
logsSlice := strings.Split(logs, "\n")
stringPool.Put(logs)
chunkSize := 300
n := len(logsSlice)
noOfThread := n / chunkSize
ifn%chunkSize != 0 {
noOfThread++
}
fori := 0; i < (noOfThread); i++ {
wg2.Add(1)
go func(s int, e int) {
defer wg2.Done //to avaoid deadlocks
fori := s; i < e; i++ {
text := logsSlice[i]
iflen(text) == 0 {
continue
}
logSlice := strings.SplitN(text, ",", 2)
logCreationTimeString := logSlice[0]
logCreationTime, err := time.Parse( "2006-01-02T15:04:05.0000Z", logCreationTimeString)
iferr != nil {
fmt.Printf( "\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
return
}
iflogCreationTime.After(start) && logCreationTime.Before(end) {
//fmt.Println(text)
}
}
}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
}
wg2.Wait
logsSlice = nil
}
原文链接:https://medium.com/swlh/processing-16gb-file-in-seconds-go-lang-3982c235dfa2
Kubernetes线下培训
本次培训在 上海开班, 基于最新考纲,通过 线下授课、考题解读、模拟演练等方式,帮助学员快速掌握Kubernetes的理论知识和专业技能,并针对考试做特别强化训练,让学员能从容面对CKA认证考试,使学员既能掌握Kubernetes相关知识,又能通过CKA认证考试, 学员可多次参加培训,直到通过认证。点击下方图片或者阅读原文链接查看详情。 返回搜狐,查看更多