基于epoll的Reactor网络模型
Epoll API
epoll_create
epoll_create
用于创建一个 epoll
实例,返回一个 epoll
句柄(文件描述符 epfd
),用于后续 epoll_ctl
、epoll_wait
操作。
1 2 3 4 5
| #include <sys/epoll.h>
int epoll_create(int size);
int epoll_create1(int flags);
|
创建成功后,epfd
就可以用于管理多个 socket fd
。
epoll_event
epoll_event
结构体用于存储 epoll
监听的事件信息,包括触发的事件类型(events
)和 关联的文件描述符(data.fd
)
1 2 3 4
| struct epoll_event { uint32_t events; epoll_data_t data; };
|
events:EPOLLIN 可读事件、 EPOLLOUT 可写事件、 EPOLLET边缘触发(默认水平)、EPOLLERR 错误事件、 EPOLLRDHUP 对端关闭连接。
1 2 3
| struct epoll_event ev; ev.events = EPOLLIN | EPOLLET; ev.data.fd = sockfd;
|
epoll_ctl
epoll_ctl
用于向 epoll
实例中添加、修改或删除文件描述符(fd)。
1
| int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
|
op
(操作类型):
EPOLL_CTL_ADD
:向 epoll
实例中添加 fd
EPOLL_CTL_MOD
:修改 fd
监听的事件类型
EPOLL_CTL_DEL
:从 epoll
实例中删除 fd
fd
:要操作的文件描述符(socket fd)。
event
:epoll_event
结构体指针,EPOLL_CTL_DEL
操作时可以传 NULL
。
1 2 3 4 5 6 7 8
| struct epoll_event ev; ev.events = EPOLLIN | EPOLLET; ev.data.fd = sockfd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev) == -1) { perror("epoll_ctl failed"); exit(EXIT_FAILURE); }
|
epoll_wait
等待 epoll
事件触发,获取就绪 fd
列表。
epoll监听多客户端代码模板
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| struct epoll_event ev, events[MAX_EVENTS]; epfd = epoll_create(1); ev.events = EPOLLIN; ev.data.fd = sock_server; epoll_ctl(epfd, EPOLL_CTL_ADD, server_fd, &ev); while (1) { event_count = epoll_wait(epfd, events, MAX_EVENTS, -1); for (int i = 0; i < event_count; i++) { int fd = events[i].data.fd; if (fd == server_fd) else } } close(epfd);
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <sys/epoll.h>
#define MAX_EVENTS 10 #define PORT 8080
int main() { int server_fd, client_fd, epfd, event_count; struct sockaddr_in server_addr, client_addr; socklen_t addr_len = sizeof(client_addr); struct epoll_event ev, events[MAX_EVENTS];
server_fd = socket(AF_INET, SOCK_STREAM, 0); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = INADDR_ANY; server_addr.sin_port = htons(PORT);
bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)); listen(server_fd, 10);
epfd = epoll_create(1);
ev.events = EPOLLIN; ev.data.fd = server_fd; epoll_ctl(epfd, EPOLL_CTL_ADD, server_fd, &ev);
printf("Server listening on port %d...\n", PORT);
while (1) { event_count = epoll_wait(epfd, events, MAX_EVENTS, -1); for (int i = 0; i < event_count; i++) { int fd = events[i].data.fd; if (fd == server_fd) { client_fd = accept(server_fd, (struct sockaddr *)&client_addr, &addr_len); printf("New client connected: %d\n", client_fd); ev.events = EPOLLIN; ev.data.fd = client_fd; epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev); } else { char buffer[1024]; int bytes_read = read(fd, buffer, sizeof(buffer)); if (bytes_read > 0) { buffer[bytes_read] = '\0'; printf("Received: %s\n", buffer); write(fd, buffer, bytes_read); } else { printf("Client %d disconnected\n", fd); epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); close(fd); } } } } close(server_fd); close(epfd); return 0; }
|
Epoll原理剖析
epoll的两个重要结构:rbtree 和 rdlist。
rbtree是红黑树结构,为了管理通过epoll_ctl注册在epoll上的文件描述符(如sock_fd),插入,删除,查找均为O(logn)。
rdlist是用来存放接收缓冲区有数据的socket。这里细分为两种状态,ET(边缘触发):数据到达,会触发事件通知,程序未完全读完缓冲区数据,下次也不会继续通知剩余部分;LT(水平触发):只要缓冲区有数据,就会一直通过epoll_wait返回这些事件,直到读完为止。
这里大家可能有点懵逼,这和rdlist有什么关系?接下来会带着大家捋一下整个流程。
rdlist是epoll的结构体eventpoll中的一个成员变量,是epoll特有的。那之前IO复用是如何通知CPU将对应socket缓冲区有数据的进程重新放回工作队列中的呢,像select,采用的是“每个sock结构中的等待队列中放入它对应的进程,当接收缓冲区有数据的时候,发送中断信号,CPU重新把对应进程放入工作队列,此操作中,①查看缓冲区是否有数据,将有数据的进程从对应socket的等待队列中取出,放入工作队列,需要完整遍历一次socket列表;②在进程被唤醒后,需要知道哪些socket中有数据,通过FD_ISSET再判断出来,需要从用户态切换到内核态,且再遍历一次socket列表。”
每次调用select都需要将进程加入到所有监视socket的等待队列,每次唤醒都需要从每个队列中移除。这里涉及了两次遍历,而且每次都要将整个fds列表传递给内核,有一定的开销。正是因为遍历操作开销大,出于效率的考量,才会规定select的最大监视数量,默认只能监视1024个socket。
那介绍完select后,说回来epoll。为什么需要用一个rdlist存socket?
因为可以减少遍历的次数。从设计上,epoll**(struct eventpoll)会和其它socket(struct sock)**一样,内核会创建一个对应的结构体,位于内核态(kmalloc分配的内存区域),与调用进程绑定,此时。这种设计模式下,我们不会再把对应的阻塞进程加入到socket的等待队列中,而是通过epoll_ctl先将所有需要管理的socket注册到epoll中,当有进程因为epoll_wait被阻塞,将其放入eventpoll的等待队列中。当socket的接收缓冲区接收到数据后,会产生中断,中断程序会:①将接收到数据的socket的fd放入rdlist中(减少了一次查询遍历);②唤醒在等待队列中的进程,再次进入运行态(减少了一次访问socket中进程,再删除的遍历)。
1 2 3 4 5 6 7
| struct eventpoll { struct mutex mtx; struct rb_root_cached rbr; struct list_head rdlist; wait_queue_head_t wq; struct file *file; };
|

就绪列表引用着就绪的socket,所以它应能够快速的插入数据。所以就绪列表应是一种能够快速插入和删除的数据结构。
双向链表就是这样一种数据结构,epoll使用双向链表来实现就绪队列(对应上图的rdlist)。
既然epoll将“维护监视队列”和“进程阻塞”分离,也意味着需要有个数据结构来保存监视的socket。至少要方便的添加和移除,还要便于搜索,以避免重复添加。红黑树是一种自平衡二叉查找树,搜索、插入和删除时间复杂度都是O(log(N)),效率较好。epoll使用了红黑树作为索引结构(对应上图的rbr)。
ps:因为操作系统要兼顾多种功能,以及由更多需要保存的数据,rdlist并非直接引用socket,而是通过epitem间接引用,红黑树的节点也是epitem对象。同样,文件系统也并非直接引用着socket。
Reactor模型
高性能并发编程模型。应用:网络服务器、事件驱动编程。
利用I/O多路复用(epoll, select, poll)处理并发连接。
**Reactor将IO管理,转变为事件管理。**不同的IO事件,对应不同的回调函数,先register,后callback。
1 2 3 4
| listenfd EPOLLIN accept_cb clientfd EPOLLIN recv_cb clientfd EPOLLOUT send_cb
|
Reactor 模式本质上是对 epoll
(或其他 I/O 多路复用机制)的封装,它提供了一种事件驱动的异步编程模型,用于高效处理多个 I/O 连接。Reactor
是一种 编程模式,基于 epoll
组织事件循环,提高代码的扩展性和可维护性。
大部分高性能网络框架(如 Nginx、Redis、Netty)都基于 Reactor 模式,并使用 epoll
作为底层实现。
一个典型的 Reactor 框架会抽象出 事件分发器(Dispatcher)、事件处理器(Handler).
优势:
- 封装
epoll
,提供面向对象的 API
- 支持回调机制(
handleEvent()
)
- 更易扩展(可以用于不同的 I/O 事件)
直接使用epoll 缺点:
- 代码层面是面向系统调用的,难以扩展。
- 没有事件回调机制,必须手动遍历事件列表。
Reactor完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
| #include <errno.h> #include <stdio.h> #include <sys/socket.h> #include <netinet/in.h> #include <string.h> #include <unistd.h> #include <sys/epoll.h>
#define BUFFER_LENGTH 1024 #define CONNECTION_SIZE 1024
typedef int (*RCALLBACK)(int fd);
int accept_cb(int fd); int recv_cb(int fd); int send_cb(int fd);
int epfd = 0;
struct conn{ int fd; char rbuffer[BUFFER_LENGTH]; int rlength; char wbuffer[BUFFER_LENGTH]; int wlength; RCALLBACK send_callback; union{ RCALLBACK recv_callback; RCALLBACK accept_callback; } r_action; }; struct conn conn_list[CONNECTION_SIZE] = {0};
int set_event(int fd, int event, int flag){ if (flag){ struct epoll_event ev; ev.events = event; ev.data.fd = fd; epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); } else{ struct epoll_event ev; ev.events = event; ev.data.fd = fd; epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev); } } int event_register(int fd, int event){ if(fd < 0) return -1; conn_list[fd].fd = fd; conn_list[fd].r_action.recv_callback = recv_cb; conn_list[fd].send_callback = send_cb; memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH); conn_list[fd].rlength = 0; memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH); conn_list[fd].wlength = 0; set_event(fd, event, 1); } int accept_cb(int fd){ struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); int clientfd = accept(fd, (struct sockaddr *)&clientaddr, &len); printf("accept finished: %d\n", clientfd); if(clientfd < 0){ printf("accept errno: %d --> %s\n", errno, strerror(errno)); return -1; } event_register(clientfd, EPOLLIN); return 0; } int recv_cb(int fd){ int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0); if(count == 0){ printf("client disconnect: %d\n", fd); close(fd); epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); return 0; } conn_list[fd].rlength = count; printf("RECV: %s\n", conn_list[fd].rbuffer);
conn_list[fd].wlength = conn_list[fd].rlength; memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength); set_event(fd, EPOLLOUT, 0); return count; } int send_cb(int fd){ int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0); set_event(fd, EPOLLIN, 0); return count; } int init_server(unsigned short port){ int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in servaddr; servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(port);
bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)); listen(sockfd, 10); return sockfd; }
int main(){ epfd = epoll_create(1); int sockfd = init_server(2000); conn_list[sockfd].fd = sockfd; conn_list[sockfd].r_action.recv_callback = accept_cb; set_event(sockfd, EPOLLIN, 1); while(true){ struct epoll_event events[1024]; int nready = epoll_wait(epfd, events, 1024, -1); for (int i = 0; i < nready; ++i){ int connfd = events[i].data.fd; if (events[i].events & EPOLLIN) conn_list[connfd].r_action.recv_callback(connfd); if (events[i].events & EPOLLOUT) conn_list[connfd].send_callback(connfd); } } }
|
Rector函数说明
1. set_event(int fd, int event, int flag)
作用:配置 epoll
事件,将 fd
注册到 epoll
实例中,并指定监听的事件类型。
参数:
fd
:要监听的文件描述符(socket)。
event
:监听的事件(如 EPOLLIN
、EPOLLOUT
)。
flag
:操作类型(EPOLL_CTL_ADD
添加新 fd
,EPOLL_CTL_MOD
修改 fd
的事件)。
实现逻辑:
- 创建
epoll_event
结构体,设置事件类型。
- 调用
epoll_ctl()
在 epoll
实例中注册或修改 fd
事件。
2. event_register(int fd, int event)
作用:初始化并注册一个新的 fd
到 epoll
,使其能够监听指定的事件。
参数:
fd
:需要注册的 socket
文件描述符。
event
:监听的事件类型(如 EPOLLIN
)。
实现逻辑:
- 为
fd
关联必要的资源(如 I/O 缓冲区、回调函数)。
- 调用
set_event()
将其注册到 epoll
,监听可读(EPOLLIN
)或可写(EPOLLOUT
)事件。
3. accept_cb(int fd)
作用:当服务器监听的 socket
变为可读(表示有新连接)时,调用此回调函数接受新连接。
参数:
fd
:监听的服务器 socket
。
实现逻辑:
- 调用
accept()
接受新的客户端连接。
- 为新连接创建
socket
并设置非阻塞模式。
- 打印客户端连接信息(如 IP 地址、端口)。
- 调用
event_register()
注册新 socket
,监听 EPOLLIN
事件(可读)。
4. recv_cb(int fd)
作用:当 fd
变为可读(即有数据到达)时,调用此回调函数读取数据。
参数:
fd
:客户端的 socket
文件描述符。
实现逻辑:
- 从
fd
读取数据到接收缓冲区。
- 处理收到的数据(如打印、回显)。
- 若客户端断开连接(
recv()
返回 0
),关闭 fd
并移除 epoll
监听。
5. send_cb(int fd)
作用:当 fd
变为可写(即发送缓冲区有空间)时,调用此回调函数发送数据。
参数:
fd
:客户端的 socket
文件描述符。
实现逻辑:
- 从发送缓冲区取数据,写入
fd
。
- 若数据发送完毕,将
fd
事件重新设置为 EPOLLIN
,继续监听可读事件。
- 若未发送完全部数据,继续监听
EPOLLOUT
,等待可写事件再次触发。
6. init_server(unsigned short port)
作用:初始化服务器,创建 socket
并监听指定端口,准备接受客户端连接。
参数:
port
:服务器监听的端口号。
实现逻辑:
- 创建
socket
,绑定 port
,设置非阻塞模式。
- 调用
listen()
让 socket
进入监听模式。
- 调用
event_register()
将 server_fd
注册到 epoll
,监听 EPOLLIN
事件(新连接)。
7. main()
作用:程序主入口,负责初始化 epoll
事件循环,并分发 I/O 事件。
实现逻辑:
创建 epoll
实例。
调用 init_server(port)
初始化服务器 socket
并注册到 epoll
。
进入 epoll_wait()
事件循环:
- 等待
epoll
事件触发。
- 根据
fd
状态调用相应的回调函数(accept_cb
、recv_cb
、send_cb
)。
- 处理完毕后继续等待新的事件。