epoll 的使用

2014-12-14 12:53:14   最后更新: 2015-05-27 22:29:30   访问数量:1700




epoll 是 linux 内核为处理大批量文件描述符而对 poll 进行的改进版本,是 linux 下多路复用 IO 接口 select/poll 的增强版本,显著提高了程序在大量并发连接中只有少量活跃的情况下的CPU利用率

在获取事件时,它无需遍历整个被侦听描述符集,只要遍历被内核 IO 事件异步唤醒而加入 ready 队列的描述符集合就行了

epoll 除了提供 select/poll 所提供的 IO 事件的电平触发,还提供了边沿触发,,这样做可以使得用户空间程序有可能缓存 IO 状态,减少 epoll_wait 或 epoll_pwait 的调用,提高程序效率

 

当某个进程调用 epoll_create 函数创建 epoll 专用的文件描述符时,Linux 内核会创建一个 eventpoll 结构体变量:

struct eventpoll { ... struct rb_root rbr; // 红黑树根节点,存储 epoll 中所有事件 struct list_head rdllist; // 双向链表,保存需要 epoll_wait 返回的事件 ... }

 

每一个 epoll 对象都拥有一个独立的 eventpoll 结构体,这个结构体会在内核空间中分配独立的内存,用于存储使用 epoll_ctl 函数向 epoll 对象中添加进来的事件

每一个事件都会挂到红黑树 rbr 上,这样重复添加的时间就可以通过红黑树结构快速识别并避免加入,保证了 epoll_ctl 函数的效率

 

所有添加到 epoll 中的时间都会与设备驱动程序建立回调关系,一旦某个事件发生,则设备驱动程序会调用相应的回调函数,这个回调函数就是 ep_poll_callback,它会把相应事件放到 rdllist 这个双向链表中

这个双向链表的元素是 epitem 结构体类型的:

struct epitem { // 红黑树节点 struct rb_node rbn; // 双向链表节点 struct list_head rdllink; // 事件 fd 等信息 struct epoll_filefd ffd; // 指向所属的 eventpoll 对象的指针 struct eventpoll *ep; // 期待时间类型 struct epoll_event event; }

 

 

epoll 的创建

int epoll_create(int size);

 

创建一个 epoll 专用的文件描述符,调用成功返回描述符,否则返回 -1

需要注意的是,该描述符使用完毕后同样需要 close 操作

 

size 参数用来告诉内核监听的数目,自从 linux 2.6.8 开始,size 参数被忽略,但是依然必须大于 0

 

事件注册函数

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

 

调用成功返回0,否则返回 -1

参数说明

  • epfd -- epoll_create 返回的 epoll 专用的文件描述符

  • op -- 表示参数,有以下取值:
epoll_ctl 动作参数取值
取值动作
EPOLL_CTL_ADD注册新的 fd 到 epfd 中
EPOLL_CTL_MOD修改已经注册的 fd 的监听事件
EPOLL_CTL_DEL从 epfd 中删除一个 fd

  • fd -- 需要监听的fd

  • event -- 监听事件类型
struct epoll_event { __uint32_t events; epoll_data_t data; }

 

可选以下几个宏的组合:

epoll_ctl event 参数可选宏
说明
EPOLLIN文件描述符可读(或对端 socket 正常关闭)
EPOLLOUT文件描述符可写
EPOLLPRI文件描述符有带外数据可读
EPOLLERR文件描述符发生错误
EPOLLHUP文件描述符被挂断
EPOLLET将 epoll 设为边缘触发
EPOLLONESHOT只监听一次

 

等待事件产生

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

 

返回需要处理的事件数目,如返回 0 表示超时,调用失败返回 -1

 

类似于 select 函数

参数说明

  • epfd -- epoll 专用文件描述符

  • events -- 事件集合

  • maxevents -- 每次能处理的最大事件数,不能大于 epoll_create 的 size 参数

  • timeout -- 超时时间,以毫秒为单位,0 表示立即返回,-1 表示永远阻塞

支持同时打开大量的文件描述符

select 函数对一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置,默认值是1024,这对于一个服务器来说显然是太少了,虽然修改这个宏之后重新编译系统可以解决这个问题,但是随着 FD_SETSIZE 值的上升,select 函数的性能会显著下降

传统 Apache 服务器对此的解决方案是使用多进程的方式来打开大于 FD_SETSIEZE 的文件描述符,但是开辟进程的效率和资源都有一定的消耗,同时进程间数据同步也远没有线程间数据同步来的高效

epoll 能够打开的 FD 与系统能够持有的 FD 数目是一致的,只受限于系统的内存

 

IO效率不随 FD 数目增加而线性下降

传统的 select、poll 具有一个致命弱点,每当有数据可读或可写,都需要对整个描述符集合进行扫描,这样如果文件描述符集合很大,而同时又有大量空闲连接,则效率下降会非常明显

 

使用mmap加速内核与用户空间的消息传递

epoll是通过内核与用户空间mmap同一块内存实现的,这样就可以避免从内核空间通知用户空间的时候不必要的拷贝了

 

内核微调

内核的 TCP/IP 协议栈使用内存池管理 sk_buff 结构,通过在运行时改变 /proc/sys/net/core/hot_list_length 的值,即可动态调整整个内存池的大小,如 listen 函数所指示的3次握手数据包队列长度也可以根据平台内存动态调整

 

边缘触发(Edge Triggered)

边缘触发模式是高速工作方式,只支持 no-block socket

在这种模式下,当描述符从未就绪变为就绪时,内核通过 epoll 告知调用者

然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)

需要注意的是,如果一直不对这个 fd 作 IO 操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once),不过在TCP协议中,ET 模式的加速效用仍需要更多的 benchmark 确认(这句话不理解)。

由于边缘触发模式下,epoll 产生一个EPOLLIN事件后,不会再次发出更多的通知,所以调用者需要判断 recv 返回的大小是否等于请求的大小,如果 recv 返回的大小小于请求的大小,则说明有可能数据仍然存在于缓冲区中,需要再次进行读取

while(rs) { buflen = recv(activeevents.data.fd, buf, sizeof(buf), 0); if(buflen < 0) { // 由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可读 // 在这里就当作是该次事件已处理处. if(errno == EAGAIN) break; else return; } else if(buflen == 0) { // 这里表示对端的socket已正常关闭. } if(buflen == sizeof(buf) rs = 1; // 需要再次读取 else rs = 0; }

 

 

水平触发(Level Triggered)

epoll 的默认工作模式

在水平触发模式下,epoll 相当于一个较快的 poll

 

SERVER

/** * date: 2015-01-02 * file: main.c * author: 龙泉居士 */ #include "function/function.h" int main(int argc, char **argv) { int listenfd, connfd, epollfd, nfds, n, curfds, accept_count = 0; struct sockaddr_in cliaddr; socklen_t socklen = sizeof(struct sockaddr_in); struct epoll_event ev; struct epoll_event events[MAXEPOLLSIZE]; char buf[MAXLINE]; listenfd = get_listenfd(); epollfd = get_epollfd(listenfd, &ev); curfds = 1; printf("epollserver startup, port %d, max connection is %d, backlog is %d\n", SERV_PORT, MAXEPOLLSIZE, LISTENQ); while (1) { /* 等待有事件发生 */ nfds = epoll_wait(epollfd, events, curfds, -1); if (nfds == -1) { perror("epoll_wait"); continue; } /* 处理所有事件 */ for (n = 0; n < nfds; ++n) { if (events[n].data.fd == listenfd) { connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &socklen); if (connfd < 0) { perror("accept"); continue; } sprintf(buf, "accept form %s:%d\n", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port); printf("%d:%s", ++accept_count, buf); if (curfds >= MAXEPOLLSIZE) { fprintf(stderr, "too many connection, more than %d\n", MAXEPOLLSIZE); close(connfd); continue; } if (setnonblocking(connfd) < 0) { perror("setnonblocking error"); return -1; } ev.events = EPOLLIN | EPOLLET; ev.data.fd = connfd; if (epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &ev) < 0) { fprintf(stderr, "add socket '%d' to epoll failed: %s\n", connfd, strerror(errno)); return -1; } curfds++; continue; } // 处理客户端请求 if (handle(events[n].data.fd) < 0) { epoll_ctl(epollfd, EPOLL_CTL_DEL, events[n].data.fd,&ev); curfds--; } } } close(listenfd); return 0; }

 

/** * date: 2015-01-02 * file: function.h * author: 龙泉居士 */ #ifndef EPOLLSERV_FUNCTION_20150102 #define EPOLLSERV_FUNCTION_20150102 #include <unistd.h> #include <sys/types.h> /* basic system data types */ #include <sys/socket.h> /* basic socket definitions */ #include <netinet/in.h> /* sockaddr_in{} and other Internet defns */ #include <arpa/inet.h> /* inet(3) functions */ #include <sys/epoll.h> /* epoll function */ #include <fcntl.h> /* nonblocking */ #include <sys/resource.h> /* setrlimit */ #include <stdlib.h> #include <errno.h> #include <stdio.h> #include <string.h> #define LISTENQ 1024 #define MAXLINE 10240 #define SERV_PORT 8888 #define MAXEPOLLSIZE 10000 int handle(int); int setnonblocking(int); int get_listenfd(); int get_epollfd(int, struct epoll_event *); #endif

 

/** * date: 2015-01-02 * file: function.c * author: 龙泉居士 */ #include "function.h" int setnonblocking(int sockfd) { if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK) < 0) { perror("fcntl"); exit(-1); } return 0; } int get_listenfd() { int opt = 1; int listenfd; struct rlimit rt; struct sockaddr_in servaddr; /* 设置每个进程允许打开的最大文件数 */ rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE; if (setrlimit(RLIMIT_NOFILE, &rt) == -1) { perror("setrlimit"); exit(-1); } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl (INADDR_ANY); servaddr.sin_port = htons (SERV_PORT); listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd < 0) { perror("can't create socket file"); exit(-1); } // 允许绑定已经被使用的端口 setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); // 设置连接为非阻塞状态 if (setnonblocking(listenfd) < 0) { perror("setnonblock"); exit(-1); } if (bind(listenfd, (struct sockaddr *) &servaddr, sizeof(struct sockaddr)) < 0) { perror("bind"); exit(-1); } if (listen(listenfd, LISTENQ) < 0) { perror("listen"); exit(-1); } return listenfd; } int get_epollfd(int listenfd, struct epoll_event *pev) { /* 创建 epoll 句柄,把监听 socket 加入到 epoll 集合里 */ int epollfd = epoll_create(MAXEPOLLSIZE); pev->events = EPOLLIN | EPOLLET; pev->data.fd = listenfd; if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, pev) < 0) { perror("epoll_ctl"); exit(-1); } return epollfd; } int handle(int connfd) { int nread; char buf[MAXLINE]; nread = read(connfd, buf, MAXLINE);//读取客户端socket流 if (nread == 0) { printf("client close the connection\n"); close(connfd); return -1; } if (nread < 0) { perror("read error"); close(connfd); return -1; } write(connfd, buf, nread);//响应客户端 return 0; }

 

CLIENT

/** * date: 2015-01-02 * file: main.c * author: 龙泉居士 */ #include "function/function.h" int main(int argc, char **argv) { int sockfd = connect_serv(); printf("welcome to echoclient\n"); handle(sockfd); /* do it all */ close(sockfd); printf("exit\n"); return 0; }

 

/** * date: 2015-01-02 * file: function.h * author: 龙泉居士 */ #ifndef EPOLLCLI_FUNCTION_20150102 #define EPOLLCLI_FUNCTION_20150102 #include <netdb.h> /*gethostbyname function */ #include <errno.h> #include <stdio.h> #include <string.h> #include <stdlib.h> #include <unistd.h> #include <arpa/inet.h> /* inet(3) functions */ #include <sys/types.h> /* basic system data types */ #include <sys/socket.h> /* basic socket definitions */ #include <netinet/in.h> /* sockaddr_in{} and other Internet defns */ #define MAXLINE 1024 #define IPADDR "127.0.0.1" #define SERV_PORT 8888 void handle(int connfd); int connect_serv(); #endif

 

/** * date: 2015-01-02 * file: function.c * author: 龙泉居士 */ #include "function.h" void handle(int sockfd) { char sendline[MAXLINE], recvline[MAXLINE]; int n; for (;;) { if (fgets(sendline, MAXLINE, stdin) == NULL) { break;//read eof } n = write(sockfd, sendline, strlen(sendline)); n = read(sockfd, recvline, MAXLINE); if (n == 0) { printf("echoclient: server terminated prematurely\n"); break; } write(STDOUT_FILENO, recvline, n); } } int connect_serv() { int sockfd; struct sockaddr_in servaddr; if ((sockfd = socket (AF_INET, SOCK_STREAM, 0)) <= 0) { perror ("socket error"); exit(-1); } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(SERV_PORT); inet_pton(AF_INET, IPADDR, &servaddr.sin_addr.s_addr); if (connect (sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr))) { perror ("connect error"); exit(-1); } return sockfd; }

 

 






读书笔记      技术帖      linux      unix      network      c语言      unp      unix网络编程      网络编程      龙潭书斋      io      select      epoll      poll      io复用     


京ICP备15018585号