epoll的功能:实现I/O复用,即多路I/O。
一、epoll的系统调用函数
epoll只有epoll_create,epoll_ctl 和 epoll_wait 3个系统调用函数
1、epoll_create(int size)
创建一个epoll的句柄,size表示内核监听的数目量。
注意:当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
函数声明:epoll_create(int size);
注释:生成一个epoll的专用描述符,其实是在内核中申请一块空间,用来监听想要监听的socket fd上发生了什么事件。size是epoll监听的最大socket fd数目。
2、epoll_ctl (int epfd, int op, int fd, struct epoll_event *event)
将被监听的描述符添加到epoll句柄或从epool句柄中删除或者对监听事件进行修改。该函数用于控制某个epoll文件描述符上的事件,可以注册事件,修改事件,删除事件。
函数声明:epoll_ctl (int epfd, int op, int fd, struct epoll_event *event);
参数:
epfd:由 epoll_create 生成的epoll专用的文件描述符;
op:要进行的操作例如注册事件,可能的取值EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修改、EPOLL_CTL_DEL 删除;
fd:关联的文件描述符;
event:指向epoll_event的指针,即在内核中需要监听什么事件。
返回值:成功返回0,失败返回1。
// structepoll_event 的结构体:
typedef union epoll_data
{
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event
{
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events可以是以下几个宏的集合:
- EPOLLIN: 触发该事件,表示对应的文件描述符上有可读数据。(包括对端SOCKET正常关闭);
- EPOLLOUT: 触发该事件,表示对应的文件描述符上可以写数据;
- EPOLLPRI: 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
- EPOLLERR: 表示对应的文件描述符发生错误;
- EPOLLHUP: 表示对应的文件描述符被挂断;
- EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
- EPOLLONESHOT: 只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。
// 案例:
//定义epoll_event事件的变量
struct epoll_event ev;
//设置与要处理的事件相关的文件描述符
ev.data.fd=listenfd;
//设置要处理的事件类型
ev.events=EPOLLIN|EPOLLET;
//注册epoll事件
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
3、int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
等待事件的发生。参数events用来从内核得到事件的集合,maxevents表示内核events有多大(数组成员的个数),这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。
函数声明:int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
参数:
epfd:由epoll_create 生成的epoll专用的文件描述符;
epoll_event:用于回传代处理事件的数组;
maxevents:每次能处理的事件数;
timeout:等待I/O事件发生的超时值(单位我也不太清楚);-1相当于阻塞,0相当于非阻塞。一般用-1即可返回发生事件数。
返回值:成功返回处理事件的数目,失败返回0,表示已超时;
二、epoll的两种模式ET和LT
假如有这样一个例子:(LT方式,即默认方式下,内核会继续通知,可以读数据,ET方式,内核不会再通知,可以读数据)
- 我们已经把一个用来从管道中读取数据的文件句柄(RFD)添加到epoll描述符
- 这个时候从管道的另一端被写入了2KB的数据
- 调用epoll_wait(2),并且它会返回RFD,说明它已经准备好读取操作
- 然后我们读取了1KB的数据
- 调用epoll_wait(2)......
Edge Triggered工作模式:
如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait(2)之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候ET工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。在上面的例子中,会有一个事件产生在RFD句柄上,因为在第2步执行了一个写操作,然后,事件将会在第3步被销毁。因为第4步的读取操作没有读空文件输入缓冲区内的数据,因此我们在第5步调用epoll_wait(2)完成后,是否挂起是不确定的。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模式的epoll接口,在后面会介绍避免可能的缺陷。(LT方式可以解决这种缺陷)
i 基于非阻塞文件句柄
ii 只有当read(2)或者write(2)返回EAGAIN时(认为读完)才需要挂起,等待。但这并不是说每次read()时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read()返回的读到的数据长度小于请求的数据长度时(即小于sizeof(buf)),就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成。
Level Triggered工作模式(默认的工作方式)
以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll(2),并且无论后面的数据是否被使用,因此他们具有同样的职能。因为即使使用ET模式的epoll,在收到多个chunk的数据的时候仍然会产生多个事件。调用者可以设定EPOLLONESHOT标志,在epoll_wait(2)收到事件后epoll会与事件关联的文件句柄从epoll描述符中禁止掉。因此当EPOLLONESHOT设定后,使用带有EPOLL_CTL_MOD标志的epoll_ctl(2)处理文件句柄就成为调用者必须作的事情。
ET和LT的区别:
- LT(leveltriggered)是缺省的工作方式,并且同时支持block和no-blocksocket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。
- ET(edge-triggered)是高速工作方式,只支持no-blocksocket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了。
// epoll的常规用法
for(;;)
{
nfds = epoll_wait(kdpfd, events, maxevents, -1);
for(n = 0; n < nfds; ++n)
{
if(events[n].data.fd == listenfd) //有新的连接
{
client = accept(listener, (struct sockaddr *) &local,
&addrlen); //accept这个连接
if(client < 0)
{
perror("accept");
continue;
}
setnonblocking(client);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = client;
//将新的fd添加到epoll的监听队列中
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0)
{
fprintf(stderr, "epoll set insertion error: fd=%d\n",
client);
return -1;
}
}
else if( events[i].events&EPOLLIN ) //接收到数据,读socket
{
n = read(sockfd, line, MAXLINE)) < 0 //读
ev.data.ptr = md; //md为自定义类型,添加数据
ev.events=EPOLLOUT|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓
}
else if(events[i].events&EPOLLOUT) //有数据待发送,写socket
{
struct myepoll_data* md = (myepoll_data*)events[i].data.ptr; //取数据
sockfd = md->fd;
send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); //发送数据
ev.data.fd=sockfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改标识符,等待下一个循环时接收数据
}
else
{
// 其他处理;
}
}
}
// 案例:
#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
using namespace std;
#define MAXLINE 5
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5000
#define INFTIM 1000
void set_non_blocking(int sock) // 非阻塞
{
int opts;
opts = fcntl(sock, F_GETFL);
if (opts < 0)
{
perror("fcntl(sock, GETFL)");
exit(1);
}
opts = opts|O_NONBLOCK;
if (fcntl(sock, F_SETFL, opts) < 0)
{
perror("fcntl(sock, SETFL, opts)");
exit(1);
}
}
int main(int argc, char* argv[])
{
int i, maxi, listenfd, connfd, sockfd, epfd, nfds, portnumber;
ssize_t n;
char line[MAXLINE];
socklen_t clilen;
if ( 2 == argc )
{
if((portnumber = atoi(argv[1])) < 0 )
{
fprintf(stderr, "Usage:%s portnumber/a/n", argv[0]);
return 1;
}
}
else
{
fprintf(stderr, "Usage:%s portnumber/a/n", argv[0]);
return 1;
}
//声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
struct epoll_event ev, events[20];
//生成用于处理accept的epoll专用的文件描述符
epfd=epoll_create(256);
struct sockaddr_in clientaddr;
struct sockaddr_in serveraddr;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
//把socket设置为非阻塞方式
//setnonblocking(listenfd);
//设置与要处理的事件相关的文件描述符
ev.data.fd = listenfd;
//设置要处理的事件类型
ev.events = EPOLLIN|EPOLLET;
//ev.events = EPOLLIN;
//注册epoll事件
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
bzero(&serveraddr, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
char *local_addr = "127.0.0.1";
inet_aton(local_addr, &(serveraddr.sin_addr)); //htons(portnumber);
serveraddr.sin_port = htons(portnumber);
bind(listenfd, (sockaddr *)&serveraddr, sizeof(serveraddr));
listen(listenfd, LISTENQ);
maxi = 0;
for ( ; ; )
{
//等待epoll事件的发生
nfds = epoll_wait(epfd,events,20,500);
//处理所发生的所有事件
for(i=0; i<nfds; ++i)
{
if (events[i].data.fd == listenfd)//如果新监测到一个SOCKET用户连接到了绑定的SOCKET端口,建立新的连接。
{
connfd = accept(listenfd, (sockaddr *)&clientaddr, &clilen);
if(connfd<0){
perror("connfd<0");
exit(1);
}
//setnonblocking(connfd);
char *str = inet_ntoa(clientaddr.sin_addr);
cout << "accapt a connection from " << str << endl;
//设置用于读操作的文件描述符
ev.data.fd=connfd;
//设置用于注测的读操作事件
ev.events=EPOLLIN|EPOLLET;
//ev.events=EPOLLIN;
//注册ev
epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
}
//如果是已经连接的用户,并且收到数据,那么进行读入。
else if (events[i].events&EPOLLIN)
{
cout << "EPOLLIN" << endl;
if ((sockfd = events[i].data.fd) < 0)
continue;
if ( (n = read(sockfd, line, MAXLINE)) < 0)
{
if (errno == ECONNRESET)
{
close(sockfd);
events[i].data.fd = -1;
}
else
{
std::cout << "readline error" << std::endl;
}
}
else if (n == 0)
{
close(sockfd);
events[i].data.fd = -1;
}
line[n] = '/0';
std::cout << "read " << line << std::endl;
//设置用于写操作的文件描述符
ev.data.fd = sockfd;
//设置用于注测的写操作事件
ev.events = EPOLLOUT|EPOLLET;
//修改sockfd上要处理的事件为EPOLLOUT
//epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd,&ev);
}
else if (events[i].events&EPOLLOUT) //如果有数据发送
{
sockfd = events[i].data.fd;
write(sockfd, line, n);
//设置用于读操作的文件描述符
ev.data.fd=sockfd;
//设置用于注测的读操作事件
ev.events=EPOLLIN|EPOLLET;
//修改sockfd上要处理的事件为EPOLIN
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
}
}
}
return 0;
}