上述实现并发的代码中为了保持主线程不挂掉,我们都会在最后写上一个死循环或者写上一个定时器来实现等待 goroutine 执行完毕

上述实现并发的代码中为了解决生产者消费者资源同步问题,我们利用加锁来解决,但是这仅仅是一对一的情况,如果是一对多或者多对多,上述代码还是会出现问题

管道(Channel)

Channel 的本质是一个队列

Channel 是线程安全的,也就是自带锁定功能

Channel 声明和初始化

var 变量名chan 数据类型mych := make(chan 数据类型, 容量)
package main
import "fmt"
func main() {
	// 1.声明一个管道
	var mych chan int
	// 2.初始化一个管道
	mych = make(chan int, 3)
	// 3.查看管道的长度和容量
	fmt.Println("长度是", len(mych), "容量是", cap(mych))
	// 4.像管道中写入数据
	mych<- 666
	fmt.Println("长度是", len(mych), "容量是", cap(mych))
	// 5.取出管道中写入的数据
	num := <-mych
	fmt.Println("num = ", num)
	fmt.Println("长度是", len(mych), "容量是", cap(mych))
}

注意点:

  • 管道中只能存放声明的数据类型,不能存放其它数据类型
  • 管道中如果已经没有数据,再取就会报错
  • 如果管道中数据已满,再写入就会报错
package main

import "fmt"

func main() {
	// 1.声明一个管道
	var mych chan int
	// 2.初始化一个管道
	mych = make(chan int, 3)

	// 注意点: 管道中只能存放声明的数据类型, 不能存放其它数据类型
	//mych<-3.14

	// 注意点: 管道中如果已经没有数据, 
	// 并且检测不到有其它协程再往管道中写入数据, 那么再取就会报错
	//num = <-mych
	//fmt.Println("num = ", num)

	// 注意点: 如果管道中数据已满, 再写入就会报错
	mych<- 666
	mych<- 777
	mych<- 888
	mych<- 999
}

管道的关闭和遍历

package main

import "fmt"

func main() {
	// 1.创建一个管道
	mych := make(chan int, 3)
	// 2.往管道中存入数据
	mych<-666
	mych<-777
	mych<-888
	// 3.遍历管道
	// 第一次遍历i等于0, len = 3,
	// 第二次遍历i等于1, len = 2
	// 第三次遍历i等于2, len = 1
	//for i:=0; i<len(mych); i++{
	//	fmt.Println(<-mych) // 输出结果不正确
	//}

	// 3.写入完数据之后先关闭管道
	// 注意点: 管道关闭之后只能读不能写
	close(mych)
	//mych<- 999 // 报错

	// 4.遍历管道
	// 利用for range遍历, 必须先关闭管道, 否则会报错
	//for value := range mych{
	//	fmt.Println(value)
	//}

	// close主要用途:
	// 在企业开发中我们可能不确定管道有还没有有数据, 所以我们可能一直获取
	// 但是我们可以通过ok-idiom模式判断管道是否关闭, 如果关闭会返回false给ok
	for{
		if num, ok:= <-mych; ok{
			fmt.Println(num)
		}else{
			break;
		}
	}
	fmt.Println("数据读取完毕")
}

Channel 阻塞现象

  • 单独在主线程中操作管道,写满了会报错,没有数据去获取也会报错
  • 只要在协程中操作管道过,写满了就会阻塞,没有就数据去获取也会阻塞
package main
import (
	"fmt"
	"time"
)
// 创建一个管道
var myCh = make(chan int, 5)
func demo()  {
	var myCh = make(chan int, 5)
	//myCh<-111
	//myCh<-222
	//myCh<-333
	//myCh<-444
	//myCh<-555
	//fmt.Println("我是第六次添加之前代码")
	//myCh<-666
	//fmt.Println("我是第六次添加之后代码")

	fmt.Println("我是第六次直接获取之前代码")
	<-myCh
	fmt.Println("我是第六次直接获取之后代码")
}
func test()  {
	//myCh<-111
	//myCh<-222
	//myCh<-333
	//myCh<-444
	//myCh<-555
	//fmt.Println("我是第六次添加之前代码")
	//myCh<-666
	//fmt.Println("我是第六次添加之后代码")

	//fmt.Println("我是第六次直接获取之前代码")
	//<-myCh
	//fmt.Println("我是第六次直接获取之后代码")
}
func example()  {
	time.Sleep(time.Second * 2)
	myCh<-666
}
func main() {
	// 1.同一个go程中操作管道
	// 写满了会报错
	//myCh<-111
	//myCh<-222
	//myCh<-333
	//myCh<-444
	//myCh<-555
	//myCh<-666

	// 没有了去取也会报错
	//<-myCh

	// 2.在协程中操作管道
	// 写满了不会报错, 但是会阻塞
	//go test()

	// 没有了去取也不会报错, 也会阻塞
	//go test()

	//go demo()
	//go demo()
	
	// 3.只要在协程中操作了管道, 就会发生阻塞现象
	go example()
	fmt.Println("myCh之前代码")
	<-myCh
	fmt.Println("myCh之后代码")

	//for{
	//	;
	//}
}

利用 Channel 实现生产者消费者

package main

import (
	"fmt"
	"math/rand"
	"time"
)
// 定义缓冲区
var myCh = make(chan int, 5)
var exitCh = make(chan bool, 1)

// 定义生产者
func producer(){
	rand.Seed(time.Now().UnixNano())
	for i:=0;i<10;i++{
		num := rand.Intn(100)
		fmt.Println("生产者生产了: ", num)
		// 往管道中写入数据
		myCh<-num
		//time.Sleep(time.Millisecond * 500)
	}
	// 生产完毕之后关闭管道
	close(myCh)
	fmt.Println("生产者停止生产")
}
// 定义消费者
func consumer()  {
	// 不断从管道中获取数据, 直到管道关闭位置
	for{
		if num, ok := <-myCh; !ok{
			break
		}else{
			fmt.Println("---消费者消费了", num)
		}
	}
	fmt.Println("消费者停止消费")
	exitCh<-true
}

func main() {
	go producer()
	go consumer()
	fmt.Println("exitCh之前代码")
	<-exitCh
	fmt.Println("exitCh之后代码") 
}

无缓冲 Channel

package main
import "fmt"
var myCh1 = make(chan int, 5)
var myCh2 = make(chan int, 0)
func main() {
	// 有缓冲管道
	// 只写入, 不读取不会报错
	//myCh1<-1
	//myCh1<-2
	//myCh1<-3
	//myCh1<-4
	//myCh1<-5
	//fmt.Println("len =",len(myCh1), "cap =", cap(myCh1))

	// 无缓冲管道
	// 只有两端同时准备好才不会报错
	go func() {
		fmt.Println(<-myCh2)
	}()
	// 只写入, 不读取会报错
	myCh2<-1
	//fmt.Println("len =",len(myCh2), "cap =", cap(myCh2))
	// 写入之后在同一个线程读取也会报错
	//fmt.Println(<-myCh2)
	// 在主程中先写入, 在子程中后读取也会报错
	//go func() {
	//	fmt.Println(<-myCh2)
	//}()
}

无缓冲 Channel 和有缓冲 Channel

  • 有缓冲管道具备异步的能力(写几个读一个或读几个)
  • 无缓冲管道具备同步的能力(写一个读一个)
package main
import (
	"fmt"
	"math/rand"
	"time"
)
// 定义缓冲区
//var myCh = make(chan int, 0)
var myCh = make(chan int)
var exitCh = make(chan bool, 1)

// 定义生产者
func producer(){
	rand.Seed(time.Now().UnixNano())
	for i:=0;i<10;i++{
		num := rand.Intn(100)
		fmt.Println("生产者生产了: ", num)
		// 往管道中写入数据
		myCh<-num
		//time.Sleep(time.Millisecond * 500)
	}
	// 生产完毕之后关闭管道
	close(myCh)
	fmt.Println("生产者停止生产")
}
// 定义消费者
func consumer()  {
	// 不断从管道中获取数据, 直到管道关闭位置
	for{
		if num, ok := <-myCh; !ok{
			break
		}else{
			fmt.Println("---消费者消费了", num)
		}
	}
	fmt.Println("消费者停止消费")
	exitCh<-true
}

func main() {
	go producer()
	go consumer()
	fmt.Println("exitCh之前代码")
	<-exitCh
	fmt.Println("exitCh之后代码")
}

IO 的延迟说明:看到的输出结果和我们想象的不太一样,是因为IO输出非常消耗性能,输出之后还没来得及赋值可能就跑去执行别的协程了


单向管道和双向管道

  • 默认情况下所有管道都是双向了(可读可写)
  • 但是在企业开发中, 我们经常需要用到将一个管道作为参数传递
  • 在传递的过程中希望对方只能单向使用, 要么只能写,要么只能读

双向管道

var myCh chan int = make(chan int, 0)

单向管道

var myCh chan<- int = make(chan<- int, 0)
var myCh <-chan int = make(<-chan int, 0)

注意点:

  • 双向管道可以自动转换为任意一种单向管道
  • 单向管道不能转换为双向管道
package main

import "fmt"

func main() {
	// 1.定义一个双向管道
	var myCh chan int = make(chan int, 5)

	// 2.将双向管道转换单向管道
	var myCh2 chan<- int
	myCh2 = myCh
	fmt.Println(myCh2)
	var myCh3 <-chan int
	myCh3 = myCh
	fmt.Println(myCh3)

	// 3.双向管道,可读可写
	myCh<-1
	myCh<-2
	myCh<-3
	fmt.Println(<-myCh)
	
	// 3.只写管道,只能写, 不能读
	//	myCh2<-666
	//	fmt.Println(<-myCh2)

	// 4.指读管道, 只能读,不能写
	fmt.Println(<-myCh3)
	//myCh3<-666
	
	// 注意点: 管道之间赋值是地址传递, 以上三个管道底层指向相同容器
}

单向管道作为函数参数

package main
import (
	"fmt"
	"math/rand"
	"time"
)
// 定义生产者
func producer(myCh chan<- int){
	rand.Seed(time.Now().UnixNano())
	for i:=0;i<10;i++{
		num := rand.Intn(100)
		fmt.Println("生产者生产了: ", num)
		// 往管道中写入数据
		myCh<-num
		//time.Sleep(time.Millisecond * 500)
	}
	// 生产完毕之后关闭管道
	close(myCh)
	fmt.Println("生产者停止生产")
}
// 定义消费者
func consumer(myCh <-chan int)  {
	// 不断从管道中获取数据, 直到管道关闭位置
	for{
		if num, ok := <-myCh; !ok{
			break
		}else{
			fmt.Println("---消费者消费了", num)
		}
	}
	fmt.Println("消费者停止消费")

}

func main() {
	// 定义缓冲区
	var myCh = make(chan int, 5)
	go producer(myCh)
	consumer(myCh)
}