跳到主要内容

Linux epoll 使用示例

网络服务器传统上采用每个连接使用一个进程/线程的方式实现。但是由于资源使用和上下文切换时间等因素的影响,限制了服务器的并发能力,因此这种实现方式不适合那些需要处理并发的大量客户端请求的高性能应用。一个解决办法是在单线程上使用非阻塞 I/O,以及准备就绪通知方法,它在可以从套接字上读或写更多数据时通知你。

本文介绍 Linux 的 epoll 机制,它是 Linux 下最好的准备就绪通知机制。我们将用 C 给出一个完整的 TCP 服务器实现。我假定你有 C 编程经验,知道在 Linux 上如何编译和运行程序,并能够在 Man 手册查看用到的各种 C 函数。

epoll 机制

epoll 是在 Linux 2.6 中引入的,在其他的类 UNIX 操作系统中不可用。它提供了和 selectpoll 类似的机制:

  • select 可以最多同时监视 FD_SETSIZE 个描述符,通常是一个较小的数(比如 1024)。
  • poll 没有同时监视的描述符个数的限制,但是它在每次检查准备就绪的通知时需要扫描所有的描述符,这是 O(n) 的而且比较慢。

epoll 没有固定的限制,也不执行线性检查,因此它的效率更高,可以处理更多的事件。

epoll_createepoll_create1 创建 epoll 实例。 epoll_ctl 用来添加/删除需要观察的描述符。用 epoll_wait 等待观察集合上的事件,它阻塞直到有事件发生。更多的相关信息请见 Man 手册。

epoll 触发模式

当描述符添加到 epoll 实例中时,有两种模式:水平触发和边缘触发。

  • **LT(Level Triggered,水平触发)**是缺省的工作方式,并且同时支持 block 和 non-block socket。在这种模式下,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的 fd 进行 I/O 操作,如果你不作任何操作,内核还是会继续通知你的。所以,这种模式编程出错的可能性要小一点,传统的 select/poll 都是这种模型的代表。

    当被监控的文件描述符上有可读写事件发生时,epoll_wait() 会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait() 时,它还会通知你在上次没有读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率!!!

  • **ET(Edge Triggered,边缘触发)**是高速工作方式,只支持 non-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核将通过 epoll 通知你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态

    当被监控的文件描述符上有可读写事件发生时,epoll_wait() 会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait() 时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符!!!

接口函数

epoll 相关的接口函数声明在 sys/epoll.h 头文件,主要有三类函数:创建、操作 和 等待。

创建

int epoll_create(int size);
参数描述
size用来告诉内核要监听的 socket 数目一共有多少个,
但从 Linux 2.6.8 开始,size 参数就被忽略,只要大于零即可。
返回
≥0执行成功返回一个非负整数的文件描述符,作为创建好的 epoll 句柄。
-1执行失败,返回 -1,错误信息可以通过 errno 获得。

另外,系统还提供了 epoll_create1 函数,当其参数 flags 为 0 时,除了丢弃过时的 size 参数之外,它的效果与 epoll_create 一样。

int epoll_create1(int flags);
参数描述
flagsEPOLL_CLOEXEC :在新文件描述符上设置 close-on-exec (FD_CLOEXEC) 标志。
返回
≥0执行成功返回一个非负整数的文件描述符,作为创建好的 epoll 句柄。
-1执行失败,返回 -1,错误信息可以通过 errno 获得。

实现(eventpoll.c)

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
return do_epoll_create(flags);
}

SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;

return do_epoll_create(0);
}

操作

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数描述
epfd
op
fd
event
返回

等待

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t *sigmask);

应用示例

当你使用水平触发模式时,如果数据可读, epoll_wait 会总是返回准备好的事件。如果数据没有读完,再次调用 epoll_wait ,它会再次返回这个描述符的准备好的事件,因为数据可读。而边缘触发模式中,只能得到一次准备就绪通知。如果你没有读取全部数据,然后再次调用 epoll_wait 来查看该描述符,它将阻塞,因为准备就绪事件已经发送过了。

传递给 epoll_ctlepoll 事件结构如下。每个被观察的描述符可以关联一个整型变量或指针作为用户数据。

typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

现在让我们开始写代码。我们将实现一个微型服务器,它将打印所有发送到套接字的数据到标准输出。我们将从写一个创建并绑定 TCP 套接字的函数 create_and_bind 开始:

static int create_and_bind (char *port)
{
struct addrinfo hints;
struct addrinfo *result, *rp;
int s, sfd;

memset (&hints, 0, sizeof (struct addrinfo));
hints.ai_family = AF_UNSPEC; /* Return IPv4 and IPv6 choices */
hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */
hints.ai_flags = AI_PASSIVE; /* All interfaces */

s = getaddrinfo (NULL, port, &hints, &result);
if (s != 0) {
fprintf (stderr, "getaddrinfo: %s\n", gai_strerror (s));
return -1;
}

for (rp = result; rp != NULL; rp = rp->ai_next) {
sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == -1)
continue;

s = bind (sfd, rp->ai_addr, rp->ai_addrlen);
if (s == 0) {
/* We managed to bind successfully! */
break;
}
close (sfd);
}

if (rp == NULL) {
fprintf (stderr, "Could not bind\n");
return -1;
}

freeaddrinfo (result);
return sfd;
}

create_and_bind 包括了一段可移植的获取 IPv4 或 IPv6 套接字的标准代码块。它以字符串形式接受一个端口参数,可以用 argv[1] 来传递。 getaddrinfo 函数返回一串和 hints 参数兼容的 addrinfo 结构。

addrinfo 结构如下:

struct addrinfo {
int ai_flags;
int ai_family;
int ai_socktype;
int ai_protocol;
size_t ai_addrlen;
struct sockaddr *ai_addr;
char *ai_canonname;
struct addrinfo *ai_next;
};

我们逐个遍历这些结构,尝试用它们创建套接字。如果成功, create_and_bind 函数返回套接字描述符,否则返回 -1。

接下来,我们写一个函数将套接字修改为非阻塞。 make_socket_non_blocking 函数设置描述符的 O_NONBLOCK 标志:

static int make_socket_non_blocking (int sfd)
{
int flags, s;

flags = fcntl (sfd, F_GETFL, 0);
if (flags == -1) {
perror ("fcntl");
return -1;
}

flags |= O_NONBLOCK;
s = fcntl (sfd, F_SETFL, flags);
if (s == -1) {
perror ("fcntl");
return -1;
}

return 0;
}

现在再来看 main 函数,它包含事件循环,是程序的主体:

#define MAXEVENTS 64

int main (int argc, char *argv[])
{
int sfd, s;
int efd;
struct epoll_event event;
struct epoll_event *events;

if (argc != 2) {
fprintf (stderr, "Usage: %s [port]\n", argv[0]);
exit (EXIT_FAILURE);
}

sfd = create_and_bind (argv[1]);
if (sfd == -1)
abort ();

s = make_socket_non_blocking (sfd);
if (s == -1)
abort ();

s = listen (sfd, SOMAXCONN);
if (s == -1) {
perror ("listen");
abort ();
}

efd = epoll_create1 (0);
if (efd == -1) {
perror ("epoll_create");
abort ();
}

event.data.fd = sfd;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
if (s == -1) {
perror ("epoll_ctl");
abort ();
}

/* Buffer where events are returned */
events = calloc (MAXEVENTS, sizeof event);

/* The event loop */
while (1) {
int n, i;
n = epoll_wait (efd, events, MAXEVENTS, -1);
for (i = 0; i < n; i++)
{
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN)))
{
/* An error has occured on this fd, or the socket is not
ready for reading (why were we notified then?) */
fprintf (stderr, "epoll error\n");
close (events[i].data.fd);
continue;
}

else if (sfd == events[i].data.fd) {
/* We have a notification on the listening socket, which
means one or more incoming connections. */
while (1) {
struct sockaddr in_addr;
socklen_t in_len;
int infd;
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

in_len = sizeof in_addr;
infd = accept (sfd, &in_addr, &in_len);
if (infd == -1) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
/* We have processed all incoming connections. */
break;
} else {
perror ("accept");
break;
}
}

s = getnameinfo(&in_addr, in_len, hbuf, sizeof(hbuf),
sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV);
if (s == 0) {
printf("Accepted connection on descriptor %d "
"(host=%s, port=%s)\n", infd, hbuf, sbuf);
}

/* Make the incoming socket non-blocking and add it to the
list of fds to monitor. */
s = make_socket_non_blocking (infd);
if (s == -1)
abort ();

event.data.fd = infd;
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);
if (s == -1) {
perror ("epoll_ctl");
abort ();
}
}
continue;
} else {
/* We have data on the fd waiting to be read. Read and
display it. We must read whatever data is available
completely, as we are running in edge-triggered mode
and won't get a notification again for the same data. */
int done = 0;
while (1) {
ssize_t count;
char buf[512];

count = read (events[i].data.fd, buf, sizeof buf);
if (count == -1) {
/* If errno == EAGAIN, that means we have read all
data. So go back to the main loop. */
if (errno != EAGAIN) {
perror ("read");
done = 1;
}
break;
} else if (count == 0) {
/* End of file. The remote has closed the connection. */
done = 1;
break;
}

/* Write the buffer to standard output */
s = write (1, buf, count);
if (s == -1) {
perror ("write");
abort ();
}
}

if (done) {
printf ("Closed connection on descriptor %d\n", events[i].data.fd);
/* Closing the descriptor will make epoll remove it
from the set of descriptors which are monitored. */
close (events[i].data.fd);
}
}
}
}

free(events);
close(sfd);
return EXIT_SUCCESS;
}

main 函数首先调用 create_and_bind 函数来建立套接字。然后将套接字设置为非阻塞,然后调用 listen 函数。接下来创建一个 epoll 实例 efd ,以边缘触发模式向它添加监听套接字 sfd 来观察输入事件。

外面的 while 循环是主事件循环。调用 epoll_wait 函数阻塞线程来等待事件。当有事件发生时, epoll_wait 函数通过 events 参数返回事件。

当我们添加新的要观察的连接,以及移除已经终止的连接时, epoll 实例在事件循环中不断更新。

当有事件发生时,有三种类型:

  • 出错。当一个错误条件发生时,或者事件不是一个有关数据可读的通知,关闭关联的描述符。关闭描述符会自动将它从 epoll 实例的观察集合中移除。
  • 新连接。当监听描述符 sfd 可读时,意味着有一个或多个新的连接到达。调用 accept 函数接受这些连接,打印一条消息,然后将套接字修改为非阻塞并将它添加到 epoll 实例的观察集合中。
  • 客户端数据。当任何客户端描述符上有数据可读时,我们使用 read 函数读取数据。我们必须读取全部可读数据,因为在边缘触发模式下只产生一个事件。读取的数据用 write 函数写到标准输出。如果 read 函数返回0,代表遇到 EOF ,可以关闭客户端连接了。如果为-1并且 errnoEAGAIN ,表示这个事件的所有数据都已经读完,可以回到主循环。

就是这样。不断循环,向观察集合中添加和删除描述符。