说明
前面的章节我们基本聊完了golang网络编程的关键API流程,但遗留了一个关键内容:当系统调用返回EAGAIN时,会调用WaitRead/WaitWrite来阻塞当前协程,现在我们接着聊。
WaitRead/WaitWrite
func (pd *pollDesc) Wait(mode int) error { res := runtime_pollWait(pd.runtimeCtx, mode) return convertErr(res) } func (pd *pollDesc) WaitRead() error { return pd.Wait('r') } func (pd *pollDesc) WaitWrite() error { return pd.Wait('w') }
最终runtime_pollWait走到下面去了:
TEXT net·runtime_pollWait(SB),NOSPLIT,$0-0 JMP runtime·netpollWait(SB)
我们仔细考虑应该明白:netpollWait的主要作用是:等待关心的socket是否有事件(其实后面我们知道只是等待一个标记位是否发生改变),如果没有事件,那么就将当前的协程挂起,直到有通知事件发生,我们接下来看看到底如何实现:
func netpollWait(pd *pollDesc, mode int) int { // 先检查该socket是否有error发生(如关闭、超时等) err := netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // As for now only Solaris uses level-triggered IO. if GOOS == "solaris" { onM(func() { netpollarm(pd, mode) }) } // 循环等待netpollblock返回值为true // 如果返回值为false且该socket未出现任何错误 // 那该协程可能被意外唤醒,需要重新被挂起 // 还有一种可能:该socket由于超时而被唤醒 // 此时netpollcheckerr就是用来检测超时错误的 for !netpollblock(pd, int32(mode), false) { err = netpollcheckerr(pd, int32(mode)) if err != 0 { return err } } return 0 } func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } // set the gpp semaphore to WAIT // 首先将轮询状态设置为pdWait // 为什么要使用for呢?因为casuintptr使用了自旋锁 // 为什么使用自旋锁就要加for循环呢? for { old := *gpp if old == pdReady { *gpp = 0 return true } if old != 0 { gothrow("netpollblock: double wait") } // 将socket轮询相关的状态设置为pdWait if casuintptr(gpp, 0, pdWait) { break } } // 如果未出错将该协程挂起,解锁函数是netpollblockcommit if waitio || netpollcheckerr(pd, mode) == 0 { f := netpollblockcommit gopark(**(**unsafe.Pointer)(unsafe.Pointer(&f)), unsafe.Pointer(gpp), "IO wait") } // 可能是被挂起的协程被唤醒 // 或者由于某些原因该协程压根未被挂起 // 获取其当前状态记录在old中 old := xchguintptr(gpp, 0) if old > pdWait { gothrow("netpollblock: corrupted state") } return old == pdReady }
从上面的分析我们看到,如果无法读写,golang会将当前协程挂起,在协程被唤醒的时候,该标记位应该会被置位。 我们接下来看看这些挂起的协程何时会被唤醒。
事件通知
golang运行库在系统运行过程中存在socket事件检查点,目前,该检查点主要位于以下几个地方:
runtime·startTheWorldWithSema(void):在完成gc后;
findrunnable():这个暂时不知道何时会触发?
sysmon:golang中的监控协程,会周期性检查就绪socket
TODO: 为什么是在这些地方检查socket就绪事件呢?
接下来我们看看如何检查socket就绪事件,在socket就绪后又是如何唤醒被挂起的协程?主要调用函数runtime-netpoll()
我们只关注epoll的实现,对于epoll,上面的方法具体实现是netpoll_epoll.go中的netpoll
func netpoll(block bool) (gp *g) { if epfd == -1 { return } waitms := int32(-1) if !block { // 如果调用者不希望block // 设置waitsm为0 waitms = 0 } var events [128]epollevent retry: // 调用epoll_wait获取就绪事件 n := epollwait(epfd, &events[0], int32(len(events)), waitms) if n < 0 { ... } goto retry } for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } var mode int32 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' } if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' } // 对每个事件,调用了netpollready // pd主要记录了与该socket关联的等待协程 if mode != 0 { pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) netpollready((**g)(noescape(unsafe.Pointer(&gp))), pd, mode) } } // 如果调用者同步等待且本次未获取到就绪socket // 继续重试 if block && gp == nil { goto retry } return gp }
这个函数主要调用epoll_wait(当然,golang封装了系统调用)来获取就绪socket fd,对每个就绪的fd,调用netpollready()作进一步处理。这个函数的最终返回值就是一个已经就绪的协程(g)链表。
netpollready主要是将该socket fd标记为IOReady,并唤醒等待在该fd上的协程g,将其添加到传入的g链表中。
// make pd ready, newly runnable goroutines (if any) are returned in rg/wg func netpollready(gpp **g, pd *pollDesc, mode int32) { var rg, wg *g if mode == 'r' || mode == 'r'+'w' { rg = netpollunblock(pd, 'r', true) } if mode == 'w' || mode == 'r'+'w' { wg = netpollunblock(pd, 'w', true) } // 将就绪协程添加至链表中 if rg != nil { rg.schedlink = *gpp *gpp = rg } if wg != nil { wg.schedlink = *gpp *gpp = wg } } // 将pollDesc的状态置为pdReady并返回就绪协程 func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } for { old := *gpp if old == pdReady { return nil } if old == 0 && !ioready { return nil } var new uintptr if ioready { new = pdReady } if casuintptr(gpp, old, new) { if old == pdReady || old == pdWait { old = 0 } return (*g)(unsafe.Pointer(old)) } } }
疑问:一个fd会被多个协程同时进行IO么?比如一个协程读,另外一个协程写?或者多个协程同时读?此时返回的是哪个协程就绪呢?
一个socket fd可支持并发读写,因为对于tcp协议来说,是全双工。读写操作的是不同缓冲区,但是不支持并发读和并发写,因为这样会错乱的。所以上面的netFD.RWLock()就是干这个作用的。