开篇

golang.org/x/sync

业务研发中我们经常会遇到需要调用多个下游的场景,比如加载一个商品的详情页,你可能需要访问商品服务,库存服务,券服务,用户服务等,才能从各个数据源获取到所需要的信息,经过一些鉴权逻辑后,组装成前端需要的数据格式下发。

串行调用当然可以,但这样就潜在的给各个调用预设了【顺序】,必须执行完 A,B,C 之后才能执行 D 操作。但如果我们对于顺序并没有强需求,从语义上看两个调用是完全独立可并发的,那么我们就可以让他们并发执行。

这个时候就可以使用 errgroup 来解决问题。一定意义上讲,errgroup 是基于 WaitGroup 在错误传递上进行一些优化而提供出来的能力。它不仅可以支持 context.Context 的相关控制能力,还可以将子任务的 error 向上传递。

errgroup 源码拆解

golang.org/x/sync/errgroup

Group

Group 就是对我们上面提到的一堆子任务执行计划的抽象。每一个子任务都会有自己对应的 goroutine 来执行。

通过这个结构我们也可以看出来,errgroup 底层实现多个 goroutine 调度,等待的能力还是基于 sync.WaitGroup。

WithContext

我们可以调用 errgroup.WithContext 函数来创建一个 Group。

这里可以看到,Group 的 cancel 函数本质就是为了支持 context 的 cancel 能力,初始化的 Group 只有一个 cancel 属性,其他都是默认的。一旦有一个子任务返回错误,或者是 Wait 调用返回,这个新 Context 就会被 cancel。

Wait

本质上和 WaitGroup 的 Wait 方法语义一样,既然我们是个 group task,就需要等待所有任务都执行完,这个语义就由 Wait 方法提供。如果有多个子任务返回错误,它只会返回第一个出现的错误,如果所有的子任务都执行成功,就返回 nil。

Wait 的实现非常简单。一个前置的 WaitGroup Wait,结束后只做了两件事:

  • 如果对于公共的 Context 有 cancel 函数,就将其 cancel,因为事情办完了;
  • 返回公共的 Group 结构中的 err 给调用方。

Go

func() error

我们重点来分析下 Go 这里发生了什么。

WaitGroup 加 1 用作计数;

启动一个新的 goroutine 执行调用方传入的 f() 函数;

  • 若 err 为 nil 说明执行正常;
  • 若 err 不为 nil,说明执行出错,此时将这个返回的 err 赋值给全局 Group 的变量 err,并将 context cancel 掉。注意,这里的处理在 once 分支中,也就是只有第一个来的错误会被处理。

在 defer 语句中调用 Group 的 done 方法,底层依赖 WaitGroup 的 Done,说明这一个子任务结束。

这里也可以看到,其实所谓 errgroup,我们并不是将所有子任务的 error 拼成一个字符串返回,而是直接在 Go 方法那里将第一个错误返回,底层依赖了 once 的能力。

SetLimit

其实看到这里,你有没有觉得 errgroup 的功能有点鸡肋?底层核心技术都是靠 WaitGroup 完成的,自己只不过是起了个 goroutine 执行方法,err 还只能保留一个。而且 Group 里面的 sem 那个 chan 是用来干什么呢?

这一节我们就来看看,Golang 对 errgroup 能力的一次扩充。

到目前为止,errgroup 是可以做到一开始人们对它的期望的,即并发执行子任务。但问题在于,这里是每一个子任务都开了个goroutine,如果是在高并发的环境里,频繁创建大量goroutine 这样的用法很容易对资源负载产生影响。开发者们提出,希望有办法限制 errgroup 创建的 goroutine 数量,参照这个 proposal: #27837

g.sem = make(chan token, n)

回忆一下我们在 Go 方法 defer 调用 done 中的表现,是不是清晰了很多?我们来理一下:

首先,Group 结构体就包含了 sem 变量,只作为通信,元素是空结构体,不包含实际意义:

如果你对整个 Group 的 Limit 没有要求,which is fine,你就直接忽略这个 SetLimit,errgroup 的原有能力就可以支持你的诉求。

但是,如果你希望保持 errgroup 的 goroutine 在一个上限之内,请在调用 Go 方法前,先 SetLimit,这样 Group 的 sem 就赋值为一个长度为 n 的 channel。

那么,当你调用 Go 方法时,注意下面的框架代码,此时 g.sem 不为 nil,所以我们会塞一个 token 进去,作为占坑,语义上表示我申请一个 goroutine 用。

当然,如果此时 goroutine 数量已经达到上限,这里就会 block 住,直到别的 goroutine 干完活,sem 输出了一个 token之后,才能继续往里面塞。

g.done

这样就把 sem 的用法串起来了。我们通过创建一个定长的channel来实现对于 goroutine 数量的管控,对于channel实际包含的元素并不关心,所以用一个空结构体省一省空间,这是非常优秀的设计,大家平常也可以参考。

TryGo

TryGo 和 SetLimit 这套体系其实都是不久前欧长坤大佬提交到 errgroup 的能力。

func() error
Go

使用方法

这里我们先看一个最常见的用法,针对一组任务,每一个都单独起 goroutine 执行,最后获取 Wait 返回的 err 作为整个 Group 的 err。

你会发现,最后 err 打印出来就是第二个子任务的 err。

当然,上面这个 case 是我们正好只有一个报错,但是,如果实际有多个任务都挂了呢?

从完备性来考虑,有没有什么办法能够将多个任务的错误都返回呢?

这一点其实 errgroup 库并没有提供非常好的支持,需要开发者自行做一些改造。因为 Group 中只有一个 err 变量,我们不可能基于 Group 来实现这一点。

f()

可以看到,我们声明了一个 result slice,长度为 3。这里不用担心并发问题,因为每个 goroutine 读写的位置是确定唯一的。

f()

Go 官方文档中的利用 errgroup 实现 pipeline 的示例也很有参考意义:由一个子任务遍历文件夹下的文件,然后把遍历出的文件交给 20 个 goroutine,让这些 goroutine 并行计算文件的 md5。

这里贴出来简化代码学习一下.

其实本质上还是 channel发挥了至关重要的作用,这里建议大家有时间尽量看看源文章:pkg.go.dev/golang.org/…

对于用 errgroup 实现 pipeline 模式有很大帮助。

结束语

今天我们学习了 errgroup 的源码已经新增的 SetLimit 能力,其实看过了 sync 相关的这些库,整体用到的能力其实大差不差,很多设计思想都是非常精巧的。比如 errgroup 中利用定长 channel 实现控制 goroutine 数量,空结构体节省内存空间。

并且 sync 包的实现一般都还是非常简洁的,比如 once,singleflight,semaphore 等。建议大家有空的话自己过一遍,对并发和设计模式的理解会更上一个台阶。

errgroup 本身并不复杂,业界也有很多封装实现,大家可以对照源码再思考一下还有什么可以改进的地方。