现如今提起网络大家的第一反应就是epoll,而实际工程开发中绝大部分的情况都会优先考虑采用已有的一些开源网络框架来做功能的开发。网络框架不同的语言有不同的实现,例如java中大名鼎鼎的netty,再比如c++中的libevent、boost::asio、muduo等,golang中目前在开源社区比较有影响力的网络框架有gnet、evio、netpoll(字节开源)这几个。在之前研究完gnet后差不多一年多的时间了,近期机缘巧合又抽空研究了下netpoll的源码实现。在此总结一篇源码分析的文章,方便日后回顾。

netpoll是字节不久前开源的一款golang编写的高性能网络框架(基于Multi-Reactor模型),旨在用于处理rpc场景,详细的介绍可参见下图介绍。

下面将为大家详细分析其内部的源码实现逻辑。其他系列文章参见如下: 1. 网络IO演变过程 2. gnet网络框架源码剖析

1. Reactor模型简介

我们在开始netpoll框架的源码分析前,方便大家阅读源码有一个更好的体验,先简单的回顾下网络编程中的Reacor模型吧。目前很多主流的网络框架都会采用经典的Reactor模型来进行框架内部的实现。而Reactor模型中用的最频繁的就属Multi-Reactor了,最基本的Multi-Reactor模型框架如下图所示。

Multi-Reactor模型中根据角色的不同,可以将Reactor分类两类:mainRactorsubReactor。一般mainReactor是一个,而subReactor会有多个。 Multi-Reactor模型的原理如下: 1. mainReactor主要负责接收客户端的连接请求,建立新连接,接收完连接后mainReactor就会按照一定的负载均衡策略分发给其中一个subReactor进行管理。 2. subReactor会将新的客户端连接进行管理,负责后续该客户端的请求处理。 3. 通常Reactor线程主要负责IO的操作(数据读写)、而业务逻辑的处理会由专门的工作线程来执行。

备注:此处所指的Reactor,以epoll为例可以简单理解成一个Reactor对应于一个epoll对象,由一个线程进行处理,Reactor线程又称为IO线程。

简单回顾完Multi-Reactor模型的原理后,下面我们进入正式的主题:netpoll网络框架的源码分析。

2. netpoll整体框架

2.1 netpoll client和server端的交互过程

netpoll中对client和server都进行了封装,通过netpoll可以快速的创建一个server端程序。同时可以采用其提供的client方法可以和server进行交互。下面是client和server的一个完整的交互过程。

在netpoll中针对server端,它提供了以下几个方法和回调接口它们的功能分别如下:

Serve():启动服务端,监听等待客户端的请求 OnPrepare():主要做一些初始化、准备的工作,创建连接前回调 OnConnect():在连接创建后回调 OnRequest():业务逻辑方法回调,实现业务逻辑异步处理

2.2 netpoll server端内部结构

下面这张图侧重于介绍netpoll中server端的核心逻辑,其实现原理和前面介绍的Multi-Reactor模型基本一致,其中Listener、loadbalance、pollmanager、EventLoop等都是netpoll中核心的概念,下面我们再对其做一一介绍。

Listener:主要用来初始化Listener,内部调用标准库的net.Listen(),然后再封装了一层。具体的实现则是调用socket()、bind()、listen()等系统调用。 EventLoop:框架对外提供的接口,对外暴露Serve()方法来创建server端程序。 Poll: 是抽象出的一套接口,屏蔽底层不同操作系统平台接口的差异,linux下采用epoll来实现、bsd平台下则采用kqueue来实现。 pollmanager:Poll的管理器,可以理解成一个Poll池,也就是一组epoll或者kqueue集合。 loadbalance:负责均衡封装,主要用来从pollmanager按照一定的策略(随机、轮询、最小连接等)选择出来一个Poll实例,一般在客户端初始化完成后,server会调用该接口拿到一个Poll实例,并将新建立的客户端加入到Poll管理。

3. netpoll Server端源码分析

3.1 server 使用示例

下面是netpoll的一个简单使用示例,从中我们可以看到它对外暴露的api的使用姿势。

func main(){
    // 1.创建listener
    var listener, _ = CreateListener(network, address)
    // 2.初始化EventLoop
    var eventLoop, _ = NewEventLoop(func(ctx context.Context, connection Connection) error {
        time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
        if l := connection.Reader().Len(); l > 0 {
            var data, err = connection.Reader().Next(l)
            if err != nil {
                return err
            }
            fmt.Printf("data:%+v\n", string(data))
        }
        return nil
    })
    // 3.阻塞等待客户端请求
    eventLoop.Serve(listener)

}

从上可以看到采用netpoll只需要三步就可以实现,非常简洁。 1. 创建Listener 2. 创建EventLoop 3. Serve()

下面就让我们深入内部研究下,它的内部到底是如何实现的吧

3.2 server端启动入口

从上面的示例中可以看出,server端的启动主要是通过EventLoop的Serve方法来实现的,EventLoop实际是一个接口,其内部由eventloop实现。一个EventLoop表示一个server,EventLoop的源码定义如下所示。

// A EventLoop is a network server.
type EventLoop interface {
    // Serve registers a listener and runs blockingly to provide services, including listening to ports,
    // accepting connections and processing trans data. When an exception occurs or Shutdown is invoked,
    // Serve will return an error which describes the specific reason.
    Serve(ln net.Listener) error

    // Shutdown is used to graceful exit.
    // It will close all idle connections on the server, but will not change the underlying pollers.
    //
    // Argument: ctx set the waiting deadline, after which an error will be returned,
    // but will not force the closing of connections in progress.
    Shutdown(ctx context.Context) error
}


// OnRequest defines the function for handling connection. When data is sent from the connection peer,
// netpoll actively reads the data in LT mode and places it in the connection's input buffer.
// Generally, OnRequest starts handling the data in the following way:
//
//  func OnRequest(ctx context, connection Connection) error {
//      input := connection.Reader().Next(n)
//      handling input data...
//      send, _ := connection.Writer().Malloc(l)
//      copy(send, output)
//      connection.Flush()
//      return nil
//  }
//
// OnRequest will run in a separate goroutine and
// it is guaranteed that there is one and only one OnRequest running at the same time.
// The underlying logic is similar to:
//
//  go func() {
//      for !connection.Reader().IsEmpty() {
//          OnRequest(ctx, connection)
//      }
//  }()
//
// PLEASE NOTE:
// OnRequest must either eventually read all the input data or actively Close the connection,
// otherwise the goroutine will fall into a dead loop.
//
// Return: error is unused which will be ignored directly.
type OnRequest func(ctx context.Context, connection Connection) error

下面是eventloop的结构定义及相应的方法实现逻辑

// NewEventLoop .
func NewEventLoop(onRequest OnRequest, ops ...Option) (EventLoop, error) {
    opts := &options{
        onRequest: onRequest,
    }
    for _, do := range ops {
        do.f(opts)
    }
    return &eventLoop{
        opts: opts,
        stop: make(chan error, 1),
    }, nil
}

type eventLoop struct {
    sync.Mutex
    svr  *server

    opts *options
    stop chan error
}

// Serve implements EventLoop.
func (evl *eventLoop) Serve(ln net.Listener) error {
    npln, err := ConvertListener(ln)
    if err != nil {
        return err
    }
    evl.Lock()
    evl.svr = newServer(npln, evl.opts, evl.quit)
    // 开启所有的epoll,然后异步协程阻塞等待
    evl.svr.Run()
    evl.Unlock()

    // 阻塞住
    err = evl.waitQuit()
    // ensure evl will not be finalized until Serve returns
    runtime.SetFinalizer(evl, nil)
    return err
}

在eventloop中,Serve方法的实现如下,其中主要三步操作:

  1. 转换Listener
  2. 创建一个server对象
  3. 调用server的run()方法

由此来看最核心的就是run()方法的实现了,我们继续研究run()方法的实现。run()方法的实现如下:

// newServer wrap listener into server, quit will be invoked when server exit.
func newServer(ln Listener, opts *options, onQuit func(err error)) *server {
    return &server{
        ln:     ln,
        opts:   opts,
        onQuit: onQuit,
    }
}

type server struct {
    operator    FDOperator
    ln          Listener
    opts        *options
    onQuit      func(err error)
    connections sync.Map // key=fd, value=connection
}

// Run this server.
func (s *server) Run() (err error) {
    s.operator = FDOperator{
        FD:     s.ln.Fd(),
        OnRead: s.OnRead,
        OnHup:  s.OnHup,
    }
    // 从pollmanager中选择出来一个epoll,来管理server fd,也就是设置mainReactor
    s.operator.poll = pollmanager.Pick()
    // 服务端设置可读
    err = s.operator.Control(PollReadable)

    if err != nil {
        s.onQuit(err)
    }
    return err
}


// OnRead implements FDOperator.
// 服务端读就绪时,处理接收客户端连接数据
func (s *server) OnRead(p Poll) error {
    // accept socket
    // 接收客户端连接
    conn, err := s.ln.Accept()
    if err != nil {
        // shut down
        if strings.Contains(err.Error(), "closed") {
            s.operator.Control(PollDetach)
            s.onQuit(err)
            return err
        }
        log.Println("accept conn failed:", err.Error())
        return err
    }
    if conn == nil {
        return nil
    }
    // store & register connection
    // 构建一个新的连接
    var connection = &connection{}
    // 加入到epoll中,并传入opts,
    // 其中包含处理业务逻辑的OnRequest
    connection.init(conn.(Conn), s.opts)

    if !connection.IsActive() {
        return nil
    }
    var fd = conn.(Conn).Fd()

    // 存储新的连接
    connection.AddCloseCallback(func(connection Connection) error {
        s.connections.Delete(fd)
        return nil
    })
    s.connections.Store(fd, connection)

    // trigger onConnect asynchronously
    connection.onConnect()
    return nil
}

通过上面的源码可以看到,run()方法主要完成了server端的FDOperator对象的创建,并对其赋值了OnRead、poll属性,run()方法执行完后,那server端就已经初始化完成了,这里比较重要的逻辑就出现了。

1.首先第一个就是mainReactor的分配,对应下面的这两行代码。

    s.operator.poll = pollmanager.Pick()
    // 服务端设置可读
    err = s.operator.Control(PollReadable)

2.将server端fd注册到Poll中后,注册了可读事件,可读事件的回调方法则是OnRead(),在OnRead()中,可以看到主要完成的是调用listener对象的Accept()方法接收客户端的建立连接的请求。

但是我们知道两点:

问题1. server端mainReactor接收到的新连接需要分配给subReactors中的一个subReactor进行管理,这部分在哪里实现的呢?

这部分逻辑实际上就在OnRead()的这行方法中内部实现。在3.3节我们重点分析该方法内部的逻辑。

    connection.init(conn.(Conn), s.opts)

问题2. server端需要初始化mainReactor、subReactors,在上述代码中并没有看到呀,这部分工作在哪里做的呢?

这部分在netpoll中是在pollmanager模块实现的,我们在3.4节中给大家介绍

3.3 初始化新的client连接

下面是connection的init()方法源码实现,我们继续研究内部的实现逻辑。


// init initialize the connection with options
func (c *connection) init(conn Conn, opts *options) (err error) {
    // init buffer, barrier, finalizer
    c.readTrigger = make(chan struct{}, 1)
    c.writeTrigger = make(chan error, 1)
    c.bookSize, c.maxSize = block1k/2, pagesize
    c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
    c.inputBarrier, c.outputBarrier = barrierPool.Get().(*barrier), barrierPool.Get().(*barrier)

    // 初始化fd
    c.initNetFD(conn) // conn must be *netFD{}
    // 初始化FDOperator
    c.initFDOperator()
    c.initFinalizer()

    syscall.SetNonblock(c.fd, true)
    // enable TCP_NODELAY by default
    switch c.network {
    case "tcp", "tcp4", "tcp6":
        setTCPNoDelay(c.fd, true)
    }
    // check zero-copy
    if setZeroCopy(c.fd) == nil && setBlockZeroCopySend(c.fd, defaultZeroCopyTimeoutSec, 0) == nil {
        c.supportZeroCopy = true
    }

    // connection initialized and prepare options
   // 做连接初始化和准备工作
    return c.onPrepare(opts)
}

func (c *connection) initNetFD(conn Conn) {
    if nfd, ok := conn.(*netFD); ok {
        c.netFD = *nfd
        return
    }
    c.netFD = netFD{
        fd:         conn.Fd(),
        localAddr:  conn.LocalAddr(),
        remoteAddr: conn.RemoteAddr(),
    }
}



func (c *connection) initFDOperator() {
    // 分配一个FDOperator
    op := allocop()
    op.FD = c.fd
    op.OnRead, op.OnWrite, op.OnHup = nil, nil, c.onHup
    // 设置输入处理方法和输入的响应回调接口
    op.Inputs, op.InputAck = c.inputs, c.inputAck
    // 当outputs为空时,监听可读事件
    op.Outputs, op.OutputAck = c.outputs, c.outputAck

    // if connection has been registered, must reuse poll here.
    if c.pd != nil && c.pd.operator != nil {
        op.poll = c.pd.operator.poll
    }
    c.operator = op
}


// OnPrepare supports close connection, but not read/write data.
// connection will be registered by this call after preparing.
func (c *connection) onPrepare(opts *options) (err error) {
    if opts != nil {
        c.SetOnConnect(opts.onConnect)
        c.SetOnRequest(opts.onRequest)
        c.SetReadTimeout(opts.readTimeout)
        c.SetIdleTimeout(opts.idleTimeout)

        // calling prepare first and then register.
        if opts.onPrepare != nil {
     // 执行onPrepare回调方法
            c.ctx = opts.onPrepare(c)
        }
    }

    if c.ctx == nil {
        c.ctx = context.Background()
    }
    // prepare may close the connection.
    if c.IsActive() {
        // 注册客户端,注册到Poll(epoll)中
        return c.register()
    }
    return nil
}

// register only use for connection register into poll.
// 注册客户端到epoll中
func (c *connection) register() (err error) {
    // 将当前的客户端加入到epoll中管理,此处对应的是subReactor
    if c.operator.poll != nil {
        err = c.operator.Control(PollModReadable)
    } else {
        c.operator.poll = pollmanager.Pick()
        err = c.operator.Control(PollReadable)
    }
    if err != nil {
        log.Println("connection register failed:", err.Error())
        c.Close()
        return Exception(ErrConnClosed, err.Error())
    }
    return nil
}

从上面可以看到,init()方法中主要完成了给client连接对象connection分配FDOperator、分配读Buffer、写Buffer以及一些回调方法intpus()、inputAck()、outputs()、outputAck()。最后完成后调用register()方法将当前的client连接注册到poll中,此处的poll这也就是subReactor。

3.4 全局pollmananger分析

下面我们介绍netpoll中的pollmanager模块,该模块维护了所有的Poll对象,也就是说不管是mainReactor还是subReactors都是维护在pollmanager中的。下面看看它的源码实现:


// 设置loop数量
func setNumLoops(numLoops int) error {
    return pollmanager.SetNumLoops(numLoops)
}

func setLoadBalance(lb LoadBalance) error {
    return pollmanager.SetLoadBalance(lb)
}

// manage all pollers
var pollmanager *manager

// 启动的时候进行初始化
func init() {
    pollmanager = &manager{}
    // 设置负载均衡
    pollmanager.SetLoadBalance(RoundRobin)
    // 设置了循环个数,也就是epoll个数
    pollmanager.SetNumLoops(defaultNumLoops())
}

func defaultNumLoops() int {
    procs := runtime.GOMAXPROCS(0)
    loops := 1
    // Loops produce events that handlers consume,
    // so the producer should be faster than consumer otherwise it will have a bottleneck.
    // But there is no universal option that could be appropriate for any use cases,
    // plz use `SetNumLoops` if you do know what you want.
    if procs > 4 {
        loops = procs
    }
    return loops
}

// LoadBalance is used to do load balancing among multiple pollers.
// a single poller may not be optimal if the number of cores is large (40C+).
type manager struct {
    NumLoops int
    balance  loadbalance // load balancing method
    // 维护所有的Poll对象
    polls    []Poll      // all the polls
}

// SetNumLoops will return error when set numLoops < 1
func (m *manager) SetNumLoops(numLoops int) error {
    if numLoops < 1 {
        return fmt.Errorf("set invaild numLoops[%d]", numLoops)
    }
    // if less than, reset all; else new the delta.
    if numLoops < m.NumLoops {
        m.NumLoops = numLoops
        return m.Reset()
    }
    m.NumLoops = numLoops
    return m.Run()
}

// SetLoadBalance set load balance.
func (m *manager) SetLoadBalance(lb LoadBalance) error {
    if m.balance != nil && m.balance.LoadBalance() == lb {
        return nil
    }
    m.balance = newLoadbalance(lb, m.polls)
    return nil
}

// Run all pollers.
func (m *manager) Run() error {
    // new poll to fill delta.
    for idx := len(m.polls); idx < m.NumLoops; idx++ {
        // 创建epoll
        var poll = openPoll()
        m.polls = append(m.polls, poll)
        // 每个epoll阻塞等待
        go poll.Wait()
    }
    // LoadBalance must be set before calling Run, otherwise it will panic.
    m.balance.Rebalance(m.polls)
    return nil
}


// Pick will select the poller for use each time based on the LoadBalance.
func (m *manager) Pick() Poll {
    return m.balance.Pick()
}

通过上面的源码可以看到,它里面通过一个[]Poll数组来维护所有的Poll对象,并且所有的初始化逻辑都在init()方法中完成。在上述方法中我们看到了几个陌生的方法

  1. openPoll(): 根据字面意思来理解,就是打开一个Poll,大概猜测下内部肯定就是调用了epoll_create()方法来实现。
  2. poll.Wait(): 该方法是通过go协程异步调用的,猜想内部肯定是调用epoll的epoll_wait()方法。

具体关于Poll的实现咱们下一节介绍。

3.5 Poll(epoll/kqueue)实现分析

poll_default_linux.gopoll_default_bsd.go
// Poll monitors fd(file descriptor), calls the FDOperator to perform specific actions,
// and shields underlying differences. On linux systems, poll uses epoll by default,
// and kevent by default on bsd systems.
type Poll interface {
    // Wait will poll all registered fds, and schedule processing based on the triggered event.
    // The call will block, so the usage can be like:
    //
    //  go wait()
    //
    Wait() error

    // Close the poll and shutdown Wait().
    Close() error

    // Trigger can be used to actively refresh the loop where Wait is located when no event is triggered.
    // On linux systems, eventfd is used by default, and kevent by default on bsd systems.
    Trigger() error

    // Control the event of file descriptor and the operations is defined by PollEvent.
    Control(operator *FDOperator, event PollEvent) error
}

// PollEvent defines the operation of poll.Control.
type PollEvent int

const (
    // PollReadable is used to monitor whether the FDOperator registered by
    // listener and connection is readable or closed.
    PollReadable PollEvent = 0x1

    // PollWritable is used to monitor whether the FDOperator created by the dialer is writable or closed.
    // ET mode must be used (still need to poll hup after being writable)
    PollWritable PollEvent = 0x2

    // PollDetach is used to remove the FDOperator from poll.
    PollDetach PollEvent = 0x3

    // PollModReadable is used to re-register the readable monitor for the FDOperator created by the dialer.
    // It is only used when calling the dialer's conn init.
    PollModReadable PollEvent = 0x4

    // PollR2RW is used to monitor writable for FDOperator,
    // which is only called when the socket write buffer is full.
    PollR2RW PollEvent = 0x5

    // PollRW2R is used to remove the writable monitor of FDOperator, generally used with PollR2RW.
    PollRW2R PollEvent = 0x6
)

看完Poll接口的定义后,我们以epoll的实现为例进行源码的分析,kqueue的封装和epoll大同小异,读者对kqueue感兴趣的话可以自行阅读源码。


const EPOLLET = -syscall.EPOLLET

type epollevent struct {
    events uint32
    data   [8]byte // unaligned uintptr
}

// EpollCtl implements epoll_ctl.
func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) {
    _, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0)
    if err == syscall.Errno(0) {
        err = nil
    }
    return err
}

// EpollWait implements epoll_wait.
func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) {
    var r0 uintptr
    var _p0 = unsafe.Pointer(&events[0])
    if msec == 0 {
        r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0)
    } else {
        r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0)
    }
    if err == syscall.Errno(0) {
        err = nil
    }
    return int(r0), err
}



// Includes defaultPoll/multiPoll/uringPoll...
func openPoll() Poll {
    return openDefaultPoll()
}

func openDefaultPoll() *defaultPoll {
    var poll = defaultPoll{}
    poll.buf = make([]byte, 8)
    // 创建epoll
    var p, err = syscall.EpollCreate1(0)
    if err != nil {
        panic(err)
    }
    poll.fd = p
    var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0)
    if e0 != 0 {
        syscall.Close(p)
        panic(err)
    }

    poll.Reset = poll.reset
    poll.Handler = poll.handler

    poll.wop = &FDOperator{FD: int(r0)}
    poll.Control(poll.wop, PollReadable)
    return &poll
}

type defaultPoll struct {
    pollArgs
    fd      int         // epoll fd
    wop     *FDOperator // eventfd, wake epoll_wait
    buf     []byte      // read wfd trigger msg
    trigger uint32      // trigger flag
    // fns for handle events
    Reset   func(size, caps int)
    Handler func(events []epollevent) (closed bool)
}

type pollArgs struct {
    size     int
    caps     int
    events   []epollevent
    barriers []barrier
}

func (a *pollArgs) reset(size, caps int) {
    a.size, a.caps = size, caps
    a.events, a.barriers = make([]epollevent, size), make([]barrier, size)
    for i := range a.barriers {
        a.barriers[i].bs = make([][]byte, a.caps)
        a.barriers[i].ivs = make([]syscall.Iovec, a.caps)
    }
}

// Wait implements Poll.
func (p *defaultPoll) Wait() (err error) {
    // init
    var caps, msec, n = barriercap, -1, 0
    p.Reset(128, caps)
    // wait
    for {
        if n == p.size && p.size < 128*1024 {
            p.Reset(p.size<<1, caps)
        }
        // 调用epoll_wait()
        n, err = EpollWait(p.fd, p.events, msec)
        if err != nil && err != syscall.EINTR {
            return err
        }
        if n <= 0 {
            msec = -1
            runtime.Gosched()
            continue
        }
        msec = 0
        if p.Handler(p.events[:n]) {
            return nil
        }
    }
}

func (p *defaultPoll) handler(events []epollevent) (closed bool) {
    var hups []*FDOperator // TODO: maybe can use sync.Pool
    for i := range events {
        var operator = *(**FDOperator)(unsafe.Pointer(&events[i].data))
        // trigger or exit gracefully
        if operator.FD == p.wop.FD {
            // must clean trigger first
            syscall.Read(p.wop.FD, p.buf)
            atomic.StoreUint32(&p.trigger, 0)
            // if closed & exit
            if p.buf[0] > 0 {
                syscall.Close(p.wop.FD)
                syscall.Close(p.fd)
                return true
            }
            continue
        }
        if !operator.do() {
            continue
        }

        evt := events[i].events
        switch {
        // check hup first
        case evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0:
            hups = append(hups, operator)
        case evt&syscall.EPOLLERR != 0:
            // Under block-zerocopy, the kernel may give an error callback, which is not a real error, just an EAGAIN.
            // So here we need to check this error, if it is EAGAIN then do nothing, otherwise still mark as hup.
            if _, _, _, _, err := syscall.Recvmsg(operator.FD, nil, nil, syscall.MSG_ERRQUEUE); err != syscall.EAGAIN {
                hups = append(hups, operator)
            }
        default:
            // 读事件
            if evt&syscall.EPOLLIN != 0 {
                if operator.OnRead != nil {
                    // for non-connection
                    operator.OnRead(p)
                } else {
                    // for connection
                    var bs = operator.Inputs(p.barriers[i].bs)
                    if len(bs) > 0 {
                        var n, err = readv(operator.FD, bs, p.barriers[i].ivs)
                        // 内部会有处理业务逻辑的调用
                        operator.InputAck(n)
                        if err != nil && err != syscall.EAGAIN && err != syscall.EINTR {
                            log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error())
                            hups = append(hups, operator)
                            break
                        }
                    }
                }
            }
            // 写事件
            if evt&syscall.EPOLLOUT != 0 {
                if operator.OnWrite != nil {
                    // for non-connection
                    operator.OnWrite(p)
                } else {
                    // for connection
                    var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs)
                    if len(bs) > 0 {
                        // TODO: Let the upper layer pass in whether to use ZeroCopy.
                        var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy)
                        operator.OutputAck(n)
                        if err != nil && err != syscall.EAGAIN {
                            log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error())
                            hups = append(hups, operator)
                            break
                        }
                    }
                }
            }
        }
        operator.done()
    }
    // hup conns together to avoid blocking the poll.
    if len(hups) > 0 {
        p.detaches(hups)
    }
    return false
}

// Close will write 10000000
func (p *defaultPoll) Close() error {
    _, err := syscall.Write(p.wop.FD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
    return err
}

// Trigger implements Poll.
func (p *defaultPoll) Trigger() error {
    if atomic.AddUint32(&p.trigger, 1) > 1 {
        return nil
    }
    // MAX(eventfd) = 0xfffffffffffffffe
    _, err := syscall.Write(p.wop.FD, []byte{0, 0, 0, 0, 0, 0, 0, 1})
    return err
}

// Control implements Poll.
func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
    var op int
    var evt epollevent
    *(**FDOperator)(unsafe.Pointer(&evt.data)) = operator
    switch event {
    case PollReadable:
        operator.inuse()
        op, evt.events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR
    case PollModReadable:
        operator.inuse()
        op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR
    case PollDetach:
        defer operator.unused()
        op, evt.events = syscall.EPOLL_CTL_DEL, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
    case PollWritable:
        operator.inuse()
        op, evt.events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
    case PollR2RW:
        op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
    case PollRW2R:
        op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR
    }
    // epoll_ctl
    return EpollCtl(p.fd, op, operator.FD, &evt)
}

func (p *defaultPoll) detaches(hups []*FDOperator) error {
    var onhups = make([]func(p Poll) error, len(hups))
    for i := range hups {
        onhups[i] = hups[i].OnHup
        p.Control(hups[i], PollDetach)
    }
    go func(onhups []func(p Poll) error) {
        for i := range onhups {
            if onhups[i] != nil {
                onhups[i](p)
            }
        }
    }(onhups)
    return nil
}
epoll_create()epoll_ctl()epoll_wait()

3.6 处理业务逻辑

在3.3节中初始化connection时看到有一个inputs和inputAck的赋值,其中在inputAck中内部就实现了调用业务逻辑处理的回调方法OnRequest(),下面我们分析inputAck()的实现逻辑。

// inputAck implements FDOperator.
func (c *connection) inputAck(n int) (err error) {
    if n < 0 {
        n = 0
    }
    // Auto size bookSize.
    if n == c.bookSize && c.bookSize < mallocMax {
        c.bookSize <<= 1
    }

    length, _ := c.inputBuffer.bookAck(n)
    if c.maxSize < length {
        c.maxSize = length
    }
    if c.maxSize > mallocMax {
        c.maxSize = mallocMax
    }

    var needTrigger = true
    if length == n { // first start onRequest
        // 读完数据后调用处理逻辑方法
        needTrigger = c.onRequest()
    }
    if needTrigger && length >= int(atomic.LoadInt32(&c.waitReadSize)) {
        c.triggerRead()
    }
    return nil
}

// onRequest is responsible for executing the closeCallbacks after the connection has been closed.
func (c *connection) onRequest() (needTrigger bool) {
    var onRequest, ok = c.onRequestCallback.Load().(OnRequest)
    if !ok {
        return true
    }
    processed := c.onProcess(
        // only process when conn active and have unread data
        func(c *connection) bool {
            return c.Reader().Len() > 0 && c.IsActive()
        },
        func(c *connection) {
        // 执行业务逻辑方法
            _ = onRequest(c.ctx, c)
        },
    )
    // if not processed, should trigger read
    return !processed
}

// onProcess is responsible for executing the process function serially,
// and make sure the connection has been closed correctly if user call c.Close() in process function.
func (c *connection) onProcess(isProcessable func(c *connection) bool, process func(c *connection)) (processed bool) {
    if process == nil {
        return false
    }
    // task already exists
    if !c.lock(processing) {
        return false
    }
    // add new task
    var task = func() {
    START:
        // Single request processing, blocking allowed.
        for isProcessable(c) {
            process(c)
        }
        // Handling callback if connection has been closed.
        if !c.IsActive() {
            c.closeCallback(false)
            return
        }
        c.unlock(processing)
        // Double check when exiting.
        if isProcessable(c) && c.lock(processing) {
            goto START
        }
        // task exits
        return
    }

    runTask(c.ctx, task)
    return true
}

var runTask = gopool.CtxGo

上面是读完输入的数据后,做的逻辑,读取的客户端数据会存放到inputBuffer中,然后接下来异步的调用OnRequest()方法执行业务逻辑,业务逻辑执行完后,可以调用connection的Write()和Flush()方法将要响应给客户端的数据写出去。其实内部实际上是写到缓冲区中,然后当客户端状态达到可写时进行写出。

下面简单再介绍下connection的Write()和Flush()方法的实现。


// Flush will send all malloc data to the peer,
// so must confirm that the allocated bytes have been correctly assigned.
//
// Flush first checks whether the out buffer is empty.
// If empty, it will call syscall.Write to send data directly,
// otherwise the buffer will be sent asynchronously by the epoll trigger.
func (c *connection) Flush() error {
    if !c.lock(flushing) {
        return Exception(ErrConnClosed, "when flush")
    }
    defer c.unlock(flushing)
    c.outputBuffer.Flush()
    return c.flush()
}

// Write will Flush soon.
func (c *connection) Write(p []byte) (n int, err error) {
    if !c.lock(flushing) {
        return 0, Exception(ErrConnClosed, "when write")
    }
    defer c.unlock(flushing)

    dst, _ := c.outputBuffer.Malloc(len(p))
    n = copy(dst, p)
    c.outputBuffer.Flush()
    err = c.flush()
    return n, err
}



// flush write data directly.
func (c *connection) flush() error {
    if c.outputBuffer.IsEmpty() {
        return nil
    }
    // TODO: Let the upper layer pass in whether to use ZeroCopy.
    var bs = c.outputBuffer.GetBytes(c.outputBarrier.bs)
    var n, err = sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy)
    if err != nil && err != syscall.EAGAIN {
        return Exception(err, "when flush")
    }
    if n > 0 {
        err = c.outputBuffer.Skip(n)
        c.outputBuffer.Release()
        if err != nil {
            return Exception(err, "when flush")
        }
    }
    // return if write all buffer.
    if c.outputBuffer.IsEmpty() {
        return nil
    }
    //更新描述符状态为可读写
    err = c.operator.Control(PollR2RW)
    if err != nil {
        return Exception(err, "when flush")
    }

    err = <-c.writeTrigger
    return err
}

3.7 小结

这一节中重点分析了netpoll server端的源码实现,主要从server的Serve入口开始分析,然后内部介绍了server接收客户端连接的实现逻辑、处理客户端请求的逻辑、以及pollmanager模块逻辑、Poll封装逻辑等内容。实际框架源码内容更多,本文是按照阅读代码的习惯,精简了核心代码进行了介绍。感兴趣的读者看完后可以直接打开项目进行阅读。

4.netpoll client端源码分析

前面在第三节重点分析了netpoll中server端的源码逻辑。netpoll中本身对client也做了一层封装,本节我们再快速的对client端的源码做一些分析。

client端抽象了一个Dialer接口,它继承了net.Dialer接口的api。下面是详细的定义。

// Dialer extends net.Dialer's API, just for interface compatibility.
// DialConnection is recommended, but of course all functions are practically the same.
// The returned net.Conn can be directly asserted as Connection if error is nil.
type Dialer interface {
    DialConnection(network, address string, timeout time.Duration) (connection Connection, err error)

    DialTimeout(network, address string, timeout time.Duration) (conn net.Conn, err error)
}

// DialConnection is a default implementation of Dialer.
func DialConnection(network, address string, timeout time.Duration) (connection Connection, err error) {
    return defaultDialer.DialConnection(network, address, timeout)
}

// NewDialer only support TCP and unix socket now.
func NewDialer() Dialer {
    return &dialer{}
}

var defaultDialer = NewDialer()

type dialer struct{}

// DialTimeout implements Dialer.
func (d *dialer) DialTimeout(network, address string, timeout time.Duration) (net.Conn, error) {
    conn, err := d.DialConnection(network, address, timeout)
    return conn, err
}

// DialConnection implements Dialer.
func (d *dialer) DialConnection(network, address string, timeout time.Duration) (connection Connection, err error) {
    ctx := context.Background()
    if timeout > 0 {
        subCtx, cancel := context.WithTimeout(ctx, timeout)
        defer cancel()
        ctx = subCtx
    }

    switch network {
    case "tcp", "tcp4", "tcp6":
        var raddr *TCPAddr
        raddr, err = ResolveTCPAddr(network, address)
        if err != nil {
            return nil, err
        }
        connection, err = DialTCP(ctx, network, nil, raddr)
    // case "udp", "udp4", "udp6":  // TODO: unsupport now
    case "unix", "unixgram", "unixpacket":
        var raddr *UnixAddr
        raddr, err = ResolveUnixAddr(network, address)
        if err != nil {
            return nil, err
        }
        connection, err = DialUnix(network, nil, raddr)
    default:
        return nil, net.UnknownNetworkError(network)
    }
    return connection, err
}

// sysDialer contains a Dial's parameters and configuration.
type sysDialer struct {
    net.Dialer
    network, address string
}

从上面可以看到,netpoll对client支持主要包括:tcp协议和unix协议。而udp协议暂时还未支持。tcp协议的话,主要的建立连接就在DialTCP()方法中实现了,下面我们对该方法的实现进行分析。


// TCPConnection implements Connection.
type TCPConnection struct {
    connection
}

// newTCPConnection wraps *TCPConnection.
func newTCPConnection(conn Conn) (connection *TCPConnection, err error) {
    connection = &TCPConnection{}
    err = connection.init(conn, nil)
    if err != nil {
        return nil, err
    }
    return connection, nil
}

// DialTCP acts like Dial for TCP networks.
//
// The network must be a TCP network name; see func Dial for details.
//
// If laddr is nil, a local address is automatically chosen.
// If the IP field of raddr is nil or an unspecified IP address, the
// local system is assumed.
func DialTCP(ctx context.Context, network string, laddr, raddr *TCPAddr) (*TCPConnection, error) {
    switch network {
    case "tcp", "tcp4", "tcp6":
    default:
        return nil, &net.OpError{Op: "dial", Net: network, Source: laddr.opAddr(), Addr: raddr.opAddr(), Err: net.UnknownNetworkError(network)}
    }
    if raddr == nil {
        return nil, &net.OpError{Op: "dial", Net: network, Source: laddr.opAddr(), Addr: nil, Err: errMissingAddress}
    }
    if ctx == nil {
        ctx = context.Background()
    }
    sd := &sysDialer{network: network, address: raddr.String()}
    c, err := sd.dialTCP(ctx, laddr, raddr)
    if err != nil {
        return nil, &net.OpError{Op: "dial", Net: network, Source: laddr.opAddr(), Addr: raddr.opAddr(), Err: err}
    }
    return c, nil
}

func (sd *sysDialer) dialTCP(ctx context.Context, laddr, raddr *TCPAddr) (*TCPConnection, error) {
   // 内容实现建立连接的逻辑
    conn, err := internetSocket(ctx, sd.network, laddr, raddr, syscall.SOCK_STREAM, 0, "dial")

    // TCP has a rarely used mechanism called a 'simultaneous connection' in
    // which Dial("tcp", addr1, addr2) run on the machine at addr1 can
    // connect to a simultaneous Dial("tcp", addr2, addr1) run on the machine
    // at addr2, without either machine executing Listen. If laddr == nil,
    // it means we want the kernel to pick an appropriate originating local
    // address. Some Linux kernels cycle blindly through a fixed range of
    // local ports, regardless of destination port. If a kernel happens to
    // pick local port 50001 as the source for a Dial("tcp", "", "localhost:50001"),
    // then the Dial will succeed, having simultaneously connected to itself.
    // This can only happen when we are letting the kernel pick a port (laddr == nil)
    // and when there is no listener for the destination address.
    // It's hard to argue this is anything other than a kernel bug. If we
    // see this happen, rather than expose the buggy effect to users, we
    // close the conn and try again. If it happens twice more, we relent and
    // use the result. See also:
    //  https://golang.org/issue/2690
    //  https://stackoverflow.com/questions/4949858/
    //
    // The opposite can also happen: if we ask the kernel to pick an appropriate
    // originating local address, sometimes it picks one that is already in use.
    // So if the error is EADDRNOTAVAIL, we have to try again too, just for
    // a different reason.
    //
    // The kernel socket code is no doubt enjoying watching us squirm.
    for i := 0; i < 2 && (laddr == nil || laddr.Port == 0) && (selfConnect(conn, err) || spuriousENOTAVAIL(err)); i++ {
        if err == nil {
            conn.Close()
        }
        conn, err = internetSocket(ctx, sd.network, laddr, raddr, syscall.SOCK_STREAM, 0, "dial")
    }

    if err != nil {
        return nil, err
    }
    return newTCPConnection(conn)
}

在dialTCP()中主要调用了internetSocket()方法来创建一个连接,我们重点关注该方法内部的逻辑即可,下面进行分析。

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string) (conn *netFD, err error) {
    if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd" || runtime.GOOS == "nacl") && raddr.isWildcard() {
        raddr = raddr.toLocal(net)
    }
    family, ipv6only := favoriteAddrFamily(net, laddr, raddr)
    // 创建socket
    return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr)
}

// socket returns a network file descriptor that is ready for
// asynchronous I/O using the network poller.
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (netfd *netFD, err error) {
    // syscall.Socket & set socket options
    var fd int
    // 调用Socket系统调用函数
    fd, err = sysSocket(family, sotype, proto)
    if err != nil {
        return nil, err
    }
    err = setDefaultSockopts(fd, family, sotype, ipv6only)
    if err != nil {
        syscall.Close(fd)
        return nil, err
    }

    netfd = newNetFD(fd, family, sotype, net)
    // 调用握手方法
    // 内部调用connect方法
    err = netfd.dial(ctx, laddr, raddr)
    if err != nil {
        netfd.Close()
        return nil, err
    }
    return netfd, nil
}


// Wrapper around the socket system call that marks the returned file
// descriptor as nonblocking and close-on-exec.
func sysSocket(family, sotype, proto int) (int, error) {
    // See ../syscall/exec_unix.go for description of ForkLock.
    syscall.ForkLock.RLock()
    // 调用Socket系统调用
    s, err := syscall.Socket(family, sotype, proto)
    if err == nil {
        syscall.CloseOnExec(s)
    }
    syscall.ForkLock.RUnlock()
    if err != nil {
        return -1, os.NewSyscallError("socket", err)
    }
    // 设置非阻塞
    if err = syscall.SetNonblock(s, true); err != nil {
        syscall.Close(s)
        return -1, os.NewSyscallError("setnonblock", err)
    }
    return s, nil
}

func newNetFD(fd, family, sotype int, net string) *netFD {
    var ret = &netFD{}
    ret.fd = fd
    ret.network = net
    ret.family = family
    ret.sotype = sotype
    ret.isStream = sotype == syscall.SOCK_STREAM
    ret.zeroReadIsEOF = sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW
    return ret
}


// if dial connection error, you need exec netFD.Close actively
func (c *netFD) dial(ctx context.Context, laddr, raddr sockaddr) (err error) {
    var lsa syscall.Sockaddr
    if laddr != nil {
        if lsa, err = laddr.sockaddr(c.family); err != nil {
            return err
        } else if lsa != nil {
            // bind local address
            // 调用bind方法
            if err = syscall.Bind(c.fd, lsa); err != nil {
                return os.NewSyscallError("bind", err)
            }
        }
    }
    var rsa syscall.Sockaddr  // remote address from the user
    var crsa syscall.Sockaddr // remote address we actually connected to
    if raddr != nil {
        if rsa, err = raddr.sockaddr(c.family); err != nil {
            return err
        }
    }
    // 调用connect()连接方法
    // remote address we actually connected to
    if crsa, err = c.connect(ctx, lsa, rsa); err != nil {
        return err
    }
    c.isConnected = true

    // Record the local and remote addresses from the actual socket.
    // Get the local address by calling Getsockname.
    // For the remote address, use
    // 1) the one returned by the connect method, if any; or
    // 2) the one from Getpeername, if it succeeds; or
    // 3) the one passed to us as the raddr parameter.
    lsa, _ = syscall.Getsockname(c.fd)
    c.localAddr = sockaddrToAddr(lsa)
    if crsa != nil {
        c.remoteAddr = sockaddrToAddr(crsa)
    } else if crsa, _ = syscall.Getpeername(c.fd); crsa != nil {
        c.remoteAddr = sockaddrToAddr(crsa)
    } else {
        c.remoteAddr = sockaddrToAddr(rsa)
    }
    return nil
}

func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, ret error) {
    // Do not need to call c.writing here,
    // because c is not yet accessible to user,
    // so no concurrent operations are possible.
    // 调用Connect系统调用
    switch err := syscall.Connect(c.fd, ra); err {
    case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR:
    case nil, syscall.EISCONN:
        select {
        case <-ctx.Done():
            return nil, mapErr(ctx.Err())
        default:
        }
        return nil, nil
    case syscall.EINVAL:
        // On Solaris we can see EINVAL if the socket has
        // already been accepted and closed by the server.
        // Treat this as a successful connection--writes to
        // the socket will see EOF.  For details and a test
        // case in C see https://golang.org/issue/6828.
        if runtime.GOOS == "solaris" {
            return nil, nil
        }
        fallthrough
    default:
        return nil, os.NewSyscallError("connect", err)
    }

    // TODO: can't support interrupter now.
    // Start the "interrupter" goroutine, if this context might be canceled.
    // (The background context cannot)
    //
    // The interrupter goroutine waits for the context to be done and
    // interrupts the dial (by altering the c's write deadline, which
    // wakes up waitWrite).
    if ctx != context.Background() {
        // Wait for the interrupter goroutine to exit before returning
        // from connect.
        done := make(chan struct{})
        interruptRes := make(chan error)
        defer func() {
            close(done)
            if ctxErr := <-interruptRes; ctxErr != nil && ret == nil {
                // The interrupter goroutine called SetWriteDeadline,
                // but the connect code below had returned from
                // waitWrite already and did a successful connect (ret
                // == nil). Because we've now poisoned the connection
                // by making it unwritable, don't return a successful
                // dial. This was issue 16523.
                ret = mapErr(ctxErr)
                c.Close() // prevent a leak
            }
        }()
        go func() {
            select {
            case <-ctx.Done():
                // Force the runtime's poller to immediately give up
                // waiting for writability, unblocking waitWrite
                // below.
                c.SetWriteDeadline(aLongTimeAgo)
                interruptRes <- ctx.Err()
            case <-done:
                interruptRes <- nil
            }
        }()
    }

    c.pd = newPollDesc(c.fd)
    for {
        // Performing multiple connect system calls on a
        // non-blocking socket under Unix variants does not
        // necessarily result in earlier errors being
        // returned. Instead, once runtime-integrated network
        // poller tells us that the socket is ready, get the
        // SO_ERROR socket option to see if the connection
        // succeeded or failed. See issue 7474 for further
        // details.
        if err := c.pd.WaitWrite(ctx); err != nil {
            return nil, err
        }
        nerr, err := syscall.GetsockoptInt(c.fd, syscall.SOL_SOCKET, syscall.SO_ERROR)
        if err != nil {
            return nil, os.NewSyscallError("getsockopt", err)
        }
        switch err := syscall.Errno(nerr); err {
        case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR:
        case syscall.EISCONN:
            return nil, nil
        case syscall.Errno(0):
            // The runtime poller can wake us up spuriously;
            // see issues 14548 and 19289. Check that we are
            // really connected; if not, wait again.
            if rsa, err := syscall.Getpeername(c.fd); err == nil {
                return rsa, nil
            }
        default:
            return nil, os.NewSyscallError("connect", err)
        }
    }
}

上面是创建链接的核心实现逻辑,从中我们可以看到,最关键的也就是两步: 1. 调用sysSocket()方法,该方法内部通过syscall.Socket()系统调用初始化一个socket描述。 2. 调用netFD的dial()方法,该方法内部主要调用syscall.Connect()方法进行建立链接 3. 获得netFD后,最后再通过newTCPConnection()方法初始化connection信息,这部分逻辑也就和server端建立连接后调用init()初始化的过程是相同的。

5. 总结

本文主要回顾了网络编程中经典的Multi-Reactor模型,并在此基础上分析了golang网络框架netpoll的server和client核心源码实现逻辑。再源码中主要关注了网络处理的核心逻辑实现。此外由于篇幅有限,本文并未对netpoll中采用的零拷贝、输出输出缓冲区等内容进行分析。读者感兴趣的话可以自行查看源码进行阅读。文章有理解不恰当地方还请大家指正。

6. 参考资料

  1. https://github.com/cloudwego/netpoll.git
  2. https://github.com/cloudwego/kitex.git
  3. https://github.com/panjf2000/gnet.git
  4. 网络IO演变过程
  5. gnet网络框架源码剖析