golang并发读取文件并开启工作池处理数据
func main() {
strCh := make(chan string)
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for str := range strCh {
log.Info.Println(str)
}
}()
}
ReadFiles2Chan(filePaths, strCh)
close(strCh)
wg.Wait()
log.Info.Println("正常退出")
}
// 并发读取文件将文件行放入管道
func ReadFiles2Chan(filePaths []string, strCh chan string) {
var wg sync.WaitGroup
for _, filePath := range filePaths {
wg.Add(1)
go func(path string) {
defer wg.Done()
f, err := os.Open(path)
if err != nil {
log.Error.Fatal(err)
}
scanner := bufio.NewScanner(mahonia.NewDecoder("gbk").NewReader(f))
scanner.Split(bufio.ScanLines)
i := -1
for scanner.Scan() {
if i++; i < 2 {
continue
}
strCh <- scanner.Text()
}
}(filePath)
}
wg.Wait()
}