与其他语言的网络IO强调异步非阻塞不同,GOLANG里的网络IO模型是:创建多个goroutine,每个goroutine的网络IO都是阻塞的,这样的代码非常直观

但低层,所有的网络IO实际上都是非阻塞的

以net.Dial为例子,其他的Read/Write机制类似

0f74b09407c7ef6e8b0a72d62e7e012b.png

Read的原理:

for {    fd.read()    if err == EAGAIN {        pollserver.WaitRead(fd)        continue    }    break}

网络关键API的实现,主要包括Listen、Accept、Read、Write等。 另外,为了突出关键流程,我们选择忽略所有的错误。这样可以使得代码看起来更为简单。 而且我们只关注tcp协议实现,udp和unix socket不是我们关心的。

func Listen(net, laddr string) (Listener, error) {   la, err := resolveAddr("listen", net, laddr, noDeadline)   ......   switch la := la.toAddr().(type) {   case *TCPAddr:       l, err = ListenTCP(net, la)   case *UnixAddr:       ......   }  ......} // 对于tcp协议,返回的的是TCPListenerfunc ListenTCP(net string, laddr *TCPAddr) (*TCPListener, error) {   ......   fd, err := internetSocket(net, laddr, nil, noDeadline, syscall.SOCK_STREAM, 0, "listen")   ......   return &TCPListener{fd}, nil}func internetSocket(net string, laddr, raddr sockaddr, deadline time.Time, sotype, proto int, mode string) (fd *netFD, err error) {   ......   return socket(net, family, sotype, proto, ipv6only, laddr, raddr, deadline)} func socket(net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, deadline time.Time) (fd *netFD, err error) {   // 创建底层socket,设置属性为O_NONBLOCK   s, err := sysSocket(family, sotype, proto)   ......   setDefaultSockopts(s, family, sotype, ipv6only)   // 创建新netFD结构   fd, err = newFD(s, family, sotype, net)   ......   if laddr != nil && raddr == nil {       switch sotype {       case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:           // 调用底层listen监听创建的套接字           fd.listenStream(laddr, listenerBacklog)           return fd, nil       case syscall.SOCK_DGRAM:           ...... }   }   } // 最终调用该函数来创建一个socket// 并且将socket属性设置为O_NONBLOCKfunc sysSocket(family, sotype, proto int) (int, error) {   syscall.ForkLock.RLock()   s, err := syscall.Socket(family, sotype, proto)   if err == nil {       syscall.CloseOnExec(s)   }   syscall.ForkLock.RUnlock()   if err != nil {       return -1, err   }   if err = syscall.SetNonblock(s, true); err != nil {       syscall.Close(s)       return -1, err}   return s, nil} func (fd *netFD) listenStream(laddr sockaddr, backlog int) error {   if err := setDefaultListenerSockopts(fd.sysfd)   if lsa, err := laddr.sockaddr(fd.family); err != nil {       return err   } else if lsa != nil {       // Bind绑定至该socket       if err := syscall.Bind(fd.sysfd, lsa); err != nil {           return os.NewSyscallError("bind", err)       }   }   // 监听该socket   if err := syscall.Listen(fd.sysfd, backlog);    // 这里非常关键:初始化socket与异步IO相关的内容   if err := fd.init(); err != nil {       return err   }   lsa, _ := syscall.Getsockname(fd.sysfd)   fd.setAddr(fd.addrFunc()(lsa), nil)      return nil}     

我们这里看到了如何实现Listen。流程基本都很简单,但是因为我们使用了异步编程,因此,我们在Listen完该socket后,还必须将其添加到监听队列中,以后该socket有事件到来时能够及时通知到。

对linux有所了解的应该都知道epoll,没错golang使用的就是epoll机制来实现socket事件通知。那我们看对一个监听socket,是如何将其添加到epoll的监听队列中呢?

func (fd *netFD) init() error {   if err := fd.pd.Init(fd); err != nil {       return err   }   return nil} func (pd *pollDesc) Init(fd *netFD) error {   // 利用了Once机制,保证一个进程只会执行一次   // runtime_pollServerInit:    // TEXT net·runtime_pollServerInit(SB),NOSPLIT,$0-0   // JMP runtime·netpollServerInit(SB)   serverInit.Do(runtime_pollServerInit)   // runtime_pollOpen:   // TEXT net·runtime_pollOpen(SB),NOSPLIT,$0-0   // JMP runtime·netpollOpen(SB)   ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))   if errno != 0 {       return syscall.Errno(errno) }   pd.runtimeCtx = ctx   return nil}

这里就是socket异步编程的关键:

netpollServerInit()初始化异步编程结构,对于epoll,该函数是netpollinit,且使用Once机制保证一个进程 只会初始化一次;

func netpollinit() {    epfd = epollcreate1(_EPOLL_CLOEXEC)    if epfd >= 0 {        return    }    epfd = epollcreate(1024)    if epfd >= 0 {        closeonexec(epfd)        return    }    ......}

netpollOpen则在socket被创建出来后将其添加到epoll队列中,对于epoll,该函数被实例化为netpollopen

func netpollopen(fd uintptr, pd *pollDesc) int32 {   var ev epollevent   ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET   *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd   return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)}

OK,看到这里,我们也就明白了,监听一个套接字的时候无非就是传统的socket异步编程,然后将该socket添加到 epoll的事件监听队列中。

Accept

既然我们描述的重点的tcp协议,因此,我们看看TCPListener的Accept方法是怎么实现的:

func (l *TCPListener) Accept() (Conn, error) {    c, err := l.AcceptTCP()    ......} func (l *TCPListener) AcceptTCP() (*TCPConn, error) {    ......    fd, err := l.fd.accept()    ......    // 返回给调用者一个新的TCPConn    return newTCPConn(fd), nil} func (fd *netFD) accept() (netfd *netFD, err error) {    // 为什么对该函数加读锁?    if err := fd.readLock(); err != nil {        return nil, err    }    defer fd.readUnlock()     ......    for {        // 这个accept是golang包装的系统调用        // 用来处理跨平台        s, rsa, err = accept(fd.sysfd)        if err != nil {            if err == syscall.EAGAIN {                // 如果没有可用连接,WaitRead()阻塞该协程                // 后面会详细分析WaitRead.                if err = fd.pd.WaitRead(); err == nil {                    continue                }            } else if err == syscall.ECONNABORTED {                // 如果连接在Listen queue时就已经被对端关闭                continue            }        }        break    }     netfd, err = newFD(s, fd.family, fd.sotype, fd.net)......    // 这个前面已经分析,将该fd添加到epoll队列中    err = netfd.init()    ......    lsa, _ := syscall.Getsockname(netfd.sysfd)    netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))    return netfd, nil}

从前面的编程事例中我们知道,一般在主协程中会accept新的connection,使用异步编程我们知道,如果没有 新连接到来,该协程会一直被阻塞,直到新连接到来有人唤醒了该协程。

一般在主协程中调用accept,如果返回值为EAGAIN,则调用WaitRead来阻塞当前协程,后续在该socket有事件到来时被唤醒,WaitRead以及唤醒过程我们会在后面仔细分析。

Read

func (c *conn) Read(b []byte) (int, error) {    if !c.ok() {        return 0, syscall.EINVAL    }    return c.fd.Read(b)} func (fd *netFD) Read(p []byte) (n int, err error) {    // 为什么对函数调用加读锁    if err := fd.readLock(); err != nil {        return 0, err    }    defer fd.readUnlock()    // 这个又是干嘛?    if err := fd.pd.PrepareRead(); err != nil {        return 0, &OpError{"read", fd.net, fd.raddr, err}    }    for {        n, err = syscall.Read(int(fd.sysfd), p)        if err != nil {            n = 0            // 如果返回EAGIN,阻塞当前协程直到有数据可读被唤醒            if err == syscall.EAGAIN {                if err = fd.pd.WaitRead(); err == nil {                    continue                }            }        }        // 检查错误,封装io.EOF        err = chkReadErr(n, err, fd)        break    }    if err != nil && err != io.EOF {        err = &OpError{"read", fd.net, fd.raddr, err}    }    return}func chkReadErr(n int, err error, fd *netFD) error {    if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW {        return io.EOF    }    return err}

每次Read不能保证可以读到想读的那么多内容,比如缓冲区大小是10,而实际可能只读到5,应用程序需要能够处理这种情况。

Write

func (fd *netFD) Write(p []byte) (nn int, err error) {    // 为什么这里加写锁    if err := fd.writeLock(); err != nil {        return 0, err    }    defer fd.writeUnlock()    // 这个是干什么?    if err := fd.pd.PrepareWrite(); err != nil {        return 0, &OpError{"write", fd.net, fd.raddr, err}    }    // nn记录总共写入的数据量,每次Write可能只能写入部分数据    for {        var n int        n, err = syscall.Write(int(fd.sysfd), p[nn:])        if n > 0 {            nn += n        }        // 如果数组数据已经全部写完,函数返回        if nn == len(p) {            break        }        // 如果写入数据时被block了,阻塞当前协程        if err == syscall.EAGAIN {            if err = fd.pd.WaitWrite(); err == nil {                continue            }        }        if err != nil {            n = 0            break        }        // 如果返回值为0,代表了什么?        if n == 0 {            err = io.ErrUnexpectedEOF            break   }    }    if err != nil {        err = &OpError{"write", fd.net, fd.raddr, err}    }    return nn, err}          

注意Write语义与Read不一样的地方:

Write尽量将用户缓冲区的内容全部写入至底层socket,如果遇到socket暂时不可写入,会阻塞当前协程; Read在某次读取成功时立即返回,可能会导致读取的数据量少于用户缓冲区的大小; 为什么会在实现上有此不同,我想可能read的优先级比较高吧,应用程序可能一直在等着,我们不能等到数据一直读完才返回,会阻塞用户。 而写不一样,优先级相对较低,而且用户一般也不着急写立即返回,所以可以将所有的数据全部写入,而且这样 也能简化应用程序的写法。

当系统调用返回EAGAIN时,会调用WaitRead/WaitWrite来阻塞当前协程,现在我们接着聊。

WaitRead/WaitWrite

func (pd *pollDesc) Wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res)
}
func (pd *pollDesc) WaitRead() error {
return pd.Wait('r')
}
func (pd *pollDesc) WaitWrite() error {
return pd.Wait('w')
}

最终runtime_pollWait走到下面去了:

TEXT net·runtime_pollWait(SB),NOSPLIT,$0-0 
JMP runtime·netpollWait(SB)

我们仔细考虑应该明白:netpollWait的主要作用是:等待关心的socket是否有事件(其实后面我们知道只是等待一个标记位是否发生改变),如果没有事件,那么就将当前的协程挂起,直到有通知事件发生,我们接下来看看到底如何实现:

func netpollWait(pd *pollDesc, mode int) int {
// 先检查该socket是否有error发生(如关闭、超时等)
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// As for now only Solaris uses level-triggered IO.
if GOOS == "solaris" {
onM(func() {
netpollarm(pd, mode)
})
}
// 循环等待netpollblock返回值为true
// 如果返回值为false且该socket未出现任何错误
// 那该协程可能被意外唤醒,需要重新被挂起
// 还有一种可能:该socket由于超时而被唤醒
// 此时netpollcheckerr就是用来检测超时错误的
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
}
return 0
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to WAIT
// 首先将轮询状态设置为pdWait
// 为什么要使用for呢?因为casuintptr使用了自旋锁
// 为什么使用自旋锁就要加for循环呢?
for {
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
gothrow("netpollblock: double wait")
}
// 将socket轮询相关的状态设置为pdWait
if casuintptr(gpp, 0, pdWait) {
break
}
}
// 如果未出错将该协程挂起,解锁函数是netpollblockcommit
if waitio || netpollcheckerr(pd, mode) == 0 {
f := netpollblockcommit
gopark(**(**unsafe.Pointer)(unsafe.Pointer(&f)), unsafe.Pointer(gpp), "IO wait")
}
// 可能是被挂起的协程被唤醒
// 或者由于某些原因该协程压根未被挂起
// 获取其当前状态记录在old中
old := xchguintptr(gpp, 0)
if old > pdWait {
gothrow("netpollblock: corrupted state")
}
return old == pdReady
}

从上面的分析我们看到,如果无法读写,golang会将当前协程挂起,在协程被唤醒的时候,该标记位应该会被置位。 我们接下来看看这些挂起的协程何时会被唤醒。

事件通知

golang运行库在系统运行过程中存在socket事件检查点,目前,该检查点主要位于以下几个地方:

runtime·startTheWorldWithSema(void):在完成gc后;

findrunnable():这个暂时不知道何时会触发?

sysmon:golang中的监控协程,会周期性检查就绪socket

TODO: 为什么是在这些地方检查socket就绪事件呢?

接下来我们看看如何检查socket就绪事件,在socket就绪后又是如何唤醒被挂起的协程?主要调用函数runtime-netpoll()

我们只关注epoll的实现,对于epoll,上面的方法具体实现是netpoll_epoll.go中的netpoll

func netpoll(block bool) (gp *g) {
if epfd == -1 {
return
}
waitms := int32(-1)
if !block {
// 如果调用者不希望block
// 设置waitsm为0
waitms = 0
}
var events [128]epollevent
retry:
// 调用epoll_wait获取就绪事件
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
...
}
goto retry
}
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
// 对每个事件,调用了netpollready
// pd主要记录了与该socket关联的等待协程
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
netpollready((**g)(noescape(unsafe.Pointer(&gp))), pd, mode)
}
}
// 如果调用者同步等待且本次未获取到就绪socket
// 继续重试
if block && gp == nil {
goto retry
}
return gp
}

这个函数主要调用epoll_wait(当然,golang封装了系统调用)来获取就绪socket fd,对每个就绪的fd,调用netpollready()作进一步处理。这个函数的最终返回值就是一个已经就绪的协程(g)链表。

netpollready主要是将该socket fd标记为IOReady,并唤醒等待在该fd上的协程g,将其添加到传入的g链表中。

// make pd ready, newly runnable goroutines (if any) are returned in rg/wg 
func netpollready(gpp **g, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
// 将就绪协程添加至链表中
if rg != nil {
rg.schedlink = *gpp
*gpp = rg
}
if wg != nil {
wg.schedlink = *gpp
*gpp = wg
}
}
// 将pollDesc的状态置为pdReady并返回就绪协程
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
return nil
}
var new uintptr
if ioready {
new = pdReady
}
if casuintptr(gpp, old, new) {
if old == pdReady || old == pdWait {
old = 0
}
return (*g)(unsafe.Pointer(old))
}
}
}

疑问:一个fd会被多个协程同时进行IO么?比如一个协程读,另外一个协程写?或者多个协程同时读?此时返回的是哪个协程就绪呢?

一个socket fd可支持并发读写,因为对于tcp协议来说,是全双工。读写操作的是不同缓冲区,但是不支持并发读和并发写,因为这样会错乱的。所以上面的netFD.RWLock()就是干这个作用的。

runtime中的epoll事件驱动抽象层其实在进入net库后,又被封装了一次,这一次封装从代码上看主要是为了方便在纯Go语言环境进行操作,net库中的这次封装实现在poll/fd_poll_runtime.go文件中,主要是通过pollDesc对象来实现的: 
(ps: 这里对应的版本是go1.9.1 的版本)

type pollDesc struct {runtimeCtx uintptr
}

注意:此处的pollDesc对象不是上文提到的runtime中的PollDesc,相反此处pollDesc对象的runtimeCtx成员才是指向的runtime的PollDesc实例。pollDesc对象主要就是将runtime的事件驱动抽象层给再封装了一次,供网络fd对象使用。

func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}

pollDesc对象最需要关注的就是其Init方法,这个方法通过一个sync.Once变量来调用了runtime_pollServerInit函数,也就是创建epoll实例的函数。 
意思就是runtime_pollServerInit函数在整个进程生命周期内只会被调用一次,也就是只会创建一次epoll实例。epoll实例被创建后,会调用runtime_pollOpen函数将fd添加到epoll中。

网络编程中的所有socket fd都是通过netFD对象实现的,netFD是对网络IO操作的抽象,linux的实现在文件net/fd_unix.go中。netFD对象实现有自己的init方法,还有完成基本IO操作的Read和Write方法,当然除了这三个方法以外,还有很多非常有用的方法供用户使用。

/src/net/fd_unix.go

// Network file descriptor.
type netFD struct {
pfd poll.FD

// immutable until Close
family int
sotype int
isConnected bool
net string
laddr Addr
raddr Addr
}

通过netFD对象的定义可以看到每个fd都关联了一个pollDesc实例,通过上文我们知道pollDesc对象最终是对epoll的封装。

func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
},
family: family,
sotype: sotype,
net: net,
}
return ret, nil
}

func (fd *netFD) init() error {
return fd.pfd.Init(fd.net, true)
}

netFD对象的init函数仅仅是调用了pollDesc实例的Init函数,作用就是将fd添加到epoll中,如果这个fd是第一个网络socket fd的话,这一次init还会担任创建epoll实例的任务。要知道在Go进程里,只会有一个epoll实例来管理所有的网络socket fd,这个epoll实例也就是在第一个网络socket fd被创建的时候所创建。

/src/net/fd_unix.go 
Read()函数:

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}

重点关注这个for循环中的syscall.Read调用的错误处理。当有错误发生的时候,会检查这个错误是否是syscall.EAGAIN,如果是,则调用WaitRead将当前读这个fd的goroutine给park住,直到这个fd上的读事件再次发生为止。 
当这个socket上有新数据到来的时候,WaitRead调用返回,继续for循环的执行。这样的实现,就让调用netFD的Read的地方变成了同步“阻塞”方式编程,不再是异步非阻塞的编程方式了。netFD的Write方法和Read的实现原理是一样的,都是在碰到EAGAIN错误的时候将当前goroutine给park住直到socket再次可写为止。

本文只是将网络库的底层实现给大体上引导了一遍,知道底层代码大概实现在什么地方,方便结合源码深入理解。Go语言中的高并发、同步阻塞方式编程的关键其实是”goroutine和调度器”,针对网络IO的时候,我们需要知道EAGAIN这个非常关键的调度点,掌握了这个调度点,即使没有调度器,自己也可以在epoll的基础上配合协程等用户态线程实现网络IO操作的调度,达到同步阻塞编程的目的。

最后,为什么需要同步阻塞的方式编程?只有看多、写多了异步非阻塞代码的时候才能够深切体会到这个问题。真正的高大上绝对不是——“别人不会,我会;别人写不出来,我写得出来。”


EAGAIN:

ET还是LT?

LT的处理过程:
. accept一个连接,添加到epoll中监听EPOLLIN事件
. 当EPOLLIN事件到达时,read fd中的数据并处理
. 当需要写出数据时,把数据write到fd中;如果数据较大,无法一次性写出,那么在epoll中监听EPOLLOUT事件
. 当EPOLLOUT事件到达时,继续把数据write到fd中;如果数据写出完毕,那么在epoll中关闭EPOLLOUT事件

ET的处理过程:
. accept一个一个连接,添加到epoll中监听EPOLLIN|EPOLLOUT事件
. 当EPOLLIN事件到达时,read fd中的数据并处理,read需要一直读,直到返回EAGAIN为止
. 当需要写出数据时,把数据write到fd中,直到数据全部写完,或者write返回EAGAIN
. 当EPOLLOUT事件到达时,继续把数据write到fd中,直到数据全部写完,或者write返回EAGAIN

从ET的处理过程中可以看到,ET的要求是需要一直读写,直到返回EAGAIN,否则就会遗漏事件。而LT的处理过程中,直到返回EAGAIN不是硬性要求,但通常的处理过程都会读写直到返回EAGAIN,但LT比ET多了一个开关EPOLLOUT事件的步骤

LT的编程与poll/select接近,符合一直以来的习惯,不易出错
ET的编程可以做到更加简洁,某些场景下更加高效,但另一方面容易遗漏事件,容易产生bug

推荐阅读

  • Go 语言 net 包学习和实战


喜欢本文的朋友,欢迎关注“Go语言中文网”:

56227a7495895aa2e8553292271bc44d.png