大家好,我是小道哥。
今天给大家带来的面试专题是:Golang Channel
channel的底层数据结构
channel是golang中用来实现多个goroutine通信的管道,它的底层是一个叫做hchan的结构体。在go的runtime包下。
数据结构
type hchan struct {
//channel分为无缓冲和有缓冲两种。
//对于有缓冲的channel存储数据,借助的是如下循环数组的结构
qcount uint // 循环数组中的元素数量
dataqsiz uint // 循环数组的长度
buf unsafe.Pointer // 指向底层循环数组的指针
elemsize uint16 //能够收发元素的大小
closed uint32 //channel是否关闭的标志
elemtype *_type //channel中的元素类型
//有缓冲channel内的缓冲数组会被作为一个“环型”来使用。
//当下标超过数组容量后会回到第一个位置,所以需要有两个字段记录当前读和写的下标位置
sendx uint // 下一次发送数据的下标位置
recvx uint // 下一次读取数据的下标位置
//当循环数组中没有数据时,收到了接收请求,那么接收数据的变量地址将会写入读等待队列
//当循环数组中数据已满时,收到了发送请求,那么发送数据的变量地址将写入写等待队列
recvq waitq // 读等待队列
sendq waitq // 写等待队列
lock mutex //互斥锁,保证读写channel时不存在并发竞争问题
}
对应图解如下:
总结hchan结构体的主要组成部分有四个:
- 用来保存goroutine之间传递数据的循环链表。=====> buf。
- 用来记录此循环链表当前发送或接收数据的下标值。=====> sendx和recvx。
- 用于保存向该chan发送和从改chan接收数据的goroutine的队列。=====> sendq 和 recvq
- 保证channel写入和读取数据时线程安全的锁。 =====> lock
举个栗子
//G1
func sendTask(taskList []Task) {
...
ch:=make(chan Task, 4) // 初始化长度为4的channel
for _,task:=range taskList {
ch <- task //发送任务到channel
}
...
}
//G2
func handleTask(ch chan Task) {
for {
task:= <-ch //接收任务
process(task) //处理任务
}
}
ch是长度为4的带缓冲的channel,G1是发送者,G2是接收者
- 初始hchan结构体重的buf为空,sendx和recvx均为0。
- 当G1向ch里发送数据时,首先会对buf加锁,然后将数据copy到buf中,然后sendx++,然后释放对buf的锁。
- 当G2消费ch的时候,会首先对buf加锁,然后将buf中的数据copy到task变量对应的内存里,然后recvx++,并释放锁。
可以发现整个过程,G1和G2没有共享的内存,底层是通过hchan结构体的buf,并使用copy内存的方式进行通信,最后达到了共享内存的目的,这里也体现了Go中的CSP并发模型。
Go中的CSP并发模型即是通过goroutine和channel实现的。
CSP并发模型:不要以共享内存的方式来通信,相反,要通过通信的方式来共享内存。
那么当channel中的缓存满了之后会发生什么呢?
首先简单了解下GMP的概念。相关的数据模型如下图:
【G】goroutine是Golang实现的用户空间的轻量级的线程
【M】代表操作系统线程
【P】处理器, 它包含了待运行goroutine。
如果线程M想运行goroutine,必须先获取P,从P中获取goroutine执行。
当G1向buf已经满了的ch发送数据的时候,检测到hchan的buf已经满了,会通知调度器,调度器会将G1的状态设置为waiting, 并移除与线程M的联系,然后从P的runqueue中选择一个goroutine在线程M中执行,此时G1就是阻塞状态,但是不是操作系统的线程阻塞,所以这个时候只用消耗少量的资源。
那么G1设置为waiting状态后去哪了?怎们去resume呢?我们再回到hchan结构体,注意到hchan有个sendq的成员,其类型是waitq,查看源码如下:
type hchan struct {
...
recvq waitq // 读等待队列
sendq waitq // 写等待队列
...
}
type waitq struct {
first *sudog
last *sudog
}
实际上,当G1变为waiting状态后,会创建一个代表自己的sudog的结构,然后放到sendq这个list中,sudog结构中保存了channel相关的变量的指针(如果该Goroutine是sender,那么保存的是待发送数据的变量的地址,如果是receiver则为接收数据的变量的地址,之所以是地址,前面我们提到在传输数据的时候使用的是copy的方式)
当G2从ch中接收一个数据时,会通知调度器,设置G1的状态为runnable,然后将加入P的runqueue里,等待线程执行.
func G2(){
t := <-ch
}
前面我们是假设G1先运行,如果G2先运行会怎么样呢?
如果G2先运行,那么G2会从一个empty的channel里取数据,这个时候G2就会阻塞,和前面介绍的G1阻塞一样,G2也会创建一个sudog结构体,保存接收数据的变量的地址,但是该sudog结构体是放到了recvq列表里。
当G1向ch发送数据的时候,为了提升效率,runtime并不会对hchan结构体题的buf进行加锁,而是直接将G1里的发送到ch的数据copy到了G2 sudog里对应的elem指向的内存地址!【不通过buf】
这时候,张三道友抛出了三个问题:
func main(){
cache:=make(chan int,3)
go func() {
for i:=0;i< 3;i++ {
cache<-i
}
}()
time.Sleep(time.Second)
//休眠1秒钟,保证channel中的数据已经写入完整
go getCache("gorouine1",cache)
go getCache("gorouine2",cache)
go getCache("gorouine3",cache)
time.Sleep(time.Second)
}
func getCache(routine string,cache <-chan int) {
for {
select {
case i:=<-cache:
fmt.Printf("%s:%d\n",routine,i)
}
}
}
gorouine1:0
gorouine2:1
gorouine3:2
$go run main.go
gorouine3:1
gorouine2:2
gorouine1:0
channel的用法
使用for range 读取channel
for i := range ch{
fmt.Println(i)
}
for-range
使用_,ok判断channel是否关闭
if v, ok := <- ch; ok {
fmt.Println(v)
}
oktruefalse
使用select处理多个channel
for{
select {
case <-ch1:
process1()
return
case <-ch2:
process2()
return
}
}
select
关闭channel的注意事项
-
一个 channel不能多次关闭,会导致painc
-
向一个已经关闭了的 channel发送数据会导致panic
面试点总结
- Go channel的底层数据结构以及工作原理?
- 向缓存满的channel写数据会发生什么?
- 向没有数据的channel读数据会发生什么?
- 简述channel的日常用法?
后语
如果大家对本文提到的面试技术点有任何问题,都可以在评论区进行回复哈,我们共同学习,一起进步!
关注公众号[简道编程],每天一个后端技术面试点