前言:
在查阅网络相关的资料时,一件事情引起了我的好奇,为何golang那么适合网络高并发,它的原理是什么?实际上要说明白这个问题,需要知道linux独有的epoll以及goroutine的调度,netFD的网络轮训器,本篇先关注epoll,其余的在接下来的文章中。
为什么要再次回顾netFD,原因是我在查询资料的时候,看到讲解netFD的很多都提到了它的网络轮训器,会在接下来的文章细说
type netFD struct {
....
pd pollDesc
}
PS:后面还有两篇:1.goroutine调度探索 2.惊群效应、close与shutdown区别探索等
epoll原理
为什么要先说epoll,因为golang在linux版本用的就是它。
以下内容是我结合网络查询的资料与自己的理解总结。
1.1 epoll与select、poll比较
说到epoll就不得不跟select与poll做比较了。在网络开发中,避免不了一个问题,如何做到高并发,而我们知道现在的网络服务动则上百万QPS,每台机器上的socket连接数上万绝对不在话下,服务程序与其上游建立了那么多socket,那么服务程序需要知道每个socket什么时候有数据过来了或者写成功了,然后做相应的操作。当然不仅仅是为了socket,包括所有的文件类型,只是说我们socket也当成了文件,为了实现这个需求,不同操作系统有不同的方式,也就是我们要说的select、poll、epoll,其中epoll是linux特有的。
select是怎么做的呢?
#include <sys/select.h>
#include <sys/time.h>
int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout)
先解释下这个函数的作用,用户程序通过调用select函数来请求系统获取被激活的文件描述符,解释下每个参数的含义:
1.maxfdp1的意思就是本次要求检测的描述符的最大的值加1(因为文件描述符是从0开始的)
2.readset、writeset、exceptset这三个参数的数据类型都是struct fd_set,该类型就是存储文件描述符集合的,在这里就是存储我们要检测的文件描述符,该结构可通过以下四个宏进行设置:
//清空集合
void FD_ZERO(fd_set *fdset);
//将一个给定的文件描述符加入集合之中
void FD_SET(int fd, fd_set *fdset);
//将一个给定的文件描述符从集合中删除
void FD_CLR(int fd, fd_set *fdset);
//检查集合中指定的文件描述符是否可以读写
int FD_ISSET(int fd, fd_set *fdset);
其中FD_SET是有操作系统把准备好的
看参数名称,我们也能理解,三个参数根据的就是我们要对文件监控的事件类型来区分的,分别是读、写、异常条件,如果你只关注某一个事件类型,把其余的指针置为NULL即可。
3.timeout参数就比较有意思了,我们先看下timeval数据结构的类型
struct timeval{
long tv_sec; /*秒 */
long tv_usec; /*微秒 */
}
该参数可以使select处于三种状态,第一,若为NULL,即不传入时间结构,就是将select置于阻塞状态,一定等到监视文件描述符集合中某个文件描述符发生变化为止;第二,若将时间值设为0秒0微秒,就变成一个纯粹的非阻塞函数,不管文件描述符是否有变化,都立刻返回继续执行,文件无变化返回0,有变化返回一个正值;第三,timeout的值大于0,这就是等待的超时时间,即select在timeout时间内阻塞,超时时间之内有事件到来就返回了,否则在超时后不管怎样一定返回,返回值同上述。
select的返回值是一个int类型的数,就绪描述符的数目:
如果为正,代表有ready的文件描述符
如果为-1,代表出错了
如果为0,代表超时,没有ready的文件描述符。
起初,我理解到这,以为就搞明白了select,但是实际上还差的很远,几个小思考帮助我们深入理解:
1).为什么都说select有最大fd限制?
我们先来看下fd_set,它是存储fd的集合的结构体,有限制的话肯定跟它有关了,fd_set其实是一个整数数组,每个整数我们可以看成是一个二进制数,这样整个数组就是一个连在一起的大的二进制,而二进制的每一位代表一个fd了。举个例子,假设我们的fd_set是一个长度为1的数组,而每个元素占用1个字节,如果我们要把fd=2放到集合中,fd_set就是0000010,我们如果要再把fd=5放进去,就fd_set就是00010010,而设置fd_set的就是我们的FD_SET(fd,&set),fd_set占用的字节长度是代码写死的。理解为什么我们的fd_set存储的fd有最大值限制了吧,个人感觉这种实现方式略坑,假如我的文件描述符太大,就没办法使用select监听了。
2).select返回后,我们怎么判断是哪些fdready?
这同样跟fd_set的结构有关,我们传入fd_set后,select会把fd_set中没有ready的fd对应的位置0,保留ready的fd。比如我们调用select的时候,传入的fd_set为00000111,代表我们监听的fd为1,2,3,当select返回后,fd_set变为00000100,这就代表,只有fd为3的文件已经ready了,而检查的方式就是FD_ISSET(int fd, fd_set *fdset),没有看源码,但估计用的&方式。这种特性带来了一个很不好的地方,每调用一次select,fd_set就被会修改,并且select返回后,如果我们不保存原来的fd集合,就无法判断有哪些fd发生了改变,所以我们需要专门创建一个数组用于保存原来的fd集合,每次调用的时候把fd_set重新填充一遍,不是一般的麻烦啊。
3).最初理解select的timeout参数的解释的时候,我曾经看到有这样的解释:
(1)永远等待下去:仅在有一个描述字准备好I/O时才返回。为此,把该参数设置为空指针NULL。
(2)等待一段固定时间:在有一个描述字准备好I/O时返回,但是不超过由该参数所指向的timeval结构中指定的秒数和微秒数。
(3)根本不等待:检查描述字后立即返回,这称为轮询。为此,该参数必须指向一个timeval结构,而且其中的定时器值必须为0。
我就在想为什么是有一个ready就返回,如果每次最多只有一个fd被发现激活的话,返回值int直接说返回值为1不得了,还有,如果超时的时候,恰好扫描到一半,怎么办,直接返回吗?这些疑问促使想了解select是怎么实现的,这里大致说下select的实现原理(linux2.6以后的)。
linux的select主要由四个函数:
sys_select:处理时间参数,调用core_sys_select。
core_sys_select:处理三个fd_set参数,调用do_select。
do_select:做select/poll的工作。在合适的时机把自己挂起等待,调用sock_poll。
sock_poll:用函数指针分派到具体的协议层函数tcp_poll、udp_poll、datagram_poll。
我们只需要关注do_select即可,我并没有全部理解do_select的代码,而是带着解答我上面的疑惑的目的大致看下,如果有错误,还请指出。do_select首先进入一个循环,这个循环内部还有一个循环,而这个内部的循环会扫描所有的fd,来检查是否ready,等内部循环完毕后,会再次检测是否超时,如果超时则break外层循环,如果我们设置的超时时间是NULL,并且本次内部循环ready的fd不等于0,则直接break外层循环。
这也就代表select会扫描全部的fd后再判断是否超时,触发select返回的条件是至少有一个fdready,而并不是有一个就返回。
接下来,我们看看poll。
# include <poll.h>
int poll ( struct pollfd * fds, unsigned int nfds, int timeout);
照例,我们先看下函数的参数:
1.fds参数是用来存储fd集合的,注意它是个pollfd类型指针,其实就是数组,用数组,理论上来讲监听的fd的数量就没有最大的限制了(网上有说是基于链表的,看起来不太对),来看看pollfd数据结构
struct pollfd
{
int fd;
short events;
short revents;
} ;
其中fd自然就是文件描述符了,events是要监听的类型,注意它是一个位掩码(每一位代表不同含义),也就是说每个fd我们可以监控不同的事件了,事件类型如下:
POLLIN 普通或优先级带数据可读
POLLRDNORM 普通数据可读
POLLRDBAND 优先级带数据可读
POLLPRI 高优先级数据可读
POLLOUT 普通数据可写
POLLWRNORM 普通数据可写
POLLWRBAND 优先级带数据可写
POLLERR 发生错误
POLLHUP 发生挂起
POLLNVAL 描述字不是一个打开的文件
revents是什么呢?它是调用与告知调用方该fd发生的时间,同样是一个位掩码。
注意,如果某个fd监听了多个事件,即使只有一个事件触发,不会等待其他的事件都触发再返回。
2.nfds是nfds_t类型的参数,用于标记数组fds中的结构体元素的总数量
3.timeout代表超时时间,单位是毫秒
介绍完参数,来说下原理,其实poll的原理跟select没太大区别:
1.首先,每次都需要把fd的集合拷贝到内核空间,消耗加大,这跟select有同样的毛病
2.由于fd集合存储的方式采用了数组,不像select有太强的fd最大值限制了
3.其他就没有什么区别了,它同样是每次需要扫描所有的文件描述符,找到触发监听事件的文件描述符,然后返回。
1.2 epoll原理
终于到epoll了,epoll的出现就是为了解决select与poll的缺点,满足我们现在需要的网络高并发。
以下内容来自网络以及自己的理解,有不对的地方还请指出。
用户态下epoll的使用主要包含以下三个函数:
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct
epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
我们先来文字描述下epoll的原理,然后看代码。
1.首先在使用之前调用epoll_create,创建一个epoll fd,注意epoll的使用等同于文件,所以在使用完之后要close,参数size已经没用了。这个epoll fd会存储着用户监听的描述符,并且现在较新的版本的Linux中,存储的方式是红黑树,所以目前的size没有用了。
2.创建完一个epoll fd之后,可以开始调用epoll_ctl函数,把我们想要监听的文件描述符注册到epoll fd中,它的第一个参数是epoll fd的文件描述符,第二个参数代表本次调用的操作类型:添加、删除、修改,第三个参数的含义是我们监听该fd的哪些事件,我们来看下这个结构体长啥样。
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
3.注册完之后,接着就需要查看我关注的那些文件描述符是否ready了,第三个函数epoll_wait就是提供这样的功能的,它的作用就是把我们监听文件描述符中,获取到ready的那些。
第一个参数就不说了,直接看第二个,这个参数实际上是传进去的指针,并且是分配好内存的地址,让内核把那些ready的数据放进去,第三个参数maxevents的events的大小,最后一个timeout自然就是超时时间了,它的单位是毫秒级别,如果为0则代表立即返回,如果为复制,则一直阻塞直到有文件描述符ready。
epoll的优点:
1.相比select和poll,epoll不用每次获取ready的事件时,把所有的文件描述符传给内核,大大的减少了内核的消耗,只需要用epoll_ctl进行更新即可。
2.在存储被监控的fd信息时,内核使用了slab机制,为epoll提供了快速的数据结构,这一点我没有深入理解slab机制,不便展开。
3.最后一点,也是最厉害的,如果我们调用epoll_wait从大量的文件描述符中查找ready的文件描述符,能够快速的返回。原因是epoll在create的时候,创建的是一个eventpoll结构的对象,该对象中有一个链表,专门用于存储ready的事件。调用epoll_wait的时候,只需要检查这个链表是否为空即可。
那么是谁向这个链表中插入ready的事件呢?我们再调用epoll_ctl注册我们监听的fd的时候,内核会帮我们给该描述符添加一个callback机制,当事件发生时,会通过callback机制把该文件描述符插入到我们的list中,详细过程稍后看代码。
4.有人说epoll采用了mmap技术,说实话我不太认同,此处略过。
5.由于存储fd的信息是采用红黑树的方式,所以多次添加同样的fd,会先检测是否已经存储过,是的话直接返回。
下面给出详细的代码
epoll_create时,系统为epoll fd创建了那些内容呢?
struct eventpoll {
spin_lock_t lock; //对本数据结构的访问
struct mutex mtx; //防止使用时被删除
wait_queue_head_t wq; //sys_epoll_wait() 使用的等待队列
wait_queue_head_t poll_wait;//file->poll()使用的等待队列
struct list_head rdllist; //事件满足条件的链表
struct rb_root rbr; //用于管理所有fd的红黑树
struct epitem *ovflist; //将事件到达的fd进行链接起来发送至用户空间
}
而我们在给系统添加一个监听的fd时,系统又创建了什么?
struct epitem
{
struct rb_node rbn; //用于主结构管理的红黑树
struct list_head rdllink; //事件就绪队列
struct epitem *next; //用于主结构体中的链表
struct epoll_filefd ffd; //每个fd生成的一个结构
int nwait;
struct list_head pwqlist; //poll等待队列
struct eventpoll *ep; //该项属于哪个主结构体
struct list_head fllink; //链接fd对应的file链表
struct epoll_event event; //注册的感兴趣的事件,也就是用户空间的epoll_event
}
我们前面说过,eventpoll中采用的红黑树存储信息,现在看来存储的是epitem结构,他们通过的是rb_root来链接,我们来看下他的结构
struct epitem
{
struct rb_node rbn; //用于主结构管理的红黑树
struct list_head rdllink; //事件就绪队列
struct epitem *next; //用于主结构体中的链表
struct epoll_filefd ffd; //每个fd生成的一个结构
int nwait;
struct list_head pwqlist; //poll等待队列
struct eventpoll *ep; //该项属于哪个主结构体
struct list_head fllink; //链接fd对应的file链表
struct epoll_event event; //注册的感兴趣的事件,也就是用户空间的epoll_event
}
喔,里面有左右的节点地址,那么我们怎么根据这个地址来访问到epitem的其他字段信息呢?
epoll中使用了list_first_entry用于获取链表中第一个节点所在结构体的首地址,它的参数如下:
ptr: 链表头节点
type: 链表节点struct list_head变量所在结构体的类型
member:链表节点在所求首地址结构体内的成员变量名
epoll_create的代码比较简单,直接列出,不在详细描述
//原来使用的是hash表,所以有size,现在改为红黑树,故不使用.
long sys_epoll_create(int size)
{
int error,fd = -1;
struct eventpoll *ep;
struct inode *inode;
struct file *file;
….
error = -EINVAL;
if(size <= 0 || (error = ep_alloc(&ep)!=0))
goto errror_return;
//创建一个struct file结构,由于没有任何文件系统,为匿名文件,并将主结构体放入file->private项中进行保存
error = anon_inode_getfd(&fd,&inode,&file,”[eventpoll]”,
&eventpoll_fops,ep);
if(error)
goto error_free;
return fd;
...
}
下面直接看epoll_ctl,每一个fd加入到epoll中,就会创建一个struct epitem结构,并插入至红黑树中。
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int did_lock_epmutex = 0;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
error = -EFAULT;
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
goto error_return;
error = -EBADF;
file = fget(epfd);
if (!file)
goto error_return;
tfile = fget(fd);
if (!tfile)
goto error_fput;
error = -EPERM;
if (!tfile->f_op || !tfile->f_op->poll)
goto error_tgt_fput;
error = -EINVAL;
if (file == tfile || !is_file_epoll(file))
goto error_tgt_fput;
ep = file->private_data;
if (unlikely(is_file_epoll(tfile) && op == EPOLL_CTL_ADD)) {
mutex_lock(&epmutex);
did_lock_epmutex = 1;
error = -ELOOP;
if (ep_loop_check(ep, tfile) != 0)
goto error_tgt_fput;
}
mutex_lock(&ep->mtx);
epi = ep_find(ep, tfile, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP;
//重点在这,插入epds到ep中
error = ep_insert(ep, &epds, tfile, fd);
} else
error = -EEXIST;
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
mutex_unlock(&ep->mtx);
error_tgt_fput:
if (unlikely(did_lock_epmutex))
mutex_unlock(&epmutex);
fput(tfile);
error_fput:
fput(file);
error_return:
return error;
}
该函数中很多都是异常检测,最终会调用ep_insert,插入数据,我们来看下ep_insert的代码
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
int error, revents, pwake = 0;
unsigned long flags;
long user_watches;
struct epitem *epi;
struct ep_pqueue epq;
user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches))
return -ENOSPC;
//分配一个epitem
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
epq.epi = epi;
//这里给epitem安装了回调函数ep_ptable_queue_proc
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
revents = tfile->f_op->poll(tfile, &epq.pt);
error = -ENOMEM;
if (epi->nwait < 0)
goto error_unregister;
spin_lock(&tfile->f_lock);
list_add_tail(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_lock);
ep_rbtree_insert(ep, epi);
spin_lock_irqsave(&ep->lock, flags);
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
atomic_long_inc(&ep->user->epoll_watches);
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 0;
error_unregister:
ep_unregister_pollwait(ep, epi);
spin_lock_irqsave(&ep->lock, flags);
if (ep_is_linked(&epi->rdllink))
list_del_init(&epi->rdllink);
spin_unlock_irqrestore(&ep->lock, flags);
kmem_cache_free(epi_cache, epi);
return error;
}
接下来是epoll_wait
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
int error;
struct file *file;
struct eventpoll *ep;
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
error = -EFAULT;
goto error_return;
}
error = -EBADF;
file = fget(epfd);
if (!file)
goto error_return;
error = -EINVAL;
if (!is_file_epoll(file))
goto error_fput;
ep = file->private_data;
//重点在这
error = ep_poll(ep, events, maxevents, timeout);
error_fput:
fput(file);
error_return:
return error;
}
看下ep_poll的代码
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res = 0, eavail, timed_out = 0;
unsigned long flags;
long slack = 0;
wait_queue_t wait;
ktime_t expires, *to = NULL;
if (timeout > 0) {
struct timespec end_time = ep_set_mstimeout(timeout);
slack = select_estimate_accuracy(&end_time);
to = &expires;
*to = timespec_to_ktime(end_time);
} else if (timeout == 0) {
//注意这里timeout为0直接跳转到check_events
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
goto check_events;
}
fetch_events:
spin_lock_irqsave(&ep->lock, flags);
if (!ep_events_available(ep)) {
init_waitqueue_entry(&wait, current);
//将当前进程挂在等待队列
__add_wait_queue_exclusive(&ep->wq, &wait);
for (;;) {
//set成功才会真正睡眠,等待被唤醒
set_current_state(TASK_INTERRUPTIBLE);
//唤醒可能是有ready事件,也有可能是超时事件
if (ep_events_available(ep) || timed_out)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
check_events:
eavail = ep_events_available(ep);
spin_unlock_irqrestore(&ep->lock, flags);
//注意这里ep_send_events会把ready的事件赋值到调用时传进来的指针所指向的用户内存
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;
return res;
}
我们来看看这个ep_send_events函数源码
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
//看起来是把ready的数据扫描到了esed中
return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}
继续看ep_scan_ready_list的源码
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
//初始化一个列表,并且创建头部节点
LIST_HEAD(txlist);
mutex_lock(&ep->mtx);
spin_lock_irqsave(&ep->lock, flags);
//把ready列表rdllist拼接到txlist上,并且清空了rdllist
list_splice_init(&ep->rdllist, &txlist);
ep->ovflist = NULL;
spin_unlock_irqrestore(&ep->lock, flags);
//第二个是函数参数作为复制到用户指定地址的操作
error = (*sproc)(ep, &txlist, priv);
spin_lock_irqsave(&ep->lock, flags);
//在扫描rdllist期间,有可能会有事件被触发,这些事件直接放到ovflist中,所以需要再次扫描一遍ovflist
for (nepi = ep->ovflist; (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);
}
ep->ovflist = EP_UNACTIVE_PTR;
list_splice(&txlist, &ep->rdllist);
if (!list_empty(&ep->rdllist)) {
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
mutex_unlock(&ep->mtx);
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return error;
}
再看下ep_send_events_proc
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
struct ep_send_events_data *esed = priv;
int eventcnt;
unsigned int revents;
struct epitem *epi;
struct epoll_event __user *uevent;
for (eventcnt = 0, uevent = esed->events;
!list_empty(head) && eventcnt < esed->maxevents;) {
//list_first_entry 的作用就是找到epi,上面已经介绍过其作用
epi = list_first_entry(head, struct epitem, rdllink);
//从双向链表中删除该节点
list_del_init(&epi->rdllink);
revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
epi->event.events;
if (revents) {
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
return eventcnt ? eventcnt : -EFAULT;
}
eventcnt++;
uevent++;
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
else if (!(epi->event.events & EPOLLET)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
}
}
}
return eventcnt;
}
1.3 注意
关于epoll还有一个地方需要注意,它有两种工作方式:水平触发和边缘触发。
先来解释下为什么会有这两种工作方式,我们知道当调用poll_wait的时候,epoll机制会把当前已经ready的文件描述符告诉给用户。我们知道在网络通信中,socket在linux中就是一个文件,系统给每个套接字分配发送和接受缓冲区。假设当我们拿到ready的文件描述符是一个socket可读事件时,可能一次读取并不能把缓冲区中的数据全部读取,那么就带来了两种不同的工作方式:
1.我每次读取完之后(有可能还未把缓冲区的数据读取完),继续调用poll_wait,epoll机制继续把该socket作为事件触发的返回给用户,直到缓冲区中没有数据了,并且也没有新的事件触发,则不再通知用户。这种工作方式,被称为水平触发(LT)。
2.我调用poll_wait之后,epoll机制告诉我们哪个socket可读了,以后不会再通知我,我们会一直读取数据,直到把数据读取完之后,再次调用poll_wait。这种工作方式被称为边缘触发(ET)。
我们再回过头看下源码中哪个地方体现了这两种不同的模式。
看下ep_send_events_proc函数,这个函数的作用就是把内核的ready的事件复制到用户空间。我们分析下源码过程。
遍历所有的txlist列表:
注意我们看到这个for循环中使用了list_empty(head),其中head最初的值为ep的rdllist变量,也就是rdllist的头结点,
我们来看下list_empty的源码
static inline int list_empty(const struct list_head *head)
{
return head->next == head;
}
它的作用就是返回一个节点,直到为NULL。
1.拿到这个list_head的节点,调用list_first_entry读取对应的epitem。
2.把该节点删除
3.调用该文件对应的操作poll函数,返回该文件的状态掩码(注意这个地方跟边缘触发和水平触发就有关系了,拿tcpsocket为例,如果socket缓冲区中的可读数据还有的话,返回的可读状态就为true),把该状态掩码与我们的注册时关注的状态想与,就能得到我们关注的状态哪些是ready的了。
4.如果有事件ready的话,我们就进入if中,__put_user函数就是把内核的数据copy到用户空间的函数,并不是网上说的内存映射技术。我们看到两次调用分别把ready的事件与ready的具体信息copy用户在调用poll_wait的时候传过来的指针所指向的用户空间内存。
5.如果copy失败,会把当前的时间重新插入链表,然后把这些剩余的节点重新放在rdllist上,在哪放回的呢?注意看上面列出的ep_scan_ready_list源码,其中有一个地方是调用了
list_splice(&txlist, &ep->rdllist);
它其实就是把txlist重新又链到了rdllist上。
6.eventcnt++指的是增加ready事件的个数,用于epoll返回。uevent是用户空间的内存地址,加1的目的是下轮copy的地址需要是下一个item。
7.重点来了,首先判断我们关注的文件描述符事件是不是EPOLLONETSHOT类型,采用EPOLLONETSHOT事件的文件描述符上的注册事件只触发一次,要想重新注册事件则需要调用epoll_ctl重置文件描述符上的事件,如果是,就设置为EP_PRIVATE_BITS,没有具体了解这个值代表的含义,但是看里面有个private应该是把事件置为私有吧。一般不是EPOLLONETSHOT,再次判断,是否为非ET模式,如果是非ET模式,需要把本轮扫描的节点再次添加到txlist中,最终会添加到rdllist中去,如果为ET模式,则不再加。
注意:还以socket可读事件为例,再次把节点放进去的目的是为了防止该socket的缓冲区的数据还没有读取完,但是如果本次poll_wait应把数据读取完了,下次调用
revents=epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &epi->event.events
返回的revents会为0,不会进入if,也就是空跑一次。
到此,epoll原理介绍完了。