2624e806a6fe49f03607c89ff74db645.png

上一章中对于golang的语言基础说明如下:

  • 1 函数调用
  • 2 接口
  • 3 反射

接下来我们来对golang的常用关键字进行说明,主要内容有:

  • 1. for 和 range
  • 2. select
  • 3. defer
  • 4. panic 和 recover
  • 5. make 和 new
selectselectpollepollselectselectselect
selectselectselect
e3dd798a9baaf400712cdb8082b96c2e.png
图 - Select 和 Channels
selectswitchswitchselectcasecaseselect
func fibonacci(c, quit chan int) {
	x, y := 0, 1
	for {
		select {
		case c <- x:
			x, y = y, x+y
		case <-quit:
			fmt.Println("quit")
			return
		}
	}
}
c <- x<-quitcaseselectcasecase

1. 现象

select
selectselectcase
select

非阻塞的收发

selectselectdefaultselect
casedefault
default
func main() {
	ch := make(chan int)
	select {
	case i := <-ch:
		println(i)

	default:
		println("default")
	}
}

$ go run main.go
default
selectcasedefault

非阻塞的 Channel 发送和接收操作还是很有必要的,在很多场景下我们不希望向 Channel 发送消息或者从 Channel 中接收消息会阻塞当前 Goroutine,我们只是想看看 Channel 的可读或者可写状态。下面就是一个常见的例子:

errCh := make(chan error, len(tasks))
wg := sync.WaitGroup{}
wg.Add(len(tasks))
for i := range tasks {
    go func() {
        defer wg.Done()
        if err := tasks[i].Run(); err != nil {
            errCh <- err
        }
    }()
}
wg.Wait()

select {
case err := <-errCh:
    return err
default:
    return nil
}
selectselectx, ok := <-c
selectdefaultselectx, ok := <-cx, ok := <-cclosed(c)

我们可以从上面的几个提交中看到非阻塞收发从最初到现在的演变。

随机执行

selectcaseselectcase
func main() {
	ch := make(chan int)
	go func() {
		for range time.Tick(1 * time.Second) {
			ch <- 0
		}
	}()

	for {
		select {
		case <-ch:
			println("case1")
		case <-ch:
			println("case2")
		}
	}
}

$ go run main.go
case1
case2
case1
...
select<-chcase
case

2. 数据结构

selectselectcaseruntime.scase
type scase struct {
	c           *hchan
	elem        unsafe.Pointer
	kind        uint16
	pc          uintptr
	releasetime int64
}
caseruntime.scaseruntime.hchancaseelemkindruntime.scase
const (
	caseNil = iota
	caseRecv
	caseSend
	caseDefault
)
case

3. 实现原理

selectOSELECTOSELECTOCASEOCASEdefault
bc6bc5d72499d81e7a0041f779fe686b.png
图 - OSELECT 和多个 OCASE
selectOCASE
selectcasecmd/compile/internal/gc.walkselectcases
selectcaseselectcaseselectcasecasedefaultselectcase

上述的四种情况不仅会涉及编译器的重写和优化,还会涉及 Go 语言的运行时机制,我们会从编译期间和运行时两方面分析上述情况。

直接阻塞

selectcasecmd/compile/internal/gc.walkselectcases
func walkselectcases(cases *Nodes) []*Node {
	n := cases.Len()

	if n == 0 {
		return []*Node{mkcall("block", nil, nil)}
	}
	...
}
select {}runtime.block
func block() {
	gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1)
}
runtime.blockruntime.goparkwaitReasonSelectNoCases
select

单一管道

selectcaseselectifselect
// 改写前
select {
case v, ok <-ch: // case ch <- v
    ...    
}

// 改写后
if ch == nil {
    block()
}
v, ok := <-ch // case ch <- v
...
cmd/compile/internal/gc.walkselectcasesselectcase

非阻塞操作

selectcasedefaultcmd/compile/internal/gc.walkselectcasescase

发送

caseOSENDif/elseruntime.selectnbsend
select {
case ch <- i:
    ...
default:
    ...
}

if selectnbsend(ch, i) {
    ...
} else {
    ...
}
runtime.selectnbsendruntime.chansendblock
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
	return chansend(c, elem, false, getcallerpc())
}
runtime.chansendfalse

接收

由于从 Channel 中接收数据可能会返回一个或者两个值,所以接受数据的情况会比发送稍显复杂,不过改写的套路是差不多的:

// 改写前
select {
case v <- ch: // case v, ok <- ch:
    ......
default:
    ......
}

// 改写后
if selectnbrecv(&v, ch) { // if selectnbrecv2(&v, &ok, ch) {
    ...
} else {
    ...
}
runtime.selectnbrecvruntime.selectnbrecv2runtime.chanrecv
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
	selected, _ = chanrecv(c, elem, false)
	return
}

func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
	selected, *received = chanrecv(c, elem, false)
	return
}
runtime.selectnbrecvruntime.selectnbrecv2runtime.chansendruntime.chanrecvblock

常见流程

select
caseruntime.scaseruntime.selectgoruntime.scaseforifcase
caseselect
selv := [3]scase{}
order := [6]uint16
for i, cas := range cases {
    c := scase{}
    c.kind = ...
    c.elem = ...
    c.c = ...
}
chosen, revcOK := selectgo(selv, order, 3)
if chosen == 0 {
    ...
    break
}
if chosen == 1 {
    ...
    break
}
if chosen == 2 {
    ...
    break
}
caseruntime.selectgo
casecase

初始化

runtime.selectgocasepollOrderlockOrder
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
	cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
	order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
	
	scases := cas1[:ncases:ncases]
	pollorder := order1[:ncases:ncases]
	lockorder := order1[ncases:][:ncases:ncases]
	for i := range scases {
		cas := &scases[i]
	}

	for i := 1; i < ncases; i++ {
		j := fastrandn(uint32(i + 1))
		pollorder[i] = pollorder[j]
		pollorder[j] = uint16(i)
	}

	// 根据 Channel 的地址排序确定加锁顺序
	...
	sellock(scases, lockorder)
	...
}
pollOrderlockOrder
runtime.fastrandn
runtime.sellockselect

循环

selectruntime.selectgo
  1. 查找是否已经存在准备就绪的 Channel,即可以执行收发操作;
  2. 将当前 Goroutine 加入 Channel 对应的收发队列上并等待其他 Goroutine 的唤醒;
  3. 当前 Goroutine 被唤醒之后找到满足条件的 Channel 并进行处理;
runtime.selectgogoto
bufrecvbufsendrecvsendrclosescloseretc
caseruntime.sudogcase
caseNilcase
case
caseRecvcase
sendqrecvbufrecvrclose
caseSendcase
sclosepanicrecvqsend
caseDefaultcasedefault
caseselect
3542cdea3b70032c05ce7627e1b2a09c.png
图 - 运行时 selectgo 函数
casesendqrecvq
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
	...
	gp = getg()
	nextp = &gp.waiting
	for _, casei := range lockorder {
		casi = int(casei)
		cas = &scases[casi]
		c = cas.c
		sg := acquireSudog()
		sg.g = gp
		sg.c = c

		switch cas.kind {
		case caseRecv:
			c.recvq.enqueue(sg)
		case caseSend:
			c.sendq.enqueue(sg)
		}
	}

	gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
	...
}
runtime.sudogruntime.sudogruntime.gopark
15b48a35b570d96dd763bb5a1fab4ae0.png
图 - Goroutine 上等待收发的 sudog 链表
selectruntime.selectgoruntime.sudog
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
	...
	sg = (*sudog)(gp.param)
	gp.param = nil

	casi = -1
	cas = nil
	sglist = gp.waiting
	for _, casei := range lockorder {
		k = &scases[casei]
		if sg == sglist {
			casi = int(casei)
			cas = k
		} else {
			if k.kind == caseSend {
				c.sendq.dequeueSudoG(sglist)
			} else {
				c.recvq.dequeueSudoG(sglist)
			}
		}
		sgnext = sglist.waitlink
		sglist.waitlink = nil
		releaseSudog(sglist)
		sglist = sgnext
	}

	c = cas.c
	goto retc
	...
}
casesudogcasesudogcasecase
selectcasecasesudogsudog
gotobufrecvbufsend
bufrecv:
	recvOK = true
	qp = chanbuf(c, c.recvx)
	if cas.elem != nil {
		typedmemmove(c.elemtype, cas.elem, qp)
	}
	typedmemclr(c.elemtype, qp)
	c.recvx++
	if c.recvx == c.dataqsiz {
		c.recvx = 0
	}
	c.qcount--
	selunlock(scases, lockorder)
	goto retc

bufsend:
	typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
	c.sendx++
	if c.sendx == c.dataqsiz {
		c.sendx = 0
	}
	c.qcount++
	selunlock(scases, lockorder)
	goto retc
runtime.chansendruntime.chanrecvretc
runtime.sendruntime.recv
recv:
	recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	recvOK = true
	goto retc

send:
	send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	goto retc

不过如果向关闭的 Channel 发送数据或者从关闭的 Channel 中接收数据,情况就稍微有一点复杂了:

panic
rclose:
	selunlock(scases, lockorder)
	recvOK = false
	if cas.elem != nil {
		typedmemclr(c.elemtype, cas.elem)
	}
	goto retc

sclose:
	selunlock(scases, lockorder)
	panic(plainError("send on closed channel"))
selectselectdefault

4. 小结

selectselectselectcase
selectruntime.blockselectcaseif ch == nil { block }; n;
case
selectcasedefaultruntime.selectnbrecvruntime.selectnbsendruntime.selectgocaseifcase
selectruntime.selectgo
pollOrderlockOrderpollOrdercasecaseruntime.sudogruntime.goparklockOrdercaseruntime.sudog
select

全套教程点击下方链接直达:

IT实战:Go语言设计与实现自学教程​zhuanlan.zhihu.com
cf09aa08c67ecfc1adfb1ac21e15af9f.png