我们在上一个教程中讨论的所有通道基本上都是无缓冲的。正如我们在通道教程中详细讨论的那样,发送和接收到无缓冲的通道都是阻塞的。

可以使用缓冲区创建通道。仅当缓冲区已满时才会阻止对缓冲通道的发送。类似地,仅当缓冲区为空时才阻止从缓冲通道接收。

make
ch := make(chan type, capacity) 

对于具有缓冲区的通道,上述语法中的容量应大于0。默认情况下,无缓冲通道的容量为0,因此在上一个教程中创建通道时省略了容量参数。

让我们写一些代码并创建一个缓冲的通道。

package main

import (  
    "fmt"
)


func main() {  
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println(<- ch)
    fmt.Println(<- ch)
}

在上面的程序中,在第9行我们创建一个容量为2的缓冲通道。由于通道的容量为2,因此可以将2个字符串写入通道而不会被阻塞。我们在第10和11写了2个字符串,通道不阻塞。我们在第12和13行读了2个字符串。这个程序打印结果:

naveen  
paul  

让我们再看一个缓冲通道的例子,其中通道的值写入并发协程并从主协程读取。这个例子将帮助我们更好地理解写入缓冲通道何时阻塞。

package main

import (  
    "fmt"
    "time"
)

func write(ch chan int) {  
    for i := 0; i < 5; i++ {
        ch <- i
        fmt.Println("successfully wrote", i, "to ch")
    }
    close(ch)
}
func main() {  
    ch := make(chan int, 2)
    go write(ch)
    time.Sleep(2 * time.Second)
    for v := range ch {
        fmt.Println("read value", v,"from ch")
        time.Sleep(2 * time.Second)

    }
}

在上面程序,在16行创建一个容量为2的整型通道,并将通道传递给write协程。然后main协程等待2s,在此期间,write协程并发执行。write协程有一个for循环,从0到4往通道写数值。但是通道的容量是2,因此wirte协程只能立刻写入0和1,然后将其阻塞。知道至少一个值从ch通道读出。所以这个程序将立即打印一下2行:

successfully wrote 0 to ch  
successfully wrote 1 to ch

在打印上述两行之后,ch通道的输入流阻塞,直到有人从该ch通道读取数据为止。由于main协程在开始从通道读取之前会休眠2s,因此程序在接下来的2s内不会打印任何东西。main协程在2s后唤醒,通过for range从ch通道读取数值并打印。然后再次睡眠2是,继续循环,直到ch通道关闭。因此,此程序将在2s后打印一下行:

read value 0 from ch  
successfully wrote 2 to ch
write协程
successfully wrote 0 to ch  
successfully wrote 1 to ch  
read value 0 from ch  
successfully wrote 2 to ch  
read value 1 from ch  
successfully wrote 3 to ch  
read value 2 from ch  
successfully wrote 4 to ch  
read value 3 from ch  
read value 4 from ch  
死锁
package main

import (  
    "fmt"
)

func main() {  
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    ch <- "steve"
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

在上面的程序中,我们将3个字符串写入容量为2的缓冲通道。到程序执行到第3次写字符串时,由于通道容量已满,因此写入被阻塞。现在需要有协程从通道中去读取数据以便继续写入,但是在这种情况下,程序并没有从该通道读取的并发协程。因此会出现死锁,程序会在运行时出现以下信息,

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:  
main.main()  
    /tmp/sandbox274756028/main.go:11 +0x100
长度和容量

缓冲通道的容量是通道可以容纳的值的数量。这是我们使用make创建缓冲通道时指定的值。

缓冲通道的长度是当前在其中排队的元素个数。

一个程序将使事情变得清晰:

package main

import (  
    "fmt"
)

func main() {  
    ch := make(chan string, 3)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println("capacity is", cap(ch))
    fmt.Println("length is", len(ch))
    fmt.Println("read value", <-ch)
    fmt.Println("new length is", len(ch))
}

在上面的程序,创建了一个容量为3的通道,也就是说该通道可以持有3个字符串。然后我们分别往通道里写入2个字符串。这样通道里就有2个字符串在队列里,因此长度是2。第13行程序,我们从通道读取一个字符串,现在通道只剩下一个字符串在队列里,因此长度变为1.所以程序的打印信息是:

capacity is 3  
length is 2  
read value naveen  
new length is 1
WaitGroup

本教程大的下一部分是关于工作池。为了更好的理解工作池,我们需要先了解

下WatiGroup,它将用来实现工作池。

WaitGroup被用来等待一批正在执行的协程。流程将被阻塞,知道所有的协程都完成执行。举个例子,我们有3个并发的从主程序产生的协程。主程序需要等待这3个协程执行完才能退出。这个可以用WaitGroup来实现。

让我们停止理论,立即编写一些代码,

package main

import (  
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {  
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}

func main() {  
    no := 3
    var wg sync.WaitGroup
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()
    fmt.Println("All go routines finished executing")
}

WaitGroup是一个结构体,首先声明一个WaitGroup变量,默认初始化值为0。WaitGroup的工作方式是一个计数器。当调用Add时,并传递一个int数值,计数器值增加。调用Done()方法是,计数器值递减。Wait()方法用来阻塞进程,直到计数器变为0。

上面的程序,我们在for循环中调用wg.Add(1),循环3次,计数器变为3。for循环同时生成3个执行协程。然后调用wg.Wait(),让主协程阻塞等待计数器变为0。在process函数中,计数器调用wg.Done()减少计数。一旦三个协程都完成执行任务,wg.Done()将被调用3次,计数器将变为0,主协程将重新执行。

由于需要修改wg的计数,所有传递给process的参数是wg的地址。

started Goroutine  2  
started Goroutine  0  
started Goroutine  1  
Goroutine 0 ended  
Goroutine 2 ended  
Goroutine 1 ended  
All go routines finished executing  

程序的执行结果可能和上面不通,这是因为,协程的调度顺序不通。

工作池的实现

缓冲通道的一个重要用途是工作池的实现。

通常,工作池是一组线程,它们正在等待分配给它们的任务。一旦完成分配的任务,他们就会再次为下一个任务提供服务。

我们通过缓冲通道来实现工人池.下面的例子我们会创建一个工人池来实现统计输入数值的数字和。例如输入234.输入就是2+3+4.收入工作池的是一列伪随机数。

下面是一些关于我们工人池的核心功能。

  • 创建一个协程池用来监听得到job的输入缓冲通道
  • 往输入的缓冲通道中增加任务
  • 当job完成时写入结果到输出的缓冲通道
  • 从输出缓冲通道中读取和打印结果

我们会一步步写出这个例子,以便理解。第一步先创建job和result的结构体


type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int

每一个job结构体拥有一个id和随机值randomno,result拥有一个job元素和统计各位数的和sunofdigits元素值。

下一步我们来创建接收job和输出result的缓冲通道


var jobs = make(chan Job, 10)  
var results = make(chan Result, 10) 

工人协程会监听jobs缓冲通道上到来的任务,一旦任务完成就把结果写入到results的缓冲通道中。

接下来下面digits函数是统计数字的各个位之和。


func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
接下来,我们创建一个工人协程
func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}

上面的函数创建了一个“工人”他负责从jobs通道中读取元素,计算结果并往results通道中写入结果。这个方法用一个wg作为参数,当所有jobs被完成时会调用wg的Done()方法。

接下来的crateWorkerPool函数会创建一个工人协程池


func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}

上面的函数有一个int输入,指定要创建多少工人。函数中调用wg.Add(1)作为协程增加的计数器,之后把wg作为入参传给worker函数,当完成创建所需要的工人协程之后,就调用wg.Wait()进入阻塞状态。当所有的协程完成执行之后,它会关闭results通道。

现在我们的工人池已经准备好了让我们从头写一个分配工作给工人的函数


func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}

上面的allocate函数需要一个int作为输入参数指定一共要分发给工人们的工作总数。randomno为一个最大值为998的随机数,通过循环创建Job结构体并写入到jobs通道中,当写入工作全部结束后就关闭jobs通道。

接下来创建读取和输出结果的函数

func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}

result函数从result通道中读取出结果值并打印job的id、随机数和统计随机数各位数之和。函数中通过写入done通道值来表示全部结果已经打印完毕。

最后来main函数:


func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

函数开始先进行存储了开始执行的时间和最后结束的时间用来计算程序运行的时间差, noOfJobs设置为100,allocate被调用添加jobs到通道。done通道用来获取结果输出完毕的状态。noOfWorkers作为createWorkerPool函数的入参,用来指定创建多少个工人协程。下面给出完整代码:


package main
 
 
import (  
    "fmt"
    "math/rand"
    "sync"
    "time"
)
 
 
type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}
 
 
var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)
 
 
func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

程序打印

Job id 1, input random no 636, sum of digits 15  
Job id 0, input random no 878, sum of digits 23  
Job id 9, input random no 150, sum of digits 6  
...
total time taken  20.01081009 seconds  

一共100行的输出记过会被打印,现在如果把main函数中的noOfWorkers值给为20,会发现total time差不多会降低一半,原因当然是工人池中的工人数量增加了一倍,从jobs协程中读取结果的效率也就多了一倍。你可以自行修改noOfJobs和boOfWorkers的值来查看不同的效果。

这一节就到这里,希望你有愉快的一天!