前言:
在查阅网络相关的资料时,一件事情引起了我的好奇,为何golang那么适合网络高并发,它的原理是什么?实际上要说明白这个问题,需要知道linux独有的epoll以及goroutine的调度,netFD的网络轮训器,本篇先关注epoll,其余的在接下来的文章中。
为什么要再次回顾netFD,原因是我在查询资料的时候,看到讲解netFD的很多都提到了它的网络轮训器,会在接下来的文章细说

type netFD struct {
 ....
 pd pollDesc
}


PS:后面还有两篇:1.goroutine调度探索 2.惊群效应、close与shutdown区别探索等
epoll原理
为什么要先说epoll,因为golang在linux版本用的就是它。
以下内容是我结合网络查询的资料与自己的理解总结。
1.1 epoll与select、poll比较
说到epoll就不得不跟select与poll做比较了。在网络开发中,避免不了一个问题,如何做到高并发,而我们知道现在的网络服务动则上百万QPS,每台机器上的socket连接数上万绝对不在话下,服务程序与其上游建立了那么多socket,那么服务程序需要知道每个socket什么时候有数据过来了或者写成功了,然后做相应的操作。当然不仅仅是为了socket,包括所有的文件类型,只是说我们socket也当成了文件,为了实现这个需求,不同操作系统有不同的方式,也就是我们要说的select、poll、epoll,其中epoll是linux特有的。
select是怎么做的呢?

#include <sys/select.h>
#include <sys/time.h>
int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout)


先解释下这个函数的作用,用户程序通过调用select函数来请求系统获取被激活的文件描述符,解释下每个参数的含义:
1.maxfdp1的意思就是本次要求检测的描述符的最大的值加1(因为文件描述符是从0开始的)
2.readset、writeset、exceptset这三个参数的数据类型都是struct fd_set,该类型就是存储文件描述符集合的,在这里就是存储我们要检测的文件描述符,该结构可通过以下四个宏进行设置:

//清空集合
void FD_ZERO(fd_set *fdset);  
//将一个给定的文件描述符加入集合之中
void FD_SET(int fd, fd_set *fdset); 
//将一个给定的文件描述符从集合中删除
void FD_CLR(int fd, fd_set *fdset);   
//检查集合中指定的文件描述符是否可以读写 
int FD_ISSET(int fd, fd_set *fdset); 


其中FD_SET是有操作系统把准备好的
看参数名称,我们也能理解,三个参数根据的就是我们要对文件监控的事件类型来区分的,分别是读、写、异常条件,如果你只关注某一个事件类型,把其余的指针置为NULL即可。
3.timeout参数就比较有意思了,我们先看下timeval数据结构的类型

struct timeval{      
 long tv_sec;   /*秒 */
 long tv_usec;  /*微秒 */ 
}


该参数可以使select处于三种状态,第一,若为NULL,即不传入时间结构,就是将select置于阻塞状态,一定等到监视文件描述符集合中某个文件描述符发生变化为止;第二,若将时间值设为0秒0微秒,就变成一个纯粹的非阻塞函数,不管文件描述符是否有变化,都立刻返回继续执行,文件无变化返回0,有变化返回一个正值;第三,timeout的值大于0,这就是等待的超时时间,即select在timeout时间内阻塞,超时时间之内有事件到来就返回了,否则在超时后不管怎样一定返回,返回值同上述。
select的返回值是一个int类型的数,就绪描述符的数目:
如果为正,代表有ready的文件描述符
如果为-1,代表出错了
如果为0,代表超时,没有ready的文件描述符。
起初,我理解到这,以为就搞明白了select,但是实际上还差的很远,几个小思考帮助我们深入理解:
1).为什么都说select有最大fd限制?
我们先来看下fd_set,它是存储fd的集合的结构体,有限制的话肯定跟它有关了,fd_set其实是一个整数数组,每个整数我们可以看成是一个二进制数,这样整个数组就是一个连在一起的大的二进制,而二进制的每一位代表一个fd了。举个例子,假设我们的fd_set是一个长度为1的数组,而每个元素占用1个字节,如果我们要把fd=2放到集合中,fd_set就是0000010,我们如果要再把fd=5放进去,就fd_set就是00010010,而设置fd_set的就是我们的FD_SET(fd,&set),fd_set占用的字节长度是代码写死的。理解为什么我们的fd_set存储的fd有最大值限制了吧,个人感觉这种实现方式略坑,假如我的文件描述符太大,就没办法使用select监听了。
2).select返回后,我们怎么判断是哪些fdready?
这同样跟fd_set的结构有关,我们传入fd_set后,select会把fd_set中没有ready的fd对应的位置0,保留ready的fd。比如我们调用select的时候,传入的fd_set为00000111,代表我们监听的fd为1,2,3,当select返回后,fd_set变为00000100,这就代表,只有fd为3的文件已经ready了,而检查的方式就是FD_ISSET(int fd, fd_set *fdset),没有看源码,但估计用的&方式。这种特性带来了一个很不好的地方,每调用一次select,fd_set就被会修改,并且select返回后,如果我们不保存原来的fd集合,就无法判断有哪些fd发生了改变,所以我们需要专门创建一个数组用于保存原来的fd集合,每次调用的时候把fd_set重新填充一遍,不是一般的麻烦啊。
3).最初理解select的timeout参数的解释的时候,我曾经看到有这样的解释:
(1)永远等待下去:仅在有一个描述字准备好I/O时才返回。为此,把该参数设置为空指针NULL。
(2)等待一段固定时间:在有一个描述字准备好I/O时返回,但是不超过由该参数所指向的timeval结构中指定的秒数和微秒数。
(3)根本不等待:检查描述字后立即返回,这称为轮询。为此,该参数必须指向一个timeval结构,而且其中的定时器值必须为0。
我就在想为什么是有一个ready就返回,如果每次最多只有一个fd被发现激活的话,返回值int直接说返回值为1不得了,还有,如果超时的时候,恰好扫描到一半,怎么办,直接返回吗?这些疑问促使想了解select是怎么实现的,这里大致说下select的实现原理(linux2.6以后的)。
linux的select主要由四个函数:
sys_select:处理时间参数,调用core_sys_select。
core_sys_select:处理三个fd_set参数,调用do_select。
do_select:做select/poll的工作。在合适的时机把自己挂起等待,调用sock_poll。
sock_poll:用函数指针分派到具体的协议层函数tcp_poll、udp_poll、datagram_poll。
我们只需要关注do_select即可,我并没有全部理解do_select的代码,而是带着解答我上面的疑惑的目的大致看下,如果有错误,还请指出。do_select首先进入一个循环,这个循环内部还有一个循环,而这个内部的循环会扫描所有的fd,来检查是否ready,等内部循环完毕后,会再次检测是否超时,如果超时则break外层循环,如果我们设置的超时时间是NULL,并且本次内部循环ready的fd不等于0,则直接break外层循环。
这也就代表select会扫描全部的fd后再判断是否超时,触发select返回的条件是至少有一个fdready,而并不是有一个就返回。
接下来,我们看看poll。

# include <poll.h>
int poll ( struct pollfd * fds, unsigned int nfds, int timeout);

照例,我们先看下函数的参数:
1.fds参数是用来存储fd集合的,注意它是个pollfd类型指针,其实就是数组,用数组,理论上来讲监听的fd的数量就没有最大的限制了(网上有说是基于链表的,看起来不太对),来看看pollfd数据结构

struct pollfd
{
 int fd;             
 short events;       
 short revents;      
} ; 


其中fd自然就是文件描述符了,events是要监听的类型,注意它是一个位掩码(每一位代表不同含义),也就是说每个fd我们可以监控不同的事件了,事件类型如下:
POLLIN 普通或优先级带数据可读
POLLRDNORM 普通数据可读
POLLRDBAND 优先级带数据可读
POLLPRI 高优先级数据可读
POLLOUT 普通数据可写
POLLWRNORM 普通数据可写
POLLWRBAND 优先级带数据可写
POLLERR 发生错误
POLLHUP 发生挂起
POLLNVAL 描述字不是一个打开的文件
revents是什么呢?它是调用与告知调用方该fd发生的时间,同样是一个位掩码。
注意,如果某个fd监听了多个事件,即使只有一个事件触发,不会等待其他的事件都触发再返回。
2.nfds是nfds_t类型的参数,用于标记数组fds中的结构体元素的总数量
3.timeout代表超时时间,单位是毫秒
介绍完参数,来说下原理,其实poll的原理跟select没太大区别:
1.首先,每次都需要把fd的集合拷贝到内核空间,消耗加大,这跟select有同样的毛病
2.由于fd集合存储的方式采用了数组,不像select有太强的fd最大值限制了
3.其他就没有什么区别了,它同样是每次需要扫描所有的文件描述符,找到触发监听事件的文件描述符,然后返回。
1.2 epoll原理
终于到epoll了,epoll的出现就是为了解决select与poll的缺点,满足我们现在需要的网络高并发。
以下内容来自网络以及自己的理解,有不对的地方还请指出。
用户态下epoll的使用主要包含以下三个函数:

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct
epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);


我们先来文字描述下epoll的原理,然后看代码。
1.首先在使用之前调用epoll_create,创建一个epoll fd,注意epoll的使用等同于文件,所以在使用完之后要close,参数size已经没用了。这个epoll fd会存储着用户监听的描述符,并且现在较新的版本的Linux中,存储的方式是红黑树,所以目前的size没有用了。
2.创建完一个epoll fd之后,可以开始调用epoll_ctl函数,把我们想要监听的文件描述符注册到epoll fd中,它的第一个参数是epoll fd的文件描述符,第二个参数代表本次调用的操作类型:添加、删除、修改,第三个参数的含义是我们监听该fd的哪些事件,我们来看下这个结构体长啥样。

struct epoll_event {
 __uint32_t events; /* Epoll events */
 epoll_data_t data; /* User data variable */
};
typedef union epoll_data {
 void *ptr;
 int fd;
 __uint32_t u32;
 __uint64_t u64;
} epoll_data_t;


3.注册完之后,接着就需要查看我关注的那些文件描述符是否ready了,第三个函数epoll_wait就是提供这样的功能的,它的作用就是把我们监听文件描述符中,获取到ready的那些。
第一个参数就不说了,直接看第二个,这个参数实际上是传进去的指针,并且是分配好内存的地址,让内核把那些ready的数据放进去,第三个参数maxevents的events的大小,最后一个timeout自然就是超时时间了,它的单位是毫秒级别,如果为0则代表立即返回,如果为复制,则一直阻塞直到有文件描述符ready。
epoll的优点:
1.相比select和poll,epoll不用每次获取ready的事件时,把所有的文件描述符传给内核,大大的减少了内核的消耗,只需要用epoll_ctl进行更新即可。
2.在存储被监控的fd信息时,内核使用了slab机制,为epoll提供了快速的数据结构,这一点我没有深入理解slab机制,不便展开。
3.最后一点,也是最厉害的,如果我们调用epoll_wait从大量的文件描述符中查找ready的文件描述符,能够快速的返回。原因是epoll在create的时候,创建的是一个eventpoll结构的对象,该对象中有一个链表,专门用于存储ready的事件。调用epoll_wait的时候,只需要检查这个链表是否为空即可。
那么是谁向这个链表中插入ready的事件呢?我们再调用epoll_ctl注册我们监听的fd的时候,内核会帮我们给该描述符添加一个callback机制,当事件发生时,会通过callback机制把该文件描述符插入到我们的list中,详细过程稍后看代码。
4.有人说epoll采用了mmap技术,说实话我不太认同,此处略过。
5.由于存储fd的信息是采用红黑树的方式,所以多次添加同样的fd,会先检测是否已经存储过,是的话直接返回。
下面给出详细的代码
epoll_create时,系统为epoll fd创建了那些内容呢?

struct eventpoll {
 spin_lock_t lock;           //对本数据结构的访问
 struct mutex mtx;           //防止使用时被删除
 wait_queue_head_t wq;       //sys_epoll_wait() 使用的等待队列
 wait_queue_head_t poll_wait;//file->poll()使用的等待队列
 struct list_head rdllist;   //事件满足条件的链表
 struct rb_root rbr;         //用于管理所有fd的红黑树
 struct epitem *ovflist;     //将事件到达的fd进行链接起来发送至用户空间
}


而我们在给系统添加一个监听的fd时,系统又创建了什么?

struct epitem
{
 struct rb_node rbn;        //用于主结构管理的红黑树
 struct list_head rdllink;  //事件就绪队列
 struct epitem *next;       //用于主结构体中的链表
 struct epoll_filefd ffd;   //每个fd生成的一个结构
 int nwait;                 
 struct list_head pwqlist;  //poll等待队列
 struct eventpoll *ep;      //该项属于哪个主结构体
 struct list_head fllink;   //链接fd对应的file链表
 struct epoll_event event;  //注册的感兴趣的事件,也就是用户空间的epoll_event
 }


我们前面说过,eventpoll中采用的红黑树存储信息,现在看来存储的是epitem结构,他们通过的是rb_root来链接,我们来看下他的结构

struct epitem
{
 struct rb_node rbn;        //用于主结构管理的红黑树
 struct list_head rdllink;  //事件就绪队列
 struct epitem *next;       //用于主结构体中的链表
 struct epoll_filefd ffd;   //每个fd生成的一个结构
 int nwait;                 
 struct list_head pwqlist;  //poll等待队列
 struct eventpoll *ep;      //该项属于哪个主结构体
 struct list_head fllink;   //链接fd对应的file链表
 struct epoll_event event;  //注册的感兴趣的事件,也就是用户空间的epoll_event
 }


喔,里面有左右的节点地址,那么我们怎么根据这个地址来访问到epitem的其他字段信息呢?
epoll中使用了list_first_entry用于获取链表中第一个节点所在结构体的首地址,它的参数如下:

ptr:   链表头节点
type:  链表节点struct list_head变量所在结构体的类型
member:链表节点在所求首地址结构体内的成员变量名
epoll_create的代码比较简单,直接列出,不在详细描述
//原来使用的是hash表,所以有size,现在改为红黑树,故不使用.
 long sys_epoll_create(int size)
 {
 int error,fd = -1;
 struct eventpoll *ep;
 struct inode *inode;
 struct file *file;
 
 ….
 error = -EINVAL;
 if(size <= 0 || (error = ep_alloc(&ep)!=0))
 goto errror_return;
 //创建一个struct file结构,由于没有任何文件系统,为匿名文件,并将主结构体放入file->private项中进行保存
 error = anon_inode_getfd(&fd,&inode,&file,”[eventpoll]”,
 &eventpoll_fops,ep);
 if(error)
 goto error_free;
 return fd;
 ...
 }


下面直接看epoll_ctl,每一个fd加入到epoll中,就会创建一个struct epitem结构,并插入至红黑树中。

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
 struct epoll_event __user *, event)
{
 int error;
 int did_lock_epmutex = 0;
 struct file *file, *tfile;
 struct eventpoll *ep;
 struct epitem *epi;
 struct epoll_event epds;
 
 error = -EFAULT;
 if (ep_op_has_event(op) &&
 copy_from_user(&epds, event, sizeof(struct epoll_event)))
 goto error_return;
 error = -EBADF;
 file = fget(epfd);
 if (!file)
 goto error_return;
 tfile = fget(fd);
 if (!tfile)
 goto error_fput;
 error = -EPERM;
 if (!tfile->f_op || !tfile->f_op->poll)
 goto error_tgt_fput;
 error = -EINVAL;
 if (file == tfile || !is_file_epoll(file))
 goto error_tgt_fput;
 
 ep = file->private_data;
 
 if (unlikely(is_file_epoll(tfile) && op == EPOLL_CTL_ADD)) {
 mutex_lock(&epmutex);
 did_lock_epmutex = 1;
 error = -ELOOP;
 if (ep_loop_check(ep, tfile) != 0)
 goto error_tgt_fput;
 }
 mutex_lock(&ep->mtx);
 epi = ep_find(ep, tfile, fd);
 
 error = -EINVAL;
 switch (op) {
 case EPOLL_CTL_ADD:
 if (!epi) {
 epds.events |= POLLERR | POLLHUP;
 //重点在这,插入epds到ep中
 error = ep_insert(ep, &epds, tfile, fd);
 } else
 error = -EEXIST;
 break;
 case EPOLL_CTL_DEL:
 if (epi)
 error = ep_remove(ep, epi);
 else
 error = -ENOENT;
 break;
 case EPOLL_CTL_MOD:
 if (epi) {
 epds.events |= POLLERR | POLLHUP;
 error = ep_modify(ep, epi, &epds);
 } else
 error = -ENOENT;
 break;
 }
 mutex_unlock(&ep->mtx);
 
error_tgt_fput:
 if (unlikely(did_lock_epmutex))
 mutex_unlock(&epmutex);
 
 fput(tfile);
error_fput:
 fput(file);
error_return:
 
 return error;
}


该函数中很多都是异常检测,最终会调用ep_insert,插入数据,我们来看下ep_insert的代码

static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
 struct file *tfile, int fd)
{
 int error, revents, pwake = 0;
 unsigned long flags;
 long user_watches;
 struct epitem *epi;
 struct ep_pqueue epq;
 
 user_watches = atomic_long_read(&ep->user->epoll_watches);
 if (unlikely(user_watches >= max_user_watches))
 return -ENOSPC;
 //分配一个epitem
 if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
 return -ENOMEM;
 
 INIT_LIST_HEAD(&epi->rdllink);
 INIT_LIST_HEAD(&epi->fllink);
 INIT_LIST_HEAD(&epi->pwqlist);
 epi->ep = ep;
 ep_set_ffd(&epi->ffd, tfile, fd);
 epi->event = *event;
 epi->nwait = 0;
 epi->next = EP_UNACTIVE_PTR;
 
 
 epq.epi = epi;
 //这里给epitem安装了回调函数ep_ptable_queue_proc
 init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
 revents = tfile->f_op->poll(tfile, &epq.pt);
 
 
 error = -ENOMEM;
 if (epi->nwait < 0)
 goto error_unregister;
 
 
 spin_lock(&tfile->f_lock);
 list_add_tail(&epi->fllink, &tfile->f_ep_links);
 spin_unlock(&tfile->f_lock);
 
 
 ep_rbtree_insert(ep, epi);
 
 spin_lock_irqsave(&ep->lock, flags);
 
 if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
 list_add_tail(&epi->rdllink, &ep->rdllist);
 
 /* Notify waiting tasks that events are available */
 if (waitqueue_active(&ep->wq))
 wake_up_locked(&ep->wq);
 if (waitqueue_active(&ep->poll_wait))
 pwake++;
 }
 
 spin_unlock_irqrestore(&ep->lock, flags);
 
 atomic_long_inc(&ep->user->epoll_watches);
 
 
 if (pwake)
 ep_poll_safewake(&ep->poll_wait);
 
 return 0;
 
error_unregister:
 ep_unregister_pollwait(ep, epi);
 
 spin_lock_irqsave(&ep->lock, flags);
 if (ep_is_linked(&epi->rdllink))
 list_del_init(&epi->rdllink);
 spin_unlock_irqrestore(&ep->lock, flags);
 
 kmem_cache_free(epi_cache, epi);
 
 return error;
}


接下来是epoll_wait

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
 int, maxevents, int, timeout)
{
 int error;
 struct file *file;
 struct eventpoll *ep;
 
 if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
 return -EINVAL;
 
 
 if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
 error = -EFAULT;
 goto error_return;
 }
 
 
 error = -EBADF;
 file = fget(epfd);
 if (!file)
 goto error_return;
 
 error = -EINVAL;
 if (!is_file_epoll(file))
 goto error_fput;
 ep = file->private_data;
 //重点在这
 error = ep_poll(ep, events, maxevents, timeout);
 
error_fput:
 fput(file);
error_return:
 
 return error;
}


看下ep_poll的代码

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
 int maxevents, long timeout)
{
 int res = 0, eavail, timed_out = 0;
 unsigned long flags;
 long slack = 0;
 wait_queue_t wait;
 ktime_t expires, *to = NULL;
 
 if (timeout > 0) {
 struct timespec end_time = ep_set_mstimeout(timeout);
 slack = select_estimate_accuracy(&end_time);
 to = &expires;
 *to = timespec_to_ktime(end_time);
 } else if (timeout == 0) {
 //注意这里timeout为0直接跳转到check_events
 timed_out = 1;
 spin_lock_irqsave(&ep->lock, flags);
 goto check_events;
 }
 
fetch_events:
 spin_lock_irqsave(&ep->lock, flags);
 if (!ep_events_available(ep)) {
 init_waitqueue_entry(&wait, current);
 //将当前进程挂在等待队列
 __add_wait_queue_exclusive(&ep->wq, &wait);
 
 for (;;) {
 //set成功才会真正睡眠,等待被唤醒
 set_current_state(TASK_INTERRUPTIBLE);
 //唤醒可能是有ready事件,也有可能是超时事件
 if (ep_events_available(ep) || timed_out)
 break;
 if (signal_pending(current)) {
 res = -EINTR;
 break;
 }
 
 spin_unlock_irqrestore(&ep->lock, flags);
 if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
 timed_out = 1;
 
 spin_lock_irqsave(&ep->lock, flags);
 }
 __remove_wait_queue(&ep->wq, &wait);
 
 set_current_state(TASK_RUNNING);
 }
check_events:
 eavail = ep_events_available(ep);
 
 spin_unlock_irqrestore(&ep->lock, flags);
 //注意这里ep_send_events会把ready的事件赋值到调用时传进来的指针所指向的用户内存
 if (!res && eavail &&
 !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
 goto fetch_events;
 
 return res;
}
我们来看看这个ep_send_events函数源码
static int ep_send_events(struct eventpoll *ep,
 struct epoll_event __user *events, int maxevents)
{
 struct ep_send_events_data esed;
 
 esed.maxevents = maxevents;
 esed.events = events;
 //看起来是把ready的数据扫描到了esed中
 return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}


继续看ep_scan_ready_list的源码

static int ep_scan_ready_list(struct eventpoll *ep,
 int (*sproc)(struct eventpoll *,
 struct list_head *, void *),
 void *priv)
{
 int error, pwake = 0;
 unsigned long flags;
 struct epitem *epi, *nepi;
 //初始化一个列表,并且创建头部节点
 LIST_HEAD(txlist);
 mutex_lock(&ep->mtx);
 spin_lock_irqsave(&ep->lock, flags);
 //把ready列表rdllist拼接到txlist上,并且清空了rdllist
 list_splice_init(&ep->rdllist, &txlist);
 ep->ovflist = NULL;
 spin_unlock_irqrestore(&ep->lock, flags);
 
 //第二个是函数参数作为复制到用户指定地址的操作
 error = (*sproc)(ep, &txlist, priv);
 
 spin_lock_irqsave(&ep->lock, flags);
 //在扫描rdllist期间,有可能会有事件被触发,这些事件直接放到ovflist中,所以需要再次扫描一遍ovflist
 for (nepi = ep->ovflist; (epi = nepi) != NULL;
 nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
 
 if (!ep_is_linked(&epi->rdllink))
 list_add_tail(&epi->rdllink, &ep->rdllist);
 }
 
 ep->ovflist = EP_UNACTIVE_PTR;
 
 list_splice(&txlist, &ep->rdllist);
 
 if (!list_empty(&ep->rdllist)) {
 
 if (waitqueue_active(&ep->wq))
 wake_up_locked(&ep->wq);
 if (waitqueue_active(&ep->poll_wait))
 pwake++;
 }
 spin_unlock_irqrestore(&ep->lock, flags);
 
 mutex_unlock(&ep->mtx);
 
 if (pwake)
 ep_poll_safewake(&ep->poll_wait);
 
 return error;
}


再看下ep_send_events_proc

static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
 void *priv)
{
 struct ep_send_events_data *esed = priv;
 int eventcnt;
 unsigned int revents;
 struct epitem *epi;
 struct epoll_event __user *uevent;
 
 for (eventcnt = 0, uevent = esed->events;
 !list_empty(head) && eventcnt < esed->maxevents;) {
 //list_first_entry 的作用就是找到epi,上面已经介绍过其作用
 epi = list_first_entry(head, struct epitem, rdllink);
 //从双向链表中删除该节点
 list_del_init(&epi->rdllink);
 revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
 epi->event.events;
 
 if (revents) {
 if (__put_user(revents, &uevent->events) ||
 __put_user(epi->event.data, &uevent->data)) {
 list_add(&epi->rdllink, head);
 return eventcnt ? eventcnt : -EFAULT;
 }
 eventcnt++;
 uevent++;
 if (epi->event.events & EPOLLONESHOT)
 epi->event.events &= EP_PRIVATE_BITS;
 else if (!(epi->event.events & EPOLLET)) {
 list_add_tail(&epi->rdllink, &ep->rdllist);
 }
 }
 }
 return eventcnt;
} 



1.3 注意

关于epoll还有一个地方需要注意,它有两种工作方式:水平触发和边缘触发。
先来解释下为什么会有这两种工作方式,我们知道当调用poll_wait的时候,epoll机制会把当前已经ready的文件描述符告诉给用户。我们知道在网络通信中,socket在linux中就是一个文件,系统给每个套接字分配发送和接受缓冲区。假设当我们拿到ready的文件描述符是一个socket可读事件时,可能一次读取并不能把缓冲区中的数据全部读取,那么就带来了两种不同的工作方式:
1.我每次读取完之后(有可能还未把缓冲区的数据读取完),继续调用poll_wait,epoll机制继续把该socket作为事件触发的返回给用户,直到缓冲区中没有数据了,并且也没有新的事件触发,则不再通知用户。这种工作方式,被称为水平触发(LT)。
2.我调用poll_wait之后,epoll机制告诉我们哪个socket可读了,以后不会再通知我,我们会一直读取数据,直到把数据读取完之后,再次调用poll_wait。这种工作方式被称为边缘触发(ET)。
我们再回过头看下源码中哪个地方体现了这两种不同的模式。
看下ep_send_events_proc函数,这个函数的作用就是把内核的ready的事件复制到用户空间。我们分析下源码过程。
遍历所有的txlist列表:
注意我们看到这个for循环中使用了list_empty(head),其中head最初的值为ep的rdllist变量,也就是rdllist的头结点,
我们来看下list_empty的源码

static inline int list_empty(const struct list_head *head)  
{  
 return head->next == head;  
}  


它的作用就是返回一个节点,直到为NULL。
1.拿到这个list_head的节点,调用list_first_entry读取对应的epitem。
2.把该节点删除
3.调用该文件对应的操作poll函数,返回该文件的状态掩码(注意这个地方跟边缘触发和水平触发就有关系了,拿tcpsocket为例,如果socket缓冲区中的可读数据还有的话,返回的可读状态就为true),把该状态掩码与我们的注册时关注的状态想与,就能得到我们关注的状态哪些是ready的了。
4.如果有事件ready的话,我们就进入if中,__put_user函数就是把内核的数据copy到用户空间的函数,并不是网上说的内存映射技术。我们看到两次调用分别把ready的事件与ready的具体信息copy用户在调用poll_wait的时候传过来的指针所指向的用户空间内存。
5.如果copy失败,会把当前的时间重新插入链表,然后把这些剩余的节点重新放在rdllist上,在哪放回的呢?注意看上面列出的ep_scan_ready_list源码,其中有一个地方是调用了
list_splice(&txlist, &ep->rdllist);
它其实就是把txlist重新又链到了rdllist上。
6.eventcnt++指的是增加ready事件的个数,用于epoll返回。uevent是用户空间的内存地址,加1的目的是下轮copy的地址需要是下一个item。
7.重点来了,首先判断我们关注的文件描述符事件是不是EPOLLONETSHOT类型,采用EPOLLONETSHOT事件的文件描述符上的注册事件只触发一次,要想重新注册事件则需要调用epoll_ctl重置文件描述符上的事件,如果是,就设置为EP_PRIVATE_BITS,没有具体了解这个值代表的含义,但是看里面有个private应该是把事件置为私有吧。一般不是EPOLLONETSHOT,再次判断,是否为非ET模式,如果是非ET模式,需要把本轮扫描的节点再次添加到txlist中,最终会添加到rdllist中去,如果为ET模式,则不再加。
注意:还以socket可读事件为例,再次把节点放进去的目的是为了防止该socket的缓冲区的数据还没有读取完,但是如果本次poll_wait应把数据读取完了,下次调用
revents=epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &epi->event.events
返回的revents会为0,不会进入if,也就是空跑一次。
到此,epoll原理介绍完了。