Go 语言最大的特色之一,就是其从语言的层面支持并发。Go 语言使用了其特有的 goroutine 作为最基本的并发执行单元,以协程的方式,实现了更加轻量和高效的并发执行。然而,goroutine 缺乏一个高级的管理机制,原生情况下使用,要实现动态调整数量、内存资源复用、错误处理等,往往需要编写比较多的底层代码逻辑。Ants,这个 goroutine 池实现,提供了对于大规模 goroutine 的管理功能,相比原生实现,资源使用率和执行性能都有了很大的提升。
简介Ants,是 panjf2000 在 Github 上开源的高性能 goroutine 池,项目位于 ,目前版本为 v2.4.0。Ants 实现了对于大规模 goroutine 的调度管理和复用,允许使用者在开发 Golang 并发程序时限制 goroutine 数量,复用资源,达到更高效执行任务的效果。Ants 提供了大量有用的接口,包括:任务提交、获取运行中的 goroutine 数量、动态调整池带下、释放和重启池等。Ants 通过优秀的资源复用策略,极大地节省内存使用量,在大规模批量并发任务场景下,比原生的 goroutine 实现的并发具有更高的性能。
安装Ants 使用 Go 语言开发,需要 Go 1.8.x 以上。Ants 目前同时维护 v1 和 v2 版本,安装 v1 版本:
go get -u github.com/panjf2000/ants
v2 版本需要使用 go module 支持,开启 GO111MODULE=on:
go get -u github.com/panjf2000/ants/v2
示例
Ants 对于任务的执行原理比较直观,通过一个工作池的形式维护 goroutine 集合。当向工作池提交任务时,从池中取出 worker 来执行。如果已经存在可用的 goroutine 了,那么直接开始执行,如果没有,则需要判断是否已经达到容量上限。如果还没有超过,那就意味着可用的 worker 比容量更少,此时启动新的 worker 来执行。而如果容量已经用完,就依据是否为阻塞模式,来马上返回,或是阻塞等待。
当任务执行完毕,对应的 worker 就会得到释放,重新回到池中,等待下一个任务的调度,实现 goroutine 的复用。
完整的工作池 worker 调度的逻辑和流程如下:
Ants 支持不同的使用方式,可以直接使用 Submit 接口,使用默认配置的工作池完成任务执行。Submit 函数的定义如下:
func Submit(task func()) error
通过提供一个函数类型的任务参数,来把任务提交到工作池执行。我们来看一个简单的使用例子:
package main
import (
"fmt"
"sync"
"time"
"github.com/panjf2000/ants/v2"
)
func demoFunc() {
time.Sleep(10 * time.Millisecond)
fmt.Println("Hello World!")
}
func main() {
defer ants.Release()
runTimes := 1000
var wg sync.WaitGroup
syncCalculateSum := func() {
demoFunc()
wg.Done()
}
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = ants.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n")
}
在这个例子中,定义了一个简单的任务函数 demoFunc,短暂休眠后打印 Hello World。在 main 函数中,使用了 sync.WaitGroup 来进行并发控制,把 demoFunc 包裹成为一个并发任务函数 syncCalculateSum。我们要把这个任务执行 1000 次,就可以通过循环,进行 1000 次的 ants.Submit 调用,把所有任务都提交到工作池执行。提交完成后,等待任务完成。程序在完成了 1000 次的 Hello World 打印后,最终完成了任务执行。
除了使用默认的工作池外,我们还可以自己实例化一个工作池,并提供容量和任务函数,使用 NewPoolWithFunc 简单完成 goroutine 池的创建:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/panjf2000/ants/v2"
)
var sum int32
func myFunc(i interface{}) {
n := i.(int32)
atomic.AddInt32(∑, n)
fmt.Printf("run with %d\n", n)
}
func main() {
runTimes := 1000
// 创建一个容量为10的goroutine池
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
wg.Done()
})
defer p.Release()
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = p.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
}
可以看到,使用 ants.NewPoolWithFunc,创建了一个自定义容量和任务的函数工作池,任务函数可以提供一个 interface{} 参数,方便传递数据。然后,通过函数工作池的 Invoke 接口,完成任务参数的传递和任务的提交。在这个例子中,实现了从 0 到 1000 的并发求和,最终打印出计算结果。
此外,我们还可以使用最基础的方法 NewPool 来进行 ants.Pool 结构的实例化:
p, _ := ants.NewPool(10000)
NewPool 的函数签名如下:
func NewPool(size int, options ...Option) (*Pool, error)
其接收一个容量参数,以及其他配置参数,返回指向 Pool 类型实例的指针和错误。我们可以使用 options 参数进行更为细化的配置,配置参数包括:
- ExpiryDuration:清理 goroutine 的时间间隔。每隔一段时间,Ants 就会对池中未被使用的 goroutine 进行清理,减少内存占用;
- PreAlloc:是否在初始化工作池时预分配内存。对于一个超大容量,且任务耗时长的工作池来说,预分配内存可以大幅降低 goroutine 池中的内存重新分配损耗;
- MaxBlockingTasks:阻塞任务的最大数,0代表无限制;
- Nonblocking:工作池是否是非阻塞的,这决定了 Pool.Submit 接口在提交任务时是否会被阻塞;
- PanicHandler:任务崩溃时的处理函数;
- Logger:日志记录器
这些参数既可以在初始化的时候通过 Option 传递,也可以使用链式调用的方法实现配置叠加,利用 WithExpiryDuration、WithPreAlloc 等方法实现。
Ants 的工作池的容量需要在初始化的时候提供,但它并不是一成不变的,可以通过 Tune 接口实现 goroutine 池容量的动态调整:
pool.Tune(1000)
pool.Tune(100000)
这个方法时线程安全的,不必担心动态调整带来的数据并发问题。
在使用完成后,需要对工作池进行资源释放,一般通过 defer 机制调用:
pool.Release()
也可以通过 Reboot 方法,把一个已经释放资源被销毁的池重新激活,投入使用:
pool.Reboot()
Ants 以其高性能和低消耗著称,自然有测试依据。项目作者进行了 1000 万大规模并发任务执行的性能测试,Ants 使用 70 万的 goroutine 就完成了全部任务,执行速度比原生 goroutine 提高了 100%,且内存消耗保持在不使用 Pool 的 40%。此外,还进行了吞吐量测试,使用 Ants 的吞吐性能达到了原生 goroutine 的 2 到 6 倍,而内存消耗则达到 10 到 20 倍的降低。从测试结果来看,Ants 的高性能特性名不虚传。
总结Ants 作为一个高性能 goroutine 池,提供了比原生 goroutine 实现更为高级的调度管理和复用机制,抽象层次更高,且充分利用池化策略,使用尽可能少的 goroutine 数量和内存占用,以更快的速度完成并发任务的执行,在大规模和高吞吐场景下,具备很强的性能优势。Ants 项目代码整洁,注释详尽,文档丰富,对于 goroutine 并发模型有较深的理解,对相关领域感兴趣的开发者可以进行参考学习。