场景:分群用户推荐业务数据。
方案1:直接启用goroutine
高并发的场景下,不对 goroutine数进行控制,你的 CPU 使用率暴涨,内存占用暴涨,直至程序奔溃。如果此操作落地至数据库,例如 mysql,那么相应的,你数据库的服务器磁盘IO、网络带宽 、CPU负载、内存消耗都会非常高,小心崩溃。
CPU翻倍,高峰期更甚。
func RecHandler(w http.ResponseWriter, r *http.Request) {
var wg sync.WaitGroup
wg.Add(1)
go FetchUserGroup(&wg) // 请求用户分群
wg.Wait()
}
方案2: select + 带缓冲区channel
channel 满了之后, 后续的请求将会被阻塞等待。响应时间会大幅度增加, 甚至不再有任何的响应。
// var recQueue chan Rec = make(chan Rec, 200)
func RecHandler(w http.ResponseWriter, r *http.Request) {
var p Rec
recQueue <- p // 写满将堵塞。
}
/*
for {
select {
case rec := <-recQueue: // 读
FetchUserGroup()
}
}
*/
方案3(👍):
1. 请求: 把任务放入JobQueue。
package main
import (
"fmt"
"log"
"net/http"
"time"
)
const (
MaxWorker = 100 // 随便设置值
MaxQueue = 200 // 随便设置值
)
// 一个可以发送工作请求的缓冲 channel
var JobQueue chan Job
func init() {
JobQueue = make(chan Job, MaxQueue)
}
type Payload struct{}
type Job struct {
PayLoad Payload
}
// 接收请求,把任务筛入JobQueue。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
work := Job{PayLoad: Payload{}}
JobQueue <- work
_, _ = w.Write([]byte("操作成功"))
}
func main() {
// 通过调度器创建worker,监听来自 JobQueue的任务
d := NewDispatcher()
d.Run()
http.HandleFunc("/payload", payloadHandler)
log.Fatal(http.ListenAndServe(":8099", nil))
}
2. Dispatcher调度器:循环读取JobQueue
一个Dispatcher 管理 多个Worker。
// 初始化操作
type Dispatcher struct {
WorkerPool chan chan Job
}
func NewDispatcher() *Dispatcher {
pool := make(chan chan Job, MaxWorker)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// 1. 开始运行 n 个 worker
for i := 0; i < MaxWorker; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
}
// 2. 循环读取JobQueue,随机选取一个Worker执行任务
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
go func(job Job) {
// 阻塞直到获取一个可用的worker job channel
jobChannel := <-d.WorkerPool
// 分发任务到 worker job channel 中
jobChannel <- job
}(job)
}
}
}
3.Worker: 从Dispatcher获取任务
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool),
}
}
// 开启一个 worker 循环监听或退出channel
func (w Worker) Start() {
go func() {
for {
// 将当前的 worker 注册到 worker 队列中
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// 真正业务的地方,模拟操作耗时
time.Sleep(500 * time.Millisecond)
fmt.Printf("上传成功:%v\n", job)
case <-w.quit:
return
}
}
}()
}
func (w Worker) stop() {
go func() {
w.quit <- true
}()
}
tips:是因为提前创建好goroutine吗?
使用 Go 每分钟处理百万请求
原文:http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/