APUE.07b::网络编程-多路复用

五种 I/O 模型

  1. 阻塞 IO: 调用 read, 如果内核数据未就绪, 调用 read 的进程进入阻塞状态。应用程序调用一个 IO 函数,导致应用程序阻塞并等待数据准备就绪。如果数据没有准备好,一直等待。如果数据准备好了,则从内核拷贝到用户空间拷贝数据,IO 函数返回成功指示。
  2. 非阻塞 IO: nonblocking IO 的特点是用户进程需要不断的主动询问 kernel 数据是否准备好. 当所请求的 I/O 操作无法完成时,不要将进程睡眠,而是返回一个错误。这样我们的 I/O 操作函数将不断的测试数据是否已经准备好,如果没有准备好,继续测试,直到数据准备好为止。在这个不断测试的过程中,会大量的占用 CPU 的时间。
  3. 多路复用 IO: 复用模型会用到 select 或者 poll 函数,这两个函数也会使进程阻塞,但是和阻塞 I/O 所不同的的,这两个函数可以同时阻塞多个 I/O 操作。而且可以同时对多个读操作,多个写操作的 I/O 函数进行检测,直到有数据可读或可写时,才真正调用 I/O 操作函数。从而使得系统在单线程的情况下可以同时处理多个客户端请求. 与传统的多线程/多进程模型比, I/O 多路复用的最大优势是系统开销小。
    和阻塞 IO 模型相比,selectI/O 复用模型相当于提前阻塞了。等到有数据到来时,再调用 recv 就不会因为要等数据就绪而发生阻塞。
    • select: 一般采用 select + no-block, select 返回后要遍历所有阻塞在 select 上的 IO 句柄,找到数据就绪的那一个 IO 句柄后, 应用程序调用 recvfrom 将数据从内核区拷贝至用户区;
    • epoll : 比 select 更高效,无需轮询全部句柄,epoll 只返回数据 ready 的 IO 句柄
  4. 信号驱动 IO:让内核在数据就绪时用信号 SIGIO 通知我们,将此方法称为信号驱动 I/O。首先,我们允许套接字进行信号驱动 I/O,并通过系统调用 sigaction 安装一个信号处理程序。此系统调用立即返回,进程继续工作,它是非阻塞的。当数据报准备好被读时,就为该进程生成一个 SIGIO 信号。我们随即可以在信号处理程序中调用 recvfrom 来取读数据报。
  5. 异步 IO: 我们让内核启动操作,并在整个操作完成后(包括将数据从内核拷贝到我们自己的缓冲区)通知我们。
    调用 aio_read 函数,告诉内核描述字,缓冲区指针,缓冲区大小,文件偏移以及通知的方式,然后立即返回。当内核将数据拷贝到缓冲区后,再通知应用程序。
    上面其它四种模型,至少都会在由 kernel copy data to appliction 时阻塞。而该模型是当 copy 完成后才通知 application,可见是纯异步的。
    很少有 *nix 系统支持,windows 的 IOCP(完成端口)则是此模型

Linux I/O 模型的发展技术是: select -> poll -> epoll -> aio -> libevent -> libuv。另外还有 Windows 的 Completion Port。

提供一致的接口,IO Design Patterns
实际上,不管是哪种模型,都可以抽象一层出来,提供一致的接口,广为人知的有 ACE,Libevent 这些,他们都是跨平台的,而且他们自动选择最优的 I/O 复用机制,用户只需调用接口即可。说到这里又得说说2个设计模式,Reactor and Proactor。有一篇经典文章http://www.artima.com/articles/io_design_patterns.html值得阅读,Libevent 是 Reactor 模型,ACE 提供 Proactor 模型。实际都是对各种 I/O 复用机制的封装。

select

int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

select 函数监视的文件描述符分 3 类,分别是 writefds、readfds、和 exceptfds。调用后 select 函数会阻塞,直到有描述副就绪(有数据可读、可写、或者有 except),或者超时(timeout 指定等待时间,如果立即返回设为 null 即可),函数返回。当 select 函数返回后,可以通过遍历 fdset,来找到就绪的描述符。
select 目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select 的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在 Linux 上一般为 1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。

poll

int poll (struct pollfd *fds, unsigned int nfds, int timeout);

不同与 select 使用三个位图来表示三个 fdset 的方式,poll 使用一个 pollfd 的指针实现。
pollfd 结构包含了要监视的 event 和发生的 event,不再使用 select“参数-值”传递的方式。同时,pollfd 并没有最大数量限制(但是数量过大后性能也是会下降)。和 select 函数一样,poll 返回后,需要轮询 pollfd 来获取就绪的描述符。

epoll

epoll 是在 2.6 内核中提出的,是之前的 select 和 poll 的增强版本。相对于 select 和 poll 来说,epoll 更加灵活,没有描述符限制。epoll 使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的 copy 只需一次。

int epoll_create(int size);//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
  1. int epoll_create(int size);
    创建一个 epoll 的句柄,size 用来告诉内核这个监听的数目一共有多大,这个参数不同于 select()中的第一个参数,给出最大监听的 fd+1 的值,参数 size 并不是限制了 epoll 所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。
    当创建好 epoll 句柄后,它就会占用一个 fd 值,在 linux 下如果查看/proc/进程 id/fd/,是能够看到这个 fd 的,所以在使用完 epoll 后,必须调用 close()关闭,否则可能导致 fd 被耗尽。

  2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    函数是对指定描述符 fd 执行 op 操作。

  • epfd:是 epoll_create()的返回值。
  • op:表示 op 操作,用三个宏来表示:添加 EPOLL_CTL_ADD,删除 EPOLL_CTL_DEL,修改 EPOLL_CTL_MOD。分别添加、删除和修改对 fd 的监听事件。
  • fd:是需要监听的 fd(文件描述符)
  • epoll_event:是告诉内核需要监听什么事件
  1. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
    等待 epfd 上的 io 事件,最多返回 maxevents 个事件。
    参数 events 用来从内核得到事件的集合,maxevents 告之内核这个 events 有多大,这个 maxevents 的值不能大于创建 epoll_create()时的 size,参数 timeout 是超时时间(毫秒,0 会立即返回,-1 将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

select & poll & epoll 内部实现

select

select 的实现:

  • 当用户线程调用完 select 后开始进入阻塞状态,内核 开始轮询遍历 fd数组,查看数组里的每个 fd 对应的 Socket 的接收缓冲区否有数据。如果有数据到来,则将 fd 对应值设置为 1。如果没有数据到来,则保持值为 0

  • 内核遍历一遍 fd数组 后,如果发现有些 fd 上有 IO 数据到来,则将修改后的 fd数组 返回给用户线程。此时,会将 fd数组内核空间 拷贝到 用户空间

  • 当内核将修改后的 fd数组 返回给用户线程后,用户线程解除 阻塞,由用户线程开始工作:遍历 fd数组 然后找出 fd数组 中值为 1Socket 文件描述符…

  • 由于内核在遍历的过程中已经修改了 fd数组,所以在用户线程遍历完 fd数组,就需要重置 fd 数组,并重新调用 select 传入重置后的 fd数组,让内核发起新的一轮遍历轮询。

select 的不足:

  • 在发起 select 系统调用以及返回时,用户线程各发生了一次 用户态内核态 以及 内核态用户态 的上下文切换开销。发生 2 次上下文 切换
  • fd集合 从用户空间 拷贝 到内核空间的拷贝次数:
  • 内核 会对内核态的 fd集合 进行修改。导致每次在用户空间重新发起 select 调用时,都需要对 fd集合 进行 重置
  • fd集合 长度为固定的 1024,所以只能监听 0~1023 的文件描述符。
  • select 系统调用不是线程安全的。

很明显 select 也不能解决 C10K 问题,只适用于 1000 个左右的并发连接场景。

poll

poll 相比 select 的改进:

  • select 中使用的 fd集合 是采用的固定长度为 1024 的数组,而 poll 换成了一个 pollfd 结构没有固定长度的数组,这样就没有了最大描述符数量的限制(当然还会受到系统文件描述符限制)

所以 poll 只解决了 select 最大监听数的限制,但 poll 的 fd 组织是用的线性数据结构,遍历性能在 fd 数量很多的时候不足。依然无法解决 C10K 问题。

epoll

复习一下阻塞模式 recv:

阻塞 recv 场景下,服务端 sock 的 等待队列,队列里的 等待项(wait_queue_t)保存了两个重要数据:1)阻塞在此 sock 上的进程描述符, 2)就绪回调函数 autoremove_wake_function,这个函数的作用是当 sock 可读时,调用这个就绪函数;
那么当此 sock 的接收队列 有数据时,系统内核会从 等待队列 取出 等待项(wait_queue_t) ,通过 autoremove_wake_function 唤醒线程(注意,即使是有多个进程都阻塞在同一个 sock 上,也只唤醒 1 个进程,避免惊群。)

对于 epoll + 非阻塞 recv 的模式,上面提到的 等待项(wait_queue_t) 的回调函数变成了 epoll 的 ep_poll_callback

➤ epoll 调用过程如下:

(1)调用 epoll_create 创建 epoll 对象:

struct eventpoll {

//等待队列,阻塞在epoll上的进程会放在这里
wait_queue_head_t wq;

//就绪队列,IO就绪的socket连接会放在这里
struct list_head rdllist;

//红黑树用来管理所有监听的socket连接
struct rb_root rbr;

}

  • 等待队列:区别 sock 对象的等待队列,epoll 的等待队列里,存放是阻塞于 epoll 的进程;
  • 就绪队列:只有就绪的 sock 被放入,这也是比 select 效率高的改进;
  • 红黑树: epoll 监听的所有 sock 连接,都存于红黑树里,对于海量链接,红黑树比数组&链表的性能都更好;

epoll 的红黑树中保存的是,fd 作为查找 key ?

struct epitem {
  //指向epoll中对应的红黑树节点
  struct rb_node rbn;
  
  //指向epoll对象中的就绪队列
  struct list_head rdllink;
  
  //指向epitem所表示的socket->file结构以及对应的fd
  struct epoll_filefd ffd;
  
  //指向其所属的eventepoll对象
  struct eventpoll *ep;
  
  //期待的事件类型
  struct epoll_event event;

//下一个epitem实例
struct epitem *next;
};

(2)调用 epoll_ctl 向添加监听的 Socket:

  • sock 对象的 等待队列中,插入等待项(wait_queue_t 对象),这个等待项的回调函数是 epoll 的 ep_poll_callback,而不再是阻塞模式下的 autoremove_wake_function
  • sock 对象被包装为 epitem 类型,加入红黑树管理;

(3)调用 epoll_wait 同步阻塞:

  • 用户代码调用 epoll_wait 后,内核首先会查找 epoll 中的就绪队列(rdllist)是否有 IO 就绪的 sock;
  • 如果就绪队列(rdllist)没有就绪的 sock,那么会向 epoll 的等待队列中插入当前用户进程的信息,此等待项(wait_queue_t)的回调是 default_wake_function,然后用户进程让出 CPU,进入阻塞;

➤ sock 的接收队列有数据可读时:

  • 内核调用此 sock 等待队列上等待项的回调函数,这里是 epoll 的 ep_poll_callback,此函数中,把就绪的 sock 插入到 epoll 的就绪队列(rdllist)
  • 随后查看 epoll 中的等待队列中是否有等待项,也就是说查看是否有进程阻塞在 epoll_wait 上,如果没有等待项,则软中断处理完成。
  • 如果有等待项,则回到注册在等待项中的回调函数 default_wake_function,在回调函数中唤醒 阻塞进程,并将就绪队列 rdllist 中的 epitemIO就绪 socket 信息封装到 struct epoll_event 中返回。
  • 用户进程拿到 epoll_event,取出就绪的socket

下图是 epoll 等待数据到来的完整工作流程:

../_images/APUE.07b.网络编程-多路复用epoll-2023-05-04-2.png

➤ 为什么是红黑树?

这里我们再聊聊为啥要用红黑树,很多人说是因为效率高。其实我觉得这个解释不够全面,要说查找效率树哪能比的上 HASHTABLE。我个人认为觉得更为合理的一个解释是为了让 epoll 在查找效率、插入效率、内存开销等等多个方面比较均衡,最后发现最适合这个需求的数据结构是红黑树。

水平触发和边缘触发

再谈水平触发(LT)和边缘触发(ET)

网上有大量的关于这两种模式的讲解,大部分讲的比较模糊,感觉只是强行从概念上进行描述,看完让人难以理解。所以在这里,笔者想结合上边 epoll 的工作过程,再次对这两种模式做下自己的解读,力求清晰的解释出这两种工作模式的异同。

水平触发和边缘触发最关键的区别就在于 当 socket 中的接收缓冲区还有数据可读时。epoll_wait 是否会清空 rdllist

  • 水平触发(LT):在这种模式下,用户线程调用 epoll_wait 获取到 IO 就绪的 socket 后,对 Socket 进行系统 IO 调用读取数据,假设 socket 中的数据只读了一部分没有全部读完,用户再次调用 epoll_wait 不会直接清空 rdllist,而是 epoll_wait 会检查这些 Socket 中的接收缓冲区是否还有数据可读,如果还有数据可读,就将 socket 重新放回 rdllist。所以当 socket 上的 IO 没有被处理完时,再次调用 epoll_wait 依然可以获得这些 socket,用户进程可以接着处理 socket 上的 IO 事件。

  • 边缘触发(ET): 在这种模式下,用户再次 epoll_wait 就会直接清空 rdllist,不管 socket 上是否还有数据可读。所以在边缘触发模式下,当你没有来得及处理 socket 接收缓冲区的剩下可读数据时,再次调用 epoll_wait,因为这时 rdlist 已经被清空了,socket 不会再次从 epoll_wait 中返回,所以用户进程就不会再次获得这个 socket 了,也就无法在对它进行 IO 处理了。除非,这个 socket 上有新的 IO 数据到达,根据 epoll 的工作过程,该 socket 会被再次放入 rdllist 中。

如果你在边缘触发模式下,处理了部分 socket 上的数据,那么想要处理剩下部分的数据,就只能等到这个 socket 上再次有网络数据到达

在 Netty 中实现的 EpollSocketChannel 默认的就是边缘触发模式。JDK 的 NIO 默认是水平触发模式。

epoll 默认也是 LT https://stackoverflow.com/questions/24400941/how-do-we-know-whether-call-to-epoll-wait-is-edge-triggered-or-level-triggered

@ref: 聊聊Netty那些事儿之从内核角度看IO模型


比较 ET 和 LT 的性能:

  • 如果数据处理和 epoll 同一线程,LT 和 ET 区别不大,如果用 LT 会从 epoll_wait 醒来更频繁,对消息处理时效性更好;
  • 如果数据处理和 epoll 不在同一线程(例如更高并发的 RPC 实现中, 为了对大消息的反序列化也可以并行),这种情况下,如果数据处理的线程池被打满,那么 epoll_wait 所在的线程就会陷入疯狂的旋转(醒来-没有空闲的线程可以处理数据-再次 wait-唤醒),而 ET 是有消息来时才触发,和及时处理与否无关,频率低很多。

@ref: epoll的边沿触发模式(ET)真的比水平触发模式(LT)快吗?(当然LT模式也使用非阻塞IO,重点是要求ET模式下的代码不能造成饥饿) - 知乎


水平触发(LT)和边缘触发(ET)名字的来历?

为了弄明白 LT(Level Triggered,水平触发) 和 ET(Edge Triggered,边沿触发),我们先要了解,这个 Level 和 Edge 是什么涵义,Level 翻译成中文这里准确的涵义应该是电平; Edge 是边沿。

这两个词曾经是电子信号领域的一个专有名词。如果,用时序图来标示一个数字电信号“010”,应该是类似下图所示:

../_images/APUE.07b.网络编程-多路复用epoll-2023-05-04-1.png

  • 低电平表示0,高电平表示1
  • 0向1变化的竖线就是上升沿,1向0变化的竖线就是下降沿
  • 在0或者1的情况下触发的信号就是 LT(Level Triggered,水平触发)
  • 在0向1、1向0变化的过程中触发的信号就是 和 ET(Edge Triggered,边沿触发)

0或1都是一个状态,在 epoll 中,0表示缓冲区无数据,1表示缓冲区有数据,从无数据→有数据即上升沿,也即边缘触发(ET),我们很直观的就可以得出结论,LT 是一个持续的状态,ET 是个事件性的一次性状态。

二者的差异在于:

  • Level Triggered 模式下只要某个 socket 处于 readable/writable 状态,无论什么时候进行 epoll_wait 都会返回该 socket;

  • 而 Edge Triggered 模式下只有某个 socket 从 unreadable 变为 readable 或从 unwritable 变为 writable 时,epoll_wait 才会返回该 socket。

https://zhuanlan.zhihu.com/p/20315482

epoll example

https://github.com/onestraw/epoll-example/blob/master/epoll.c


listen_sock = socket(AF_INET, SOCK_STREAM, 0);

set_sockaddr(&srv_addr);
bind(listen_sock, (struct sockaddr *)&srv_addr, sizeof(srv_addr));

setnonblocking(listen_sock);
listen(listen_sock, MAX_CONN);

epfd = epoll_create(1);
epoll_ctl_add(epfd, listen_sock, EPOLLIN | EPOLLOUT | EPOLLET);

socklen = sizeof(cli_addr);
for (;;) {
nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
for (i = 0; i < nfds; i++) {
if (events[i].data.fd == listen_sock) {
/* handle new connection */
conn_sock =
accept(listen_sock,
(struct sockaddr *)&cli_addr,
&socklen);

inet_ntop(AF_INET, (char *)&(cli_addr.sin_addr),
buf, sizeof(cli_addr));
printf("[+] connected with %s:%d\n", buf,
ntohs(cli_addr.sin_port));

setnonblocking(conn_sock);
epoll_ctl_add(epfd, conn_sock,
EPOLLIN | EPOLLET | EPOLLRDHUP |
EPOLLHUP);
} else if (events[i].events & EPOLLIN) {
/* handle EPOLLIN event */
for (;;) {
bzero(buf, sizeof(buf));
n = read(events[i].data.fd, buf,
sizeof(buf));
if (n <= 0 /* || errno == EAGAIN */ ) {
break;
} else {
printf("[+] data: %s\n", buf);
write(events[i].data.fd, buf,
strlen(buf));
}
}
} else {
printf("[+] unexpected\n");
}
/* check if the connection is closing */
if (events[i].events & (EPOLLRDHUP | EPOLLHUP)) {
printf("[+] connection closed\n");
epoll_ctl(epfd, EPOLL_CTL_DEL,
events[i].data.fd, NULL);
close(events[i].data.fd);
continue;
}
}
}