想要了解Go语言的并发模型,我们先来了解一下基础知识。

什么是并发?什么是并行?

并发:指的是同一时间段内,多条指令在CPU上同时执行

并行:指的是同一时刻,多条指令在CPU上同时执行

也就是说,并发在宏观上来看是在同时执行,但是在微观上看其实是多个线程被分配一定的执行时间,在CPU上被快速的轮换执行(即同一时刻只有一个线程在CPU上跑)

注意:并发程序的执行通常是不确定的,主要是因为资源之间的相互依赖和竞态条件导致执行线程之间的相互等待。

讲GO语言的线程模型前需要先了解一下常见的线程模型,而在这之前,还需要了解一下用户态内核态是什么?

熟悉操作系统的都知道,根据资源的访问权限不同,操作系统会把内存分为内核空间用户空间内核空间的指令代码具备直接调度计算机底层资源的权限,而用户空间的指令代码没有访问计算机底层资源的能力,需要通过系统调用等方式切换至内核态从而进行计算机底层资源的申请和调度。

注:频繁的系统调用会消耗较多的系统资源

用户级线程模型

特点:一个进程对应一个内核线程

局限性:进程内的多线程无法很好的利用CPU的多核运算优势,只能通过分时复用的方法轮换执行。当进程内的任意线程阻塞,比如线程A请求I/O资源而被阻塞,会导致整个进程范围内的阻塞,因为此时进程对应的内核线程因为线程A的I/O阻塞而被剥夺CPU执行时间,导致整个进程失去了在CPU的执行代码权力。

在这里插入图片描述

内核空间的线程我们称为内核线程,其由操作系统管理和调度,能够直接操作计算机底层的资源。

用户空间的线程我们称为用户线程,其由用户空间的代码创建、管理和销毁,线程的调度由用户空间的线程库完成,无需切换至内核态,资源消耗少且高效。

内核级线程模型

特点:进程中的每个线程都会对应一个内核线程。

优点:能够充分利用CPU的多核并行计算能力。

局限性:进程内每当创建一个新的线程工作时,都会在内核空间创建一个内核线程与之对应,线程的管理和调度由操作系统负责,这将导致每次线程切换上下文时都会从用户态切换至内核态,产生不小的资源消耗。

在这里插入图片描述

两级线程模型

特点:结合了用户级线程模型和内核级线程模型,一个进程对应多个内核线程,由进程内的调度器来决定线程内的线程如何与内核空间的内核线程对应。

优点:既能够有效降低线程创建和管理的资源消耗,也能够很好地提供线程并行计算的能力。

局限性:给开发人员带来了技术挑战,因为需要在程序代码中模拟线程调度的细节,比如:线程切换时上下文信息的保存和恢复、栈空间大小的管理等。

在这里插入图片描述

MPG线程模型:(特殊的两级线程模型)

M(Machine):操作系统的主线程(可以理解为内核线程在用户进程中的映射)

P(Processor):协程执行的上下文环境(资源) P的最大数量决定了程序的并发规模。

G(Goroutine):协程(轻量级用户线程)

在这里插入图片描述
在这里插入图片描述

什么是协程goroutine?

协程:可以理解为轻量级的线程

  1. 独立的栈空间
  2. 共享程序堆空间
  3. 调度由用户控制
  4. 逻辑态,资源消耗相对小

Golang的协程机制是重要的特点,可以轻松开启上万个协程。Go具有并发上的优势相对于其他编程语言

import(
	"fmt"
    "strconv"
    "time"
)
func test(){
	for i := 0;i < 10;i++{
		fmt.Println("this is test()" + strconv.Itoa(i))
		time.Sleep(time.Second)
	}
}

func main(){
	go test()

	for i := 0;i < 10;i++{
		fmt.Println("this is main()" + strconv.Itoa(i))
		time.Sleep(time.Second)
	}
}
this is main()0
this is test()0
this is main()1
this is test()1
this is test()2
this is main()2
this is main()3
this is test()3
this is test()4
this is main()4
this is main()5
this is test()5
this is test()6
this is main()6
this is test()7
this is main()7
this is test()8
this is main()8
this is test()9
this is main()9

设置运行的CPU数目

import(
	"runtime"
)
//获取当前系统CPU的数量
num := runtime.NumCPU()
runtime.GOMAXPROCS(num)

管道channel:

当我们在使用协程时,往往需要协程间的通信,此时就需要用到channel 管道了。

比如我们想要并发执行求取200个数每个数的阶乘,并保存在map中:

func test(n int){
	ans := 1
	for i:= 1;i<=n;i++{
		ans *= i
	}
	Map[n] = ans
}

//全局变量
var (
	Map = make(map[int]int,10)
)

func main(){
	for i := 0;i < 100;i++{
		go test(i)
	}
    //等待协程完成任务
	time.Sleep(time.Second * 4)
	for i,v := range Map{
		fmt.Printf("map[%d]=%d\n",i,v)
	}
}

此时就会导致冲突的问题

解决方法:

  1. 全局变量的互斥锁
  2. 使用管道channel解决

全局变量的互斥锁:

func test(n int){
	var ans uint64 = 1
	var i uint64 = 1
	for ;i<=uint64(n);i++{
		ans *= i
	}
	//加锁
	lock.Lock()
	Map[n] = ans
	lock.Unlock()
	//解锁
}

var (
	Map = make(map[int]uint64,10)
	//lock 是一个全局互斥锁
	lock sync.Mutex
)

func main(){
	for i := 0;i < 30;i++{
		go test(i)
	}
	time.Sleep(time.Second * 5)
	lock.Lock()
	for i,v := range Map{
		fmt.Printf("map[%v]=%v\n",i,v)
	}

	lock.Unlock()
}

使用管道channel:

channel 本质是一个队列,先进先出。并且channel本身就是线程安全的。

channel是有类型的,一个string的channel只能存放string类型数据

声明方式:

var intChan chan int
var mapChan chan map[string]int
var perStructChan chan Person //Person是一个结构体类型

说明:channel 是引用类型,必须初始化才能够写入数据(即make后才能使用)

//管道的基本使用
func main(){
    //创建一个可以存储任意类型的管道
	var allChan chan interface{}
	allChan = make(chan interface{},10)
	//依次将数据放入管道
	p1 := Person{Name:"ywh",Age:10,Birthday:"1000-10-10",Salary:1231.124,Skill:"playing",}
	allChan <- p1
	allChan <- 10
	allChan <- "hello golang!"
	var m map[string]int= make(map[string]int,2)
	m["北京"] = 1
	m["上海"] = 2
	allChan <- m
    fmt.Printf("chan len :%v chan cap :%v\n",len(allChan),cap(allChan))
	//将管道中的数据依次拿出
	tmp1 := <- allChan
	tmp2 := <- allChan
	tmp3 := <- allChan
	tmp4 := <- allChan

	fmt.Println(tmp1,tmp2,tmp3,tmp4)
    fmt.Printf("chan len :%v chan cap :%v\n",len(allChan),cap(allChan))
}
chan len :4 chan cap :10
{ywh 10 1000-10-10 1231.124 playing} 10 hello golang! map[上海:2 北京:1]
chan len :0 chan cap :10
tmp1的Name成员变量
fmt.Printf("tmp1.Name :%v\n",tmp1.Name) //会报错!

必须要使用类型断言!

tmp_person := tmp1.(Person)
fmt.Printf("tmp1.Name :%v\n",tmp_person.Name)

管道的关闭

使用内置函数close可以关闭channel,当channel关闭后就不能再继续写入数据了,但仍然可以读取管道中的数据。

close(allChan)

管道的遍历

channel支持for range的方式遍历,但注意:

  1. 遍历时,如果channel没有关闭,则会出现deadlock错误。
  2. 遍历时,如果channel已经关闭,则会正常遍历数据,遍历完后,就会退出遍历。
func main(){
	var allChan chan int = make(chan int,100)
	for i := 1;i <=100;i++{
		allChan <- i * 2
	}
	close(allChan)
	//time.Sleep(time.Second * 5)
    //不能用普通的遍历方式
	for v := range allChan{
		fmt.Printf("v=%v\n",v)
	}
}

管道的应用实例:

采用goroutine和channel协同工作,实现:

  1. 开启一个Write协程,向管道intChan写入50个整数
  2. 开启一个Read协程,从管道intChan中读取Write写入的数据
  3. 主线程需要等待writeData和readData协程都完成工作才能退出
func Write(intChan chan int){
	for i := 1;i <= 50;i++{
		intChan <- i
		fmt.Printf("Write %v!\n",i)
	}
	fmt.Printf("Write over!\n")
	close(intChan)
}

func Read(intChan chan int,boolChan chan bool){
	 for {
	 	v,ok := <- intChan
	 	if !ok{
	 		break
	 	}
	 	fmt.Printf("Read %v!\n",v)
	 }
	 fmt.Printf("Read over!\n")

	 boolChan <- true
	 close(boolChan)
}

func main(){
	var intChan chan int = make(chan int,50)
	boolChan := make(chan bool,1)

	go Write(intChan)
	go Read(intChan,boolChan)

	for {
		_,ok := <- boolChan
		if !ok{
			break
		}
	}
	fmt.Printf("main over!\n")
}
Write 1!
Write 2!
Write 3!
Write 4!
Write 5!
Write 6!
Write 7!
Write 8!
Write 9!
Write 10!
Write 11!
Write 12!
Write 13!
Write 14!
Read 1!
Read 2!
Read 3!
Read 4!
Read 5!
Read 6!
Read 7!
Read 8!
Read 9!
Read 10!
Read 11!
Read 12!
Read 13!
Read 14!
Read 15!
Write 15!
Write 16!
Write 17!
Write 18!
Write 19!
Write 20!
Write 21!
Write 22!
Write 23!
Write 24!
Write 25!
Write 26!
Write 27!
Write 28!
Write 29!
Write 30!
Write 31!
Write 32!
Write 33!
Write 34!
Write 35!
Write 36!
Write 37!
Write 38!
Write 39!
Write 40!
Write 41!
Write 42!
Write 43!
Write 44!
Write 45!
Write 46!
Write 47!
Write 48!
Write 49!
Write 50!
Write over!
Read 16!
Read 17!
Read 18!
Read 19!
Read 20!
Read 21!
Read 22!
Read 23!
Read 24!
Read 25!
Read 26!
Read 27!
Read 28!
Read 29!
Read 30!
Read 31!
Read 32!
Read 33!
Read 34!
Read 35!
Read 36!
Read 37!
Read 38!
Read 39!
Read 40!
Read 41!
Read 42!
Read 43!
Read 44!
Read 45!
Read 46!
Read 47!
Read 48!
Read 49!
Read 50!
Read over!
main over!

从输出也可以看出两个协程是并发执行的!

应用实例2:

利用goroutine和channel实现统计1-200000的数字中,哪些是素数。

package main

import (
	"fmt"
	"encoding/json"
	"time"
	//"strconv"
	//"sync"
)
//向管道存放待处理数据
func putNum(intChan chan int){
	for i:= 1;i <= 1000;i++{
		intChan <- i
	}
	close(intChan)
}
//处理协程——判断是否是素数
func primeNum(intChan chan int,primeChan chan int,exitChan chan bool){
	for {
		time.Sleep(time.Millisecond * 10)
		num,ok := <- intChan
	
		if !ok {
			break
		}

		flag := true
		for i := 2;i < num;i++{
			if num % i == 0{
				flag = false
				break
			}
		}

		if flag{
			primeChan <- num
		}
	}
	fmt.Printf("No data in Chan ! over!\n")
	exitChan <- true
	if len(exitChan) == 4{//4个协程均完成了工作,关闭管道
		close(exitChan)
	}
	
	//close(primeChan) 这里不能关闭primeChan
}
func main(){
	intChan := make(chan int,1000)
	primeChan := make(chan int,2000)
	exitChan := make(chan bool, 4)
	//开启一个协程向管道中放入数据
	go putNum(intChan)
	//开启4个协程 处理是否是素数
	for i:=0;i< 4;i++{
		go primeNum(intChan,primeChan,exitChan)
	}
	//等待全部完成
	go func(){
		for i := 0;i < 4;i++{
			<- exitChan
		}
		close(primeChan)//关闭存放素数的管道
	}()

	for v := range primeChan{
		fmt.Printf("%v\n",v)
	}

	time.Sleep(time.Second * 2)
}
1
2
3
5
7
11
13
17
19
23
29
31
37
41
43
47
53
59
61
67
71
73
79
83
89
97
101
103
107
109
113
127
131
137
139
149
151
157
163
167
173
179
181
191
193
197
199
211
223
227
229
233
239
241
251
257
263
269
271
277
281
283
293
307
311
313
317
331
337
347
349
353
359
367
373
379
383
389
397
401
409
419
421
431
433
439
443
449
457
461
463
467
479
487
491
499
503
509
521
523
541
547
557
563
569
571
577
587
593
599
601
607
613
617
619
631
641
643
647
653
659
661
673
677
683
691
701
709
719
727
733
739
743
751
757
761
769
773
787
797
809
811
821
823
827
829
839
853
857
859
863
877
881
883
887
907
911
919
929
937
941
947
953
967
971
977
983
991
997
No data in Chan ! over!
No data in Chan ! over!
No data in Chan ! over!
No data in Chan ! over!

有时候我们再从管道取数据时常常会遇到阻塞问题,在传统的方法遍历管道时,如果不关闭会阻塞而导致deadlock,因此可以采用select解决。

func main(){
	intChan := make(chan int,10)
	stringChan := make(chan string,10)
	for i:=0;i<5;i++{
		intChan <- i
		stringChan <- "hello " + fmt.Sprintf("%d",i)
	}

	for {
		select{
		case v:= <- intChan :
			fmt.Printf("recv %v from intChan!\n",v)
			time.Sleep(time.Second)
			/* code */
		case v:= <- stringChan :
			fmt.Printf("recv %v from stringChan!\n",v)
			time.Sleep(time.Second)
		default:
			fmt.Printf("No data recv\n")
			close(intChan)
			close(stringChan)
			return 
			/* code */
		}
	}
}
recv 0 from intChan!
recv 1 from intChan!
recv hello 0 from stringChan!
recv 2 from intChan!
recv 3 from intChan!
recv 4 from intChan!
recv hello 1 from stringChan!
recv hello 2 from stringChan!
recv hello 3 from stringChan!
recv hello 4 from stringChan!
No data recv

我们在使用协程goroutine时可能出现panic,导致程序崩溃,这时我们可以在goroutine中使用recover来捕获panic,进行处理,这样及时此协程发生问题,主线程仍然不会受到影响,可以继续执行。

func sayHello(){
	for i := 0;i < 10;i++{
		time.Sleep(time.Second)
		fmt.Printf("hello golang!\n")
	}
}

func testRecover(){
	defer func(){
		if err := recover();err != nil{
			fmt.Println("test() error!\n",err)
		}
	}()

	var Map map[int]string
	Map[0] = "hello world!\n" //会报错! 因为没有make申请空间
}

func main(){
	go sayHello()
	go testRecover()

	for i := 0;i < 10;i++{
		time.Sleep(time.Second)
		fmt.Printf("main is running!\n")
	}
}
test() error!
 assignment to entry in nil map  //可见!程序抛出了panic恐慌可是没有结束,而是继续运行。
main is running!
hello golang!
hello golang!
main is running!
main is running!
hello golang!
hello golang!
main is running!
main is running!
hello golang!
hello golang!
main is running!
main is running!
hello golang!
hello golang!
main is running!
main is running!
hello golang!
hello golang!
main is running!