前言
goroutine被无限制的大量创建,造成的后果就不啰嗦了,主要讨论几种如何控制goroutine的方法
控制goroutine的数量
通过channel+sync
通过WaitGroup启动指定数量的goroutine,监听channel的通知。发送者推送信息到channel,信息处理完了,关闭channel,等待goroutine依次退出。
使用semaphore
借助于x包中的semaphore,也可以进行goroutine的数量限制。
线程池
不过原本go中的协程已经是非常轻量了,对于协程池还是要根据具体的场景分析。
对于小场景使用channel+sync就可以,其他复杂的可以考虑使用第三方的协程池库。
几个开源的线程池的设计
fasthttp中的协程池实现
fasthttp比net/http效率高很多倍的重要原因,就是利用了协程池。来看下大佬的设计思路。
1、按需增长goroutine数量,有一个最大值,同时监听channel,Server会把accept到的connection放入到channel中,这样监听的goroutine就能处理消费。
2、本地维护了一个待使用的channel列表,当本地channel列表拿不到ch,会在sync.pool中取。
3、如果workersCount没达到上限,则从生成一个workerFunc监听workerChan。
4、对于待使用的channel列表,会定期清理掉超过最大空闲时间的workerChan。
看下具体实现
Start
梳理下流程:
1、首先判断下stopCh是否为nil,不为nil表示已经started了;
2、初始化wp.stopCh = make(chan struct{}),stopCh是一个标识,用了struct{}不用bool,因为空结构体变量的内存占用大小为0,而bool类型内存占用大小为1,这样可以更加最大化利用我们服务器的内存空间;
3、设置workerChanPool的New函数,然后可以在Get不到东西时,自动创建一个;如果单核cpu则让workerChan阻塞,否则,使用非阻塞,workerChan的长度设置为1;
4、启动一个goroutine,处理clean操作,在接收到退出信号,退出。
Stop
梳理下流程:
1、判断stop只能被关闭一次;
2、关闭stopCh,设置stopCh为nil;
3、停止所有的等待获取连接的workers,正在运行的workers,不需要等待他们退出,他们会在完成connection或mustStop被设置成true退出。
clean
主要是清理掉最近最少使用的workers如果他们过了maxIdleWorkerDuration时间没有提供服务
getCh
获取一个workerChan
梳理下流程:
1、获取一个可执行的workerChan,如果ready中为空,并且workersCount没有达到最大值,增加workersCount数量,并且设置当前操作createWorker = true;
2、ready中不为空,直接在ready获取一个;
3、如果没有获取到则在sync.pool中获取一个,之后再放回到pool中;
4、拿到了就启动一个workerFunc监听workerChan,处理具体的业务逻辑。
workerFunc
梳理下流程:
1、workerFunc会监听workerChan,并且在使用完workerChan归还到ready中;
2、Serve会把connection放入到workerChan中,这样workerFunc就能通过workerChan拿到需要处理的连接请求;
3、当workerFunc拿到的workerChan为nil或wp.mustStop被设为了true,就跳出for循环。
panjf2000/ants
先看下示例
示例一
示例二
设计思路
整体的设计思路
梳理下思路:
1、先初始化缓存池的大小,然后处理任务事件的时候,一个task分配一个goWorker;
2、在拿goWorker的过程中会存在下面集中情况;
- 本地的缓存中有空闲的goWorker,直接取出;
- 本地缓存没有就去sync.Pool,拿一个goWorker;
3、如果缓存池满了,非阻塞模式直接返回nil,阻塞模式就循环去拿直到成功拿出一个;
4、同时也会定期清理掉过期的goWorker,通过sync.Cond唤醒其的阻塞等待;
5、对于使用完成的goWorker在使用完成之后重新归还到pool。
go-playground/pool
go-playground/pool会在一开始就启动
先放几个使用的demo
Per Unit Work
Batch Work
来看下实现
workUnit
workUnit作为channel信息进行传递,用来给work传递当前需要执行的任务信息。
limitedPool
梳理下流程:
1、首先初始化pool的大小;
2、然后根据pool的大小启动对应数量的worker,阻塞等待channel被塞入可执行函数;
3、然后可执行函数会被放入workUnit,然后通过channel传递给阻塞的worker。
同样这里也提供了批量执行的方法
batch
梳理下流程:
1、首先初始化Batch的大小;
2、然后Queue将一个个WorkFunc放入到WorkUnit中,执行,并将结果写入到results中,全部执行完成,调用QueueComplete,发送执行完成的通知;
3、Results会打印出所有的结果集,同时监听所有的worker执行完成,关闭channel,退出。
总结
控制goroutine数量一般使用两种方式:
- 简单的场景使用sync+channel就可以了;
- 复杂的场景可以使用goroutine pool
参考
【Golang 开发需要协程池吗?】https://www.zhihu.com/question/302981392
【来,控制一下 Goroutine 的并发数量】https://segmentfault.com/a/1190000017956396
【golang协程池设计】https://segmentfault.com/a/1190000018193161
【fasthttp中的协程池实现】https://segmentfault.com/a/1190000009133154
【panjf2000/ants】https://github.com/panjf2000/ants
【golang协程池设计】https://segmentfault.com/a/1190000018193161
原文链接:https://www.cnblogs.com/ricklz/p/14656691.html