7.1 并发与并行
- 并发(Concurrent)
多进程指令被CPU快速轮换执行。宏观上,多个进程同时执行。微观上,多个进程非同时执行,时间分成若干段,多个进程快速交替执行。
操作系统进程的并发:CPU划分时间片段(时间区间),进程在时间区间之间来回切换处理。CPU处理速度快,时间间隔处理得当,用户感觉是多个进程同时进行。
- 并行(Parallel)
同一时刻多条指令在多个处理器上同时执行。
- 区别
- 并发偏重于多个任务交替执行,而多个任务间可能是串行的。并发是逻辑上的同时发生(simultaneous)。串行通讯,传输1bit信号。
- 并行偏重于同时执行。是物理上的同时发生。并行通讯,传输多bit信号。
7.2 进程、线程和协程
- 进程(Process)
运行中的可执行程序。
- 线程(Thread)
轻量级进程(Lightweight Process,LWP),程序执行流的最小单位。由线程ID、当前指令指针(PC)、寄存器集合和堆栈组成。
线程是进程的一个实体,与同属同一进程的其他线程共享进程的全部资源。
线程拥有独立栈,共享堆。
线程具有5种状态:初始化、可运行、运行中、阻塞、销毁。
线程和进程都由操作系统管理。线程是最小的执行单元,进程是最小的资源管理单元。
- 协程(Coroutines)
协程是比线程更加轻量级的一种特殊函数。协程不是操作系统内核管理,而是程序控制,即在用户态执行。提升性能,不会像线程切换那样消耗资源。
一个进程可以包含多个线程,一个线程可以包含多个协程。一个线程中的多个协程始终是串行的(函数都是串行运行),无论CPU多少核。
- 对比
- 协程仅是特殊函数,与线程和进程不是一个维度。
- 一个进程可以包含多个线程,一个线程可以包含多个协程。
- 一个线程内的多个协程可以切换,但协程始终是串行执行,无法利用CPU的多核能力。
- 进程由操作系统自己的切换策略来切换,用户无感。切换内容包括页全局目录、内核栈和硬件上下文,切换内容被保存在内存中。采用“从用户态到内核态再到用户态”的方式,切换效率低。
- 线程由操作系统自己的切换策略来切换,用户无感。切换内容包括内核栈和硬件上下文,切换内容被保存在内核栈中。采用“从用户态到内核态再到用户态”的方式,切换效率中等。
- 协程的切换由用户(编程者或应用程序)决定。切换内容包括硬件上下文,切换内容被保存在用户自己的变量(用户栈或堆)中。只有“用户态”,切换效率高。
7.3 Go并发模型简介
- 多线程共享内存模型
访问共享数据(数组、map、结构体或对象等)时,通过锁来访问。衍生出线程安全的数据结构,Go通过sync包来实现。
- CSP(Communicating Sequential Processes)并发模型
CSP并发模型理念是不通过共享内存来通信,而是通过通信来共享内存。
Go通过goroutine和通道(channel)来实现。goroutine时并发执行单位,类似协程。通道是goroutine间的通信管道,类似UNIX中的管道。
package main
import "fmt"
func main() {
message := make(chan string)
go func() {messages <- "ping"}()
msg := <-message
fmt.Println(msg)
}
7.4 goroutine和channel实现并发
7.4.1 goroutine简介
go func(param1, param2) {
//...
}(var1, var2)
package main
import (
"fmt"
"time"
)
func Echo(s string) {
for i := 0; i < 3; i++ {
time.Sleep(100*time.Millisecond)
fmt.Println(s)
}
}
func main() {
go Echo("go")
Echo("web program")
}
goroutine的调度方式是协同式的,没有时间片概念。
goroutine切换时刻:
- 通道接收或发送数据且造成阻塞时。
- 新的goroutine被创建。
- 造成系统调用被阻塞,如操作文件时。
goroutine在多核CPU环境下可以是并行的。
package main
import (
"fmt"
"time"
)
func main() {
for i := 0; i < 5; i++ {
go Add(i, i) //返回值被忽略
}
time.Sleep(1000*time.Millisecond)
}
func Add(a, b int) int {
c := a + b
fmt.Println(c)
time.Sleep(100*time.Millisecond)
return c
}
7.4.2 通道
- 定义
channel是用来传递数据的数据结构,可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针。一种特殊类型,任何时候,同时只能有一个goroutine访问通道进行发送或接收数据。
类似队列,遵循先入先出(First In First out)规则,保证收发数据的顺序。
- 声明
var channel_name chan type
- 创建
//通道实例 := make(chan 数据类型)
strStream := make(chan string)
done := make(chan interface{})
voidStream := make(chan *struct{})
- 通道发送数据
//通道变量 <- 通道值
ch := make(chan interface{})
ch <- 6
ch <- "love"
package main
func main() {
ch := make(chan interface{})
ch <- "sleep" //无缓冲通道,无接收方,永远阻塞,报错
}
- 通道接收数据
//阻塞接收
data := <-ch
//非阻塞接收
data, ok := <-ch
//忽略接收数据
<-ch
//循环接收数据
for data := range ch {
//
}
package main
import (
"fmt"
)
func Sum(s []int, ch chan int) {
sum := 0
for _, v := range s {
sum += v
}
ch <- sum
}
func main() {
s := []int{6, 7, 8, -9, 1, 8}
ch := make(chan int)
go Sum(s[:len(s)/2], ch)
go Sum(s[len(s)/2:], ch)
a, b := <-ch, <-ch
fmt.Println(a, b, a+b)
}
- 通道缓冲区
ch := make(chan int, 6)
package main
import (
"fmt"
)
func main() {
ch := make(chan int, 3)
ch <- 6
ch <- 7
ch <- 8
fmt.Println(<-ch)
fmt.Println(<-ch)
fmt.Println(<-ch)
}
- select多路复用
select {
default:
//
case <- ch1:
//
case v2 := <- ch2:
//
}
timeout := make(chan interface{}, 1)
go func() {
time.Sleep(6)
timeout <- interface{}{}
}
select {
case <- ch:
//
case <- timeout:
//
}
- 遍历通道与关闭通道
v, ok := <-ch
package main
import (
"fmt"
)
func fibonacci(n int, ch chan int) {
a, b := 0, 1
for i := 0; i < n; i++ {
ch <- a
a, b = b, a+b
}
close(ch)
}
func main() {
ch := make(chan int, 6)
go fibonacci(cap(ch), ch)
for j := range ch {
fmt.Println(j)
}
}
7.5 sync实现并发
7.5.1 竞态
使用并发,可能产生数据争用的竞态问题。
//输出0或6
func main() {
fmt.Println(getNumber())
}
func getNumber() int {
var i int
go func() {
i = 6
}()
return i
}
7.5.2 互斥锁
sync.Mutex,用于实现互斥锁,用于读写不确定的场景,全局锁。
type Mutex struct {
state int32 //当前互斥锁的状态
sema uint32 //控制锁状态的信号量
}
func (m *Mutex) Lock()
func (m *Mutex) Unlock()
必须先Lock(),然后Unlock()。
连续Lock(),死锁。
先Unlock(),后Lock(),panic。
可以一个goroutine先Lock(),其他goroutine后Unlock()。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mutex sync.Mutex
wait := sync.WaitGroup{}
fmt.Println("Locked")
mutex.Lock()
for i := 1; i <= 5; i++ {
wait.Add(1)
go func(i int) {
defer wait.Done()
fmt.Println("Not lock:", i)
mutex.Lock()
fmt.Println("Locked:", i)
time.Sleep(time.Second)
fmt.Println("Unlocked:", i)
mutex.Unlock()
}(i)
}
time.Sleep(time.Second)
fmt.Println("Unlocked")
mutex.Unlock()
wait.Wait()
}
7.5.3 读写互斥锁
sync.RWMutex可以多个读锁或者一个写锁,用于读次数远远多于写次数的场景。
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
//写操作
func (*RWMutex) Lock()
func (*RWMutex) Unlock()
//读操作
func (*RWMutex) RLock()
func (*RWMutex) RUnlock()
package main
import (
"fmt"
"sync"
"math/rand"
)
var count int
var rw sync.RWMutex
func main() {
ch := make(char struct{}, 6)
for i := 0; i < 3; i++ {
go ReadCount(i, ch)
}
for i := 0; i < 3; i++ {
go WriteCount(i, ch)
}
for i := 0; i < 6; i++ {
<-ch
}
}
func ReadCount(n int, ch chan struct{}) {
rw.RLock()
fmt.Printf("goroutine %d 进入读操作...\n", n)
v := count
fmt.Printf("goroutine %d 读取结束,值为:%d\n", n, v)
rw.RUnlock()
ch <- struct{}{}
}
func WriteCount(n int, ch chan struct{}) {
rw.Lock()
fmt.Printf("goroutine %d 进入写操作...\n", n)
v := rand.Intn(10)
count = v
fmt.Printf("goroutine %d 写入结束,值为:%d\n", n, v)
rw.Unlock()
ch <- struct{}{}
}
7.5.4 sync.Once结构体
多次调用sync.Once.Do(f func()),只执行第一次调用的函数。
type Once struct {
done uint32
m Mutex
}
func (o *Once) Do(f func())
package main
import (
"fmt"
"sync"
)
func main() {
var once sync.Once
onceBody := func() {
fmt.Println("test only once,这里只打印一次!")//打印
}
done := make(chan bool)
for i := 0; i < 6; i++ {
go func() {
once.Do(onceBody)//确保只执行一次
done <- true
}()
}
for i := 0; i < 6; i++ {
<-done
}
}
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
var once sync.Once
func func1(ch1 chan<- int) {
defer wg.Done()
for i := 0; i < 10; i++ {
ch1 <- i
}
close(ch1)
}
func func2(ch1 <-chan int, ch2 chan<- int) {
defer wg.Done()
for {
x, ok := <-ch1
if !ok {
break
}
ch2 <- 2 * x
}
once.Do(func() { close(ch2) }) // 确保某个操作只执行一次
}
func main() {
ch1 := make(chan int, 10)
ch2 := make(chan int, 10)
wg.Add(3)
go func1(ch1)
go func2(ch1, ch2)
go func2(ch1, ch2)
wg.Wait()
for ret := range ch2 {
fmt.Println(ret)
}
}
7.5.5 同步等待组sync.WaitGroup
用于等待一组线程的结束。
func (*WaitGroup) Add(int)
func (w *WaitGroup) Done() {
w.Add(-1)
}
func (*WaitGroup) Wait()
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("1 goroutine sleep ...")
time.Sleep(2)
fmt.Println("1 goroutine exit ...")
}()
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("2 goroutine sleep ...")
time.Sleep(4)
fmt.Println("2 goroutine exit ...")
}()
fmt.Println("Waiting for all goroutine ")
wg.Wait()
fmt.Println("All goroutines finished!")
}
package main
import (
"fmt"
"sync"
"time"
)
func main() {
testFunc := func(wg *sync.WaitGroup, id int) {
defer wg.Done()
fmt.Printf("%v goroutine start ...\n", id)
time.Sleep(2)
fmt.Printf("%v goroutine exit ...\n", id)
}
var wg sync.WaitGroup
const N = 3
wg.Add(N)
for i := 0; i < N; i++ {
go testFunc(&wg, i)
}
fmt.Println("Waiting for all goroutine")
wg.Wait()
fmt.Println("All goroutines finished!")
}
7.5.6 竞态检测器
竞态分析工具。
go build -race main.go
go run -race main.go
go test -race test_main.go
package main
import "fmt"
func main() {
c := make(chan bool)
m := make(map[string]string)
go func() {
m["a"] = "one" // 第一个冲突访问.
c <- true
}()
m["b"] = "two" // 第一个冲突访问
<-c
for k, v := range m {
fmt.Println(k, v)
}
}
7.6 Go并发的Web应用
7.6.1 自增整数生成器
package main
import "fmt"
func IntegerGenerator() chan int {
var ch chan int = make(chan int)
go func() {
for i := 0; ; i++ {
ch <- i
}
}()
return ch
}
func main() {
generator := IntegerGenerator()
for i := 0; i < 100; i++ {
fmt.Println(<-generator)
}
}
7.6.2 并发的消息发送器
package main
import "fmt"
func SendNotification(user string) <-chan string {
notifications := make(char string, 1)
go func() {
defer close(notifications)
notifications <- fmt.Sprintf("Hi %s, welcome!", user)
}()
return notifications
}
func main() {
barry := SendNotification("barry")
shirdon := SendNotification("shirdon")
fmt.Println(<-barry)
fmt.Println(<-shirdon )
}
7.6.3 多路复合计算器
package main
import (
"fmt"
"math/rand"
"time"
)
func doCompute(x int) int {
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
return 1+x
}
func branch(x int) chan int {
ch := make(chan int, 1)
go func() {
defer close(ch)
ch <- doCompute(x)
}()
return ch
}
/*
func Recombination(chs... chan int) chan int {
ch := make(chan int)
for _, c := range chs {
go func(c chan int) {ch <- <-c}(c)
}
return ch
}
*/
func Recombination(chs... chan int) chan int {
num := len(chs)
ch := make(chan int, num)
go func() {
defer close(ch)
for i := 0; i < num; i++ {
/*
select {
case v := <-chs[i]:
ch <- v
}
*/
ch <- <-chs[i]
}
}()
return ch
}
func main() {
result := Recombination(branch(10), branch(20), branch(30))
/*
for i := 0; i < 3; i++ {
fmt.Println(<-result)
}
*/
for v := range result {
fmt.Println(v)
}
}
7.6.4 select创建多通道监听器
package main
import (
"fmt"
"time"
)
func foo(x int) <-chan int {
ch := make(chan int, 1)
go func() {
defer close(ch)
ch <- x
}()
return ch
}
func main() {
ch1, ch2, ch3 := foo(3), foo(6), foo(9)
ch := make(chan int, 3)
go func() {
defer close(ch)
timeout := time.After(1*time.Second)
for isTimeout := false; !isTimeout; {
select {
case v1, ok := <-ch1:
if ok {
ch <- v1
}
case v2, ok := <-ch2:
if ok {
ch <- v2
}
case v3, ok := <-ch3:
if ok {
ch <- v3
}
case <-timeout:
isTimeout = true
}
}
}()
for v := range ch {
fmt.Println(v)
}
}
7.6.5 无缓冲通道阻塞主线
package main
import (
"fmt"
)
func main() {
ch, quit := make(chan int), make(chan int)
go func() {
ch <- 8
quit <- 1
}()
for isQuit:= false; !isQuit; {
select {
case v := <-ch:
fmt.Printf("received %d from ch", v)
case <-quit:
isQuit= true
}
}
}
7.6.6 筛选法求素数
package main
import (
"fmt"
)
func IntegerGenerator() chan int {
var ch chan int = make(chan int)
go func() {
for i := 2; ; i++ {
ch <- i
}
}()
return ch
}
func Filter(in chan int, number int) chan int {
out := make(chan int)
go func() {
for {
i := <-in
if i%number != 0 {
out <- i
}
}
}()
return out
}
func main() {
const max = 100
//产生所有的输入流
//2 3 4 5 6 7 8 9 10
numbers := IntegerGenerator()
//取第一个数2
number := <-numbers
for number <= max {
fmt.Println(number)
//过滤输入流,产生新的输入流
//第一次过滤
//3 5 7 9
//第二次过滤
//5 7
//第三次过滤
//7
numbers = Filter(numbers, number)
//第一次过滤后取第一个数3
//第二次过滤后取第一个数5
//第三次过滤后取第一个数7
number = <-numbers
}
}
7.6.7 随机数生成器
package main
import (
"fmt"
)
func randGenerator() chan int {
var ch chan int = make(chan int)
go func() {
for {
select {
case ch <- 0:
case ch <- 1:
}
}
}()
return ch
}
func main() {
generator := IntegerGenerator()
for i := 0; i < 10; i++ {
fmt.Println(<-generator)
}
}
7.6.8 定时器
package main
import (
"fmt"
"time"
)
func Timer(duration time.Duration) chan bool {
var ch chan bool = make(chan bool)
go func() {
time.Sleep(duration)
ch <- true
}()
return ch
}
func main() {
timeout := Timer(5*time.Second)
for {
select {
case <-timeout:
fmt.Println("already 5s!")
return
}
}
}
7.6.9 并发的Web爬虫
package main
import (
"fmt"
"time"
)
func Get(url string) (result string, err error) {
resp, err := http.Get(url)
if err != nil {
return
}
defer resp.Body.Close()
buf := make([]byte, 4*1024)
for {
n, err := resp.Body.Read(buf)
if err != nil {
if err == io.EOF {
err = nil
fmt.Println("文件读取完毕")
break
} else {
fmt.Println("resp.Body.Read err = ", err)
break
}
}
result += string(buf[:n])
}
return
}
func SpiderPage(i int, page chan<- int) {
url := "https://github.com/search?q=go&type=Repositories&p=1" + strconv.Itoa((i-1)*50)
fmt.Println("正在爬取第%d个网页\n", i)
result, err := Get(url)
if err != nil {
fmt.Println("http.Get err = ", err)
return
}
filename := "page" + strconv.Itoa(i) + ".html"
f, err := os.Create(filename)
if err != nil {
fmt.Println("os.Create err = ", err)
return
}
f.WriteString(result)
f.Close()
page <- i
}
func Run(start, end int) {
fmt.Printf("正在爬到第%d页到第%d页\n", start, end)
page := make(chan int)
for i := start; i <= end; i++ {
go SpiderPage(i, page)
}
for i := start; i <= end; i++ {
fmt.Printf("第%d个页面爬取完成\n", <-page)
}
}
func main() {
var start, end int
fmt.Printf("请输入起始页数字>=1:> ")
fmt.Scan(&start)
fmt.Printf("请输入结束页数字:> ")
fmt.Scan(&end)
Run(start, end)
}