对于 Go,并发性已经是一等公民,因此将并发代码添加到 Go 项目通常是一个很低的门槛。但是正确地做到这一点仍然很困难,而且我在并发 Go 代码中看到(并犯下)了很多错误,例如:
- 没有正确清理资源。
- 造成死锁。
- 由于单个 goroutine 中的 panic 而导致整个程序崩溃。
特别是,很难编写出在出现 panic 时仍能合理运行的并发代码。
我们不希望在派生的 goroutine 中发生panic时整个进程崩溃,我们希望避免死锁或泄漏的 goroutines 等可能由panic触发的其他问题。
Go 没有提供一种简单的方法来本地执行此操作。
所以构建conc这个库,它使编写并发代码更加优雅并减少了样板代码的数量。conc下面的代码显示了使用Go 标准库代替时可以减少多少样板文件。
标准:
type propagatedPanic struct { val any stack []byte } func main() { done := make(chan *propagatedPanic) go func() { defer func() { if v := recover(); v != nil { done <- &propagatedPanic{ val: v, stack: debug.Stack(), } } else { done <- nil } }() doSomethingThatMightPanic() }() if val := <-done; val != nil { panic(val) } }
使用conc:
func main() { var wg conc.WaitGroup wg.Go(doSomethingThatMightPanic) // panics with a nice stacktrace wg.Wait() }
使用 conc 进行并行流处理
在 Sourcegraph,我们对有序流进行了大量并行处理。在搜索大量代码时,我们通常会得到要进行后处理的结果流。流中的每个结果都可能需要网络请求,例如,查找存储库的权限或获取搜索结果的完整文件内容。
为此,我们始终希望:
- 并行执行网络请求。
- 尽快将结果显示给用户。
- 保持流的顺序(因为我们已经对结果进行了排序)。
很难同时获得所有这三个权利,因此我在编写concStream 包时的目标之一是尽可能多地抽象出该工作流的复杂性。
现在我可以使用类似于下面示例的代码一次获取多个文件的内容。这样可以高效安全地获取每个文件的内容,同时仍然保持流的原始顺序。
func streamFileContents(ctx context.Context, fileNames <-chan string, fileContents chan<- string) { s := stream.New() for fileName := range fileNames { fileName := fileName s.Go(func() stream.Callback { contents := fetchFileContents(ctx, fileName) return func() { fileContents <- contents } }) } s.Wait() }
conc 的目标
conc希望用更好的方法来处理panic,避免泄漏 Goroutines,或者只是拥有更易读的并发代码。