与其余语言的网络IO强调异步非阻塞不一样,GOLANG里的网络IO模型是:建立多个goroutine,每一个goroutine的网络IO都是阻塞的,这样的代码很是直观php

但低层,全部的网络IO实际上都是非阻塞的linux

以net.Dial为例子,其余的Read/Write机制相似golang

Read的原理:算法

for {编程

fd.read()swift

if err == EAGAIN {数组

pollserver.WaitRead(fd)微信

continue网络

}架构

break
}

网络关键API的实现,主要包括Listen、Accept、Read、Write等。 另外,为了突出关键流程,咱们选择忽略全部的错误。这样可使得代码看起来更为简单。 并且咱们只关注tcp协议实现,udp和unix socket不是咱们关心的。

func Listen(net, laddr string) (Listener, error) { la, err := resolveAddr("listen", net, laddr, noDeadline) ...... switch la := la.toAddr().(type) { case *TCPAddr: l, err = ListenTCP(net, la) case *UnixAddr: ...... } ......} // 对于tcp协议,返回的的是TCPListenerfunc ListenTCP(net string, laddr *TCPAddr) (*TCPListener, error) { ...... fd, err := internetSocket(net, laddr, nil, noDeadline, syscall.SOCK_STREAM, 0, "listen") ...... return &TCPListener{fd}, nil}func internetSocket(net string, laddr, raddr sockaddr, deadline time.Time, sotype, proto int, mode string) (fd *netFD, err error) { ...... return socket(net, family, sotype, proto, ipv6only, laddr, raddr, deadline)} func socket(net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, deadline time.Time) (fd *netFD, err error) { // 建立底层socket,设置属性为O_NONBLOCK s, err := sysSocket(family, sotype, proto) ...... setDefaultSockopts(s, family, sotype, ipv6only) // 建立新netFD结构 fd, err = newFD(s, family, sotype, net) ...... if laddr != nil && raddr == nil { switch sotype { case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET: // 调用底层listen监听建立的套接字 fd.listenStream(laddr, listenerBacklog) return fd, nil case syscall.SOCK_DGRAM: ...... } } } // 最终调用该函数来建立一个socket// 而且将socket属性设置为O_NONBLOCKfunc sysSocket(family, sotype, proto int) (int, error) { syscall.ForkLock.RLock() s, err := syscall.Socket(family, sotype, proto) if err == nil { syscall.CloseOnExec(s) } syscall.ForkLock.RUnlock() if err != nil { return -1, err } if err = syscall.SetNonblock(s, true); err != nil { syscall.Close(s) return -1, err} return s, nil} func (fd *netFD) listenStream(laddr sockaddr, backlog int) error { if err := setDefaultListenerSockopts(fd.sysfd) if lsa, err := laddr.sockaddr(fd.family); err != nil { return err } else if lsa != nil { // Bind绑定至该socket if err := syscall.Bind(fd.sysfd, lsa); err != nil { return os.NewSyscallError("bind", err) } } // 监听该socket if err := syscall.Listen(fd.sysfd, backlog);  // 这里很是关键:初始化socket与异步IO相关的内容 if err := fd.init(); err != nil { return err } lsa, _ := syscall.Getsockname(fd.sysfd) fd.setAddr(fd.addrFunc()(lsa), nil)  return nil} 

咱们这里看到了如何实现Listen。流程基本都很简单,可是由于咱们使用了异步编程,所以,咱们在Listen完该socket后,还必须将其添加到监听队列中,之后该socket有事件到来时可以及时通知到。

对linux有所了解的应该都知道epoll,没错golang使用的就是epoll机制来实现socket事件通知。那咱们看对一个监听socket,是如何将其添加到epoll的监听队列中呢?

func (fd *netFD) init() error { if err := fd.pd.Init(fd); err != nil { return err } return nil} func (pd *pollDesc) Init(fd *netFD) error { // 利用了Once机制,保证一个进程只会执行一次 // runtime_pollServerInit:  // TEXT net·runtime_pollServerInit(SB),NOSPLIT,$0-0 // JMP runtime·netpollServerInit(SB) serverInit.Do(runtime_pollServerInit) // runtime_pollOpen: // TEXT net·runtime_pollOpen(SB),NOSPLIT,$0-0 // JMP runtime·netpollOpen(SB) ctx, errno := runtime_pollOpen(uintptr(fd.sysfd)) if errno != 0 { return syscall.Errno(errno) } pd.runtimeCtx = ctx return nil}

这里就是socket异步编程的关键:

netpollServerInit()初始化异步编程结构,对于epoll,该函数是netpollinit,且使用Once机制保证一个进程 只会初始化一次;

func netpollinit() { epfd = epollcreate1(_EPOLL_CLOEXEC) if epfd >= 0 { return } epfd = epollcreate(1024) if epfd >= 0 { closeonexec(epfd) return } ......}

netpollOpen则在socket被建立出来后将其添加到epoll队列中,对于epoll,该函数被实例化为netpollopen

func netpollopen(fd uintptr, pd *pollDesc) int32 { var ev epollevent ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)}

OK,看到这里,咱们也就明白了,监听一个套接字的时候无非就是传统的socket异步编程,而后将该socket添加到 epoll的事件监听队列中。

Accept

既然咱们描述的重点的tcp协议,所以,咱们看看TCPListener的Accept方法是怎么实现的:

func (l *TCPListener) Accept() (Conn, error) { c, err := l.AcceptTCP() ......} func (l *TCPListener) AcceptTCP() (*TCPConn, error) { ...... fd, err := l.fd.accept() ...... // 返回给调用者一个新的TCPConn return newTCPConn(fd), nil} func (fd *netFD) accept() (netfd *netFD, err error) { // 为何对该函数加读锁? if err := fd.readLock(); err != nil { return nil, err } defer fd.readUnlock() ...... for { // 这个accept是golang包装的系统调用 // 用来处理跨平台 s, rsa, err = accept(fd.sysfd) if err != nil { if err == syscall.EAGAIN { // 若是没有可用链接,WaitRead()阻塞该协程 // 后面会详细分析WaitRead. if err = fd.pd.WaitRead(); err == nil { continue } } else if err == syscall.ECONNABORTED { // 若是链接在Listen queue时就已经被对端关闭 continue } } break }  netfd, err = newFD(s, fd.family, fd.sotype, fd.net)...... // 这个前面已经分析,将该fd添加到epoll队列中 err = netfd.init() ...... lsa, _ := syscall.Getsockname(netfd.sysfd) netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa)) return netfd, nil}

从前面的编程事例中咱们知道,通常在主协程中会accept新的connection,使用异步编程咱们知道,若是没有 新链接到来,该协程会一直被阻塞,直到新链接到来有人唤醒了该协程。

通常在主协程中调用accept,若是返回值为EAGAIN,则调用WaitRead来阻塞当前协程,后续在该socket有事件到来时被唤醒,WaitRead以及唤醒过程咱们会在后面仔细分析。

Read


func (c *conn) Read(b []byte) (int, error) { if !c.ok() { return 0, syscall.EINVAL } return c.fd.Read(b)} func (fd *netFD) Read(p []byte) (n int, err error) { // 为何对函数调用加读锁 if err := fd.readLock(); err != nil { return 0, err } defer fd.readUnlock() // 这个又是干吗? if err := fd.pd.PrepareRead(); err != nil { return 0, &OpError{"read", fd.net, fd.raddr, err} }
for { n, err = syscall.Read(int(fd.sysfd), p) if err != nil { n = 0 // 若是返回EAGIN,阻塞当前协程直到有数据可读被唤醒 if err == syscall.EAGAIN { if err = fd.pd.WaitRead(); err == nil { continue } } } // 检查错误,封装io.EOF err = chkReadErr(n, err, fd) break } if err != nil && err != io.EOF { err = &OpError{"read", fd.net, fd.raddr, err} } return}func chkReadErr(n int, err error, fd *netFD) error { if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW { return io.EOF } return err}

每次Read不能保证能够读到想读的那么多内容,好比缓冲区大小是10,而实际可能只读到5,应用程序须要可以处理这种状况。

Write

func (fd *netFD) Write(p []byte) (nn int, err error) { // 为何这里加写锁 if err := fd.writeLock(); err != nil { return 0, err } defer fd.writeUnlock() // 这个是干什么? if err := fd.pd.PrepareWrite(); err != nil { return 0, &OpError{"write", fd.net, fd.raddr, err} } // nn记录总共写入的数据量,每次Write可能只能写入部分数据 for { var n int n, err = syscall.Write(int(fd.sysfd), p[nn:]) if n > 0 { nn += n } // 若是数组数据已经所有写完,函数返回 if nn == len(p) { break } // 若是写入数据时被block了,阻塞当前协程 if err == syscall.EAGAIN { if err = fd.pd.WaitWrite(); err == nil { continue } } if err != nil { n = 0 break } // 若是返回值为0,表明了什么? if n == 0 { err = io.ErrUnexpectedEOF break } } if err != nil { err = &OpError{"write", fd.net, fd.raddr, err} } return nn, err} 

注意Write语义与Read不同的地方:

Write尽可能将用户缓冲区的内容所有写入至底层socket,若是遇到socket暂时不可写入,会阻塞当前协程; Read在某次读取成功时当即返回,可能会致使读取的数据量少于用户缓冲区的大小; 为何会在实现上有此不一样,我想可能read的优先级比较高吧,应用程序可能一直在等着,咱们不能等到数据一直读完才返回,会阻塞用户。 而写不同,优先级相对较低,并且用户通常也不着急写当即返回,因此能够将全部的数据所有写入,并且这样 也能简化应用程序的写法。

当系统调用返回EAGAIN时,会调用WaitRead/WaitWrite来阻塞当前协程,如今咱们接着聊。

WaitRead/WaitWrite

func (pd *pollDesc) Wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res)
}

func (pd *pollDesc) WaitRead() error {
return pd.Wait('r')
}

func (pd *pollDesc) WaitWrite() error {
return pd.Wait('w')
}

最终runtime_pollWait走到下面去了:

TEXT net·runtime_pollWait(SB),NOSPLIT,$0-0 
JMP runtime·netpollWait(SB)

咱们仔细考虑应该明白:netpollWait的主要做用是:等待关心的socket是否有事件(其实后面咱们知道只是等待一个标记位是否发生改变),若是没有事件,那么就将当前的协程挂起,直到有通知事件发生,咱们接下来看看到底如何实现:

func netpollWait(pd *pollDesc, mode int) int {
// 先检查该socket是否有error发生(如关闭、超时等)
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}

// As for now only Solaris uses level-triggered IO.
if GOOS == "solaris" {
onM(func() {
netpollarm(pd, mode)
})
}
// 循环等待netpollblock返回值为true
// 若是返回值为false且该socket未出现任何错误
// 那该协程可能被意外唤醒,须要从新被挂起
// 还有一种可能:该socket因为超时而被唤醒
// 此时netpollcheckerr就是用来检测超时错误的
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
}
return 0
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

// set the gpp semaphore to WAIT
// 首先将轮询状态设置为pdWait
// 为何要使用for呢?由于casuintptr使用了自旋锁
// 为何使用自旋锁就要加for循环呢?
for {
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
gothrow("netpollblock: double wait")
}
// 将socket轮询相关的状态设置为pdWait
if casuintptr(gpp, 0, pdWait) {
break
}
}
// 若是未出错将该协程挂起,解锁函数是netpollblockcommit
if waitio || netpollcheckerr(pd, mode) == 0 {
f := netpollblockcommit
gopark(**(**unsafe.Pointer)(unsafe.Pointer(&f)), unsafe.Pointer(gpp), "IO wait")
}
// 多是被挂起的协程被唤醒
// 或者因为某些缘由该协程压根未被挂起
// 获取其当前状态记录在old中
old := xchguintptr(gpp, 0)
if old > pdWait {
gothrow("netpollblock: corrupted state")
}
return old == pdReady
}

从上面的分析咱们看到,若是没法读写,golang会将当前协程挂起,在协程被唤醒的时候,该标记位应该会被置位。 咱们接下来看看这些挂起的协程什么时候会被唤醒。

事件通知

golang运行库在系统运行过程当中存在socket事件检查点,目前,该检查点主要位于如下几个地方:

runtime·startTheWorldWithSema(void):在完成gc后;

findrunnable():这个暂时不知道什么时候会触发?

sysmon:golang中的监控协程,会周期性检查就绪socket

TODO: 为何是在这些地方检查socket就绪事件呢?

接下来咱们看看如何检查socket就绪事件,在socket就绪后又是如何唤醒被挂起的协程?主要调用函数runtime-netpoll()

咱们只关注epoll的实现,对于epoll,上面的方法具体实现是netpoll_epoll.go中的netpoll

func netpoll(block bool) (gp *g) {
if epfd == -1 {
return
}
waitms := int32(-1)
if !block {
// 若是调用者不但愿block
// 设置waitsm为0
waitms = 0
}

var events [128]epollevent
retry:
// 调用epoll_wait获取就绪事件
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
...
}
goto retry
}

for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}

// 对每一个事件,调用了netpollready
// pd主要记录了与该socket关联的等待协程
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
netpollready((**g)(noescape(unsafe.Pointer(&gp))), pd, mode)
}
}
// 若是调用者同步等待且本次未获取到就绪socket
// 继续重试
if block && gp == nil {
goto retry
}
return gp
}

这个函数主要调用epoll_wait(固然,golang封装了系统调用)来获取就绪socket fd,对每一个就绪的fd,调用netpollready()做进一步处理。这个函数的最终返回值就是一个已经就绪的协程(g)链表。

netpollready主要是将该socket fd标记为IOReady,并唤醒等待在该fd上的协程g,将其添加到传入的g链表中。

// make pd ready, newly runnable goroutines (if any) are returned in rg/wg 
func netpollready(gpp **g, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}

if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
// 将就绪协程添加至链表中
if rg != nil {
rg.schedlink = *gpp
*gpp = rg
}
if wg != nil {
wg.schedlink = *gpp
*gpp = wg
}
}

// 将pollDesc的状态置为pdReady并返回就绪协程
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
return nil
}
var new uintptr
if ioready {
new = pdReady
}
if casuintptr(gpp, old, new) {
if old == pdReady || old == pdWait {
old = 0
}
return (*g)(unsafe.Pointer(old))
}
}
}

疑问:一个fd会被多个协程同时进行IO么?好比一个协程读,另一个协程写?或者多个协程同时读?此时返回的是哪一个协程就绪呢?

一个socket fd可支持并发读写,由于对于tcp协议来讲,是全双工。读写操做的是不一样缓冲区,可是不支持并发读和并发写,由于这样会错乱的。因此上面的netFD.RWLock()就是干这个做用的。

runtime中的epoll事件驱动抽象层其实在进入net库后,又被封装了一次,这一次封装从代码上看主要是为了方便在纯Go语言环境进行操做,net库中的此次封装实如今poll/fd_poll_runtime.go文件中,主要是经过pollDesc对象来实现的: 
(ps: 这里对应的版本是go1.9.1 的版本)

type pollDesc struct {
runtimeCtx uintptr
}

注意:此处的pollDesc对象不是上文提到的runtime中的PollDesc,相反此处pollDesc对象的runtimeCtx成员才是指向的runtime的PollDesc实例。pollDesc对象主要就是将runtime的事件驱动抽象层给再封装了一次,供网络fd对象使用。

func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}

pollDesc对象最须要关注的就是其Init方法,这个方法经过一个sync.Once变量来调用了runtime_pollServerInit函数,也就是建立epoll实例的函数。 
意思就是runtime_pollServerInit函数在整个进程生命周期内只会被调用一次,也就是只会建立一次epoll实例。epoll实例被建立后,会调用runtime_pollOpen函数将fd添加到epoll中。

网络编程中的全部socket fd都是经过netFD对象实现的,netFD是对网络IO操做的抽象,linux的实如今文件net/fd_unix.go中。netFD对象实现有本身的init方法,还有完成基本IO操做的Read和Write方法,固然除了这三个方法之外,还有不少很是有用的方法供用户使用。

/src/net/fd_unix.go

// Network file descriptor.
type netFD struct {
pfd poll.FD

// immutable until Close
family int
sotype int
isConnected bool
net string
laddr Addr
raddr Addr
}

经过netFD对象的定义能够看到每一个fd都关联了一个pollDesc实例,经过上文咱们知道pollDesc对象最终是对epoll的封装。

func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
},
family: family,
sotype: sotype,
net: net,
}
return ret, nil
}

func (fd *netFD) init() error {
return fd.pfd.Init(fd.net, true)
}

netFD对象的init函数仅仅是调用了pollDesc实例的Init函数,做用就是将fd添加到epoll中,若是这个fd是第一个网络socket fd的话,这一次init还会担任建立epoll实例的任务。要知道在Go进程里,只会有一个epoll实例来管理全部的网络socket fd,这个epoll实例也就是在第一个网络socket fd被建立的时候所建立。

/src/net/fd_unix.go 
Read()函数:

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}

重点关注这个for循环中的syscall.Read调用的错误处理。当有错误发生的时候,会检查这个错误是不是syscall.EAGAIN,若是是,则调用WaitRead将当前读这个fd的goroutine给park住,直到这个fd上的读事件再次发生为止。 
当这个socket上有新数据到来的时候,WaitRead调用返回,继续for循环的执行。这样的实现,就让调用netFD的Read的地方变成了同步“阻塞”方式编程,再也不是异步非阻塞的编程方式了。netFD的Write方法和Read的实现原理是同样的,都是在碰到EAGAIN错误的时候将当前goroutine给park住直到socket再次可写为止。

本文只是将网络库的底层实现给大致上引导了一遍,知道底层代码大概实如今什么地方,方便结合源码深刻理解。Go语言中的高并发、同步阻塞方式编程的关键实际上是”goroutine和调度器”,针对网络IO的时候,咱们须要知道EAGAIN这个很是关键的调度点,掌握了这个调度点,即便没有调度器,本身也能够在epoll的基础上配合协程等用户态线程实现网络IO操做的调度,达到同步阻塞编程的目的。

最后,为何须要同步阻塞的方式编程?只有看多、写多了异步非阻塞代码的时候才可以深切体会到这个问题。真正的高大上绝对不是——“别人不会,我会;别人写不出来,我写得出来。”


EAGAIN:

ET仍是LT?

LT的处理过程:
. accept一个链接,添加到epoll中监听EPOLLIN事件
. 当EPOLLIN事件到达时,read fd中的数据并处理
. 当须要写出数据时,把数据write到fd中;若是数据较大,没法一次性写出,那么在epoll中监听EPOLLOUT事件
. 当EPOLLOUT事件到达时,继续把数据write到fd中;若是数据写出完毕,那么在epoll中关闭EPOLLOUT事件

ET的处理过程:
. accept一个一个链接,添加到epoll中监听EPOLLIN|EPOLLOUT事件
. 当EPOLLIN事件到达时,read fd中的数据并处理,read须要一直读,直到返回EAGAIN为止
. 当须要写出数据时,把数据write到fd中,直到数据所有写完,或者write返回EAGAIN
. 当EPOLLOUT事件到达时,继续把数据write到fd中,直到数据所有写完,或者write返回EAGAIN

从ET的处理过程当中能够看到,ET的要求是须要一直读写,直到返回EAGAIN,不然就会遗漏事件。而LT的处理过程当中,直到返回EAGAIN不是硬性要求,但一般的处理过程都会读写直到返回EAGAIN,但LT比ET多了一个开关EPOLLOUT事件的步骤

LT的编程与poll/select接近,符合一直以来的习惯,不易出错
ET的编程能够作到更加简洁,某些场景下更加高效,但另外一方面容易遗漏事件,容易产生bug