golang中启动一个协程不会消耗太多资源,有人认为可以不用协程池。但是当访问量增大时,可能造成内存消耗完,程序崩溃。于是写了一个协程池的Demo。

channel

基本逻辑如下:

  1. Jobs管道存放job,Results管道存放结果。
  2. 程序一启动,启动3个worker协程,等待从Jobs管道中取数据。
  3. 向Jobs管道中发送3个数据。
  4. 关闭Jobs管道。
  5. worker协程从Jobs管道中接收到数据以后,执行程序,把结果放到Results管道中。然后继续等待。
  6. 当Jobs管道中没有数据,并且Results有3个数据时。退出主程序。

代码如下:

package main

import (
    "fmt"
    "time"
)

func worker(id int) {
    go func() {
        for {
            fmt.Println("Waiting for job...")
            select {
            // Receive from channel
            case j := <-Jobs :
                fmt.Println("worker", id, "started  job", j)
                time.Sleep(time.Second)
                fmt.Println("worker", id, "finished job", j)
                Results <- true
            }
        }
    }()
}

const channelLength = 3

var (
    Jobs chan int
    Results chan bool
)

func main() {
    Jobs = make(chan int, channelLength)
    Results = make(chan bool, channelLength)

    // Start worker goroutines
    for i:= 0; i < channelLength; i++ {
        worker(i)
    }

    // Send to channel
    time.Sleep(time.Second)
    for j := 0; j < channelLength; j++ {
        Jobs <- j
    }
    close(Jobs)

    for len(Jobs) != 0 || len(Results) != channelLength  {
        time.Sleep(100 * time.Millisecond)
    }
    fmt.Println("Complete main")
}

运行结果如下:

Waiting for job...
Waiting for job...
Waiting for job...
worker 1 started  job 2
worker 2 started  job 0
worker 0 started  job 1
worker 0 finished job 1
Waiting for job...
worker 0 started  job 0
worker 2 finished job 0
Waiting for job...
worker 2 started  job 0
worker 1 finished job 2
Waiting for job...
worker 1 started  job 0
Complete main

这个程序出现问题了,bug在哪里?

开始的3次,协程运行都是正常。

worker 1 started  job 2
worker 2 started  job 0
worker 0 started  job 1
worker 0 finished job 1
worker 2 finished job 0
worker 1 finished job 2

根据设计,向Jobs管道中发送3个数据以后,就关闭了管道。此后,协程不应该再从Jobs管道中接收到数据。

for j := 0; j < channelLength; j++ {
        jobs <- j
    }
close(jobs)

实际运行中,协程接收完3个数据以后,worker还能不断的从Jobs管道中接收到数据。与设计不符。

worker 0 started  job 0
worker 2 started  job 0
worker 1 started  job 0
j := <- jobworker started
for {
            fmt.Println("Waiting for job...")
            select {
            case j := <-Jobs :
                fmt.Println("worker", id, "started  job", j)
                time.Sleep(time.Second)
                fmt.Println("worker", id, "finished job", j)
                Results <- true
            }
        }
close(Jobs)
for j := 0; j < channelLength; j++ {
        Jobs <- j
    }
//close(Jobs)

程序居然正常了。

Waiting for job...
Waiting for job...
Waiting for job...
worker 1 started  job 0
worker 0 started  job 2
worker 2 started  job 1
worker 1 finished job 0
worker 0 finished job 2
Waiting for job...
Waiting for job...
worker 2 finished job 1
Waiting for job...
Complete main
close()close()senderpanic: send on closed channel
x, ok := <-cokfalse
0job 0
// The close built-in function closes a channel, which must be either
// bidirectional or send-only. It should be executed only by the sender,
// never the receiver, and has the effect of shutting down the channel after
// the last sent value is received. After the last value has been received
// from a closed channel c, any receive from c will succeed without
// blocking, returning the zero value for the channel element. The form
//  x, ok := <-c
// will also set ok to false for a closed channel.
func close(c chan<- Type)

如果要使用close,应该怎么做

close()j:=<-jobsj0Jobs非指针数据00值0Jobsnil0nil

修改程序。新生成一个机构体Job。

type Job struct {
    JobId int
}

Jobs保存指向Job的指针。

Jobs chan *Job
func main() {
    Jobs = make(chan *Job, channelLength)
    ...
    for j := 0; j < channelLength; j++ {
        Jobs <- &Job{JobId:j}
    }
    close(Jobs)
    ...
}

在worker协程中,从管道取出Job指针以后,判断指针是否为nil。如果为nil,说明管道已经关闭,协程退出。

func worker(id int) {
    go func() {
        for {
            fmt.Println("Waiting for job...")
            select {
            // Receive from channel
            case j := <-Jobs :
                if j == nil {
                    fmt.Println("Close the worker", id)
                    return
                }
                fmt.Println("worker", id, "started  job", j.JobId)
                time.Sleep(time.Second)
                fmt.Println("worker", id, "finished job", j.JobId)
                Results <- true
            }
        }
    }()
}

运行结果达到预期。

Waiting for job...
Waiting for job...
Waiting for job...
worker 0 started  job 0
worker 1 started  job 1
worker 2 started  job 2
worker 2 finished job 2
worker 0 finished job 0
Waiting for job...
Waiting for job...
Close the worker 2
Close the worker 0
worker 1 finished job 1
Waiting for job...
Close the worker 1
Complete main

附上最终的代码。

package main

import (
    "fmt"
    "time"
)

type Job struct {
    JobId int
}

func worker(id int) {
    go func() {
        for {
            fmt.Println("Waiting for job...")
            select {
            // Receive from channel
            case j := <-Jobs :
                if j == nil {
                    fmt.Println("Close the worker", id)
                    return
                }
                fmt.Println("worker", id, "started  job", j.JobId)
                time.Sleep(time.Second)
                fmt.Println("worker", id, "finished job", j.JobId)
                Results <- true
            }
        }
    }()
}

const channelLength = 3

var (
    Jobs chan *Job
    Results chan bool
)

func main() {
    Jobs = make(chan *Job, channelLength)
    Results = make(chan bool, channelLength)

    // Start worker goroutines
    for i:= 0; i < channelLength; i++ {
        worker(i)
    }

    // Send to channel
    time.Sleep(time.Second)
    for j := 0; j < channelLength; j++ {
        Jobs <- &Job{JobId:j}
    }
    close(Jobs)

    for len(Jobs) != 0 || len(Results) != channelLength  {
        time.Sleep(100 * time.Millisecond)
    }
    fmt.Println("Complete main")
}