golang中启动一个协程不会消耗太多资源,有人认为可以不用协程池。但是当访问量增大时,可能造成内存消耗完,程序崩溃。于是写了一个协程池的Demo。
channel
基本逻辑如下:
- Jobs管道存放job,Results管道存放结果。
- 程序一启动,启动3个worker协程,等待从Jobs管道中取数据。
- 向Jobs管道中发送3个数据。
- 关闭Jobs管道。
- worker协程从Jobs管道中接收到数据以后,执行程序,把结果放到Results管道中。然后继续等待。
- 当Jobs管道中没有数据,并且Results有3个数据时。退出主程序。
代码如下:
package main
import (
"fmt"
"time"
)
func worker(id int) {
go func() {
for {
fmt.Println("Waiting for job...")
select {
// Receive from channel
case j := <-Jobs :
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
Results <- true
}
}
}()
}
const channelLength = 3
var (
Jobs chan int
Results chan bool
)
func main() {
Jobs = make(chan int, channelLength)
Results = make(chan bool, channelLength)
// Start worker goroutines
for i:= 0; i < channelLength; i++ {
worker(i)
}
// Send to channel
time.Sleep(time.Second)
for j := 0; j < channelLength; j++ {
Jobs <- j
}
close(Jobs)
for len(Jobs) != 0 || len(Results) != channelLength {
time.Sleep(100 * time.Millisecond)
}
fmt.Println("Complete main")
}
运行结果如下:
Waiting for job...
Waiting for job...
Waiting for job...
worker 1 started job 2
worker 2 started job 0
worker 0 started job 1
worker 0 finished job 1
Waiting for job...
worker 0 started job 0
worker 2 finished job 0
Waiting for job...
worker 2 started job 0
worker 1 finished job 2
Waiting for job...
worker 1 started job 0
Complete main
这个程序出现问题了,bug在哪里?
开始的3次,协程运行都是正常。
worker 1 started job 2
worker 2 started job 0
worker 0 started job 1
worker 0 finished job 1
worker 2 finished job 0
worker 1 finished job 2
根据设计,向Jobs管道中发送3个数据以后,就关闭了管道。此后,协程不应该再从Jobs管道中接收到数据。
for j := 0; j < channelLength; j++ {
jobs <- j
}
close(jobs)
实际运行中,协程接收完3个数据以后,worker还能不断的从Jobs管道中接收到数据。与设计不符。
worker 0 started job 0
worker 2 started job 0
worker 1 started job 0
j := <- jobworker started
for {
fmt.Println("Waiting for job...")
select {
case j := <-Jobs :
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
Results <- true
}
}
close(Jobs)
for j := 0; j < channelLength; j++ {
Jobs <- j
}
//close(Jobs)
程序居然正常了。
Waiting for job...
Waiting for job...
Waiting for job...
worker 1 started job 0
worker 0 started job 2
worker 2 started job 1
worker 1 finished job 0
worker 0 finished job 2
Waiting for job...
Waiting for job...
worker 2 finished job 1
Waiting for job...
Complete main
close()close()senderpanic: send on closed channel
x, ok := <-cokfalse
0job 0
// The close built-in function closes a channel, which must be either
// bidirectional or send-only. It should be executed only by the sender,
// never the receiver, and has the effect of shutting down the channel after
// the last sent value is received. After the last value has been received
// from a closed channel c, any receive from c will succeed without
// blocking, returning the zero value for the channel element. The form
// x, ok := <-c
// will also set ok to false for a closed channel.
func close(c chan<- Type)
如果要使用close,应该怎么做
close()j:=<-jobsj0Jobs非指针数据00值0Jobsnil0nil
修改程序。新生成一个机构体Job。
type Job struct {
JobId int
}
Jobs保存指向Job的指针。
Jobs chan *Job
func main() {
Jobs = make(chan *Job, channelLength)
...
for j := 0; j < channelLength; j++ {
Jobs <- &Job{JobId:j}
}
close(Jobs)
...
}
在worker协程中,从管道取出Job指针以后,判断指针是否为nil。如果为nil,说明管道已经关闭,协程退出。
func worker(id int) {
go func() {
for {
fmt.Println("Waiting for job...")
select {
// Receive from channel
case j := <-Jobs :
if j == nil {
fmt.Println("Close the worker", id)
return
}
fmt.Println("worker", id, "started job", j.JobId)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j.JobId)
Results <- true
}
}
}()
}
运行结果达到预期。
Waiting for job...
Waiting for job...
Waiting for job...
worker 0 started job 0
worker 1 started job 1
worker 2 started job 2
worker 2 finished job 2
worker 0 finished job 0
Waiting for job...
Waiting for job...
Close the worker 2
Close the worker 0
worker 1 finished job 1
Waiting for job...
Close the worker 1
Complete main
附上最终的代码。
package main
import (
"fmt"
"time"
)
type Job struct {
JobId int
}
func worker(id int) {
go func() {
for {
fmt.Println("Waiting for job...")
select {
// Receive from channel
case j := <-Jobs :
if j == nil {
fmt.Println("Close the worker", id)
return
}
fmt.Println("worker", id, "started job", j.JobId)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j.JobId)
Results <- true
}
}
}()
}
const channelLength = 3
var (
Jobs chan *Job
Results chan bool
)
func main() {
Jobs = make(chan *Job, channelLength)
Results = make(chan bool, channelLength)
// Start worker goroutines
for i:= 0; i < channelLength; i++ {
worker(i)
}
// Send to channel
time.Sleep(time.Second)
for j := 0; j < channelLength; j++ {
Jobs <- &Job{JobId:j}
}
close(Jobs)
for len(Jobs) != 0 || len(Results) != channelLength {
time.Sleep(100 * time.Millisecond)
}
fmt.Println("Complete main")
}