}
这种方式最简单,但是只适用于中小流量的业务,一旦业务请求的处理内容比较多、流量比较大,程序很快就撑不住了。
goroutine 之间进行通信使用 channel:
funcmain{
channel := make( chanstring, MAX_QUEUE)
gofunc(msg string) {
channel <- msg //向信道中存消息
}( "ping")
msg := <- channel //从信道中去消息
fmt.Println(msg)
}
第二种方式就是使用 channel,加入缓冲队列的内容,每次收到一个请求,将一些工作放入到队列中,每次从队列中拿出工作进行处理:
constMAX_QUEUE = 256
varchannel chanstring
funcmain{
gostartProcessor
router := gin.Default
router.Handle( "POST", "/submit", submit)
router.Run( ":8080")
}
funcinit{
channel = make( chanstring, MAX_QUEUE)
}
funcsubmit(ctx *gin.Context){
iferr := ctx.Request.ParseForm; err != nil{
ctx.String(http.StatusBadRequest, "%s", "failure")
return
}
message := ctx.PostForm( "message")
channel <- message
ctx.String(http.StatusOK, "%s", "success")
}
funcstartProcessor{
for{
select{
casemsg := <-channel:
fmt.Println( "上传信息的处理 ", msg)
}
}
}
这种方式适用于请求访问的速率小于等于队列执行的任务速率的情况,如果请求访问的速率远远大于队列执行任务的速率,很快缓冲队列就满了,后续的请求就会阻塞。
使用 Job/Worker 模式,这种可以认为是 go 使用 channel 实现了一个工作线程池,两层的通道系统,一个通道用作任务队列,一个通道用到控制处理任务时的并发量
MAX_QUEUE = 256
MAX_WORKER = 32
MAX_WORKER_POOL_SIZE = 5
)
varJobQueue chanstring
//用来管理执行管道中的任务
typeWorker struct{
WorkerPool chanchanstring
JobChannel chanstring
quit chanbool
}
funcNewWorker(workerPool chanchanstring) * Worker{
return&Worker{
WorkerPool:workerPool,
JobChannel: make( chanstring),
quit: make( chanbool),
}
}
//开启当前的 worker,循环上传 channel 中的 job;并同时监听 quit 退出标识位,判断是否关闭该 channel
func(w *Worker)Start{
gofunc{
for{
w.WorkerPool <- w.JobChannel
select{
casejob := <-w.JobChannel:
fmt.Println(job)
case<-w.quit: //收到停止信号,退出
return
}
}
}
}
//关闭该 channel,这里是将 channel 标识位设置为关闭,实际在 worker 执行中关闭
func(w *Worker)Stop{
gofunc{
w.quit <- true
}
}
typeDispatcher struct{
WorkerPool chanchanstring
quit chanbool
}
funcNewDispatcher(maxWorkers int) * Dispatcher{
pool := make( chanchanstring, maxWorkers)
return&Dispatcher{WorkerPool: pool}
}
func(d *Dispatcher)dispatcher{
for{
select{
casejob := <-JobQueue:
gofunc(job string) {
jobChannel := <-d.WorkerPool
jobChannel <- job
}(job)
case<-d.quit:
return
}
}
}
func(d *Dispatcher)Run{
fori := 0; i < MAX_WORKER_POOL_SIZE; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start
}
god.dispatcher
}
funcmain{
dispatcher := NewDispatcher(MAX_WORKER) //创建任务分派器
dispatcher.Run //任务分派器开始工作
router := gin.Default
router.Handle( "POST", "/submit", submit)
router.Run( ":8080")
}
funcinit{
JobQueue = make( chanstring, MAX_QUEUE)
}
funcsubmit(ctx *gin.Context){
iferr := ctx.Request.ParseForm; err != nil{
ctx.String(http.StatusBadRequest, "%s", "failure")
return
}
message := ctx.PostForm( "message")
JobQueue <- message
ctx.String(http.StatusOK, "%s", "success")
}
转自:github.com/guishenbumie/MyBlog/wiki
转自:github.com/guishenbumie/MyBlog/wiki
- EOF -
点击标题可跳转
1、 Go 处理 PNG 图片及 Mac 图片集转 PDF
2、 支持分布式的 go 实现即时通讯系统
3、 Golang 闭包的实现
看完本文有收获?请分享给更多人
推荐关注「Linux 爱好者」,提升Linux技能