epoll_create
问题一:Go如何与linux网络子系统交互
runtime/sys_linux_amd64.s
runtime.epollcreate(int32 size)runtime.epollcreate1(int32 flags)
在这个文件中还发现了其他epoll_xxx函数的交互:
- runtime.epollctl()
- runtime.epollwait()
- runtime.closeonexec(fd int32)
我们暂时只需要知道go如何完成epoll相关调用即可。第二个问题是:Go如何对网络进行抽象
问题二:Go的网络抽象——netpoll
我们很容易找到runtime目录下的netpoll.go文件。通过这个文件顶部的build tag,我们可以理解,这个文件中的代码是通用于各个平台的。
//go:build aix || darwin || dragonfly || freebsd || (js && wasm) || linux || netbsd || openbsd || solaris || windows
而不同平台的实现则分布于其它netpoll_xxx.go文件中:
接下来就容易了,我们先了解netpoll的总体情况,再选择netpoll_epoll.go作为学习的对象。
问题三:各种IO复用机制的抽象
netpoll一共抽象了6个函数,这6个函数,每种IO复用机制都要实现:
func netpollinit()epfdfunc netpollopen(fd uintptr, pd *pollDesc) int32func netpollclose(fd uintptr) int32func netpoll(delta int64) gListfunc netpollBreak()func netpollIsPollDescriptor(fd uintptr) bool
netpollinit()epfdnetpollopennetpollnetpollclose
除了这6个函数,netpoll还提供了几个数据结构:
go:notinheap
pollCache
数据结构:
type pollCache struct {
lock mutex
first *pollDesc
}
pollCache是个单例,唯一变量 pollcache为全局变量。pollCache提供了两个操作:
- func (c *pollCache) alloc() *pollDesc:分配pollDesc
- func (c *pollCache) free(pd *pollDesc):将一个pollDesc放入链表头
alloc()
func persistentalloc(size, align uintptr, sysStat *sysMemStat) unsafe.Pointer
const pdSize = unsafe.Sizeof(pollDesc{})
n := pollBlockSize / pdSize
if n == 0 {
n = 1
}
persistentalloc
pollCache不为空时,
pd := c.first
c.first = pd.link
lockInit(&pd.lock, lockRankPollDesc)
取下第一个pd返回即可。
接下来是重点要搞懂的数据结构:pollDesc
pollDesc
//go:notinheap
type pollDesc struct {
link *pollDesc
fd uintptr
atomicInfo atomic.Uint32
rg atomic.Uintptr
wg atomic.Uintptr
lock mutex
closing bool
user uint32
rseq uintptr
rt timer
rd int64
wseq uintptr
wt timer
wd int64
self *pollDesc
}
pollDesc的字段分成了3组:
- link 在 pollcache 中使用,因为 pollcache 是链表
- fd 指向一个文件描述符的地址
- atomicInfo pd的一些标志位,比如pollClosing表示pd是否正在关闭,pollEventErr表示这个pd发生了某种错误,等等。
- rg、wg:这两个字段是2个信号量,值可以是 pdReady、pdWait、一个G uintptr指针或者nil。
- lock则负责保护它下面的字段。closing说明这个pd是否正在关闭;rseq、rt、rd:读相关的定时器信息;wreq、wt、wd写相关的定时器信息;self指向自身,用于timer回调函数。
提供了4个辅助方法:
func (pd *pollDesc) info() pollInfofunc (pd *pollDesc) publishInfo()func (pd *pollDesc) setEventErr(b bool)func (pd *pollDesc) makeArg() (i any)
有了以上认识之后,我们看看go runtime是驶入实现netpoll的
go netpoll的实现
netpoll初始化
首先 go runtime 提供了2个变量:
netpollInitLock mutex
netpollInited uint32
epfd int32 = -1 // epoll descriptor netpoll_epoll.go
netpollinit()netpollinit()netpollInitLocknetpollInited
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
当需要判断nepoll是否初始化过时,可以调用:
func netpollinited() bool {
return atomic.Load(&netpollInited) != 0
}
netpollGenericInitpoll_runtime_pollServerInitpoll_runtime_pollServerInitinternal/poll.runtime_pollServerInitinternal/poll
poll_runtime_pollOpen
poll_runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc()
lock(&pd.lock)
wg := pd.wg.Load()
if wg != 0 && wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
rg := pd.rg.Load()
if rg != 0 && rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
pd.closing = false
pd.setEventErr(false)
pd.rseq++
pd.rg.Store(0)
pd.rd = 0
pd.wseq++
pd.wg.Store(0)
pd.wd = 0
pd.self = pd
pd.publishInfo()
unlock(&pd.lock)
errno := netpollopen(fd, pd)
if errno != 0 {
pollcache.free(pd)
return nil, int(errno)
}
return pd, 0
}
pollcachenetpollopeninternal/poll.runtime_pollOpen
poll_runtime_pollClose
poll_runtime_pollClose
func poll_runtime_pollClose(pd *pollDesc) {
if !pd.closing {
throw("runtime: close polldesc w/o unblock")
}
wg := pd.wg.Load()
if wg != 0 && wg != pdReady {
throw("runtime: blocked write on closing polldesc")
}
rg := pd.rg.Load()
if rg != 0 && rg != pdReady {
throw("runtime: blocked read on closing polldesc")
}
netpollclose(pd.fd)
pollcache.free(pd)
}
internal/poll.runtime_pollClose
poll_runtime_pollWait
poll,等待pd可读或可写(阻塞)
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}
internal/poll.runtime_pollWait
poll_runtime_pollUnblock
internal/poll.runtime_pollUnblock
func poll_runtime_pollUnblock(pd *pollDesc) {
lock(&pd.lock)
if pd.closing {
throw("runtime: unblock on closing polldesc")
}
pd.closing = true
pd.rseq++
pd.wseq++
var rg, wg *g
pd.publishInfo()
rg = netpollunblock(pd, 'r', false)
wg = netpollunblock(pd, 'w', false)
if pd.rt.f != nil {
deltimer(&pd.rt)
pd.rt.f = nil
}
if pd.wt.f != nil {
deltimer(&pd.wt)
pd.wt.f = nil
}
unlock(&pd.lock)
if rg != nil {
netpollgoready(rg, 3)
}
if wg != nil {
netpollgoready(wg, 3)
}
}
poll_runtime_pollReset
internal/poll.runtime_pollReset
func poll_runtime_pollReset(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
if mode == 'r' {
pd.rg.Store(0)
} else if mode == 'w' {
pd.wg.Store(0)
}
return pollNoError
}
poll_runtime_pollSetDeadline
netpollReadDeadlinenetpollWriteDeadline
func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
lock(&pd.lock)
if pd.closing {
unlock(&pd.lock)
return
}
rd0, wd0 := pd.rd, pd.wd
combo0 := rd0 > 0 && rd0 == wd0
if d > 0 {
d += nanotime()
if d <= 0 {
// If the user has a deadline in the future, but the delay calculation
// overflows, then set the deadline to the maximum possible value.
d = 1<<63 - 1
}
}
if mode == 'r' || mode == 'r'+'w' {
pd.rd = d
}
if mode == 'w' || mode == 'r'+'w' {
pd.wd = d
}
pd.publishInfo()
combo := pd.rd > 0 && pd.rd == pd.wd
rtf := netpollReadDeadline
if combo {
rtf = netpollDeadline
}
if pd.rt.f == nil {
if pd.rd > 0 {
pd.rt.f = rtf
// Copy current seq into the timer arg.
// Timer func will check the seq against current descriptor seq,
// if they differ the descriptor was reused or timers were reset.
pd.rt.arg = pd.makeArg()
pd.rt.seq = pd.rseq
resettimer(&pd.rt, pd.rd)
}
} else if pd.rd != rd0 || combo != combo0 {
pd.rseq++ // invalidate current timers
if pd.rd > 0 {
modtimer(&pd.rt, pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
} else {
deltimer(&pd.rt)
pd.rt.f = nil
}
}
if pd.wt.f == nil {
if pd.wd > 0 && !combo {
pd.wt.f = netpollWriteDeadline
pd.wt.arg = pd.makeArg()
pd.wt.seq = pd.wseq
resettimer(&pd.wt, pd.wd)
}
} else if pd.wd != wd0 || combo != combo0 {
pd.wseq++ // invalidate current timers
if pd.wd > 0 && !combo {
modtimer(&pd.wt, pd.wd, 0, netpollWriteDeadline, pd.makeArg(), pd.wseq)
} else {
deltimer(&pd.wt)
pd.wt.f = nil
}
}
// If we set the new deadline in the past, unblock currently pending IO if any.
// Note that pd.publishInfo has already been called, above, immediately after modifying rd and wd.
var rg, wg *g
if pd.rd < 0 {
rg = netpollunblock(pd, 'r', false)
}
if pd.wd < 0 {
wg = netpollunblock(pd, 'w', false)
}
unlock(&pd.lock)
if rg != nil {
netpollgoready(rg, 3)
}
if wg != nil {
netpollgoready(wg, 3)
}
}
poll_runtime_isPollServerDescriptor
func poll_runtime_isPollServerDescriptor(fd uintptr) bool {
return netpollIsPollDescriptor(fd)
}
internal/poll.runtime_isPollServerDescriptorinternal/poll
internal/pollruntime_internal/poll
runtime.poll_runtime_isPollServerDescriptor <---> internal/poll.runtime_isPollServerDescriptor
runtime.poll_runtime_pollServerInit <---> internal/poll.runtime_pollServerInit
runtime.poll_runtime_pollOpen <---> internal/poll.runtime_pollOpen
runtime.poll_runtime_pollSetDeadline <---> internal/poll.runtime_pollSetDeadline
runtime.poll_runtime_pollWait <---> internal/poll.runtime_pollWait
runtime. poll_runtime_pollUnblock <---> internal/poll.runtime_pollUnblock
runtime.poll_runtime_pollReset <---> internal/poll.runtime_pollReset
runtime.poll_runtime_pollClose <---> internal/poll.runtime_pollClose
internal/poll
src/internal/poll/fd_poll_runtime.go
pollDesc
pollDesc 就是对runtime.pollDesc的封装,这时已经看不到runtime_XXX函数了。
type pollDesc struct {
runtimeCtx uintptr
}
唯一字段runtimeCtx是一个uintptr,指向的正是runtime.pollDesc。
接下来,我们简单理解一下pollDesc时如何使用runtime_XXX函数实现这些方法的。
- pollDesc的初始化init()
var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
runtime_pollServerInit
2. 非阻塞模式prepare()
func (pd *pollDesc) prepare(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return nil
}
res := runtime_pollReset(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
func (pd *pollDesc) prepareRead(isFile bool) error {
return pd.prepare('r', isFile)
}
func (pd *pollDesc) prepareWrite(isFile bool) error {
return pd.prepare('w', isFile)
}
3. 阻塞模式wait()
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) waitWrite(isFile bool) error {
return pd.wait('w', isFile)
}
4. 取消wait
func (pd *pollDesc) waitCanceled(mode int) {
if pd.runtimeCtx == 0 {
return
}
runtime_pollWaitCanceled(pd.runtimeCtx, mode)
}
5. 用完之后close()
func (pd *pollDesc) close() {
if pd.runtimeCtx == 0 {
return
}
runtime_pollClose(pd.runtimeCtx)
pd.runtimeCtx = 0
}
FD
fd可用于net包和os包,用于net包,可以表示一个网络文件描述符;用于os包可以表示一个文件描述符。
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
// Writev cache.
iovecs *[]syscall.Iovec
// Semaphore signaled when file is closed.
csema uint32
// Non-zero if this file has been set to blocking mode.
isBlocking uint32
// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool
// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool
// Whether this is a file rather than a network socket.
isFile bool
}
每个字段的含义都比较清晰。
从FD的方发表中可以发现,有FD的初始化Init和关闭Close方法,主要是各种Read和Write方法;设置FD的模式SetBlocking();如果FD代表网络文件描述符,还可以通过SetsockoptXXX方法设置相关参数。如果是本地文件描述符,则可以使用Seek()、ReadDirent()等方法。如果FD代表网络文件描述符并且作为Server端的话,还可以调用Accept(),允许打开一个客户端连接。
有了FD,距离我们熟知的connection又近了一步了。
下面我们了解几个方法的实现
- FD的初始化Init
func (fd *FD) Init(net string, pollable bool) error {
// We don't actually care about the various network types.
if net == "file" {
fd.isFile = true
}
if !pollable {
fd.isBlocking = 1
return nil
}
err := fd.pd.init(fd)
if err != nil {
// If we could not initialize the runtime poller,
// assume we are using blocking mode.
fd.isBlocking = 1
}
return err
}
这里可以结合pd.init()理解初始化过程。
2. FD的基本Read操作
func (fd *FD) Read(p []byte) (int, error) {
// 加读锁
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
// 设置为read模式
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
// 如果是TCP,设置最大可读值,maxRW = 2^30字节
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
// 通过syscall.Read, 不断尝试
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
其他ReadXXX方法和Read()类似,只是用不同的syscall方法,这里不再分析。
3. FD的Write()操作
func (fd *FD) Write(p []byte) (int, error) {
// 加写锁
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
// 设置为写模式
if err := fd.pd.prepareWrite(fd.isFile); err != nil {
return 0, err
}
var nn int
// 不断尝试
for {
max := len(p)
if fd.IsStream && max-nn > maxRW {
max = nn + maxRW
}
// 调用 syscall.Write
n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
if n > 0 {
nn += n
}
if nn == len(p) {
return nn, err
}
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitWrite(fd.isFile); err == nil {
continue
}
}
if err != nil {
return nn, err
}
if n == 0 {
return nn, io.ErrUnexpectedEOF
}
}
}
类似ReadXXX,其他WriteXXX方法也是调用某个syscall。
4. 设置deadline:SetDeadline
func (fd *FD) SetDeadline(t time.Time) error {
return setDeadlineImpl(fd, t, 'r'+'w')
}
func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
var d int64
if !t.IsZero() {
d = int64(time.Until(t))
if d == 0 {
d = -1 // don't confuse deadline right now with no deadline
}
}
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
if fd.pd.runtimeCtx == 0 {
return ErrNoDeadline
}
runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
return nil
}
runtime_pollSetDeadline
5. 改变FD的工作模式:SetBlocking()
func (fd *FD) SetBlocking() error {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
// Atomic store so that concurrent calls to SetBlocking
// do not cause a race condition. isBlocking only ever goes
// from 0 to 1 so there is no real race here.
atomic.StoreUint32(&fd.isBlocking, 1)
return syscall.SetNonblock(fd.Sysfd, false)
}
修改fd.isBlocking为1,同时通过syscall设置fd.Sysfd为false。也就是说,默认情况下,FD为非阻塞模式。
6.FD接受新连接 Accept()
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
// 加读锁
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()
// 准备读
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
// 调用accept(),fd.Sysfd为代表server的socket文件描述符
// s为新连接socket文件描述符,rsa为对端地址
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EINTR:
continue
case syscall.EAGAIN:
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}
Accept()
7. 关闭FD: Close
func (fd *FD) Close() error {
if !fd.fdmu.increfAndClose() {
return errClosing(fd.isFile)
}
// Unblock any I/O. Once it all unblocks and returns,
// so that it cannot be referring to fd.sysfd anymore,
// the final decref will close fd.sysfd. This should happen
// fairly quickly, since all the I/O is non-blocking, and any
// attempts to block in the pollDesc will return errClosing(fd.isFile).
fd.pd.evict()
// The call to decref will call destroy if there are no other
// references.
err := fd.decref()
// Wait until the descriptor is closed. If this was the only
// reference, it is already closed. Only wait if the file has
// not been set to blocking mode, as otherwise any current I/O
// may be blocking, and that would block the Close.
// No need for an atomic read of isBlocking, increfAndClose means
// we have exclusive access to fd.
if fd.isBlocking == 0 {
runtime_Semacquire(&fd.csema)
}
return err
}
func (fd *FD) decref() error {
if fd.fdmu.decref() {
return fd.destroy()
}
return nil
}
func (fd *FD) destroy() error {
// Poller may want to unregister fd in readiness notification mechanism,
// so this must be executed before CloseFunc.
fd.pd.close()
// We don't use ignoringEINTR here because POSIX does not define
// whether the descriptor is closed if close returns EINTR.
// If the descriptor is indeed closed, using a loop would race
// with some other goroutine opening a new descriptor.
// (The Linux kernel guarantees that it is closed on an EINTR error.)
err := CloseFunc(fd.Sysfd)
fd.Sysfd = -1
runtime_Semrelease(&fd.csema)
return err
}
runtime_pollClose(pd.runtimeCtx)
总结一下:internal/poll包将runtime.pollDesc和epoll简单封装的8个函数,再次封装成了internal/poll.pollDesc,完成了网络文件描述符的基本操作;然后再次封装出internal/poll.FD——提供读、写、Accept()等操作,既支持网络文件描述符也支持本地文件描述符,为net包和os包提供了公开对象FD和操作。
全文总结
这篇文章结合我们已知的知识——epoll、系统调用、golang的net包,通过从下往上的顺序,不断提问的方式,了解了Go是如何封装各种网络IO复用机制,并封装了FD对象,供各个网络包使用。
下一篇文章将介绍internal/poll.FD的应用。