第7章 Go并发编程

7.1 并发与并行

  1. 并发(Concurrent)

多进程指令被CPU快速轮换执行。宏观上,多个进程同时执行。微观上,多个进程非同时执行,时间分成若干段,多个进程快速交替执行。

操作系统进程的并发:CPU划分时间片段(时间区间),进程在时间区间之间来回切换处理。CPU处理速度快,时间间隔处理得当,用户感觉是多个进程同时进行。

  1. 并行(Parallel)

同一时刻多条指令在多个处理器上同时执行。

  1. 区别
  • 并发偏重于多个任务交替执行,而多个任务间可能是串行的。并发是逻辑上的同时发生(simultaneous)。串行通讯,传输1bit信号。
  • 并行偏重于同时执行。是物理上的同时发生。并行通讯,传输多bit信号。

7.2 进程、线程和协程

  1. 进程(Process)

运行中的可执行程序。

  1. 线程(Thread)

轻量级进程(Lightweight Process,LWP),程序执行流的最小单位。由线程ID、当前指令指针(PC)、寄存器集合和堆栈组成。

线程是进程的一个实体,与同属同一进程的其他线程共享进程的全部资源。

线程拥有独立栈,共享堆。

线程具有5种状态:初始化、可运行、运行中、阻塞、销毁。

线程和进程都由操作系统管理。线程是最小的执行单元,进程是最小的资源管理单元。

  1. 协程(Coroutines)

协程是比线程更加轻量级的一种特殊函数。协程不是操作系统内核管理,而是程序控制,即在用户态执行。提升性能,不会像线程切换那样消耗资源。

一个进程可以包含多个线程,一个线程可以包含多个协程。一个线程中的多个协程始终是串行的(函数都是串行运行),无论CPU多少核。

  1. 对比
  • 协程仅是特殊函数,与线程和进程不是一个维度。
  • 一个进程可以包含多个线程,一个线程可以包含多个协程。
  • 一个线程内的多个协程可以切换,但协程始终是串行执行,无法利用CPU的多核能力。
  • 进程由操作系统自己的切换策略来切换,用户无感。切换内容包括页全局目录、内核栈和硬件上下文,切换内容被保存在内存中。采用“从用户态到内核态再到用户态”的方式,切换效率低。
  • 线程由操作系统自己的切换策略来切换,用户无感。切换内容包括内核栈和硬件上下文,切换内容被保存在内核栈中。采用“从用户态到内核态再到用户态”的方式,切换效率中等。
  • 协程的切换由用户(编程者或应用程序)决定。切换内容包括硬件上下文,切换内容被保存在用户自己的变量(用户栈或堆)中。只有“用户态”,切换效率高。

7.3 Go并发模型简介

  1. 多线程共享内存模型

访问共享数据(数组、map、结构体或对象等)时,通过锁来访问。衍生出线程安全的数据结构,Go通过sync包来实现。

  1. CSP(Communicating Sequential Processes)并发模型

CSP并发模型理念是不通过共享内存来通信,而是通过通信来共享内存。

Go通过goroutine和通道(channel)来实现。goroutine时并发执行单位,类似协程。通道是goroutine间的通信管道,类似UNIX中的管道。

package main

import "fmt"

func main() {
	message := make(chan string)

	go func() {messages <- "ping"}()
	
	msg := <-message
	fmt.Println(msg)
}

7.4 goroutine和channel实现并发

7.4.1 goroutine简介

go func(param1, param2) {
	//...
}(var1, var2)
package main

import (
	"fmt"
	"time"
)

func Echo(s string) {
	for i := 0; i < 3; i++ {
		time.Sleep(100*time.Millisecond)
		fmt.Println(s)
	}
}

func main() {
	go Echo("go")
	Echo("web program")
}

goroutine的调度方式是协同式的,没有时间片概念。
goroutine切换时刻:

  • 通道接收或发送数据且造成阻塞时。
  • 新的goroutine被创建。
  • 造成系统调用被阻塞,如操作文件时。

goroutine在多核CPU环境下可以是并行的。

package main

import (
	"fmt"
	"time"
)

func main() {
	for i := 0; i < 5; i++ {
		go Add(i, i)	//返回值被忽略
	}
	time.Sleep(1000*time.Millisecond)
}

func Add(a, b int) int {
	c := a + b
	fmt.Println(c)
	time.Sleep(100*time.Millisecond)
	return c
}

7.4.2 通道

  1. 定义

channel是用来传递数据的数据结构,可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针。一种特殊类型,任何时候,同时只能有一个goroutine访问通道进行发送或接收数据。

类似队列,遵循先入先出(First In First out)规则,保证收发数据的顺序。

  1. 声明
var channel_name chan type
  1. 创建
//通道实例 := make(chan 数据类型)
strStream := make(chan string)
done := make(chan interface{})
voidStream := make(chan *struct{})
  1. 通道发送数据
//通道变量 <- 通道值
ch := make(chan interface{})
ch <- 6
ch <- "love"
package main

func main() {
	ch := make(chan interface{})
	ch <- "sleep"	//无缓冲通道,无接收方,永远阻塞,报错
}
  1. 通道接收数据
//阻塞接收
data := <-ch
//非阻塞接收
data, ok := <-ch
//忽略接收数据
 <-ch
//循环接收数据
for data := range ch {
	//
}
package main

import (
	"fmt"
)

func Sum(s []int, ch chan int) {
	sum := 0
	for _, v := range s {
		sum += v
	}
	ch <- sum
}

func main() {
	s := []int{6, 7, 8, -9, 1, 8}
	ch := make(chan int)
	go Sum(s[:len(s)/2], ch)
	go Sum(s[len(s)/2:], ch)
	a, b := <-ch, <-ch
	fmt.Println(a, b, a+b)
}
  1. 通道缓冲区
ch := make(chan int, 6)
package main

import (
	"fmt"
)

func main() {
	ch := make(chan int, 3)
	ch <- 6
	ch <- 7
	ch <- 8
	fmt.Println(<-ch)
	fmt.Println(<-ch)
	fmt.Println(<-ch)
}
  1. select多路复用
select {
default:
	//
case <- ch1:
	//
case v2 := <- ch2:
	//
}
timeout := make(chan interface{}, 1)

go func() {
	time.Sleep(6)
	timeout <- interface{}{}
}

select {
case <- ch:
	//
case <- timeout:
	//
}
  1. 遍历通道与关闭通道
v, ok := <-ch
package main

import (
	"fmt"
)

func fibonacci(n int, ch chan int) {
	a, b := 0, 1
	for i := 0; i < n; i++ {
		ch <- a
		a, b = b, a+b
	}
	close(ch)
}

func main() {
	ch := make(chan int, 6)
	go fibonacci(cap(ch), ch)
	for j := range ch {
		fmt.Println(j)
	}
}

7.5 sync实现并发

7.5.1 竞态

使用并发,可能产生数据争用的竞态问题。

//输出0或6
func main() {
	fmt.Println(getNumber())
}

func getNumber() int {
	var i int
	go func() {
		i = 6
	}()

	return i
}

7.5.2 互斥锁

sync.Mutex,用于实现互斥锁,用于读写不确定的场景,全局锁。

type Mutex struct {
	state int32	//当前互斥锁的状态
	sema uint32	//控制锁状态的信号量
}

func (m *Mutex) Lock()
func (m *Mutex) Unlock()

必须先Lock(),然后Unlock()。
连续Lock(),死锁。
先Unlock(),后Lock(),panic。
可以一个goroutine先Lock(),其他goroutine后Unlock()。

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var mutex sync.Mutex
	wait := sync.WaitGroup{}
	fmt.Println("Locked")
	
	mutex.Lock()

	for i := 1; i <= 5; i++ {
		wait.Add(1)
		go func(i int) {
			defer wait.Done()
			fmt.Println("Not lock:", i)
			mutex.Lock()
			fmt.Println("Locked:", i)
			time.Sleep(time.Second)
			fmt.Println("Unlocked:", i)
			mutex.Unlock()
		}(i)
	}

	time.Sleep(time.Second)
	fmt.Println("Unlocked")

	mutex.Unlock()

	wait.Wait()
}

7.5.3 读写互斥锁

sync.RWMutex可以多个读锁或者一个写锁,用于读次数远远多于写次数的场景。

type RWMutex struct {
	w Mutex
	writerSem uint32
	readerSem uint32
	readerCount int32
	readerWait int32
}

//写操作
func (*RWMutex) Lock()
func (*RWMutex) Unlock()

//读操作
func (*RWMutex) RLock()
func (*RWMutex) RUnlock()
package main

import (
	"fmt"
	"sync"
	"math/rand"
)

var count int
var rw sync.RWMutex

func main() {
	ch := make(char struct{}, 6)
	
	for i := 0; i < 3; i++ {
		go ReadCount(i, ch)
	}
	
	for i := 0; i < 3; i++ {
		go WriteCount(i, ch)
	}

	for i := 0; i < 6; i++ {
		<-ch
	}
}

func ReadCount(n int, ch chan struct{}) {
	rw.RLock()
	fmt.Printf("goroutine %d 进入读操作...\n", n)
	v := count
	fmt.Printf("goroutine %d 读取结束,值为:%d\n", n, v)
	rw.RUnlock()
	ch <- struct{}{}
}

func WriteCount(n int, ch chan struct{}) {
	rw.Lock()
	fmt.Printf("goroutine %d 进入写操作...\n", n)
	v := rand.Intn(10)
	count = v
	fmt.Printf("goroutine %d 写入结束,值为:%d\n", n, v)
	rw.Unlock()
	ch <- struct{}{}
}

7.5.4 sync.Once结构体

多次调用sync.Once.Do(f func()),只执行第一次调用的函数。

type Once struct {
	done uint32
	m Mutex
}

func (o *Once) Do(f func())
package main

import (
	"fmt"
	"sync"
)

func main() {
	var once sync.Once
	onceBody := func() {
		fmt.Println("test only once,这里只打印一次!")//打印
	}
	done := make(chan bool)
	for i := 0; i < 6; i++ {
		go func() {
			once.Do(onceBody)//确保只执行一次
			done <- true
		}()
	}
	for i := 0; i < 6; i++ {
		<-done
	}
}
package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup
var once sync.Once

func func1(ch1 chan<- int) {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		ch1 <- i
	}
	close(ch1)
}

func func2(ch1 <-chan int, ch2 chan<- int) {
	defer wg.Done()
	for {
		x, ok := <-ch1
		if !ok {
			break
		}
		ch2 <- 2 * x
	}

	once.Do(func() { close(ch2) }) // 确保某个操作只执行一次
}

func main() {
	ch1 := make(chan int, 10)
	ch2 := make(chan int, 10)

	wg.Add(3)

	go func1(ch1)
	go func2(ch1, ch2)
	go func2(ch1, ch2)

	wg.Wait()

	for ret := range ch2 {
		fmt.Println(ret)
	}
}

7.5.5 同步等待组sync.WaitGroup

用于等待一组线程的结束。

func (*WaitGroup) Add(int)
func (w *WaitGroup) Done() {
	w.Add(-1)
}
func (*WaitGroup) Wait()
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var wg sync.WaitGroup

	wg.Add(1)
	go func() {
		defer wg.Done()
		fmt.Println("1 goroutine sleep ...")
		time.Sleep(2)
		fmt.Println("1 goroutine exit ...")
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()
		fmt.Println("2 goroutine sleep ...")
		time.Sleep(4)
		fmt.Println("2 goroutine exit ...")
	}()

	fmt.Println("Waiting for all goroutine ")
	wg.Wait()
	fmt.Println("All goroutines finished!")
}
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	testFunc := func(wg *sync.WaitGroup, id int) {
		defer wg.Done()
		fmt.Printf("%v goroutine start ...\n", id)
		time.Sleep(2)
		fmt.Printf("%v goroutine exit ...\n", id)
	}

	var wg sync.WaitGroup
	const N = 3
	wg.Add(N)
	for i := 0; i < N; i++ {
		go testFunc(&wg, i)
	}

	fmt.Println("Waiting for all goroutine")
	wg.Wait()
	fmt.Println("All goroutines finished!")
}

7.5.6 竞态检测器

竞态分析工具。

go build -race main.go
go run -race main.go
go test -race test_main.go
package main

import "fmt"

func main() {
	c := make(chan bool)
	m := make(map[string]string)
	go func() {
		m["a"] = "one" // 第一个冲突访问.
		c <- true
	}()
	m["b"] = "two" // 第一个冲突访问
	<-c
	for k, v := range m {
		fmt.Println(k, v)
	}
}

7.6 Go并发的Web应用

7.6.1 自增整数生成器

package main

import "fmt"

func IntegerGenerator() chan int {
	var ch chan int = make(chan int)
	go func() {
		for i := 0; ; i++ {
			ch <- i
		}
	}()
	return ch
}

func main() {
	generator := IntegerGenerator()
	for i := 0; i < 100; i++ {
		fmt.Println(<-generator)
	}
}

7.6.2 并发的消息发送器

package main

import "fmt"

func SendNotification(user string) <-chan string {
	notifications := make(char string, 1)

	go func() {
		defer close(notifications)
		notifications <- fmt.Sprintf("Hi %s, welcome!", user)
	}()

	return notifications
}

func main() {
	barry := SendNotification("barry")
	shirdon := SendNotification("shirdon")

	fmt.Println(<-barry)
	fmt.Println(<-shirdon )
}

7.6.3 多路复合计算器

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func doCompute(x int) int {
	time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
	return 1+x
}

func branch(x int) chan int {
	ch := make(chan int, 1)
	go func() {
		defer close(ch)
		ch <- doCompute(x)
	}()
	return ch
}
/*
func Recombination(chs... chan int) chan int {
	ch := make(chan int)
	for _, c := range chs {
		go func(c chan int) {ch <- <-c}(c)
	}
	return ch
}
*/

func Recombination(chs... chan int) chan int {
	num := len(chs)
	ch := make(chan int, num)
	go func() {
		defer close(ch)
		for i := 0; i < num; i++ {
			/*
			select {
			case v := <-chs[i]:
				ch <- v
			}
			*/
			ch <- <-chs[i]
		}
	}()
	return ch
}

func main() {
	result := Recombination(branch(10), branch(20), branch(30))
	
	/*
	for i := 0; i < 3; i++ {
		fmt.Println(<-result)
	}
	*/
	for v := range result {
		fmt.Println(v)
	}
}

7.6.4 select创建多通道监听器

package main

import (
	"fmt"
	"time"
)

func foo(x int) <-chan int {
	ch := make(chan int, 1)
	go func() {
		defer close(ch)
		ch <- x
	}()
	return ch
}

func main() {
	ch1, ch2, ch3 := foo(3), foo(6), foo(9)
	ch := make(chan int, 3)
	
	go func() {
		defer close(ch)
		timeout := time.After(1*time.Second)
		for isTimeout := false; !isTimeout; {
			select {
			case v1, ok := <-ch1:
				if ok {
					ch <- v1
				}
			case v2, ok := <-ch2:
				if ok {
					ch <- v2
				}
			case v3, ok := <-ch3:
				if ok {
					ch <- v3
				}
			case <-timeout:
				isTimeout = true
			}
		}
	}()
	
	for v := range ch {
		fmt.Println(v)
	}
}

7.6.5 无缓冲通道阻塞主线

package main

import (
	"fmt"
)

func main() {
	ch, quit := make(chan int), make(chan int)

	go func() {
		ch <- 8
		quit <- 1
	}()
	
	for isQuit:= false; !isQuit; {
		select {
		case v := <-ch:
			fmt.Printf("received %d from ch", v)
		case <-quit:
			isQuit= true
		}
	}
}

7.6.6 筛选法求素数

package main

import (
	"fmt"
)

func IntegerGenerator() chan int {
	var ch chan int = make(chan int)

	go func() {
		for i := 2; ; i++ {
			ch <- i
		}
	}()

	return ch
}

func Filter(in chan int, number int) chan int {
	out := make(chan int)

	go func() {
		for {
			i := <-in
			if i%number != 0 {
				out <- i
			}
		}
	}()
	
	return out
}

func main() {
	const max = 100
	//产生所有的输入流
	//2 3 4 5 6 7 8 9 10
	numbers := IntegerGenerator()
	//取第一个数2
	number := <-numbers

	for number <= max {
		fmt.Println(number)
		//过滤输入流,产生新的输入流
		
		//第一次过滤
		//3 5 7 9
		//第二次过滤
		//5 7
		//第三次过滤
		//7
		numbers = Filter(numbers, number)
		//第一次过滤后取第一个数3
		//第二次过滤后取第一个数5
		//第三次过滤后取第一个数7
		number = <-numbers
	}
}

7.6.7 随机数生成器

package main

import (
	"fmt"
)

func randGenerator() chan int {
	var ch chan int = make(chan int)

	go func() {
		for {
			select {
			case ch <- 0:
			case ch <- 1:
			}
		}
	}()

	return ch
}

func main() {
	generator := IntegerGenerator()
	
	for i := 0; i < 10; i++ {
		fmt.Println(<-generator)
	}
}

7.6.8 定时器

package main

import (
	"fmt"
	"time"
)

func Timer(duration time.Duration) chan bool {
	var ch chan bool = make(chan bool)

	go func() {
		time.Sleep(duration)
		ch <- true
	}()

	return ch
}

func main() {
	timeout := Timer(5*time.Second)
	
	for {
		select {
		case <-timeout:
			fmt.Println("already 5s!")
			return
		}
	}
}

7.6.9 并发的Web爬虫

package main

import (
	"fmt"
	"time"
)

func Get(url string) (result string, err error) {
	resp, err := http.Get(url)
	if err != nil {
		return
	}
	defer resp.Body.Close()

	buf := make([]byte, 4*1024)
	for {
		n, err := resp.Body.Read(buf)
		if err != nil {
			if err == io.EOF {
				err = nil
				fmt.Println("文件读取完毕")
				break
			} else {
				fmt.Println("resp.Body.Read err = ", err)
				break
			}
		}
		result += string(buf[:n])
	}
	
	return
}

func SpiderPage(i int, page chan<- int) {
	url := "https://github.com/search?q=go&type=Repositories&p=1" + strconv.Itoa((i-1)*50)
	fmt.Println("正在爬取第%d个网页\n", i)
	result, err := Get(url)
	if err != nil {
		fmt.Println("http.Get err = ", err)
		return
	}
	
	filename := "page" + strconv.Itoa(i) + ".html"
	f, err := os.Create(filename)
	if err != nil {
		fmt.Println("os.Create err = ", err)
		return
	}
	f.WriteString(result)
	f.Close()

	page <- i
} 

func Run(start, end int) {
	fmt.Printf("正在爬到第%d页到第%d页\n", start, end)
	page := make(chan int)
	for i := start; i <= end; i++ {
		go SpiderPage(i, page)
	}
	for i := start; i <= end; i++ {
		fmt.Printf("第%d个页面爬取完成\n", <-page)
	}
}

func main() {
	var start, end int
	fmt.Printf("请输入起始页数字>=1:> ")
	fmt.Scan(&start)
	fmt.Printf("请输入结束页数字:> ")
	fmt.Scan(&end)
	
	Run(start, end)
}