基于Epoll实现的Reactor模型

基于epoll的Reactor网络模型

Epoll API

epoll_create

epoll_create 用于创建一个 epoll 实例,返回一个 epoll 句柄(文件描述符 epfd),用于后续 epoll_ctlepoll_wait 操作。

1
2
3
4
5
#include <sys/epoll.h>
//旧版需要指定 size,但它的值不会影响 epoll 句柄的行为,通常填 1 即可
int epoll_create(int size);
//新版: 0:默认行为 ;EPOLL_CLOEXEC:创建的 epoll 句柄在 exec() 后会自动关闭
int epoll_create1(int flags);

创建成功后,epfd 就可以用于管理多个 socket fd

epoll_event

epoll_event 结构体用于存储 epoll 监听的事件信息,包括触发的事件类型(events)和 关联的文件描述符(data.fd

1
2
3
4
struct epoll_event {
uint32_t events; // 事件类型(EPOLLIN, EPOLLOUT, EPOLLET 等)
epoll_data_t data; // 用户自定义数据,通常存储 fd
};

events:EPOLLIN 可读事件、 EPOLLOUT 可写事件、 EPOLLET边缘触发(默认水平)、EPOLLERR 错误事件、 EPOLLRDHUP 对端关闭连接。

1
2
3
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 监听可读事件,使用边缘触发模式
ev.data.fd = sockfd; // 绑定 socket fd
epoll_ctl

epoll_ctl 用于epoll 实例中添加、修改或删除文件描述符(fd)

1
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

op(操作类型):

  • EPOLL_CTL_ADD:向 epoll 实例中添加 fd
  • EPOLL_CTL_MOD:修改 fd 监听的事件类型
  • EPOLL_CTL_DEL:从 epoll 实例中删除 fd

fd:要操作的文件描述符(socket fd)。

eventepoll_event 结构体指针,EPOLL_CTL_DEL 操作时可以传 NULL

1
2
3
4
5
6
7
8
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 监听可读事件,边缘触发
ev.data.fd = sockfd;

if (epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev) == -1) {
perror("epoll_ctl failed");
exit(EXIT_FAILURE);
}
epoll_wait

等待 epoll 事件触发,获取就绪 fd 列表。

epoll监听多客户端代码模板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//抽象 epoll 套路写法
struct epoll_event ev, events[MAX_EVENTS];
epfd = epoll_create(1);
ev.events = EPOLLIN;
ev.data.fd = sock_server;
epoll_ctl(epfd, EPOLL_CTL_ADD, server_fd, &ev);
while (1) {
event_count = epoll_wait(epfd, events, MAX_EVENTS, -1);
for (int i = 0; i < event_count; i++) {
int fd = events[i].data.fd;
if (fd == server_fd)
// 处理新连接
else
// 处理客户端消息
}
}
close(epfd);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/epoll.h>

#define MAX_EVENTS 10
#define PORT 8080

int main() {
int server_fd, client_fd, epfd, event_count;
struct sockaddr_in server_addr, client_addr;
socklen_t addr_len = sizeof(client_addr);
struct epoll_event ev, events[MAX_EVENTS];

// 创建服务器 socket
server_fd = socket(AF_INET, SOCK_STREAM, 0);
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);

bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
listen(server_fd, 10);

// 创建 epoll 实例
epfd = epoll_create(1);

// 注册服务器 socket 到 epoll,监听新连接
ev.events = EPOLLIN;
ev.data.fd = server_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, server_fd, &ev);

printf("Server listening on port %d...\n", PORT);

while (1) {
// 等待事件
event_count = epoll_wait(epfd, events, MAX_EVENTS, -1);
for (int i = 0; i < event_count; i++) {
int fd = events[i].data.fd;
if (fd == server_fd) {
// 处理新连接
client_fd = accept(server_fd, (struct sockaddr *)&client_addr, &addr_len);
printf("New client connected: %d\n", client_fd);

ev.events = EPOLLIN;
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev);
} else {
// 处理客户端消息
char buffer[1024];
int bytes_read = read(fd, buffer, sizeof(buffer));
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
printf("Received: %s\n", buffer);
write(fd, buffer, bytes_read); // 回显
} else {
// 客户端断开连接
printf("Client %d disconnected\n", fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}
}
}
}
close(server_fd);
close(epfd);
return 0;
}

Epoll原理剖析

epoll的两个重要结构:rbtree 和 rdlist。

rbtree是红黑树结构,为了管理通过epoll_ctl注册在epoll上的文件描述符(如sock_fd),插入,删除,查找均为O(logn)。

rdlist是用来存放接收缓冲区有数据的socket。这里细分为两种状态,ET(边缘触发):数据到达,会触发事件通知,程序未完全读完缓冲区数据,下次也不会继续通知剩余部分;LT(水平触发):只要缓冲区有数据,就会一直通过epoll_wait返回这些事件,直到读完为止。

这里大家可能有点懵逼,这和rdlist有什么关系?接下来会带着大家捋一下整个流程。

rdlist是epoll的结构体eventpoll中的一个成员变量,是epoll特有的。那之前IO复用是如何通知CPU将对应socket缓冲区有数据的进程重新放回工作队列中的呢,像select,采用的是“每个sock结构中的等待队列中放入它对应的进程,当接收缓冲区有数据的时候,发送中断信号,CPU重新把对应进程放入工作队列,此操作中,①查看缓冲区是否有数据,将有数据的进程从对应socket的等待队列中取出,放入工作队列,需要完整遍历一次socket列表;②在进程被唤醒后,需要知道哪些socket中有数据,通过FD_ISSET再判断出来,需要从用户态切换到内核态,且再遍历一次socket列表。”

每次调用select都需要将进程加入到所有监视socket的等待队列,每次唤醒都需要从每个队列中移除。这里涉及了两次遍历,而且每次都要将整个fds列表传递给内核,有一定的开销。正是因为遍历操作开销大出于效率的考量,才会规定select的最大监视数量,默认只能监视1024个socket。

那介绍完select后,说回来epoll。为什么需要用一个rdlist存socket?

因为可以减少遍历的次数。从设计上,epoll**(struct eventpoll)会和其它socket(struct sock)**一样,内核会创建一个对应的结构体,位于内核态(kmalloc分配的内存区域),与调用进程绑定,此时。这种设计模式下,我们不会再把对应的阻塞进程加入到socket的等待队列中,而是通过epoll_ctl先将所有需要管理的socket注册到epoll中,当有进程因为epoll_wait被阻塞,将其放入eventpoll的等待队列中。当socket的接收缓冲区接收到数据后,会产生中断,中断程序会:①将接收到数据的socket的fd放入rdlist中(减少了一次查询遍历);②唤醒在等待队列中的进程,再次进入运行态(减少了一次访问socket中进程,再删除的遍历)。

1
2
3
4
5
6
7
struct eventpoll {
struct mutex mtx; // 互斥锁,保护数据结构
struct rb_root_cached rbr; // 红黑树,存储被监听的 fd
struct list_head rdlist; // 就绪事件链表
wait_queue_head_t wq; // 等待队列
struct file *file; // 关联的文件描述符
};

epoll原理详解及epoll反应堆模型

就绪列表引用着就绪的socket,所以它应能够快速的插入数据。所以就绪列表应是一种能够快速插入和删除的数据结构。

双向链表就是这样一种数据结构,epoll使用双向链表来实现就绪队列(对应上图的rdlist)。

既然epoll将“维护监视队列”和“进程阻塞”分离,也意味着需要有个数据结构来保存监视的socket。至少要方便的添加和移除,还要便于搜索,以避免重复添加。红黑树是一种自平衡二叉查找树,搜索、插入和删除时间复杂度都是O(log(N)),效率较好。epoll使用了红黑树作为索引结构(对应上图的rbr)。

ps:因为操作系统要兼顾多种功能,以及由更多需要保存的数据,rdlist并非直接引用socket,而是通过epitem间接引用,红黑树的节点也是epitem对象。同样,文件系统也并非直接引用着socket。

Reactor模型

高性能并发编程模型。应用:网络服务器、事件驱动编程。

利用I/O多路复用(epoll, select, poll)处理并发连接。

**Reactor将IO管理,转变为事件管理。**不同的IO事件,对应不同的回调函数,先register,后callback。

1
2
3
4
//io	        event	      callback
listenfd EPOLLIN accept_cb
clientfd EPOLLIN recv_cb
clientfd EPOLLOUT send_cb

Reactor 模式本质上是对 epoll(或其他 I/O 多路复用机制)的封装,它提供了一种事件驱动的异步编程模型,用于高效处理多个 I/O 连接。Reactor 是一种 编程模式,基于 epoll 组织事件循环,提高代码的扩展性和可维护性。

大部分高性能网络框架(如 Nginx、Redis、Netty)都基于 Reactor 模式,并使用 epoll 作为底层实现

一个典型的 Reactor 框架会抽象出 事件分发器(Dispatcher)、事件处理器(Handler).

优势

  • 封装 epoll,提供面向对象的 API
  • 支持回调机制handleEvent()
  • 更易扩展(可以用于不同的 I/O 事件)

直接使用epoll 缺点

  • 代码层面是面向系统调用的,难以扩展。
  • 没有事件回调机制,必须手动遍历事件列表。

Reactor完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>

#define BUFFER_LENGTH 1024
#define CONNECTION_SIZE 1024

typedef int (*RCALLBACK)(int fd);

int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);

int epfd = 0;

struct conn{
int fd;
char rbuffer[BUFFER_LENGTH];
int rlength;
char wbuffer[BUFFER_LENGTH];
int wlength;
RCALLBACK send_callback;
union{
RCALLBACK recv_callback;
RCALLBACK accept_callback;
} r_action;
};
struct conn conn_list[CONNECTION_SIZE] = {0};

int set_event(int fd, int event, int flag){
if (flag){
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
}
else{
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
int event_register(int fd, int event){
if(fd < 0) return -1;
conn_list[fd].fd = fd;
conn_list[fd].r_action.recv_callback = recv_cb; // 读回调
conn_list[fd].send_callback = send_cb; // 写回调
memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
conn_list[fd].rlength = 0;
memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
conn_list[fd].wlength = 0;
set_event(fd, event, 1);
}
int accept_cb(int fd){
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(fd, (struct sockaddr *)&clientaddr, &len);
printf("accept finished: %d\n", clientfd);
if(clientfd < 0){
printf("accept errno: %d --> %s\n", errno, strerror(errno));
return -1;
}
event_register(clientfd, EPOLLIN);
return 0;
}
int recv_cb(int fd){
int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);
if(count == 0){
printf("client disconnect: %d\n", fd);
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // 从 epoll 删除
return 0;
}
conn_list[fd].rlength = count;
printf("RECV: %s\n", conn_list[fd].rbuffer);

conn_list[fd].wlength = conn_list[fd].rlength;
memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);
set_event(fd, EPOLLOUT, 0);
return count;
}
int send_cb(int fd){
int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
set_event(fd, EPOLLIN, 0);
return count;
}
int init_server(unsigned short port){
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);

bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
listen(sockfd, 10);
return sockfd;
}

int main(){
epfd = epoll_create(1);
int sockfd = init_server(2000);
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.recv_callback = accept_cb;
set_event(sockfd, EPOLLIN, 1);
while(true){
struct epoll_event events[1024];
int nready = epoll_wait(epfd, events, 1024, -1);
for (int i = 0; i < nready; ++i){
int connfd = events[i].data.fd;
if (events[i].events & EPOLLIN) conn_list[connfd].r_action.recv_callback(connfd);
if (events[i].events & EPOLLOUT) conn_list[connfd].send_callback(connfd);
}
}
}

Rector函数说明

1. set_event(int fd, int event, int flag)

作用:配置 epoll 事件,将 fd 注册到 epoll 实例中,并指定监听的事件类型。
参数

  • fd:要监听的文件描述符(socket)。
  • event:监听的事件(如 EPOLLINEPOLLOUT)。
  • flag:操作类型(EPOLL_CTL_ADD 添加新 fdEPOLL_CTL_MOD 修改 fd 的事件)。
    实现逻辑
  • 创建 epoll_event 结构体,设置事件类型。
  • 调用 epoll_ctl()epoll 实例中注册或修改 fd 事件。
2. event_register(int fd, int event)

作用:初始化并注册一个新的 fdepoll,使其能够监听指定的事件。
参数

  • fd:需要注册的 socket 文件描述符。
  • event:监听的事件类型(如 EPOLLIN)。
    实现逻辑
  • fd 关联必要的资源(如 I/O 缓冲区、回调函数)。
  • 调用 set_event() 将其注册到 epoll,监听可读(EPOLLIN)或可写(EPOLLOUT)事件。
3. accept_cb(int fd)

作用:当服务器监听的 socket 变为可读(表示有新连接)时,调用此回调函数接受新连接。
参数

  • fd:监听的服务器 socket
    实现逻辑
  • 调用 accept() 接受新的客户端连接。
  • 为新连接创建 socket 并设置非阻塞模式。
  • 打印客户端连接信息(如 IP 地址、端口)。
  • 调用 event_register() 注册新 socket,监听 EPOLLIN 事件(可读)。
4. recv_cb(int fd)

作用:当 fd 变为可读(即有数据到达)时,调用此回调函数读取数据。
参数

  • fd:客户端的 socket 文件描述符。
    实现逻辑
  • fd 读取数据到接收缓冲区。
  • 处理收到的数据(如打印、回显)。
  • 若客户端断开连接(recv() 返回 0),关闭 fd 并移除 epoll 监听。
5. send_cb(int fd)

作用:当 fd 变为可写(即发送缓冲区有空间)时,调用此回调函数发送数据。
参数

  • fd:客户端的 socket 文件描述符。
    实现逻辑
  • 从发送缓冲区取数据,写入 fd
  • 若数据发送完毕,将 fd 事件重新设置为 EPOLLIN,继续监听可读事件。
  • 若未发送完全部数据,继续监听 EPOLLOUT,等待可写事件再次触发。
6. init_server(unsigned short port)

作用:初始化服务器,创建 socket 并监听指定端口,准备接受客户端连接。
参数

  • port:服务器监听的端口号。
    实现逻辑
  • 创建 socket,绑定 port,设置非阻塞模式。
  • 调用 listen()socket 进入监听模式。
  • 调用 event_register()server_fd 注册到 epoll,监听 EPOLLIN 事件(新连接)。

7. main()

作用:程序主入口,负责初始化 epoll 事件循环,并分发 I/O 事件。
实现逻辑

  1. 创建 epoll 实例。

  2. 调用 init_server(port) 初始化服务器 socket 并注册到 epoll

  3. 进入 epoll_wait()

    事件循环:

    • 等待 epoll 事件触发。
    • 根据 fd 状态调用相应的回调函数(accept_cbrecv_cbsend_cb)。
    • 处理完毕后继续等待新的事件。

基于Epoll实现的Reactor模型
https://kevin-aron.github.io/categories/C++/基于Epoll实现的Reactor模型/
作者
Iuk
发布于
2025年3月7日
许可协议