1 前置知识点
1.1 sync.Locker
sync.Locker 是 go 标准库 sync 下定义的锁接口:
任何实现了 Lock 和 Unlock 两个方法的类,都可以作为一种锁的实现,最常见的为标准库实现的 sync.Mutex.
在 ants 中,作者不希望使用 Mutex 这种重锁,而是自定义实现了一种轻量级的自旋锁:
该锁实现原理:
(1)通过一个整型状态值标识锁的状态:0-未加锁;1-加锁;
(2)加锁成功时,即把 0 改为 1;解锁时则把 1 改为 0;改写过程均通过 atomic 包保证并发安全;
(3)加锁通过 for 循环 + cas 操作实现自旋,无需操作系统介入执行 park 操作;
(4)通过变量 backoff 反映抢锁激烈度,每次抢锁失败,执行 backoff 次让 cpu 时间片动作;backoff 随失败次数逐渐升级,封顶 16.
1.2 sync.Cond
sync.Cond 是 golang 标准库提供的并发协调器,用于支援开放人员在指定条件下阻塞和唤醒协程的操作.
1.2.1 数据结构与构造器方法
(1)成员变量 noCopy + checker 是一套组合拳,保证 Cond 在第一次使用后不允许被复制;
(2)核心变量 L,一把锁,用于实现阻塞操作;
(3)核心变量 notify,阻塞链表,分别存储了调用 Cond.Wait() 方法的次数、goroutine 被唤醒的次数、一把系统运行时的互斥锁以及链表的头尾节点.
1.2.2 Cond.Wait
(1)检查 Cond 是否在使用过后被拷贝,是则 panic;
(2)该 Cond 阻塞链表 wait 统计数加 1;
(3)当前协程释放锁,因为接下来即将被操作系统 park;
(4)将当前协程包装成节点,添加到 Cond 的阻塞队列当中,并调用 park 操作将当前协程挂起;
(5)协程被唤醒后,重新尝试获取锁.
1.2.3 Cond.Signal
(1)检查 Cond 是否在首次使用后被拷贝,是则 panic;
(2)该 Cond 阻塞链表 notify 统计数加 1;
(3)从头开始遍历阻塞链表,唤醒一个等待时间最长的 goroutine.
1.2.4 Cond.BroadCast
(1)检查 Cond 是否在首次使用后被拷贝,是则 panic;
(2)取 wait 值赋值给 notify;
(3)唤醒阻塞链表所有节点.
1.3 sync.Pool
sync.Pool 是 golang 标准库下并发安全的对象池,适合用于有大量对象资源会存在被反复构造和回收的场景,可缓存资源进行复用,以提高性能并减轻 GC 压力.
1.3.1 gmp 原理简述
g:goroutine;
m:类比内核线程;
p:调取器,通常 p 的数量等于 cpu 核数.
(1)p 为中枢,m 通过与 p 的结合,调度 g;
(2)p 有本地 g 队列和全局 g 队列,前者取 g 不加锁,后者加锁;
(3)抢占式调度,g 因为阻塞或者时间片耗尽,可能回到等待队列,最终前后可能被不同的 g 和 m 执行.
1.3.2 sync.Pool数据结构
(1)noCopy 防拷贝标志;
(2)local 类型为 [P]poolLocal 的数组,数组容量 P 为 goroutine 处理器 P 的个数;
(3)victim 为经过一轮 gc 回收,暂存的上一轮 local;
(4)New 为用户指定的工厂函数,当 Pool 内存量元素不足时,会调用该函数构造新的元素.
(1)poolLocal 为 Pool 中对应于某个 P 的缓存数据;
(2)poolLocalInternal.private:对应于某个 P 的私有元素,操作时无需加锁;
(3)poolLocalInternal.shared: 某个 P 下的共享元素链表,由于各 P 都有可能访问,因此需要加锁.
1.3.3 核心方法
I Pool.pin
(1)pin 方法内部通过 native 方法 runtime_procPin 取出当前 P 的 index,并且将当前 goroutine 与 P 进行绑定,短暂处于不可抢占状态;
(2)如果是首次调用 pin 方法,则会走进 pinSlow 方法;
(3)在pinSlow 方法中,会完成 Pool.local 的初始化,并且将当前 Pool 添加到全局的 allPool 数组中,用于 gc 回收;
II Pool.Get
(1)调用 Pool.pin 方法,绑定当前 goroutine 与 P,并且取得该 P 对应的缓存数据;
(2)尝试获取 P 缓存数据的私有元素 private;
(3)倘若前一步失败,则尝试取 P 缓存数据中共享元素链表的头元素;
(4)倘若前一步失败,则走入 Pool.getSlow 方法,尝试取其他 P 缓存数据中共享元素链表的尾元素;
(5)同样在 Pool.getSlow 方法中,倘若前一步失败,则尝试从上轮 gc 前缓存中取元素(victim);
(6)调用 native 方法解绑 当前 goroutine 与 P
(7)倘若(2)-(5)步均取值失败,调用用户的工厂方法,进行元素构造并返回.
III Put
(1)判断存入元素 x 非空;
(2)调用 Pool.pin 绑定当前 goroutine 与 P,并获取 P 的缓存数据;
(3)倘若 P 缓存数据中的私有元素为空,则将 x 置为其私有元素;
(4)倘若未走入(3)分支,则将 x 添加到 P 缓存数据共享链表的末尾;
(5)解绑当前 goroutine 与 P.
1.3.4 回收机制
存入 pool 的对象会不定期被 go 运行时回收,因此 pool 没有容量概念,即便大量存入元素,也不会发生内存泄露.
具体回收时机是在 gc 时执行的:
(1)每个 Pool 首次执行 Get 方法时,会在内部首次调用 pinSlow 方法内将该 pool 添加到迁居的 allPools 数组中;
(2)每次 gc 时,会将上一轮的 oldPools 清空,并将本轮 allPools 的元素赋给 oldPools,allPools 置空;
(3)新置入 oldPools 的元素统一将 local 转移到 victim,并且将 local 置为空.
综上可以得见,最多两轮 gc,pool 内的对象资源将会全被回收.
2 Ants
2.1 基本信息
2.2 为什么用协程池?
(1)提升性能;
(2)有一个并发资源控制的概念;
(3)协程生命周期控制.
2.3 核心数据结构
2.3.1 goWorker
goWorker 可以简单理解为一个长时间运行而不回收的协程,用于反复处理用户提交的异步任务,其核心字段包含:
(1)pool:goWorker 所属的协程池;
(2)task:goWorker 用于接收异步任务包的管道;
(3)recycleTime:goWorker 回收到协程池的时间.
2.3.2 Pool
Pool 就是所谓的协程池,其包含的成员字段如下:
(1)capacity:池子的容量
(2)running:出于运行中的协程数量
(3)lock:自制的自旋锁,保证取 goWorker 时并发安全
(4)workers:goWorker 列表,即“真正意义上的协程池”
(5)state:池子状态标识,0-打开;1-关闭
(6)cond:并发协调器,用于阻塞模式下,挂起和唤醒等待资源的协程
(7)workerCache:存放 goWorker 的对象池,用于缓存释放的 goworker 资源用于复用. 对象池需要区别于协程池,协程池中的 goWorker 仍存活,进入对象池的 goWorker 严格意义上已经销毁;
(8)waiting:标识出于等待状态的协程数量;
(9)heartbeatDone:标识回收协程是否关闭;
(10)stopHeartbeat:用于关闭回收协程的控制器函数;
(11)options:一些定制化的配置.
2.3.3 options
协程池定制化参数集合,包含配置项如下:
(1)DisablePurge:是否允许回收空闲 goWorker;
(2) ExpiryDuration: 空闲 goWorker 回收时间间隔;仅当 DisablePurge 为 false 时有效;
(3)Nonblocking:是否设置为非阻塞模式,若是,goWorker 不足时不等待,直接返回 err;
(4)MaxBlockingTasks:阻塞模式下,最多阻塞等待的协程数量;仅当 Nonblocking 为 false 时有效;
(5)PanicHandler:提交任务发生 panic 时的处理逻辑;
2.3.4 workerArray
(1)workerArray 是一个 interface,其实现包含 stack 栈版本和 queue 队列包含;
(2)该 interface 主要定义了作为数据集合的几个通用 api,以及用于回收过期 goWorker 的 api.
此处仅展示基于 stack 数据结构实现的 goWorker 列表:
(1)items:存放的 goWorker 列表;
(2)expire:用于临时存放已过期的 goWorker 集合;
下面几个方法,是 workerStack 作为一个栈的数据结构所提供的一些能力,核心方法为 insert 和 detach 分别为对栈插入或者取出一个 goWorker.
下述的 retrieveExpire 方法是从 workerStack 中获取到已经过期的 goWorker 集合;其中 goWorker 的回收时间与入栈先后顺序相关,因此可以借助 binarySearch 方法基于二分法快速获取到目标集合.
2.4 核心 api
2.4.1 pool 构造器方法
(1)读取用户传的配置参数,做一些校验和默认赋值的前处理动作;
(2)构造好 Pool 数据结构;
(3)构造好 goWorker 对象池 workerCache,声明好工厂函数;
(4)构造好 Pool 内部的 goWorker 列表;
(5)构造好 Pool 的并发协调器 cond;
(6)异步启动 goWorker 过期销毁协程.
2.4.2 pool 提交任务
(1)从 Pool 中取出一个可用的 goWorker;
(2)将用户提交的任务包添加到 goWorker 的 channel 中.
(1)声明了一个构造 goWorker 的函数 spawnWorker 用于兜底,内部实际上是从对象池 workerCache 中获取 goWorker;
(2)接下来的核心逻辑就是加锁,然后尝试从池子中取出 goWorker 执行任务;
(3)倘若池子容量超限,且池子为阻塞模式,则基于并发协调器 cond 挂起协程阻塞等待;
(4)倘若池子容量超限,且池子为非阻塞模式,直接抛回错误;
(5)倘若池子容量未超限,且未取到 goWorker,调用 spawnWorker 构造新的 goWorker 用于执行任务.
2.4.3 goWorker 运行
(1)循环 + 阻塞等待,直到获取到用户提交的异步任务包 task 并执行;
(2)执行完成 task 后,会将自己交还给协程池;
(3)倘若回归协程池失败,或者用户提交了一个空的任务包,则该 goWorker 会被销毁,销毁方式是将自身放回协程池的对象池 workerCache. 并且会调用协调器 cond 唤醒一个阻塞等待的协程.
2.4.4 pool 回收协程
Pool.revertWorker 方法用于回收 goWorker 回到协程池:
(1)回收时更新 goWorker 回收时间,用于 goWorker 的定期清理;
(2)加锁后,将 goWorker 添加回协程池;
(3)通过协调器 cond 唤醒下一个阻塞等待的协程,并解锁.
2.4.5 定期回收过期 goWorker
(1)purgePeriodically 方法开启了一个 ticker,按照用户预设的过期时间间隔轮询回收过期的 goWorker;
(2)回收的方式是往对应 goWorker 的 channel 中注入一个空值,goWorker 将会自动将自身放回协程池的对象池 workerCache 当中;
(3)倘若当前存在空闲的 goWorker 且有协程阻塞等待,会唤醒所有阻塞协程.
文末小广告:
欢迎老板们关注我的个人公众号:小徐先生的编程世界
我会不定期更新个人纯原创的编程技术博客,技术栈以 go 语言为主,让我们一起点亮更多的编程技能树吧!