Go 系列教程 —— 20. 并发入门

Go 是并发式语言,而不是并行式语言。在讨论 Go 如何处理并发之前,我们必须理解何为并发,以及并发与并行的区别。

并发是什么?

并发是指立即处理多个任务的能力。一个例子就能很好地说明这一点。

我们可以想象一个人正在跑步。假如在他晨跑时,鞋带突然松了。于是他停下来,系一下鞋带,接下来继续跑。这个例子就是典型的并发。这个人能够一下搞定跑步和系鞋带两件事,即立即处理多个任务。

并行是什么?并行和并发有何区别?

并行是指同时处理多个任务。这听起来和并发差不多,但其实完全不同。

我们同样用这个跑步的例子来帮助理解。假如这个人在慢跑时,还在用他的 iPod 听着音乐。在这里,他是在跑步的同时听音乐,也就是同时处理多个任务。这称之为并行。

从技术上看并发和并行

通过现实中的例子,我们已经明白了什么是并发,以及并发与并行的区别。作为一名极客,我们接下来从技术的角度来考察并发和并行。:)

假如我们正在编写一个 web 浏览器。这个 web 浏览器有各种组件。其中两个分别是 web 页面的渲染区和从网上下载文件的下载器。假设我们已经构建好了浏览器代码,各个组件也都可以相互独立地运行(通过像 Java 里的线程,或者通过即将介绍的 Go 语言中的 Go 协程来实现)。当浏览器在单核处理器中运行时,处理器会在浏览器的两个组件间进行上下文切换。它可能在一段时间内下载文件,转而又对用户请求的 web 页面进行渲染。这就是并发。并发的进程从不同的时间点开始,分别交替运行。在这里,就是在不同的时间点开始进行下载和渲染,并相互交替运行的。

如果该浏览器在一个多核处理器上运行,此时下载文件的组件和渲染 HTML 的组件可能会在不同的核上同时运行。这称之为并行。

image

并行不一定会加快运行速度,因为并行运行的组件之间可能需要相互通信。在我们浏览器的例子里,当文件下载完成后,应当对用户进行提醒,比如弹出一个窗口。于是,在负责下载的组件和负责渲染用户界面的组件之间,就产生了通信。在并发系统上,这种通信开销很小。但在多核的并行系统上,组件间的通信开销就很高了。所以,并行不一定会加快运行速度!

Go 对并发的支持

Go 编程语言原生支持并发。Go 使用 Go 协程(Goroutine) 和信道(Channel)来处理并发。

 

Go 系列教程 —— 21. Go 协程

在前面的教程里,我们探讨了并发,以及并发与并行的区别。本教程则会介绍在 Go 语言里,如何使用 Go 协程(Goroutine)来实现并发。

Go 协程是什么?

Go 协程是与其他函数或方法一起并发运行的函数或方法。Go 协程可以看作是轻量级线程。与线程相比,创建一个 Go 协程的成本很小。因此在 Go 应用中,常常会看到有数以千计的 Go 协程并发地运行。

Go 协程相比于线程的优势

  • 相比线程而言,Go 协程的成本极低。堆栈大小只有若干 kb,并且可以根据应用的需求进行增减。而线程必须指定堆栈的大小,其堆栈是固定不变的。
  • Go 协程会复用(Multiplex)数量更少的 OS 线程。即使程序有数以千计的 Go 协程,也可能只有一个线程。如果该线程中的某一 Go 协程发生了阻塞(比如说等待用户输入),那么系统会再创建一个 OS 线程,并把其余 Go 协程都移动到这个新的 OS 线程。所有这一切都在运行时进行,作为程序员,我们没有直接面临这些复杂的细节,而是有一个简洁的 API 来处理并发。
  • Go 协程使用信道(Channel)来进行通信。信道用于防止多个协程访问共享内存时发生竞态条件(Race Condition)。信道可以看作是 Go 协程之间通信的管道。我们会在下一教程详细讨论信道。

如何启动一个 Go 协程?

go

让我们创建一个 Go 协程吧。

package main

import (
    "fmt"
)

func hello() {
    fmt.Println("Hello world goroutine")
}
func main() {
    go hello()
    fmt.Println("main function")
}
go hello()hello()main()

运行一下程序,你会很惊讶!

main function
  • 启动一个新的协程时,协程的调用会立即返回。与函数不同,程序控制不会去等待 Go 协程执行完毕。在调用 Go 协程之后,程序控制会立即返回到代码的下一行,忽略该协程的任何返回值。
  • 如果希望运行其他 Go 协程,Go 主协程必须继续运行着。如果 Go 主协程终止,则程序终止,于是其他 Go 协程也不会继续运行。
go hello()hellomain functionhello

我们现在修复这个问题。

package main

import (  
    "fmt"
    "time"
)

func hello() {  
    fmt.Println("Hello world goroutine")
}
func main() {  
    go hello()
    time.Sleep(1 * time.Second)
    fmt.Println("main function")
}
Sleepgo hello()Hello world goroutinemain function

在 Go 主协程中使用休眠,以便等待其他协程执行完毕,这种方法只是用于理解 Go 协程如何工作的技巧。信道可用于在其他协程结束执行之前,阻塞 Go 主协程。我们会在下一教程中讨论信道。

启动多个 Go 协程

为了更好地理解 Go 协程,我们再编写一个程序,启动多个 Go 协程。

package main

import (  
    "fmt"
    "time"
)

func numbers() {  
    for i := 1; i <= 5; i++ {
        time.Sleep(250 * time.Millisecond)
        fmt.Printf("%d ", i)
    }
}
func alphabets() {  
    for i := 'a'; i <= 'e'; i++ {
        time.Sleep(400 * time.Millisecond)
        fmt.Printf("%c ", i)
    }
}
func main() {  
    go numbers()
    go alphabets()
    time.Sleep(3000 * time.Millisecond)
    fmt.Println("main terminated")
}
numbers125alphabeteaenumbersalphabete

该程序会输出:

1 a 2 3 b 4 c 5 d e main terminated

程序的运作如下图所示。为了更好地观看图片,请在新标签页中打开。

image

numbersalphabets0 ms250 ms123250 ms1500 ms21 a 2 3 b 4 c 5 d e main terminated

 

 

Go 系列教程 —— 22. 信道(channel)

在上一教程里,我们探讨了如何使用 Go 协程(Goroutine)来实现并发。我们接着在本教程里学习信道(Channel),学习如何通过信道来实现 Go 协程间的通信。

什么是信道?

信道可以想像成 Go 协程之间通信的管道。如同管道中的水会从一端流到另一端,通过使用信道,数据也可以从一端发送,在另一端接收。

信道的声明

所有信道都关联了一个类型。信道只能运输这种类型的数据,而运输其他类型的数据都是非法的。

chan TT
nilmake

下面编写代码,声明一个信道。

package main

import "fmt"

func main() {  
    var a chan int
    if a == nil {
        fmt.Println("channel a is nil, going to define it")
        a = make(chan int)
        fmt.Printf("Type of a is %T", a)
    }
}
nilanilaa
channel a is nil, going to define it  
Type of a is chan int

简短声明通常也是一种定义信道的简洁有效的方法。

a := make(chan int)
a

通过信道进行发送和接收

如下所示,该语法通过信道发送和接收数据。

data := <- a // 读取信道 a  
a <- data // 写入信道 a

信道旁的箭头方向指定了是发送数据还是接收数据。

aadata
aa

发送与接收默认是阻塞的

发送与接收默认是阻塞的。这是什么意思?当把数据发送到信道时,程序控制会在发送数据的语句处发生阻塞,直到有其它 Go 协程从信道读取到数据,才会解除阻塞。与此类似,当读取信道的数据时,如果没有其它的协程把数据写入到这个信道,那么读取过程就会一直阻塞着。

信道的这种特性能够帮助 Go 协程之间进行高效的通信,不需要用到其他编程语言常见的显式锁或条件变量。

信道的代码示例

理论已经够了:)。接下来写点代码,看看协程之间通过信道是怎么通信的吧。

我们其实可以重写上章学习 Go 协程 时写的程序,现在我们在这里用上信道。

首先引用前面教程里的程序。

package main

import (  
    "fmt"
    "time"
)

func hello() {  
    fmt.Println("Hello world goroutine")
}
func main() {  
    go hello()
    time.Sleep(1 * time.Second)
    fmt.Println("main function")
}

这是上一篇的代码。我们使用到了休眠,使 Go 主协程等待 hello 协程结束。如果你看不懂,建议你阅读上一教程 Go 协程。

我们接下来使用信道来重写上面代码。

package main

import (  
    "fmt"
)

func hello(done chan bool) {  
    fmt.Println("Hello world goroutine")
    done <- true
}
func main() {  
    done := make(chan bool)
    go hello(done)
    <-done
    fmt.Println("main function")
}
donedonehellodonedonetime.Sleep
<-donedone
donehellohelloHello world goroutinedonedonemain function

该程序输出如下:

Hello world goroutine  
main function
hello
package main

import (  
    "fmt"
    "time"
)

func hello(done chan bool) {  
    fmt.Println("hello go routine is going to sleep")
    time.Sleep(4 * time.Second)
    fmt.Println("hello go routine awake and going to write to done")
    done <- true
}
func main() {  
    done := make(chan bool)
    fmt.Println("Main going to call hello go goroutine")
    go hello(done)
    <-done
    fmt.Println("Main received data")
}
hello
Main going to call hello go goroutinehellohello go routine is going to sleephello<-donedonehello go routine awake and going to write to doneMain received data

信道的另一个示例

我们再编写一个程序来更好地理解信道。该程序会计算一个数中每一位的平方和与立方和,然后把平方和与立方和相加并打印出来。

例如,如果输出是 123,该程序会如下计算输出:

squares = (1 * 1) + (2 * 2) + (3 * 3) 
cubes = (1 * 1 * 1) + (2 * 2 * 2) + (3 * 3 * 3) 
output = squares + cubes = 50

我们会这样去构建程序:在一个单独的 Go 协程计算平方和,而在另一个协程计算立方和,最后在 Go 主协程把平方和与立方和相加。

package main

import (  
    "fmt"
)

func calcSquares(number int, squareop chan int) {  
    sum := 0
    for number != 0 {
        digit := number % 10
        sum += digit * digit
        number /= 10
    }
    squareop <- sum
}

func calcCubes(number int, cubeop chan int) {  
    sum := 0 
    for number != 0 {
        digit := number % 10
        sum += digit * digit * digit
        number /= 10
    }
    cubeop <- sum
} 

func main() {  
    number := 589
    sqrch := make(chan int)
    cubech := make(chan int)
    go calcSquares(number, sqrch)
    go calcCubes(number, cubech)
    squares, cubes := <-sqrch, <-cubech
    fmt.Println("Final output", squares + cubes)
}
calcSquaressquareopcalcCubescubop
squarescubes
Final output 1536

死锁

使用信道需要考虑的一个重点是死锁。当 Go 协程给一个信道发送数据时,照理说会有其他 Go 协程来接收数据。如果没有的话,程序就会在运行时触发 panic,形成死锁。

同理,当有 Go 协程等着从一个信道接收数据时,我们期望其他的 Go 协程会向该信道写入数据,要不然程序就会触发 panic。

package main

func main() {  
    ch := make(chan int)
    ch <- 5
}
chch <- 55ch
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:  
main.main()  
    /tmp/sandbox249677995/main.go:6 +0x80

单向信道

我们目前讨论的信道都是双向信道,即通过信道既能发送数据,又能接收数据。其实也可以创建单向信道,这种信道只能发送或者接收数据。

package main

import "fmt"

func sendData(sendch chan<- int) {  
    sendch <- 10
}

func main() {  
    sendch := make(chan<- int)
    go sendData(sendch)
    fmt.Println(<-sendch)
}
sendchchan<- intchan
main.go:11: invalid operation: <-sendch (receive from send-only type chan<- int)

一切都很顺利,只不过一个不能读取数据的唯送信道究竟有什么意义呢?

这就需要用到信道转换(Channel Conversion)了。把一个双向信道转换成唯送信道或者唯收(Receive Only)信道都是行得通的,但是反过来就不行。

package main

import "fmt"

func sendData(sendch chan<- int) {  
    sendch <- 10
}

func main() {  
    cha1 := make(chan int)
    go sendData(cha1)
    fmt.Println(<-cha1)
}
cha1cha1sendDatasendDatasendch chan<- intcha1sendData10

关闭信道和使用 for range 遍历信道

数据发送方可以关闭信道,通知接收方这个信道不再有数据发送过来。

当从信道接收数据时,接收方可以多用一个变量来检查信道是否已经关闭。

v, ok := <- ch
okokint0
package main

import (  
    "fmt"
)

func producer(chnl chan int) {  
    for i := 0; i < 10; i++ {
        chnl <- i
    }
    close(chnl)
}
func main() {  
    ch := make(chan int)
    go producer(ch)
    for {
        v, ok := <-ch
        if ok == false {
            break
        }
        fmt.Println("Received ", v, ok)
    }
}
producerchn1okokokok
Received  0 true  
Received  1 true  
Received  2 true  
Received  3 true  
Received  4 true  
Received  5 true  
Received  6 true  
Received  7 true  
Received  8 true  
Received  9 true

for range 循环用于在一个信道关闭之前,从信道接收数据。

接下来我们使用 for range 循环重写上面的代码。

package main

import (  
    "fmt"
)

func producer(chnl chan int) {  
    for i := 0; i < 10; i++ {
        chnl <- i
    }
    close(chnl)
}
func main() {  
    ch := make(chan int)
    go producer(ch)
    for v := range ch {
        fmt.Println("Received ",v)
    }
}
chch
Received  0  
Received  1  
Received  2  
Received  3  
Received  4  
Received  5  
Received  6  
Received  7  
Received  8  
Received  9

我们可以使用 for range 循环,重写信道的另一个示例这一节里面的代码,提高代码的可重用性。

calcSquarescalcCubes
package main

import (  
    "fmt"
)

func digits(number int, dchnl chan int) {  
    for number != 0 {
        digit := number % 10
        dchnl <- digit
        number /= 10
    }
    close(dchnl)
}
func calcSquares(number int, squareop chan int) {  
    sum := 0
    dch := make(chan int)
    go digits(number, dch)
    for digit := range dch {
        sum += digit * digit
    }
    squareop <- sum
}

func calcCubes(number int, cubeop chan int) {  
    sum := 0
    dch := make(chan int)
    go digits(number, dch)
    for digit := range dch {
        sum += digit * digit * digit
    }
    cubeop <- sum
}

func main() {  
    number := 589
    sqrch := make(chan int)
    cubech := make(chan int)
    go calcSquares(number, sqrch)
    go calcCubes(number, cubech)
    squares, cubes := <-sqrch, <-cubech
    fmt.Println("Final output", squares+cubes)
}
digitscalcSquarescalcCubesdigitscalcSquarescalcCubes
Final output 1536

本教程的内容到此结束。关于信道还有一些其他的概念,比如缓冲信道(Buffered Channel)、工作池(Worker Pool)和 select。我们会在接下来的教程里专门介绍它们。

 

Go 系列教程 —— 23. 缓冲信道和工作池(Buffered Channels and Worker Pools)

 

什么是缓冲信道?

在上一教程里,我们讨论的主要是无缓冲信道。我们在信道的教程里详细讨论了,无缓冲信道的发送和接收过程是阻塞的。

我们还可以创建一个有缓冲(Buffer)的信道。只在缓冲已满的情况,才会阻塞向缓冲信道(Buffered Channel)发送数据。同样,只有在缓冲为空的时候,才会阻塞从缓冲信道接收数据。

make
ch := make(chan type, capacity)
capacity

我们开始编写代码,创建一个缓冲信道。

示例一

package main

import (  
    "fmt"
)


func main() {  
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println(<- ch)
    fmt.Println(<- ch)
}

在上面程序里的第 9 行,我们创建了一个缓冲信道,其容量为 2。由于该信道的容量为 2,因此可向它写入两个字符串,而且不会发生阻塞。在第 10 行和第 11 行,我们向信道写入两个字符串,该信道并没有发生阻塞。我们又在第 12 行和第 13 行分别读取了这两个字符串。该程序输出:

naveen  
paul

示例二

我们再看一个缓冲信道的示例,其中有一个并发的 Go 协程来向信道写入数据,而 Go 主协程负责读取数据。该示例帮助我们进一步理解,在向缓冲信道写入数据时,什么时候会发生阻塞。

package main

import (  
    "fmt"
    "time"
)

func write(ch chan int) {  
    for i := 0; i < 5; i++ {
        ch <- i
        fmt.Println("successfully wrote", i, "to ch")
    }
    close(ch)
}
func main() {  
    ch := make(chan int, 2)
    go write(ch)
    time.Sleep(2 * time.Second)
    for v := range ch {
        fmt.Println("read value", v,"from ch")
        time.Sleep(2 * time.Second)

    }
}
chchwritewritewritechwritechch
successfully wrote 0 to ch  
successfully wrote 1 to ch
writechchchch
read value 0 from ch  
successfully wrote 2 to ch
write
successfully wrote 0 to ch  
successfully wrote 1 to ch  
read value 0 from ch  
successfully wrote 2 to ch  
read value 1 from ch  
successfully wrote 3 to ch  
read value 2 from ch  
successfully wrote 4 to ch  
read value 3 from ch  
read value 4 from ch

死锁

package main

import (  
    "fmt"
)

func main() {  
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    ch <- "steve"
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

在上面程序里,我们向容量为 2 的缓冲信道写入 3 个字符串。当在程序控制到达第 3 次写入时(第 11 行),由于它超出了信道的容量,因此这次写入发生了阻塞。现在想要这次写操作能够进行下去,必须要有其它协程来读取这个信道的数据。但在本例中,并没有并发协程来读取这个信道,因此这里会发生死锁(deadlock)。程序会在运行时触发 panic,信息如下:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:  
main.main()  
    /tmp/sandbox274756028/main.go:11 +0x100

长度 vs 容量

make

缓冲信道的长度是指信道中当前排队的元素个数。

代码可以把一切解释得很清楚。:)

package main

import (  
    "fmt"
)

func main() {  
    ch := make(chan string, 3)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println("capacity is", cap(ch))
    fmt.Println("length is", len(ch))
    fmt.Println("read value", <-ch)
    fmt.Println("new length is", len(ch))
}

在上面的程序里,我们创建了一个容量为 3 的信道,于是它可以保存 3 个字符串。接下来,我们分别在第 9 行和第 10 行向信道写入了两个字符串。于是信道有两个字符串排队,因此其长度为 2。在第 13 行,我们又从信道读取了一个字符串。现在该信道内只有一个字符串,因此其长度变为 1。该程序会输出:

capacity is 3  
length is 2  
read value naveen  
new length is 1

WaitGroup

WaitGroupWaitGroup
WaitGroupWaitGroup

理论说完了,我们编写点儿代码吧。:)

package main

import (  
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {  
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}

func main() {  
    no := 3
    var wg sync.WaitGroup
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()
    fmt.Println("All go routines finished executing")
}
WaitGroupWaitGroupWaitGroupAddintWaitGroupAddWaitGroupDone()Wait()
wg.Add(1)processwg.Wait()processwg.Donewg.Done()
wgwgWaitGroupmain

该程序输出:

started Goroutine  2  
started Goroutine  0  
started Goroutine  1  
Goroutine 0 ended  
Goroutine 2 ended  
Goroutine 1 ended  
All go routines finished executing

由于 Go 协程的执行顺序不一定,因此你的输出可能和我不一样。:)

工作池的实现

缓冲信道的重要应用之一就是实现工作池。

一般而言,工作池就是一组等待任务分配的线程。一旦完成了所分配的任务,这些线程可继续等待任务的分配。

我们会使用缓冲信道来实现工作池。我们工作池的任务是计算所输入数字的每一位的和。例如,如果输入 234,结果会是 9(即 2 + 3 + 4)。向工作池输入的是一列伪随机数。

我们工作池的核心功能如下:

  • 创建一个 Go 协程池,监听一个等待作业分配的输入型缓冲信道。
  • 将作业添加到该输入型缓冲信道中。
  • 作业完成后,再将结果写入一个输出型缓冲信道。
  • 从输出型缓冲信道读取并打印结果。

我们会逐步编写这个程序,让代码易于理解。

第一步就是创建一个结构体,表示作业和结果。

type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}
Jobidrandomnorandomno
Resultjobsumofdigits

第二步是分别创建用于接收作业和写入结果的缓冲信道。

var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)
jobsresults
digitsdigits
func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}

然后,我们写一个创建工作协程的函数。

func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
jobsjobdigitsResultresultsworkerWaitGroupwgjobsDone()
createWorkerPool
func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
wg.Add(1)WaitGroupworkerwgwg.Wait()resultsresults

现在我们已经有了工作池,我们继续编写一个函数,把作业分配给工作者。

func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
allocateJobijobsjobjobs
results
func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
resultresultsjobidresultdonedone
main()
func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
mainendTimestartTime
noOfJobsallocatejobs
doneresult
createWorkerPoolmaindone

为了便于参考,下面是整个程序。我还引用了必要的包。

package main

import (  
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}

var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)

func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

为了更精确地计算总时间,请在你的本地机器上运行该程序。

该程序输出:

Job id 1, input random no 636, sum of digits 15  
Job id 0, input random no 878, sum of digits 23  
Job id 9, input random no 150, sum of digits 6  
...
total time taken  20.01081009 seconds

程序总共会打印 100 行,对应着 100 项作业,然后最后会打印一行程序消耗的总时间。你的输出会和我的不同,因为 Go 协程的运行顺序不一定,同样总时间也会因为硬件而不同。在我的例子中,运行程序大约花费了 20 秒。

mainnoOfWorkers
...
total time taken  10.004364685 seconds
mainnoOfJobsnoOfWorkers

 

Go 系列教程 —— 24. Select

什么是 select?

selectselectselectswitchcase

示例

package main

import (  
    "fmt"
    "time"
)

func server1(ch chan string) {  
    time.Sleep(6 * time.Second)
    ch <- "from server1"
}
func server2(ch chan string) {  
    time.Sleep(3 * time.Second)
    ch <- "from server2"

}
func main() {  
    output1 := make(chan string)
    output2 := make(chan string)
    go server1(output1)
    go server2(output2)
    select {
    case s1 := <-output1:
        fmt.Println(s1)
    case s2 := <-output2:
        fmt.Println(s2)
    }
}
server1from server1chserver2from server2ch
mainserver1server2
selectselectserver1output1server2output2selectserver2output2
from server2

然后程序终止。

select 的应用

server1server2select
server1server2selectselect

默认情况

selectselect
package main

import (  
    "fmt"
    "time"
)

func process(ch chan string) {  
    time.Sleep(10500 * time.Millisecond)
    ch <- "process successful"
}

func main() {  
    ch := make(chan string)
    go process(ch)
    for {
        time.Sleep(1000 * time.Millisecond)
        select {
        case v := <-ch:
            fmt.Println("received value: ", v)
            return
        default:
            fmt.Println("no value received")
        }
    }

}
processprocess successfulch
processprocesschselectcase v := <-ch:no value received
processchprocess successfulselectreceived value: process successful
no value received  
no value received  
no value received  
no value received  
no value received  
no value received  
no value received  
no value received  
no value received  
no value received  
received value:  process successful

死锁与默认情况

package main

func main() {  
    ch := make(chan string)
    select {
    case <-ch:
    }
}
chselectchselectpanic
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:  
main.main()  
    /tmp/sandbox416567824/main.go:6 +0x80

如果存在默认情况,就不会发生死锁,因为在没有其他 case 准备就绪时,会执行默认情况。我们用默认情况重写后,程序如下:

package main

import "fmt"

func main() {  
    ch := make(chan string)
    select {
    case <-ch:
    default:
        fmt.Println("default case executed")
    }
}

以上程序会输出:

default case executed
selectnil
package main

import "fmt"

func main() {  
    var ch chan string
    select {
    case v := <-ch:
        fmt.Println("received value", v)
    default:
        fmt.Println("default case executed")

    }
}
chnilselectchselectselect
default case executed

随机选取

select
package main

import (  
    "fmt"
    "time"
)

func server1(ch chan string) {  
    ch <- "from server1"
}
func server2(ch chan string) {  
    ch <- "from server2"

}
func main() {  
    output1 := make(chan string)
    output2 := make(chan string)
    go server1(output1)
    go server2(output2)
    time.Sleep(1 * time.Second)
    select {
    case s1 := <-output1:
        fmt.Println(s1)
    case s2 := <-output2:
        fmt.Println(s2)
    }
}
server1server2selectserver1from server1output1server2from server2output2selectfrom server1from server2

请在你的本地系统上运行这个程序,获得程序的随机结果。因为如果你在 playground 上在线运行的话,它的输出总是一样的,这是由于 playground 不具有随机性所造成的。

这下我懂了:空 select

package main

func main() {  
    select {}
}

你认为上面代码会输出什么?

select
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select (no cases)]:  
main.main()  
    /tmp/sandbox299546399/main.go:4 +0x20

 

Go 系列教程 —— 25. Mutex

本教程我们学习 Mutex。我们还会学习怎样通过 Mutex 和信道来处理竞态条件(Race Condition)。

临界区

x
x = x + 1

如果只有一个 Go 协程访问上面的代码段,那都没有任何问题。

但当有多个协程并发运行时,代码却会出错,让我们看看究竟是为什么吧。简单起见,假设在一行代码的前面,我们已经运行了两个 Go 协程。

在上一行代码的内部,系统执行程序时分为如下几个步骤(这里其实还有很多包括寄存器的技术细节,以及加法的工作原理等,但对于我们的系列教程,只需认为只有三个步骤就好了):

  1. 获得 x 的当前值
  2. 计算 x + 1
  3. 将步骤 2 计算得到的值赋值给 x

如果只有一个协程执行上面的三个步骤,不会有问题。

x = x + 1

one-scenario

xxx + 1xxx + 1xxxx

现在我们考虑另外一种可能发生的情况。

another-scenario

xxx
x

在上例中,如果在任意时刻只允许一个 Go 协程访问临界区,那么就可以避免竞态条件。而使用 Mutex 可以达到这个目的

Mutex

Mutex 用于提供一种加锁机制(Locking Mechanism),可确保在某时刻只有一个协程在临界区运行,以防止出现竞态条件。

LockUnlock
mutex.Lock()  
x = x + 1  
mutex.Unlock()
x = x + 1

如果有一个 Go 协程已经持有了锁(Lock),当其他协程试图获得该锁时,这些协程会被阻塞,直到 Mutex 解除锁定为止。

含有竞态条件的程序

在本节里,我们会编写一个含有竞态条件的程序,而在接下来一节,我们再修复竞态条件的问题。

package main  
import (  
    "fmt"
    "sync"
    )
var x  = 0  
func increment(wg *sync.WaitGroup) {  
    x = x + 1
    wg.Done()
}
func main() {  
    var w sync.WaitGroup
    for i := 0; i < 1000; i++ {
        w.Add(1)        
        go increment(&w)
    }
    w.Wait()
    fmt.Println("final value of x", x)
}
incrementxDone()
incrementxx
final value of x 941final value of x 928final value of x 922

使用 Mutex

xx
package main  
import (  
    "fmt"
    "sync"
    )
var x  = 0  
func increment(wg *sync.WaitGroup, m *sync.Mutex) {  
    m.Lock()
    x = x + 1
    m.Unlock()
    wg.Done()   
}
func main() {  
    var w sync.WaitGroup
    var m sync.Mutex
    for i := 0; i < 1000; i++ {
        w.Add(1)        
        go increment(&w, &m)
    }
    w.Wait()
    fmt.Println("final value of x", x)
}
Mutexmincrementxx = x + 1m.Lock()m.Unlock()

于是如果运行该程序,会输出:

final value of x 1000

在第 18 行,传递 Mutex 的地址很重要。如果传递的是 Mutex 的值,而非地址,那么每个协程都会得到 Mutex 的一份拷贝,竞态条件还是会发生。

使用信道处理竞态条件

我们还能用信道来处理竞态条件。看看是怎么做的。

package main  
import (  
    "fmt"
    "sync"
    )
var x  = 0  
func increment(wg *sync.WaitGroup, ch chan bool) {  
    ch <- true
    x = x + 1
    <- ch
    wg.Done()   
}
func main() {  
    var w sync.WaitGroup
    ch := make(chan bool, 1)
    for i := 0; i < 1000; i++ {
        w.Add(1)        
        go increment(&w, ch)
    }
    w.Wait()
    fmt.Println("final value of x", x)
}
incrementxxtruex

该程序也输出:

final value of x 1000

Mutex vs 信道

通过使用 Mutex 和信道,我们已经解决了竞态条件的问题。那么我们该选择使用哪一个?答案取决于你想要解决的问题。如果你想要解决的问题更适用于 Mutex,那么就用 Mutex。如果需要使用 Mutex,无须犹豫。而如果该问题更适用于信道,那就使用信道。:)

由于信道是 Go 语言很酷的特性,大多数 Go 新手处理每个并发问题时,使用的都是信道。这是不对的。Go 给了你选择 Mutex 和信道的余地,选择其中之一都可以是正确的。

总体说来,当 Go 协程需要与其他协程通信时,可以使用信道。而当只允许一个协程访问临界区时,可以使用 Mutex。

就我们上面解决的问题而言,我更倾向于使用 Mutex,因为该问题并不需要协程间的通信。所以 Mutex 是很自然的选择。

我的建议是去选择针对问题的工具,而别让问题去将就工具。:)

本教程到此结束。祝你愉快。