说明

前面的章节我们基本聊完了golang网络编程的关键API流程,但遗留了一个关键内容:当系统调用返回EAGAIN时,会调用WaitRead/WaitWrite来阻塞当前协程,现在我们接着聊。

WaitRead/WaitWrite

func (pd *pollDesc) Wait(mode int) error {
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res)
}

func (pd *pollDesc) WaitRead() error {
    return pd.Wait('r')
}

func (pd *pollDesc) WaitWrite() error {
    return pd.Wait('w')
}

最终runtime_pollWait走到下面去了:

TEXT net·runtime_pollWait(SB),NOSPLIT,$0-0 
   JMP runtime·netpollWait(SB)

我们仔细考虑应该明白:netpollWait的主要作用是:等待关心的socket是否有事件(其实后面我们知道只是等待一个标记位是否发生改变),如果没有事件,那么就将当前的协程挂起,直到有通知事件发生,我们接下来看看到底如何实现:

func netpollWait(pd *pollDesc, mode int) int {
    // 先检查该socket是否有error发生(如关闭、超时等) 
    err := netpollcheckerr(pd, int32(mode))
    if err != 0 {
        return err
    }

    // As for now only Solaris uses level-triggered IO. 
    if GOOS == "solaris" {
        onM(func() {
            netpollarm(pd, mode)
        })
    }
    // 循环等待netpollblock返回值为true 
    // 如果返回值为false且该socket未出现任何错误 
    // 那该协程可能被意外唤醒,需要重新被挂起 
    // 还有一种可能:该socket由于超时而被唤醒 
    // 此时netpollcheckerr就是用来检测超时错误的 
    for !netpollblock(pd, int32(mode), false) {
        err = netpollcheckerr(pd, int32(mode))
        if err != 0 {
            return err
        }
    }
    return 0 
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    // set the gpp semaphore to WAIT 
    // 首先将轮询状态设置为pdWait 
    // 为什么要使用for呢?因为casuintptr使用了自旋锁 
    // 为什么使用自旋锁就要加for循环呢? 
    for {
        old := *gpp
        if old == pdReady {
            *gpp = 0 
            return true 
        }
        if old != 0 {
            gothrow("netpollblock: double wait")
        }
        // 将socket轮询相关的状态设置为pdWait 
        if casuintptr(gpp, 0, pdWait) {
            break 
        }
    }
    // 如果未出错将该协程挂起,解锁函数是netpollblockcommit 
    if waitio || netpollcheckerr(pd, mode) == 0 {
        f := netpollblockcommit
        gopark(**(**unsafe.Pointer)(unsafe.Pointer(&f)), unsafe.Pointer(gpp), "IO wait")
    }
    // 可能是被挂起的协程被唤醒 
    // 或者由于某些原因该协程压根未被挂起 
    // 获取其当前状态记录在old中 
    old := xchguintptr(gpp, 0)
    if old > pdWait {
        gothrow("netpollblock: corrupted state")
    }
    return old == pdReady
}

从上面的分析我们看到,如果无法读写,golang会将当前协程挂起,在协程被唤醒的时候,该标记位应该会被置位。 我们接下来看看这些挂起的协程何时会被唤醒。

事件通知

golang运行库在系统运行过程中存在socket事件检查点,目前,该检查点主要位于以下几个地方:

runtime·startTheWorldWithSema(void):在完成gc后;
findrunnable():这个暂时不知道何时会触发?
sysmon:golang中的监控协程,会周期性检查就绪socket

TODO: 为什么是在这些地方检查socket就绪事件呢?

接下来我们看看如何检查socket就绪事件,在socket就绪后又是如何唤醒被挂起的协程?主要调用函数runtime-netpoll()

我们只关注epoll的实现,对于epoll,上面的方法具体实现是netpoll_epoll.go中的netpoll

func netpoll(block bool) (gp *g) {
    if epfd == -1 {
        return 
    }
    waitms := int32(-1)
    if !block {
        // 如果调用者不希望block 
        // 设置waitsm为0 
        waitms = 0 
    }

    var events [128]epollevent
retry:
    // 调用epoll_wait获取就绪事件 
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        ...
    }
    goto retry
   }

    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue 
        }
        var mode int32 
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r' 
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w' 
        }

        // 对每个事件,调用了netpollready 
        // pd主要记录了与该socket关联的等待协程 
        if mode != 0 {
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            netpollready((**g)(noescape(unsafe.Pointer(&gp))), pd, mode)
        }
    }
    // 如果调用者同步等待且本次未获取到就绪socket 
    // 继续重试 
    if block && gp == nil {
        goto retry
    }
    return gp
}

这个函数主要调用epoll_wait(当然,golang封装了系统调用)来获取就绪socket fd,对每个就绪的fd,调用netpollready()作进一步处理。这个函数的最终返回值就是一个已经就绪的协程(g)链表。

netpollready主要是将该socket fd标记为IOReady,并唤醒等待在该fd上的协程g,将其添加到传入的g链表中。

// make pd ready, newly runnable goroutines (if any) are returned in rg/wg 
func netpollready(gpp **g, pd *pollDesc, mode int32) {
    var rg, wg *g
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }

    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    // 将就绪协程添加至链表中 
    if rg != nil {
        rg.schedlink = *gpp
        *gpp = rg
    }
    if wg != nil {
        wg.schedlink = *gpp
        *gpp = wg
    }
}

// 将pollDesc的状态置为pdReady并返回就绪协程 
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }
    for {
        old := *gpp
        if old == pdReady {
            return nil 
        }
        if old == 0 && !ioready {
            return nil 
        }
        var new uintptr 
        if ioready {
            new = pdReady
        }
        if casuintptr(gpp, old, new) {
            if old == pdReady || old == pdWait {
                old = 0 
            }
            return (*g)(unsafe.Pointer(old))
        }
    }
}

疑问:一个fd会被多个协程同时进行IO么?比如一个协程读,另外一个协程写?或者多个协程同时读?此时返回的是哪个协程就绪呢?

一个socket fd可支持并发读写,因为对于tcp协议来说,是全双工。读写操作的是不同缓冲区,但是不支持并发读和并发写,因为这样会错乱的。所以上面的netFD.RWLock()就是干这个作用的。