对于大型的互联网应用程序,如电商平台、社交网络、金融交易平台等,每秒钟都会收到大量的请求。在这些应用程序中,需要使用高效的技术来应对高并发的请求,尤其是在短时间内处理大量的请求,如1分钟百万请求。
同时,为了降低用户的使用门槛和提升用户体验,前端需要实现参数的无感知传递。这样用户在使用时,无需担心参数传递的问题,能够轻松地享受应用程序的服务。
在处理1分钟百万请求时,需要使用高效的技术和算法,以提高请求的响应速度和处理能力。Go语言以其高效性和并发性而闻名,因此成为处理高并发请求的优秀选择。Go中有多种模式可供选择,如基于goroutine和channel的并发模型、使用池技术的协程模型等,以便根据具体应用的需要来选择适合的技术模式。
本文代码参考搬至
W1
W1 结构体类型,它有五个成员:
- WgSend 用于等待任务发送的 goroutine 完成。
- Wg 用于等待任务处理的 goroutine 完成。
- MaxNum 表示 goroutine 池的大小。
- Ch 是一个字符串类型的通道,用于传递任务。
- DispatchStop 是一个空结构体类型的通道,用于停止任务分发。
接下来是 Dispatch 方法,它将任务发送到通道 Ch 中。它通过 for 循环来发送 10 倍于 MaxNum 的任务,每个任务都是一个 goroutine。defer 语句用于在任务完成时减少 WgSend 的计数。select 语句用于在任务分发被中止时退出任务发送。
Dispatch
StartPool
StartPool
如果通道 Ch 还没有被创建,那么它将被创建。如果计数器 WgSend 还没有被创建,那么它也将被创建。如果计数器 Wg 还没有被创建,那么它也将被创建。
如果通道 DispatchStop 还没有被创建,那么它也将被创建。
MaxNum 个 goroutine
Stop
最后是 Stop 方法,它停止任务分发并等待所有任务完成。
它关闭了通道 DispatchStop,等待 WgSend 中的任务发送 goroutine 完成,然后关闭通道 Ch,等待 Wg 中的任务处理 goroutine 完成。
W2
SubWorker
子协程,它有一个 JobChan,用于接收任务。
Run:SubWorker 的方法,用于启动一个子协程,从 JobChan 中读取任务并执行。
W2
Dispatch
Dispatch:W2 的方法,用于从 ChPool 中获取 TaskChan,将任务发送给一个 SubWorker 执行。
StartPool
StartPool:W2 的方法,用于初始化协程池,启动所有子协程并把 TaskChan 存储在 ChPool 中。
Stop
Stop:W2 的方法,用于停止协程的工作,并等待所有协程结束。
DealW2 函数则是整个协程池的入口,它通过 NewWorker 方法创建一个 W2 实例,然后调用 StartPool 启动协程池,并通过 Dispatch 发送任务,最后调用 Stop 停止协程池。
个人见解
w.Wg, w.ChPool, w.QuitChan
原来是golang里如果方法传递的不是地址,那么就会做一个拷贝,所以这里调用的wg根本就不是一个对象。
传递的地方传递地址就可以了,如果不传递地址,将会出现死锁
go doSomething(i, &wg, ch)
func doSomething(index int, wg *sync.WaitGroup, ch chan int) {
造成 Goroutine 数量的剧增消耗过多系统资源,程序可能会崩溃
探究原文
在这段代码中,poolCh代表工作者池,sw.JobChan代表工作者的工作通道。当一个工作者完成了工作后,它会将工作结果发送到sw.JobChan,此时可以通过case res := <-sw.JobChan:来接收该工作的结果。
在这个代码块中,还需要处理一个退出信号quitCh。因此,第二个case <-quitCh:用于检测是否接收到了退出信号。如果接收到了退出信号,程序将打印出消息并结束。
需要注意的是,这两个case语句是互斥的,只有当工作者完成工作或收到退出信号时,才会进入其中一个语句。因此,这个循环可以保证在工作者完成工作或收到退出信号时退出。
需要读取两次sw.JobChan的原因是:第一次读取用于将工作者的工作通道放回工作者池中,这样其他工作者就可以使用该通道。第二次读取用于接收工作者的工作结果或退出信号。因此,这两次读取是为了确保能够在正确的时刻将工作者的工作通道放回工作者池中并正确地处理工作结果或退出信号。
根据w2的特点 我自己写了一个w2
但是有几个点需要注意
1.没有考虑JobChan通道的缓冲区大小,如果有大量任务被并发分配,容易导致内存占用过高;
2.每个线程都会执行无限循环,此时线程退出的条件是接收到QuitChan通道的信号,可能导致线程的阻塞等问题;
3.Dispatch函数的默认情况下只会输出"All workers busy",而不是阻塞,这意味着当所有线程都处于忙碌状态时,任务会丢失
4.线程池启动后无法动态扩展或缩小。
优化
这个优化版本改了很多次。有一些需要注意的点是,不然会一直死锁
1.使用sync.WaitGroup来确保线程池中所有线程都能够启动并运行;
2.在Stop函数中,先向SubWorker的JobChan中发送一个关闭信号,再等待所有SubWorker线程退出;
3.在Dispatch函数中,将默认情况下的输出改为阻塞等待可用通道;
w2new
AddWorkerRemoveWorker
AddWorkerRemoveWorker
测试用例
当Dispatch函数向ChPool通道获取可用通道时,会从通道中取出一个SubWorker的JobChan通道,并将任务发送到该通道中。而对于SubWorker来说,并没有进行任务的使用次数限制,所以它可以处理多个任务。
在这个例子中,当任务数量比SubWorker数量多时,一个SubWorker的JobChan通道会接收到多个任务,它们会在SubWorker的循环中按顺序依次处理,直到JobChan中没有未处理的任务为止。因此,如果任务数量特别大,可能会导致某些SubWorker的JobChan通道暂时处于未处理任务状态,而其他的SubWorker在执行任务。
在测试结果中,最后三行中出现了多个"SubWorker 0 processing job",说明SubWorker 0的JobChan通道接收了多个任务,并且在其循环中处理这些任务。下面的代码片段显示了这个过程:
SubWorker 0 的循环部分