epoll_create

问题一:Go如何与linux网络子系统交互

runtime/sys_linux_amd64.s
runtime.epollcreate(int32 size)runtime.epollcreate1(int32 flags)

在这个文件中还发现了其他epoll_xxx函数的交互:

  • runtime.epollctl()
  • runtime.epollwait()
  • runtime.closeonexec(fd int32)

我们暂时只需要知道go如何完成epoll相关调用即可。第二个问题是:Go如何对网络进行抽象

问题二:Go的网络抽象——netpoll

我们很容易找到runtime目录下的netpoll.go文件。通过这个文件顶部的build tag,我们可以理解,这个文件中的代码是通用于各个平台的。

//go:build aix || darwin || dragonfly || freebsd || (js && wasm) || linux || netbsd || openbsd || solaris || windows

而不同平台的实现则分布于其它netpoll_xxx.go文件中:

接下来就容易了,我们先了解netpoll的总体情况,再选择netpoll_epoll.go作为学习的对象。

问题三:各种IO复用机制的抽象

netpoll一共抽象了6个函数,这6个函数,每种IO复用机制都要实现:

func netpollinit()epfdfunc netpollopen(fd uintptr, pd *pollDesc) int32func netpollclose(fd uintptr) int32func netpoll(delta int64) gListfunc netpollBreak()func netpollIsPollDescriptor(fd uintptr) bool
netpollinit()epfdnetpollopennetpollnetpollclose

除了这6个函数,netpoll还提供了几个数据结构:

go:notinheap

pollCache

数据结构:

type pollCache struct {
	lock  mutex
	first *pollDesc
}

pollCache是个单例,唯一变量 pollcache为全局变量。pollCache提供了两个操作:

  • func (c *pollCache) alloc() *pollDesc:分配pollDesc
  • func (c *pollCache) free(pd *pollDesc):将一个pollDesc放入链表头
alloc()
func persistentalloc(size, align uintptr, sysStat *sysMemStat) unsafe.Pointer
const pdSize = unsafe.Sizeof(pollDesc{})
n := pollBlockSize / pdSize
if n == 0 {
    n = 1
}
persistentalloc

pollCache不为空时

pd := c.first
c.first = pd.link
lockInit(&pd.lock, lockRankPollDesc)

取下第一个pd返回即可。

接下来是重点要搞懂的数据结构:pollDesc

pollDesc

//go:notinheap
type pollDesc struct {
	link *pollDesc
	fd   uintptr

	atomicInfo atomic.Uint32
	rg atomic.Uintptr
	wg atomic.Uintptr

	lock    mutex
	closing bool
	user    uint32
	rseq    uintptr
	rt      timer
	rd      int64
	wseq    uintptr
	wt      timer
	wd      int64
	self    *pollDesc
}

pollDesc的字段分成了3组:

  • link 在 pollcache 中使用,因为 pollcache 是链表
  • fd 指向一个文件描述符的地址
  • atomicInfo pd的一些标志位,比如pollClosing表示pd是否正在关闭,pollEventErr表示这个pd发生了某种错误,等等。
  • rg、wg:这两个字段是2个信号量,值可以是 pdReady、pdWait、一个G uintptr指针或者nil。
  • lock则负责保护它下面的字段。closing说明这个pd是否正在关闭;rseq、rt、rd:读相关的定时器信息;wreq、wt、wd写相关的定时器信息;self指向自身,用于timer回调函数。

提供了4个辅助方法:

func (pd *pollDesc) info() pollInfofunc (pd *pollDesc) publishInfo()func (pd *pollDesc) setEventErr(b bool)func (pd *pollDesc) makeArg() (i any)

有了以上认识之后,我们看看go runtime是驶入实现netpoll的

go netpoll的实现

netpoll初始化

首先 go runtime 提供了2个变量:

netpollInitLock mutex
netpollInited   uint32
epfd int32 = -1 // epoll descriptor netpoll_epoll.go
netpollinit()netpollinit()netpollInitLocknetpollInited
func netpollGenericInit() {
	if atomic.Load(&netpollInited) == 0 {
		lockInit(&netpollInitLock, lockRankNetpollInit)
		lock(&netpollInitLock)
		if netpollInited == 0 {
			netpollinit()
			atomic.Store(&netpollInited, 1)
		}
		unlock(&netpollInitLock)
	}
} 

当需要判断nepoll是否初始化过时,可以调用:

func netpollinited() bool {
    return atomic.Load(&netpollInited) != 0
}
netpollGenericInitpoll_runtime_pollServerInitpoll_runtime_pollServerInitinternal/poll.runtime_pollServerInitinternal/poll

poll_runtime_pollOpen

poll_runtime_pollOpen
 func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
	pd := pollcache.alloc()
	lock(&pd.lock)
	wg := pd.wg.Load()
	if wg != 0 && wg != pdReady {
		throw("runtime: blocked write on free polldesc")
	}
	rg := pd.rg.Load()
	if rg != 0 && rg != pdReady {
		throw("runtime: blocked read on free polldesc")
	}
	pd.fd = fd
	pd.closing = false
	pd.setEventErr(false)
	pd.rseq++
	pd.rg.Store(0)
	pd.rd = 0
	pd.wseq++
	pd.wg.Store(0)
	pd.wd = 0
	pd.self = pd
	pd.publishInfo()
	unlock(&pd.lock)

	errno := netpollopen(fd, pd)
	if errno != 0 {
		pollcache.free(pd)
		return nil, int(errno)
	}
	return pd, 0
}
pollcachenetpollopeninternal/poll.runtime_pollOpen

poll_runtime_pollClose

poll_runtime_pollClose
func poll_runtime_pollClose(pd *pollDesc) {
	if !pd.closing {
		throw("runtime: close polldesc w/o unblock")
	}
	wg := pd.wg.Load()
	if wg != 0 && wg != pdReady {
		throw("runtime: blocked write on closing polldesc")
	}
	rg := pd.rg.Load()
	if rg != 0 && rg != pdReady {
		throw("runtime: blocked read on closing polldesc")
	}
	netpollclose(pd.fd)
	pollcache.free(pd)
} 
internal/poll.runtime_pollClose

poll_runtime_pollWait

poll,等待pd可读或可写(阻塞)

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
	errcode := netpollcheckerr(pd, int32(mode))
	if errcode != pollNoError {
		return errcode
	}
	// As for now only Solaris, illumos, and AIX use level-triggered IO.
	if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
		netpollarm(pd, mode)
	}
	for !netpollblock(pd, int32(mode), false) {
		errcode = netpollcheckerr(pd, int32(mode))
		if errcode != pollNoError {
			return errcode
		}
		// Can happen if timeout has fired and unblocked us,
		// but before we had a chance to run, timeout has been reset.
		// Pretend it has not happened and retry.
	}
	return pollNoError
}
internal/poll.runtime_pollWait

poll_runtime_pollUnblock

internal/poll.runtime_pollUnblock
func poll_runtime_pollUnblock(pd *pollDesc) {
	lock(&pd.lock)
	if pd.closing {
		throw("runtime: unblock on closing polldesc")
	}
	pd.closing = true
	pd.rseq++
	pd.wseq++
	var rg, wg *g
	pd.publishInfo()
	rg = netpollunblock(pd, 'r', false)
	wg = netpollunblock(pd, 'w', false)
	if pd.rt.f != nil {
		deltimer(&pd.rt)
		pd.rt.f = nil
	}
	if pd.wt.f != nil {
		deltimer(&pd.wt)
		pd.wt.f = nil
	}
	unlock(&pd.lock)
	if rg != nil {
		netpollgoready(rg, 3)
	}
	if wg != nil {
		netpollgoready(wg, 3)
	}
}

poll_runtime_pollReset

internal/poll.runtime_pollReset
func poll_runtime_pollReset(pd *pollDesc, mode int) int {
	errcode := netpollcheckerr(pd, int32(mode))
	if errcode != pollNoError {
		return errcode
	}
	if mode == 'r' {
		pd.rg.Store(0)
	} else if mode == 'w' {
		pd.wg.Store(0)
	}
	return pollNoError
}

poll_runtime_pollSetDeadline

netpollReadDeadlinenetpollWriteDeadline
func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
	lock(&pd.lock)
	if pd.closing {
		unlock(&pd.lock)
		return
	}
	rd0, wd0 := pd.rd, pd.wd
	combo0 := rd0 > 0 && rd0 == wd0
	if d > 0 {
		d += nanotime()
		if d <= 0 {
			// If the user has a deadline in the future, but the delay calculation
			// overflows, then set the deadline to the maximum possible value.
			d = 1<<63 - 1
		}
	}
	if mode == 'r' || mode == 'r'+'w' {
		pd.rd = d
	}
	if mode == 'w' || mode == 'r'+'w' {
		pd.wd = d
	}
	pd.publishInfo()
	combo := pd.rd > 0 && pd.rd == pd.wd
	rtf := netpollReadDeadline
	if combo {
		rtf = netpollDeadline
	}
	if pd.rt.f == nil {
		if pd.rd > 0 {
			pd.rt.f = rtf
			// Copy current seq into the timer arg.
			// Timer func will check the seq against current descriptor seq,
			// if they differ the descriptor was reused or timers were reset.
			pd.rt.arg = pd.makeArg()
			pd.rt.seq = pd.rseq
			resettimer(&pd.rt, pd.rd)
		}
	} else if pd.rd != rd0 || combo != combo0 {
		pd.rseq++ // invalidate current timers
		if pd.rd > 0 {
			modtimer(&pd.rt, pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
		} else {
			deltimer(&pd.rt)
			pd.rt.f = nil
		}
	}
	if pd.wt.f == nil {
		if pd.wd > 0 && !combo {
			pd.wt.f = netpollWriteDeadline
			pd.wt.arg = pd.makeArg()
			pd.wt.seq = pd.wseq
			resettimer(&pd.wt, pd.wd)
		}
	} else if pd.wd != wd0 || combo != combo0 {
		pd.wseq++ // invalidate current timers
		if pd.wd > 0 && !combo {
			modtimer(&pd.wt, pd.wd, 0, netpollWriteDeadline, pd.makeArg(), pd.wseq)
		} else {
			deltimer(&pd.wt)
			pd.wt.f = nil
		}
	}
	// If we set the new deadline in the past, unblock currently pending IO if any.
	// Note that pd.publishInfo has already been called, above, immediately after modifying rd and wd.
	var rg, wg *g
	if pd.rd < 0 {
		rg = netpollunblock(pd, 'r', false)
	}
	if pd.wd < 0 {
		wg = netpollunblock(pd, 'w', false)
	}
	unlock(&pd.lock)
	if rg != nil {
		netpollgoready(rg, 3)
	}
	if wg != nil {
		netpollgoready(wg, 3)
	}
}

poll_runtime_isPollServerDescriptor

func poll_runtime_isPollServerDescriptor(fd uintptr) bool {
	return netpollIsPollDescriptor(fd)
}
internal/poll.runtime_isPollServerDescriptorinternal/poll
internal/pollruntime_internal/poll

runtime.poll_runtime_isPollServerDescriptor <---> internal/poll.runtime_isPollServerDescriptor

runtime.poll_runtime_pollServerInit <---> internal/poll.runtime_pollServerInit

runtime.poll_runtime_pollOpen <---> internal/poll.runtime_pollOpen

runtime.poll_runtime_pollSetDeadline <---> internal/poll.runtime_pollSetDeadline

runtime.poll_runtime_pollWait <---> internal/poll.runtime_pollWait

runtime. poll_runtime_pollUnblock <---> internal/poll.runtime_pollUnblock

runtime.poll_runtime_pollReset <---> internal/poll.runtime_pollReset

runtime.poll_runtime_pollClose <---> internal/poll.runtime_pollClose

internal/poll

src/internal/poll/fd_poll_runtime.go

pollDesc

pollDesc 就是对runtime.pollDesc的封装,这时已经看不到runtime_XXX函数了。

type pollDesc struct {
	runtimeCtx uintptr
}

唯一字段runtimeCtx是一个uintptr,指向的正是runtime.pollDesc。

接下来,我们简单理解一下pollDesc时如何使用runtime_XXX函数实现这些方法的。

  1. pollDesc的初始化init()
var serverInit sync.Once

func (pd *pollDesc) init(fd *FD) error {
	serverInit.Do(runtime_pollServerInit)
	ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
	if errno != 0 {
		return errnoErr(syscall.Errno(errno))
	}
	pd.runtimeCtx = ctx
	return nil
}
runtime_pollServerInit

2. 非阻塞模式prepare()

func (pd *pollDesc) prepare(mode int, isFile bool) error {
	if pd.runtimeCtx == 0 {
		return nil
	}
	res := runtime_pollReset(pd.runtimeCtx, mode)
	return convertErr(res, isFile)
}

func (pd *pollDesc) prepareRead(isFile bool) error {
	return pd.prepare('r', isFile)
}

func (pd *pollDesc) prepareWrite(isFile bool) error {
	return pd.prepare('w', isFile)
}

3. 阻塞模式wait()

func (pd *pollDesc) wait(mode int, isFile bool) error {
	if pd.runtimeCtx == 0 {
		return errors.New("waiting for unsupported file type")
	}
	res := runtime_pollWait(pd.runtimeCtx, mode)
	return convertErr(res, isFile)
}

func (pd *pollDesc) waitRead(isFile bool) error {
	return pd.wait('r', isFile)
}

func (pd *pollDesc) waitWrite(isFile bool) error {
	return pd.wait('w', isFile)
}

4. 取消wait

func (pd *pollDesc) waitCanceled(mode int) {
	if pd.runtimeCtx == 0 {
		return
	}
	runtime_pollWaitCanceled(pd.runtimeCtx, mode)
}

5. 用完之后close()

func (pd *pollDesc) close() {
	if pd.runtimeCtx == 0 {
		return
	}
	runtime_pollClose(pd.runtimeCtx)
	pd.runtimeCtx = 0
}

FD

fd可用于net包和os包,用于net包,可以表示一个网络文件描述符;用于os包可以表示一个文件描述符。

type FD struct {
	// Lock sysfd and serialize access to Read and Write methods.
	fdmu fdMutex

	// System file descriptor. Immutable until Close.
	Sysfd int

	// I/O poller.
	pd pollDesc

	// Writev cache.
	iovecs *[]syscall.Iovec

	// Semaphore signaled when file is closed.
	csema uint32

	// Non-zero if this file has been set to blocking mode.
	isBlocking uint32

	// Whether this is a streaming descriptor, as opposed to a
	// packet-based descriptor like a UDP socket. Immutable.
	IsStream bool

	// Whether a zero byte read indicates EOF. This is false for a
	// message based socket connection.
	ZeroReadIsEOF bool

	// Whether this is a file rather than a network socket.
	isFile bool
}

每个字段的含义都比较清晰。

从FD的方发表中可以发现,有FD的初始化Init和关闭Close方法,主要是各种Read和Write方法;设置FD的模式SetBlocking();如果FD代表网络文件描述符,还可以通过SetsockoptXXX方法设置相关参数。如果是本地文件描述符,则可以使用Seek()、ReadDirent()等方法。如果FD代表网络文件描述符并且作为Server端的话,还可以调用Accept(),允许打开一个客户端连接。

有了FD,距离我们熟知的connection又近了一步了。

下面我们了解几个方法的实现

  1. FD的初始化Init
func (fd *FD) Init(net string, pollable bool) error {
	// We don't actually care about the various network types.
	if net == "file" {
		fd.isFile = true
	}
	if !pollable {
		fd.isBlocking = 1
		return nil
	}
	err := fd.pd.init(fd)
	if err != nil {
		// If we could not initialize the runtime poller,
		// assume we are using blocking mode.
		fd.isBlocking = 1
	}
	return err
}

这里可以结合pd.init()理解初始化过程。

2. FD的基本Read操作

func (fd *FD) Read(p []byte) (int, error) {
        // 加读锁
	if err := fd.readLock(); err != nil {
		return 0, err
	}
	defer fd.readUnlock()

        // 设置为read模式
	if err := fd.pd.prepareRead(fd.isFile); err != nil {
		return 0, err
	}
        // 如果是TCP,设置最大可读值,maxRW = 2^30字节
	if fd.IsStream && len(p) > maxRW {
		p = p[:maxRW]
	}
        // 通过syscall.Read, 不断尝试
	for {
		n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
		if err != nil {
			n = 0
			if err == syscall.EAGAIN && fd.pd.pollable() {
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		}
		err = fd.eofError(n, err)
		return n, err
	}
}

其他ReadXXX方法和Read()类似,只是用不同的syscall方法,这里不再分析。

3. FD的Write()操作

func (fd *FD) Write(p []byte) (int, error) {
        // 加写锁
	if err := fd.writeLock(); err != nil {
		return 0, err
	}
	defer fd.writeUnlock()
        // 设置为写模式
	if err := fd.pd.prepareWrite(fd.isFile); err != nil {
		return 0, err
	}
	var nn int
        // 不断尝试
	for {
		max := len(p)
		if fd.IsStream && max-nn > maxRW {
			max = nn + maxRW
		}
                // 调用 syscall.Write
		n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
		if n > 0 {
			nn += n
		}
		if nn == len(p) {
			return nn, err
		}
		if err == syscall.EAGAIN && fd.pd.pollable() {
			if err = fd.pd.waitWrite(fd.isFile); err == nil {
				continue
			}
		}
		if err != nil {
			return nn, err
		}
		if n == 0 {
			return nn, io.ErrUnexpectedEOF
		}
	}
}

类似ReadXXX,其他WriteXXX方法也是调用某个syscall。

4. 设置deadline:SetDeadline

func (fd *FD) SetDeadline(t time.Time) error {
	return setDeadlineImpl(fd, t, 'r'+'w')
}

func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
	var d int64
	if !t.IsZero() {
		d = int64(time.Until(t))
		if d == 0 {
			d = -1 // don't confuse deadline right now with no deadline
		}
	}
	if err := fd.incref(); err != nil {
		return err
	}
	defer fd.decref()
	if fd.pd.runtimeCtx == 0 {
		return ErrNoDeadline
	}
	runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
	return nil
}
runtime_pollSetDeadline

5. 改变FD的工作模式:SetBlocking()

func (fd *FD) SetBlocking() error {
	if err := fd.incref(); err != nil {
		return err
	}
	defer fd.decref()
	// Atomic store so that concurrent calls to SetBlocking
	// do not cause a race condition. isBlocking only ever goes
	// from 0 to 1 so there is no real race here.
	atomic.StoreUint32(&fd.isBlocking, 1)
	return syscall.SetNonblock(fd.Sysfd, false)
}

修改fd.isBlocking为1,同时通过syscall设置fd.Sysfd为false。也就是说,默认情况下,FD为非阻塞模式。

6.FD接受新连接 Accept()

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
        // 加读锁
	if err := fd.readLock(); err != nil {
		return -1, nil, "", err
	}
	defer fd.readUnlock()
        // 准备读
	if err := fd.pd.prepareRead(fd.isFile); err != nil {
		return -1, nil, "", err
	}
	for {
                // 调用accept(),fd.Sysfd为代表server的socket文件描述符
                // s为新连接socket文件描述符,rsa为对端地址
		s, rsa, errcall, err := accept(fd.Sysfd)
		if err == nil {
			return s, rsa, "", err
		}
		switch err {
		case syscall.EINTR:
			continue
		case syscall.EAGAIN:
			if fd.pd.pollable() {
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		case syscall.ECONNABORTED:
			// This means that a socket on the listen
			// queue was closed before we Accept()ed it;
			// it's a silly error, so try again.
			continue
		}
		return -1, nil, errcall, err
	}
}
Accept()

7. 关闭FD: Close

func (fd *FD) Close() error {
	if !fd.fdmu.increfAndClose() {
		return errClosing(fd.isFile)
	}

	// Unblock any I/O.  Once it all unblocks and returns,
	// so that it cannot be referring to fd.sysfd anymore,
	// the final decref will close fd.sysfd. This should happen
	// fairly quickly, since all the I/O is non-blocking, and any
	// attempts to block in the pollDesc will return errClosing(fd.isFile).
	fd.pd.evict()

	// The call to decref will call destroy if there are no other
	// references.
	err := fd.decref()

	// Wait until the descriptor is closed. If this was the only
	// reference, it is already closed. Only wait if the file has
	// not been set to blocking mode, as otherwise any current I/O
	// may be blocking, and that would block the Close.
	// No need for an atomic read of isBlocking, increfAndClose means
	// we have exclusive access to fd.
	if fd.isBlocking == 0 {
		runtime_Semacquire(&fd.csema)
	}

	return err
}

func (fd *FD) decref() error {
	if fd.fdmu.decref() {
		return fd.destroy()
	}
	return nil
}

func (fd *FD) destroy() error {
	// Poller may want to unregister fd in readiness notification mechanism,
	// so this must be executed before CloseFunc.
	fd.pd.close()

	// We don't use ignoringEINTR here because POSIX does not define
	// whether the descriptor is closed if close returns EINTR.
	// If the descriptor is indeed closed, using a loop would race
	// with some other goroutine opening a new descriptor.
	// (The Linux kernel guarantees that it is closed on an EINTR error.)
	err := CloseFunc(fd.Sysfd)

	fd.Sysfd = -1
	runtime_Semrelease(&fd.csema)
	return err
}
runtime_pollClose(pd.runtimeCtx)

总结一下:internal/poll包将runtime.pollDesc和epoll简单封装的8个函数,再次封装成了internal/poll.pollDesc,完成了网络文件描述符的基本操作;然后再次封装出internal/poll.FD——提供读、写、Accept()等操作,既支持网络文件描述符也支持本地文件描述符,为net包和os包提供了公开对象FD和操作。

全文总结

这篇文章结合我们已知的知识——epoll、系统调用、golang的net包,通过从下往上的顺序,不断提问的方式,了解了Go是如何封装各种网络IO复用机制,并封装了FD对象,供各个网络包使用。


下一篇文章将介绍internal/poll.FD的应用。