并行与并发

并行:是真正意义上的多任务,在同一刻是多个任务同时执行的,比如你的电脑cpu是4核的,那么就是真正意义上的四个任务同时执行。
并发:其实是每个任务在不停的抢占cpu时间片,谁抢到谁就执行,但是由于cpu切换太快我们感知不到,所以以为是4个任务在同时执行,比如同时打开web和听音乐,微观的角度每一刻,音乐与web只运行一个,但是人是感知不到的。所以我们的并发,从微观来说其实还是串行。经典的:微观并行,宏观并发。

  • 并行是两个队列同时使用两台咖啡机
  • 并发是两个队列交替使用一台咖啡机
go 语言并发优势

它的并发,写法比较简单,并且语言层面支持协程。语言层面支持并发,这个就是Go最大的特色,天生的支持并发。Go就是基因里面支持的并发,可以充分的利用多核,很容易的使用并发。

协程goroutine

goroutine是Go并行设计的核心。goroutine说到底其实就是协程,但是它比线程更小,十几个goroutine可能体现在底层就是五六个线程,Go语言内部帮你实现了这些goroutine之间的内存共享。执行goroutine只需极少的栈内存(大概是4~5KB),当然会根据相应的数据伸缩。也正因为如此,可同时运行成千上万个并发任务。goroutine 比 thread更易用、更高效、更轻便。

简单点理解:
goroutine就是go的协程。协程是比线程更小的单位,我们可以简单的把协程理解为一个任务,一个协程就是一个任务。

创建协程

代码中,我们通常称呼main函数为主协程。通过main函数创建的协程称之为子协程或者工作协程。
声明一个协程,go关键字,如下图所示,方法调用的时候加上go,或者for循环前加上go。
注意1:如果主协程退出,则其它协程也跟着退出。
在这里插入图片描述

主协程退出,其它协程也都退出

主协程退出,其他协程也跟着退出。以下代码中,go开启了一个新协程,然后下面的for循环也是一个for,但是我们并不知道go与下面的for谁先运行,但是下面的for不是个无线循环的,是会运行结束的,当break的时候,就跳出for了,然后主协程(main)函数下面没有其他代码也就结束运行了,主协程结束,则其他协程也都结束了

在这里插入图片描述

主协程先退出导致其它子协程未来的及调用

运行以下代码,发现什么都没有打印。
因为go开启了一个新的协程,然后cpu时间片轮询分配给子协程与主协程,然后在创建完新协程之后,主协程的到cpu时间片继续往下执行,然后就没有代码了,主协程结束,然后其它子协程也都结束,于是无打印。这点与java不同。怎么解决这个问题呢,看下节。
在这里插入图片描述

runtime包

Gosched(写在哪个协程里,哪个协程就让出cpu时间片)

runtime.Gosched()用于让出 CPU时间片,让出当前goroutine的执行权限,调度器安排其他等待的任务运行,并在下次某个时候从该位置恢复执行。
这就像跑接力赛,A跑了一会碰到代码runtime.Gosched()就把接力棒交给B了,A歇着了,B继续跑。

看下面的代码
在这里插入图片描述

分析:刚开始主协程创建一个子协程,然后继续往下执行,主协程的代码,然后新的子协程刚创建还没有来得及执行(未获取到时间片),主协程就运行完毕了,然后就退出了,子协程骂骂咧咧的也退出了。
下面代码,让主协程让出时间片,等其他协程执行完毕后再执行,就不存在上面这么尴尬的问题了。
在这里插入图片描述

Goexit(写在哪个协程里,就终止哪个协程执行)

调用runtime.Goexit()将立即终止当前goroutine执行,调度器确保所有已注册defer延迟调用被执行。

猜猜下面代码运行结果是多少?
在这里插入图片描述

运行结果为:adcb

再猜猜如下代码运行结果是多少?
在这里插入图片描述
运行结果:acb。return是终止函数,只终止该函数,但是Goexit将整个协程都终止了。

再看如下代码,猜猜结果
在这里插入图片描述
运行结果:ac。进入协程,打印a,然后进入方法test,然后调用了Goexit函数,结束该协程,但是需要注意的是defer里的内容仍然会执行,它不会执行Goexit之后的代码,但是Goexit之前的代码还是会执行的。所以defer的代码会执行,c也会被打印。然后函数test结束后,该协程已经被终止了,于是main方法中的go协程里的test后面的代码也不会执行。

GOMAXPROCS(设置可并行计算的CPU核数的最大值,并返回之前的值)

调用runtime.GOMAXPROCS(用来设置可以并行计算的CPU核数的最大值,并返回之前的值。

package main

import (
	"fmt"
	"runtime"
)

func main() {
	n := runtime.GOMAXPROCS(1)//指定以单核(1核)计算
	fmt.Println("n = ", n)
}

如上代码运行结果为:8,因为我的电脑是8核的。

下面我们看以下代码,当我们用指定用1核去运行时

package main

import (
	"fmt"
	"runtime"
)

func main() {

	n := runtime.GOMAXPROCS(1)
	fmt.Println("n = ", n)
	for{
		go fmt.Print(1)
		fmt.Print(0)
	}
}

运行结果:因为是单核,所以谁抢到cpu时间片了,谁就运行。如果我们把核数调整到2或者更高的核数,那么多核参与计算效率提升,操作系统会把多个程序的指令分别发送给多个核心,从而使得同时完成多个程序的速度大大加快,打印的0,1交替的频率就高了。
在这里插入图片描述
多核下打印,如下图所示
在这里插入图片描述

channel

多任务竞争问题

不比比,直接看代码

package main

import (
	"fmt"
)

func Printer (str string) {
	for _, data := range str {
		fmt.Printf("%c", data);
	}
}

func main() {
	Printer("也无风雨、")
	Printer("也无晴")
}

以上代码输出:
在这里插入图片描述
代码修改

package main

import (
	"fmt"
	"sync"
	"time"
)

func Printer (str string) {
	for _, data := range str {
		fmt.Printf("%c", data);
		// 睡眠时间,为了更好的模拟多个协程抢占cpu时间片,否则打印的太快,一个协程得到cpu时间片直接打印完,就达不到乱序的效果了
		time.Sleep(time.Second)
	}
}
var waitgroup sync.WaitGroup
func Person1() {
	Printer("也无风雨、")
	waitgroup.Done() //任务完成,将任务队列中的任务数量-1,其实.Done就是.Add(-1)
}
func Person2() {
	Printer("也无晴")
	waitgroup.Done() //任务完成,将任务队列中的任务数量-1,其实.Done就是.Add(-1)
}

func main() {
	waitgroup.Add(2)// 目的:为了不让主协程执行完毕 每创建一个goroutine,就把任务队列中任务的数量+1
	go Person1()
	go Person2()
	waitgroup.Wait()//.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
}

运行结果:原因就是两个协程在不断的进行上下文切换,协程1得到了cpu时间片,然后打印了一个"也"字,此时协程1的时间片用完了,协程2获得了cpu时间片,然后协程2打印了"也"字,cpu时间片也用完了,,,反复如此,直到两个协程执行完毕。于是就是我们看到的乱序的输出。
在这里插入图片描述

那么以上问题怎么解决呢
goroutine运行在相同的地址空间,因此访问共享内存必须做好同步。goroutine奉行通过通信来共享内存,而不是共享内存来通信。,说人话:其实就是建议用通信来实现共享数据,建议用通信来实现同步。
引用类型channel是CSP模式的具体实现,用于多个goroutine通讯。其内部实现了同步,确保并发安全。
channel本质上就是一个管道,我们可以把channel当成一个水管,一端进水(放数据),一端出水(取数据),当没有水的时候它就阻塞住了。
和 map类似,channel 也一个对应make创建的底层数据结构的引用。
当我们复制一个channel或用于函数参数传递时,我们只是拷贝了一个channel引用,因此调用者和被调用者将引用同一个channel对象。和其它的引用类型一样,channel的零值也是nil。
声明语法:

	//等价于make(chan Type, 0)
    make(chan Type) 
    // 还可以定义容量大小,其实就是有缓冲区的chinnal,一会我们会讲到
    make(chan Type, capacity)
    channel <- value      //发送value到channel
    <-channel             //接收并将其丢弃
    x := <-channel        //从channel中接收数据,并赋值给x
    x, ok := <-channel    //功能同上,同时检查通道是否已关闭或者是否为空

当 capacity= 0 时,channel 是无缓冲阻塞读写的,当capacity> 0 时,channel 有缓冲、是非阻塞的,直到写满 capacity个元素才阻塞写入。

channel通过操作符<-来接收和发送数据,发送和接收数据语法:
默认情况下,channel接收和发送数据都是阻塞的,除非另一端已经准备好,这样就使得goroutine同步变的更加的简单,而不需要显式的lock。

下面我们来解决一下打印乱序的问题

package main

import (
	"fmt"
	"sync"
	"time"
)
// 全局变量,创建一个channel
var ch = make(chan int)
func Printer (str string) {
	for _, data := range str {
		fmt.Printf("%c", data);
		// 睡眠时间,为了更好的模拟多个协程抢占cpu时间片,否则打印的太快,一个协程得到cpu时间片直接打印完,就达不到乱序的效果了
		time.Sleep(time.Second)
	}
	fmt.Printf("\n");
}
var waitgroup sync.WaitGroup

// person1执行完毕后,person2再执行
func Person1() {
	Printer("也无风雨、")
	ch <- 666//给管道写数据,发送
	waitgroup.Done() //任务完成,将任务队列中的任务数量-1,其实.Done就是.Add(-1)
}
func Person2() {
	<-ch //从管道取数据,接收,如果通道没有数据他就会阻塞
	Printer("也无晴")
	waitgroup.Done() //任务完成,将任务队列中的任务数量-1,其实.Done就是.Add(-1)
}
// 这里用sync.WaitGroup的目的是为了防止主协程执行完毕,子协程还没有执行完,就退出了
func main() {
	waitgroup.Add(2)// 目的:为了不让主协程执行完毕 每创建一个goroutine,就把任务队列中任务的数量+1
	go Person1()
	go Person2()
	waitgroup.Wait()//.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
}

运行结果
在这里插入图片描述
分析:两种情况,如果先执行persion2,那么就是先从管道中取数据,此时管道中当然没有数据,于是阻塞,然后cup时间片到了persion1,如果persion1在打印了两个字符后,cpu时间片又轮到了persion2然后还是去管道中取数据,但是此时管道中还是没数据,只要persion1没有执行完这句代码“Printer(“也无风雨、”)”,那么管道中就永远无数据,于是persion2就一直被阻塞,直到persion1执行完毕后,执行了ch <- 666也就是网管道中写入数据,然后persion2执行<-ch 管道中有数据,于是向下执行,以此来保证persion1先执行,再执行persion2的目的。

既然学习了管道,那么我们优化一下上面代码里的主线程阻塞方法,因为上面代码我们为了防止主协程先运行结束,导致子协程没有运行或者没有运行完毕就退出了,我们用到了 sync.WaitGroup.Done与Add方法,但是略显繁琐。下面我们用管道来解决一下这个问题。

下面的代码有没有问题呢?

package main

import(
	"fmt"
	"time"
)

func main()  {
	// 创建一个channel
	ch := make(chan string)
	defer fmt.Println("主协程结束")
	go func ()  {
		for i := 0; i < 4; i++ {
			fmt.Println("子协程i-", i)
			time.Sleep(time.Second)
		}
	}()
	// 没有数据前,阻塞
	str := <- ch
	fmt.Println("str = ", str)
}

运行结果:在这里插入图片描述
分析:
我们可以看到发生了死锁,所谓死锁:是两个线程都在等待对方执行完毕才能往下执行,于是就陷入了无限的等待当中,就发生了死锁。因为管道永远没有数据,协程一直等待下去就没办法往下走了,于是就有了死锁。
那怎么解决呢?我们只需要在子协程执行完毕后,向管道中写入数据即可,然后主协程就能获取到数据了。

如下代码:,代码是解决了问题,大家不妨猜猜代码的打印顺序

package main

import(
	"fmt"
	"time"
)

func main()  {
	// 创建一个channel
	ch := make(chan string)
	defer fmt.Println("主协程结束")
	go func ()  {
		defer fmt.Println("子协程运行完毕")
		for i := 0; i < 4; i++ {
			fmt.Println("子协程i-", i)
			time.Sleep(time.Second)
		}
		ch <- "我是子协程,要运行完毕"
	}()
	// 没有数据前,阻塞
	str := <- ch
	fmt.Println("str = ", str)
}

打印结果:
在这里插入图片描述

无缓冲的channel

无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。
这种类型的通道要求发送 goroutine 和接收 goroutine同时准备好,才能完成发送和接收操作。如果两个goroutine没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。
这种对通道进行发送接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。
下图展示两个goroutine 如何利用无缓冲的通道来共享一个值:

请添加图片描述

  1. 在第1步,两个goroutine都到达通道,但哪个都没有开始执行发送或者接收。
  2. 在第2步,左侧的 goroutine将它的手伸进了通道,这模拟了向通道发送数据的行为。这时,这个goroutine会在通道中被锁住,直到交换完成。
  3. 在第3步,右侧的 goroutine 将它的手放入通道,这模拟了从通道里接收数据。这个goroutine一样也会在通道中被锁住,直到交换完成。
  4. 在第4步和第5步,进行交换,并最终,在第6步,两个goroutine都将它们的手从通道里拿出来,这模拟了被锁住的 goroutine得到释放。两个goroutine现在都可以去做别的事情了。
    无缓冲的channel创建格式:
    make(chan Type) //等价于make(chan Type, 0)

如果没有指定缓冲区容量,那么该通道就是同步的,因此会阻塞到发送者准备好发送和接收者准备好接收。

上图得到一个非常重要的结论,无缓存的channel
注意当协程1向管道中写数据,但是协程2未从管道读出数据之前,协程1是会阻塞的,当协程2从管道取数据但是管道还没数据,一直到协程2将数据从管道中取出来之前协程2 也是阻塞的

下面看段代码实践一下
在这里插入图片描述

运行结果:
在这里插入图片描述
那么问题来了,按照咱们上面的分析,协程1向管道中写入数据,协程2未读取之前协程1、2都是是被阻塞的,然后协程2从管道中读取数据,然后双方都释放,再继续,按照这样上面的运行结果应该是先打印协程1,再打印协程2,然后再协程1,协程2,交替执行才对,为啥上面的结果没有交替执行呢?其实结论是没问题的,只不过情况不一样。
上面运行结果分析:协程1先打印数据,之后协程1向管道中写入数据,然后阻塞,协程2从管道中读取数据,然后读取完数据了,此时协程1,2就都释放了,协程2刚取出数据还没来得及去打印,此时cpu时间片轮到了协程1于是乎协程1就开始了打印,于是看到的就是协程1两次打印在一起。

总结:

  • 1、通道并不强制要求goroutine之间必须同时完成发送和接收,否则就会被阻塞。
  • 2、无缓存的channel,你往里管道中写数据,但是其他协程没有读取出数据之前,写数据的那个协程也会被阻塞,直到其他协程将数据从管道中取出之后,该协程才会被释放。然后取数据的协程在取数据的时候,如果管道中没有数据也是会被阻塞的,当取数据的协程将数据取出来之后,两个协程就都释放了。
  • 3、无缓存的channle的cap(缓冲区大小)和len(缓存区剩余数据个数)打印都为0。

有缓冲的channel

有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。
这种类型的通道并不强制要求goroutine之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。
这导致有缓冲的通道和无缓冲的通道之间的一个很大的不同:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的通道没有这种保证

如果给定了一个缓冲区容量,通道就是异步的。只要缓冲区有未使用空间用于发送数据,或还包含可以接收的数据,那么其通信就会无阻塞地进行。

声明一个有缓存的channel

func main() {
	// 声明一个有缓冲的channel
	ch := make(chan int, 3)
	// 输出缓冲的容量、大小
	fmt.Printf("缓存区剩余个数=%d, 缓存区容量=%d", len(ch), cap(ch))
}

在这里插入图片描述
以上channel容量为3,如果往里面存了3个数据之后,发送方再往里面写东西,它就已经满了,就会被阻塞了。

下面我們來編寫一個簡單的有緩存的channel,並分析一下它的輸出結果

package main

import (
	"fmt"
	"time"
)

func main() {
	// 声明一个有缓冲的channel
	ch := make(chan int, 3)
	// 输出缓冲的容量、大小
	fmt.Printf("缓存区已存入=[%d]个, 缓存区容量=%d\n", len(ch), cap(ch))

	go func() {
		for i := 0; i < 3; i++ {
			ch <- i
			fmt.Printf("子协程[%d],缓存区已存入=[%d]个,緩存區容量=[%d]\n", i, len(ch), cap(ch))
		}
	}()

	// 延時
	time.Sleep(6 * time.Second)
	for i := 0; i < 3; i++ {
		num := <-ch
		fmt.Println("num=", num)
	}
}

在这里插入图片描述
代码过程分析,主协程开始执行,声明一个管道,开启一个协程,并向channel里写入数据,由于下面有一个2s的睡眠时间,在这个过程里,缓存区里早已经写入了3个数据了,然后2s时间过了之后,下面开始从channel里面取数据,取出完毕代码执行完。
上面的过程可不一定是一个协程向管道里写3个数据,写入完毕,然后另一个协程就开始取数据。
我们将上面的代码的for循环最大值修改一下,如下

package main

import (
	"fmt"
	"time"
)

func main() {
	// 声明一个有缓冲的channel
	ch := make(chan int, 3)
	// 输出缓冲的容量、大小
	fmt.Printf("缓存区已存入=[%d]个, 缓存区容量=%d\n", len(ch), cap(ch))

	go func() {
		for i := 0; i < 10; i++ {
			ch <- i
			fmt.Printf("子协程[%d],缓存区已存入=[%d]个,緩存區容量=[%d]\n", i, len(ch), cap(ch))
		}
	}()

	// 延時
	time.Sleep(2 * time.Second)
	for i := 0; i < 10; i++ {
		num := <-ch
		fmt.Println("num=", num)
	}
}

运行结果:

在这里插入图片描述
过程分析,缓存区容量为3,主协程运行,开启子协程向里面写入数据,主协程里下面的代码仍然阻塞,于是在2s中的时间里已经向里面存满了3个数据,此时go协程开始阻塞了,此时等待2s时间过了主协程的代码开始消费channel,然后此时当下面的代码只要读取一个数据,那么缓存里的数据就不足3个了,然后上面的子协程就又可以向channel里写数据了,然后子协程写了第四个数据,然后还没来的及打印,此时时间片用完了,然后轮到了下面的代码执行了,下面的代码取出数据,此次只取出了一条数据,然后并打印出来,因为子协程刚写了1条数据,此时它的时间片用完了,又轮到了上面的子协程运行,然后打印数据,这次他的时间多,又向里面写了3个数据,然后缓冲满了,该子协程被阻塞,然后下面的代码就又开始从管道里取数据,于是我们看到的就是取数据的代码打印了4次,然后接下面,子协程里连续打印了4条数据,原因是因为缓存只要不满子协程就会一直向里面写数据。还有另一个角度可以解释,就是打印区里的“缓存已存入len(ch)”个,我们看打印,每当缓存区里已经存入3个的时候,下面出现的这句打印(不一定是连着的),但是下面再次输出这句“缓存已存入len(ch)”说明消费已经从channel里读取出来数据了,然后channel就又不满了,然后子协程就又向里面写入数据了,于是就打印了。

如果给定了一个缓冲区容量,通道就是异步的。只要缓冲区有未使用空间用于发送数据,或还包含可以接收的数据,那么其通信就会无阻塞地进行。

总结,有缓存的channel就像发短信一样,你可以发,我可以不处理,我不一定在你发完后我就立马看,这就是异步。无缓存的channel就像是打电话一样,两者都得在线,

关闭channel:close、range

上面不管是有缓存的channel还是无缓存的channel,都是一个写5次,另一个读5次,读和写的次数是相等的,下面我们下面来看一下,如果我们确定一个协程在向channel里不会再写数据了,我们将这个channel给关闭掉。代码如下

close

package main

import "fmt"

func main() {
	// 创建一个无缓存的channel
	ch := make(chan int)

	go func() {
		for i := 0; i < 5; i++ {
			// 向管道中写数据
			ch <- i
		}
		// 不需要再写数据的时候,关闭channel
		// 如果把此处的close(ch)注释掉,程序会一直阻塞在if num, ok := <-ch; ok == true {这一行
		// 因为程序走到最后是主协程把管道中的数据全部读取出来了,管道中已经没有数据了,于是就阻塞了,
		// 但是管道只是阻塞并没有关闭。
		close(ch)
	}() // 勿忘此处括号

	for {
		// ok为true说明channel没有关闭,为false说明管道已经关闭
		if num, ok := <-ch; ok == true {
			fmt.Println("num=", num)
		} else {
			// 管道没有关闭
			break
		}
	}
}

在这里插入图片描述
代码分析:注意以上代码是没有缓存的channel
情况1:先执行主协程,主协程去管道里取出数据,但是此时管道中并没有数据,于是阻塞,管道是阻塞并没有关闭,于是if num, ok := <-ch; ok == true {此时中的ok为true。然后子协程向管道中写数据,写一个0,然后子协程阻塞,然后主协程那边的ok为true,说明你没有关闭,然后主协程就从管道中读取一个数据,然后读完了,管道中没有数据,主协程就又阻塞了,然后子协程就又向里面写数据,然后子协程又阻塞,然后主协程又判断,channel没有关闭,然后它就又读取,主协程又阻塞,直到子协程循环到4写入,然后子协程阻塞,然后主协程读取,读取完毕,主协程阻塞,然后子协程走出了循环外,关闭channel,只要子协程已关闭,主协程那边就能通过ok := <-ch检测到,然后ok值为false,然后它就知道,channel关闭了。于是就走了else逻辑。
附上:无缓冲的channel阻塞图
在这里插入图片描述

range

range,在channel关闭的时候它就自动跳出for循环了

package main

import "fmt"

func main() {
	// 创建一个无缓存的channel
	ch := make(chan int)

	go func() {
		for i := 0; i < 5; i++ {
			// 向管道中写数据
			ch <- i
		}
		// 不需要再写数据的时候,关闭channel
		// 如果把此处的close(ch)注释掉,程序会一直阻塞在if num, ok := <-ch; ok == true {这一行
		// 因为程序走到最后是主协程把管道中的数据全部读取出来了,管道中已经没有数据了,于是就阻塞了,
		// 但是管道只是阻塞并没有关闭。
		close(ch)
	}() // 勿忘此处括号
	// range,在channel关闭的时候它就自动跳出for循环了
	for num := range ch {
		fmt.Println("num=", num)
	}
}

在这里插入图片描述

注意点:

  • channel不像文件一样需要经常去关闭,只有当你确实没有任何发送数据了,或者你想显式的结束range循环之类的,才去关闭channel;
  • 关闭channel后,无法向channel再发送数据(引发 panic错误后导致接收立即返回零值);
  • 关闭channel后,可以继续向channel接收数据;
  • 对于nil channel,无论收发都会被阻塞。

单向的channel

默认情况下,通道是双向的,也就是,既可以往里面发送数据也可以从里面接收数据。
但是,我们经常见一个通道作为参数进行传递而只希望对方是单向使用的,要么只让它发送数据(向channel里写数据),要么只让它接收数据(从channel里读取数据),这时候我们可以指定通道的方向。
单向channel变量的声明一点都不简单,如下:

声明

	// ch1是一个正常的channel,不是单向的
	var ch1 chan int 
	// ch2是一个单方向channel,只用于写float64的数据
	var ch2 chan <- float64
	// ch3是单向channel,只用于读int类型的数据
	var ch3 <- chan int

	// 1、双向channel能隐士的转换为单项channel
	// 2、单向channel不能转为双向channel
	ch := make(chan int)
	var writeCh chan <- int = ch // 只能写,不能读
	var readCh <- chan int = ch  // 只能读,不能写
  • chan<-表示数据进入管道,要把数据写进管道,对于调用者就是输出。
  • <-chan表示数据从管道出来,对于调用者就是得到管道的数据,当然就是输入。

注意:一般实际中可以做一个消费队列来使用,make一个channel,然后将其转为2个channel,一个只能写(生产者),一个只能读(消费者)。

练习:交替打印

比比了那么多,来点实际的,两个协程交替打印数字要求有序打印出0,1,2,3,4,5,6,7…

使用两个channel

package main

import (
	"fmt"
	"sync"
)


var wg sync.WaitGroup
var CA chan int
var CB chan int

func main(){
	wg = sync.WaitGroup{}
	CA = make(chan int,1)
	CB = make(chan int)
	wg.Add(2)

	go A()
	go B()
	CA<-1
	wg.Wait()

}

func A(){

	for i:=0;i<5;i++{
		<-CA
		fmt.Println(2*i)
		CB<-1
	}
	wg.Done()

}

func B(){

	for i:=0;i<5;i++{
		<-CB
		fmt.Println(2*i+1)
		CA<-1
	}
	wg.Done()

}

使用一个channel

package main

import (
   "fmt"
   "sync"
)

var wg sync.WaitGroup
var CA chan int

func main(){
   wg = sync.WaitGroup{}
   CA = make(chan int)

   wg.Add(2)

   go A()
   go B()
   wg.Wait()

}

func A(){

   for i:=0;i<10;i++{
      CA<-1
      if i%2 == 0{
         fmt.Println(i)
      }
   }
   wg.Done()

}

func B(){

   for i:=0;i<10;i++{
      <-CA
      if i%2 == 1{
         fmt.Println(i)
      }

   }
   wg.Done()

}

不使用channel

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup
func main(){
	wg.Add(2)
	i:=0
	go func(){
	defer wg.Done()
	for {
		if i>100{
			break
		}
		if i%2 == 0{

			fmt.Printf("a: %d\n",i)
			i++
		}
	}
	}()

	go func(){
		defer wg.Done()
		for {
			if i>100{
				break
			}
			if i%2 == 1{
				fmt.Printf("b: %d\n",i)
				i++
			}
		}
	}()
	wg.Wait()
}


定时器

定时器有两种

  • timer:时间到了只会执行一次
  • ticker:时间到了会按定时的周期去执行,就跟闹钟一样

与js中的定时器setInterval(执行多次),setTimeout(执行一次)有些类似

timer

Timer是一个定时器,代表未来的一个单一事件,你可以告诉timer你要等待多长时间,它提供一个channel,在将来的那个时间那个channel提供了一个时间值。

设置一个定时器

不比比,上代码

package main

import (
	"fmt"
	"time"
)

func main() {
	// 创建一个定时器,设置时间为2s,2s后向time通道中写内容
	timer := time.NewTimer(2 * time.Second)

	// 2s后向timer.C里面写数据,有数据后就可以读取
	t := <-timer.C // channel没有数据前会阻塞
	fmt.Println("t=", t)
}
	// 这个方法返回了一个 Time类型的channel
	// 本质就是一个time的管道定时器,与上述代码效果相同
	<- time.After(2 * time.Second)

源码

// After waits for the duration to elapse and then sends the current time
// on the returned channel.
// It is equivalent to NewTimer(d).C.
// The underlying Timer is not recovered by the garbage collector
// until the timer fires. If efficiency is a concern, use NewTimer
// instead and call Timer.Stop if the timer is no longer needed.
func After(d Duration) <-chan Time {
	return NewTimer(d).C
}
// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.
func NewTimer(d Duration) *Timer {
	c := make(chan Time, 1)
	t := &Timer{
		C: c,
		r: runtimeTimer{
			when: when(d),
			f:    sendTime,
			arg:  c,
		},
	}
	startTimer(&t.r)
	return t
}

综上所述,实现延迟功能有3种方法

  • 1、NewTimer
  • 2、time.After
  • 3、time.sleep
定时器的停止与重置
	timer := time.NewTimer(2 * time.Second)
	// 重置定时器为1秒执行1次
	timer.Reset(1 * time.Second)
	// 停止定时器
	timer.Stop()

ticker

ticker:周期触发,如下代码

package main

import (
	"fmt"
	"time"
)

func main() {
	timer := time.NewTicker(1 * time.Second)
	for {
		t := <- timer.C
		fmt.Println("回首向来萧瑟处, t=", t)
	}
}

控制台

回首向来萧瑟处, t= 2022-07-31 21:26:09.501158 +0800 CST m=+1.000744959
回首向来萧瑟处, t= 2022-07-31 21:26:10.505467 +0800 CST m=+2.005101668
回首向来萧瑟处, t= 2022-07-31 21:26:11.500597 +0800 CST m=+3.000279834
回首向来萧瑟处, t= 2022-07-31 21:26:12.501435 +0800 CST m=+4.001166043
回首向来萧瑟处, t= 2022-07-31 21:26:13.505104 +0800 CST m=+5.004883751
回首向来萧瑟处, t= 2022-07-31 21:26:14.504973 +0800 CST m=+6.004800626
回首向来萧瑟处, t= 2022-07-31 21:26:15.511108 +0800 CST m=+7.010983751
回首向来萧瑟处, t= 2022-07-31 21:26:16.500979 +0800 CST m=+8.000902376
回首向来萧瑟处, t= 2022-07-31 21:26:17.504444 +0800 CST m=+9.004415626
回首向来萧瑟处, t= 2022-07-31 21:26:18.500856 +0800 CST m=+10.000876459
回首向来萧瑟处, t= 2022-07-31 21:26:19.503042 +0800 CST m=+11.003110001
回首向来萧瑟处, t= 2022-07-31 21:26:20.505133 +0800 CST m=+12.005250043
... ...

定时器的停止

package main

import (
	"fmt"
	"time"
)

func main() {
	timer := time.NewTicker(1 * time.Second)

	i := 0
	for {
		t := <-timer.C
		fmt.Println("回首向来萧瑟处, t=", t)
		i ++
		if i == 4 {
			// 停止定时器
			timer.Stop()
			// 跳出循环
			break
		}
	}
}

控制台,只打印4次

回首向来萧瑟处, t= 2022-07-31 21:31:01.254588 +0800 CST m=+1.001486293
回首向来萧瑟处, t= 2022-07-31 21:31:02.257348 +0800 CST m=+2.004294876
回首向来萧瑟处, t= 2022-07-31 21:31:03.254233 +0800 CST m=+3.001227376
回首向来萧瑟处, t= 2022-07-31 21:31:04.254313 +0800 CST m=+4.001356001
DIDI-C02G51DQQ05G:test didi$ 

select

Go里面提供了一个关键字select,通过select可以监听channel上的数据流动

select的用法与switch语言非常类似,由select开始一个新的选择块,每个选择条件由case语句来描述。

与switch语句可以选择任何可使用相等比较的条件相比, select有比较多的限制,其中最大的一条限制就是每个case语句里必须是一个IO操作,大致的结构如下:

  select {
    case <-chan1:
        // 如果chan1成功读到数据,则进行该case处理语句
    case chan2 <- 1:
        // 如果成功向chan2写入数据,则进行该case处理语句
    default:
        // 如果上面都没有成功,则进入default处理流程
        // 但是一般都不写default,因为如果上面的case不满足条件那么就阻塞住了,
        // 但是,如果有default那么它就不会阻塞会一直执行,太耗cpu资源了
  }

注:select里的每个case是无序执行的。

在一个select语句中,Go语言会按顺序从头至尾评估每一个发送和接收的语句。

如果其中的任意一语句可以继续执行(即没有被阻塞),那么就从那些可以执行的语句中任意选择一条来使用。

如果没有任意一条语句可以执行(即所有的通道都被阻塞),那么有两种可能的情况:

  • 如果给出了default语句,那么就会执行default语句,同时程序的执行会从select语句后的语句中恢复。
  • 如果没有default语句,那么select语句将被阻塞,直到至少有一个通信可以进行下去。

select 实现斐波那契数列

package main

import (
	"fmt"
)

// ch生产者:只往channel里写数据
// quit:判断程序是否执行完毕,只读取数据
func fibonacii(ch chan<- int, quit <-chan bool) {
	x, y := 1, 1

	for {
		// 监听channel数据的流动
		select {
		case ch <- x:
			// golang实现变量的数据交换竟如此简单,这波语法糖,甜到了
			x, y = y, x+y
		case <-quit:
			return
		}
	}

}

func main() {
	ch := make(chan int)    // 数字通信
	quit := make(chan bool) // 程序是否结束

	// 消费者,从协程中消费数据
	go func() {
		for i := 0; i < 8; i++ {
			num := <-ch
			fmt.Println("consumer, num=", num)
		}
		// 可以停止
		quit <- true
	}() // 别忘了()
	// 生产者,产生数据,写入channel
	fibonacii(ch, quit)
}

控制台

consumer, num= 1
consumer, num= 1
consumer, num= 2
consumer, num= 3
consumer, num= 5
consumer, num= 8
consumer, num= 13
consumer, num= 21
DIDI-C02G51DQQ05G:test didi$ 

将上面的消费者换成协程,也是没问题的,但是要注意不要让主函数结束
如下

// 生产者,产生数据,写入channel
package main

import (
	"fmt"
)

// ch生产者:只往channel里写数据
// quit:判断程序是否执行完毕,只读取数据
func fibonacii(ch chan<- int, quit <-chan bool) {
	x, y := 1, 1

	for {
		// 监听channel数据的流动
		select {
		case ch <- x:
			// golang实现变量的数据交换竟如此简单,这波语法糖,甜到了
			x, y = y, x+y
		case <-quit:
			return
		}
	}

}

func main() {
	ch := make(chan int)    // 数字通信
	quit := make(chan bool) // 程序是否结束

	// 消费者,从协程中消费数据
	go func() {
		for i := 0; i < 8; i++ {
			num := <-ch
			fmt.Println("consumer, num=", num)
		}
		// 可以停止
		quit <- true
	}()
	// 生产者,产生数据,写入channel
	go fibonacii(ch, quit)
	for {
	}
}

分析,消费者里只读取8次,因为是无缓存的channle,8次之后生产者的ch管道被阻塞(因为没人消费)。其次,消费者在8次之后会向quit管道里写数据,然后生产者的ch管道被阻塞,但是quit管道里有数据,被消费,于是return,整个程序结束。

select 实现超时机制

有时候会出现goroutine阻塞的情况,比如一个协程长时间阻塞,或者一个无缓存的channel里的数据长时间没人读,或者一个缓存区满的channel数据无人去读就会陷入阻塞,长时间阻塞不太好,比如阻塞了多长时间,我就把你处理掉,或者来个兜底逻辑,那么我们如何避免整个程序进入阻塞的情况呢?我们可以利用select来设置超时,通过如下的方式实现:

func main() {
	ch := make(chan int)
	quit := make(chan bool)

	go func() {
		for {
			select {
			case num := <-ch:
				fmt.Println("打印数据,num=", num)
			case <-time.After(3 * time.Second):
				fmt.Println("超时")
				quit <- true
			}
		}
	}()

	<- quit
	fmt.Println("程序结束")
}

控制台

超时
程序结束

分析:
如上程序,并没有向ch管道中写入数据,所以就读取不出来,所以case num := <-ch:一直在阻塞,然后select里的case是无序运行的,其次如果其中的任意一语句可以继续执行(即没有被阻塞),那么就从那些可以执行的语句中任意选择一条来使用。所以,两种情况,
情况1:先执行到case num := <-ch然后被阻塞,然后再执行到case <-time.After(3 * time.Second):,然后定时3秒,然后结束程序运行。
情况2::先执行到case <-time.After(3 * time.Second): 然后程序设置3秒后就会向time管道写数据,然后就会触发case,所以以上代码不管有没有向管道ch里写入数据,程序都会在3s或者3s多一些(情况1)结束掉。










学习笔记,不喜勿喷

业精于勤荒于嬉,行成于思毁于随