1、惊群效应(thundering herd)
什么是惊群效应
惊群现象就是多进程(多线程)在同时阻塞等待同一个事件的时候(休眠状态),如果等待的这个事件发生,那么他就会唤醒等待的所有进程(或者线程),但是最终却只可能有一个进程(线程)获得这个时间的“控制权”,对该事件进行处理,而其他进程(线程)获取“控制权”失败,只能重新进入休眠状态,这种现象和性能浪费就叫做惊群。
为了更好的理解何为惊群,举一个很简单的例子,当你往一群鸽子中间扔一粒谷子,所有的各自都被惊动前来抢夺这粒食物,但是最终注定只可能有一个鸽子满意的抢到食物,没有抢到的鸽子只好回去继续睡觉,等待下一粒谷子的到来。这里鸽子表示进程(线程),那粒谷子就是等待处理的事件。
惊群效应到底消耗了什么
我想你应该也会有跟我一样的问题,那就是惊群效应到底消耗了什么?
(1)系统对用户进程/线程频繁地做无效的调度,上下文切换系统性能大打折扣。
(2)为了确保只有一个线程得到资源,用户必须对资源操作进行加锁保护,进一步加大了系统开销。
是不是还是觉得不够深入,概念化?看下面:
(1)上下文切换(context switch)过高会导致cpu像个搬运工,频繁地在寄存器和运行队列之间奔波,更多的时间花在了进程(线程)切换,而不是在真正工作的进程(线程)上面。直接的消耗包括cpu寄存器要保存和加载(例如程序计数器)、系统调度器的代码需要执行。间接的消耗在于多核cache之间的共享数据。
(2)通过锁机制解决惊群效应是一种方法,在任意时刻只让一个进程(线程)处理等待的事件。但是锁机制也会造成cpu等资源的消耗和性能损耗。目前一些常见的服务器软件有的是通过锁机制解决的,比如nginx(它的锁机制是默认开启的,可以关闭);还有些认为惊群对系统性能影响不大,没有去处理,比如lighttpd。
2. accept的惊群问题
Google了一下:其实在linux2.6版本以后,linux内核已经解决了accept函数的“惊群”现象,大概的处理方式就是,当内核接收到一个客户连接后,只会唤醒等待队列上的第一个进程(线程),所以如果服务器采用accept阻塞调用方式,在最新的linux系统中已经没有“惊群效应”了。
看了下kernel代码,发现2.6确实已经解决了,那2.6之前的内核都有惊群问题吗?其实不是的,下载了几个内核版本,通过代码得出结论(没实际验证),情况如下:
linux-2.2.2 会唤醒等待队列所有进程
linux-2.3.13 只会唤醒第一个进程
linux-2.4.34只会唤醒第一个进程
linux-2.5.27只会唤醒第一个进程
所以应该是从linux-2.2.2后的某个内核版本修复的,但具体版本号还不太清楚。
accept惊群问题未解决时的代码流程
接下来看看linux-2.2.2版本中堵塞唤醒代码流程,了解为什么会唤醒所有进程?为什么会出现惊群现象?
用户程序调用accept时,会调到内核态中的tcp_accept,如果没有设置非堵塞,会调用wait_for_connect(sk, &prev)堵塞进程,等待事件到来。
static struct open_request * wait_for_connect(struct sock * sk,
struct open_request **pprev)
{
struct wait_queue wait = { current, NULL };
struct open_request *req;
//将wait添加到sk的等待队列头sleep中
//假如fork了10个进程,每个进程都调用accept,则会将10个进程的wait添加到sk的等待队列头 sleep上。
add_wait_queue(sk->sleep, &wait);
for (;;) {
current->state = TASK_INTERRUPTIBLE;
//释放锁。
release_sock(sk);
//进程调度,执行其他进程,直到被唤醒
schedule();
//如果事件发生后,这个10个进程都会被被唤醒,从此处开始执行,
//首先获取锁。
//即10个进程只有一个进程才能获取锁成功,继而获取新连接成功
lock_sock(sk);
//第一个获取到锁的进程也会获取到新连接,跳出循环,继续执行。
//其他进程没获取锁的继续执行调度,等待新连接的到来
//所以这10个进程可以循环接收新连接
req = tcp_find_established(&(sk->tp_pinfo.af_tcp), pprev);
if (req)
break;
if (signal_pending(current))
break;
}
//进程被唤醒并且获取连接后,将本进程从等待队列中删除。
current->state = TASK_RUNNING;
remove_wait_queue(sk->sleep, &wait);
return req;
}
extern inline void add_wait_queue(struct wait_queue ** p, struct wait_queue * wait)
{
unsigned long flags;
//添加到队列头部,并且没有exclusive标志
write_lock_irqsave(&waitqueue_lock, flags);
__add_wait_queue(p, wait);
write_unlock_irqrestore(&waitqueue_lock, flags);
}
tcp三次握手完成后,需要唤醒accept进程,流程如下:
tcp_v4_do_rcv->tcp_v4_hnd_req->tcp_check_req->syn_recv_sock(tcp_v4_syn_recv_sock)->sk->data_ready(sk, 0)->sock_def_readable->wake_up_interruptible(sk->sleep);
#define wake_up_interruptible(x) __wake_up((x),TASK_INTERRUPTIBLE)
void __wake_up(struct wait_queue **q, unsigned int mode)
{
struct wait_queue *next;
read_lock(&waitqueue_lock);
if (q && (next = *q)) {
struct wait_queue *head;
//遍历sk等待队列中的所有进程并唤醒,此例中会唤醒10个进程。
head = WAIT_QUEUE_HEAD(q);
while (next != head) {
struct task_struct *p = next->task;
next = next->next;
if (p->state & mode)
wake_up_process(p);
}
}
read_unlock(&waitqueue_lock);
}
疑问:在惊群问题未解决时,新连接到来后,只有一个后返回accept成功,其余的九个会有返回值吗?还是在内核态继续执行调度等待其他新连接的到来?
应该是其余九个进程会被唤醒,但是不会有返回值,由于第一个被唤醒进程已经取走了新连接,这九个进程即使被唤醒,发现全连接队列为空,就会继续调度睡眠,等待新连接到来。
内核解决accept惊群后的代码流程
以内核版本3.17.89为例
用户程序调用accept时,会调到内核态中的inet_csk_accept,如果全连接队列为空,并且没有设置非堵塞,则会调用inet_csk_wait_for_connect(sk, timeo)堵塞进程,等待事件到来。
static int inet_csk_wait_for_connect(struct sock *sk, long timeo)
{
struct inet_connection_sock *icsk = inet_csk(sk);
//初始化等待队列节点wait
DEFINE_WAIT(wait);
int err;
/*
* True wake-one mechanism for incoming connections: only
* one process gets woken up, not the 'whole herd'.
* Since we do not 'race & poll' for established sockets
* anymore, the common case will execute the loop only once.
*
* Subtle issue: "add_wait_queue_exclusive()" will be added
* after any current non-exclusive waiters, and we know that
* it will always _stay_ after any new non-exclusive waiters
* because all non-exclusive waiters are added at the
* beginning of the wait-queue. As such, it's ok to "drop"
* our exclusiveness temporarily when we get woken up without
* having to remove and re-insert us on the wait queue.
*/
for (;;) {
//以上英文注释已经说清楚了,只有一个进程会被唤醒。
//非exclusive的节点会被加在等待队列前面,excusive节点会被加在
//所有非exclusive节点的后面。
//此例中,等待队列前面有n个非exclusive节点,后面有10个exclusive节点
prepare_to_wait_exclusive(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
release_sock(sk);
if (reqsk_queue_empty(&icsk->icsk_accept_queue))
timeo = schedule_timeout(timeo);
lock_sock(sk);
err = 0;
if (!reqsk_queue_empty(&icsk->icsk_accept_queue))
break;
err = -EINVAL;
if (sk->sk_state != TCP_LISTEN)
break;
err = sock_intr_errno(timeo);
if (signal_pending(current))
break;
err = -EAGAIN;
if (!timeo)
break;
}
//将等待节点从等待队列中删除
finish_wait(sk_sleep(sk), &wait);
return err;
}
#define DEFINE_WAIT_FUNC(name, function) \
wait_queue_t name = { \
.private = current, \
.func = function, \
.task_list = LIST_HEAD_INIT((name).task_list), \
}
//有新连接时,会调用autoremove_wake_function唤醒调用accept的进程,
//并且将等待节点从等待队列中删除
#define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)
int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
//唤醒堵塞的accept进程
int ret = default_wake_function(wait, mode, sync, key);
if (ret)
//如果唤醒成功了,则将等待节点从等待队列中删除,这样下次有新连接,
//就会唤醒另一个进程来处理。
list_del_init(&wait->task_list);
return ret;
}
void prepare_to_wait_exclusive(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
unsigned long flags;
//设置exclusive标志,表示一次只会有一个进程被唤醒。
wait->flags |= WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
//把wait添加到等待队列的尾部。
if (list_empty(&wait->task_list))
__add_wait_queue_tail(q, wait);
set_current_state(state);
spin_unlock_irqrestore(&q->lock, flags);
}
以上是accept的实现,继续看唤醒部分代码。
当有tcp连接完成,就会从半连接队列拷贝sock到全连接队列,这个时候就可以唤醒阻塞的accept了。代码如下:
tcp_v4_do_rcv->tcp_child_process
int tcp_child_process(struct sock *parent, struct sock *child,
struct sk_buff *skb)
{
int ret = 0;
int state = child->sk_state;
if (!sock_owned_by_user(child)) {
ret = tcp_rcv_state_process(child, skb, tcp_hdr(skb),
skb->len);
/* Wakeup parent, send SIGIO */
if (state == TCP_SYN_RECV && child->sk_state != state)
parent->sk_data_ready(parent);
} else {
/* Alas, it is possible again, because we do lookup
* in main socket hash table and lock on listening
* socket does not protect us more.
*/
__sk_add_backlog(child, skb);
}
bh_unlock_sock(child);
sock_put(child);
return ret;
}
调用sk_data_ready通知父socket,即监听socket,此函数对应sock_def_readable
static void sock_def_readable(struct sock *sk)
{
struct socket_wq *wq;
rcu_read_lock();
//判断等待队列是否为空,不为空说明有进程在堵塞等待
wq = rcu_dereference(sk->sk_wq);
if (wq_has_sleeper(wq))
wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
POLLRDNORM | POLLRDBAND);
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
rcu_read_unlock();
}
#define wake_up_interruptible_sync_poll(x, m) \
__wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))
void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, void *key)
{
unsigned long flags;
int wake_flags = 1; /* XXX WF_SYNC */
if (unlikely(!q))
return;
if (unlikely(nr_exclusive != 1))
wake_flags = 0;
spin_lock_irqsave(&q->lock, flags);
__wake_up_common(q, mode, nr_exclusive, wake_flags, key);
spin_unlock_irqrestore(&q->lock, flags);
}
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
wait_queue_t *curr, *next;
//从头开始遍历等待队列。
//等待队列头前面可能有N个非exclusive节点,先执行curr->func,但是flags中没有WQ_FLAG_EXCLUSIVE标志,则if条件不满足,不会执行--nr_exclusive,其值仍然为1
//执行到第一个exclusive节点时,也是先执行curr->func,并且flags中有WQ_FLAG_EXCLUSIVE标志,继续执行--nr_exclusive,nr_exclusive变为0,取非后满足,if的三个条件都满足,则执行break,达到只唤醒一个进程的目的。
list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;
//nr_exclusive为1,执行一次就会为0,跳出循环,达到只唤醒一个进程的目的
//被唤醒后,会在inet_csk_wait_for_connect中将wait从等待队列删除。
//func为autoremove_wake_function,此函数不只唤醒进程还会将wait节点从队列删除
if (curr->func(curr, mode, wake_flags, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
解决惊群问题后的代码测试
client端代码
root@master:~# cat client.c
#include <sys/socket.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
void main(void)
{
int fd, ret;
struct sockaddr_in addr;
fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(fd < 0)
{
perror("socket create failed");
return ;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(2222);
addr.sin_addr.s_addr = inet_addr("192.168.122.20");
ret = connect(fd, (const struct sockaddr *)&addr, sizeof(addr));
if( ret != 0)
{
perror("socket connect1 failed");
return ;
}
perror("socket connect succefsully\n");
sleep(1000);
}
server端代码
root@master:~# cat server.c
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <string.h>
#include <netinet/in.h>
#include <unistd.h>
#include <errno.h>
#define PROCESS_NUM 5
int main()
{
int fd = socket(PF_INET, SOCK_STREAM, 0);
int connfd;
int pid;
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(2222);
bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
listen(fd, 1024);
int i;
for(i = 0; i < PROCESS_NUM; ++i){
pid = fork();
if(pid == 0){
printf("I'm pid: %d\n", getpid());
while(1){
connfd = accept(fd, (struct sockaddr *)NULL, NULL);
if(connfd != -1)
{
printf("process %d accept success\n", getpid());
close(connfd);
}else{
printf("process %d accept a connection failed: %s\n", getpid(), strerror(errno));
}
}
}
}
wait(0);
return 0;
}
server端创建block阻塞型的socket后,fork出10个进程,accept同一个fd。可以使用strace跟踪server端子进程执行结果,可看到每次client连接server只有一个进程被唤醒
root@master:~# ./server
I'm pid: 10794
I'm pid: 10796
I'm pid: 10795
I'm pid: 10797
I'm pid: 10798
process 10794 accept success --->10794先调用accept,则会被先唤醒
process 10796 accept success
process 10795 accept success
process 10797 accept success
process 10798 accept success
client连接server五次,每次都有不同的进程来处理,处理事件的进程顺序由调用顺序决定。
root@master:~# ./client
socket connect succefsully
: Success
socket connect succefsully
: Success
socket connect succefsully
: Success
socket connect succefsully
: Success
3. poll/select惊群问题
先说结论,poll/select是存在惊群问题的。下面用代码验证,再看看代码流程。
client代码仍然使用accept时的代码。下面是poll模式下server端代码
root@master:~# cat poll_thunder.c
#include<stdio.h>
#include<stdlib.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<sys/wait.h>
#include<string.h>
#include<netinet/in.h>
#include<unistd.h>
#include <errno.h>
#include <poll.h>
#define PROCESS_NUM 10
int main()
{
int fd = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
int connfd;
int pid;
int status = 1;
char sendbuff[1024];
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(2222);
bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
listen(fd, 1024);
int i, new_fd, ret=0;
struct pollfd clientfds[1];
for(i = 0; i < PROCESS_NUM; ++i){
pid = fork();
if(pid == 0){
//printf("I'm pid: %d, poll on : %d\n", getpid(), fd);
while (1) {
printf("I'm pid: %d, poll on : %d\n", getpid(), fd);
clientfds[0].fd = fd;
clientfds[0].events=POLLIN;
ret = poll(clientfds, 2, -1);
if (ret < 0)
{
perror("poll error");
}
else if(ret == 0)
{
printf("poll timeout\n");
continue;
}
if (clientfds[0].revents&POLLIN) {
new_fd = accept(fd, (struct sockaddr *)NULL, NULL);;
if(new_fd < 0)
{
printf("accept failed: %d on pid: %d\n", errno, getpid());
printf("\n");
continue;
}
printf("new read event: accept new_fd: %d on pid: %d\n", new_fd, getpid());
close(new_fd);
}
printf("\n");
if (clientfds[0].revents&POLLOUT) {
printf("new write event in pid: %d\n", getpid());
}
if (clientfds[0].revents&POLLERR) {
printf("new error event in pid: %d\n", getpid());
}
}
}
}
//int status;
wait(0);
return 0;
}
client连接server一次,可看到有四个进程被唤醒,只有一个进程接收了新连接。
root@master:~# ./poll
I'm pid: 18907, poll on : 3
I'm pid: 18908, poll on : 3
I'm pid: 18909, poll on : 3
I'm pid: 18910, poll on : 3
I'm pid: 18911, poll on : 3
I'm pid: 18912, poll on : 3
I'm pid: 18913, poll on : 3
I'm pid: 18914, poll on : 3
I'm pid: 18915, poll on : 3
I'm pid: 18916, poll on : 3
new read event: accept new_fd: 4 on pid: 18915 -->只有此进程接收新连接成功
accept failed: 11 on pid: 18916
I'm pid: 18915, poll on : 3
I'm pid: 18916, poll on : 3
accept failed: 11 on pid: 18907
I'm pid: 18907, poll on : 3
accept failed: 11 on pid: 18908
I'm pid: 18908, poll on : 3
这里有一个问题,按说应该10个进程都被唤醒了,为什么只有四个进程执行到accept处?
因为进程被唤醒后,会调用目标文件的poll函数获取发生的事件通知用户程序,用户程序调用accept后,会将发生的事件清空。如果清空事件前,被唤醒的进程调用poll还会获取到发生的事件,用户程序再调用accept会返回失败。但是清空事件后,被唤醒的进程调用poll获取不到事件,也就不会通知用户程序,而是继续睡眠,这个情况通过log是看不到的,可以通过strace观察server。
或者在进程被唤醒后,在调用accept前sleep一段时间,让所有进程都有时间调用poll获取事件就会看到如下log,10个进程都被唤醒并且都调用accept,但是仍然只有一个进程能成功。
root@master:~# ./poll
I'm pid: 20130, poll on : 3
I'm pid: 20131, poll on : 3
I'm pid: 20132, poll on : 3
I'm pid: 20133, poll on : 3
I'm pid: 20135, poll on : 3
I'm pid: 20136, poll on : 3
I'm pid: 20137, poll on : 3
I'm pid: 20138, poll on : 3
I'm pid: 20134, poll on : 3
I'm pid: 20139, poll on : 3
//10个进程都被唤醒执行accept了,但是只有一个能成功接收新连接
accept failed: 11 on pid: 20136
new read event: accept new_fd: 4 on pid: 20133
accept failed: 11 on pid: 20131
I'm pid: 20136, poll on : 3
I'm pid: 20131, poll on : 3
accept failed: 11 on pid: 20139
I'm pid: 20139, poll on : 3
accept failed: 11 on pid: 20132
I'm pid: 20133, poll on : 3
accept failed: 11 on pid: 20135
I'm pid: 20132, poll on : 3
I'm pid: 20135, poll on : 3
accept failed: 11 on pid: 20137
accept failed: 11 on pid: 20130
I'm pid: 20137, poll on : 3
I'm pid: 20130, poll on : 3
accept failed: 11 on pid: 20134
I'm pid: 20134, poll on : 3
accept failed: 11 on pid: 20138
I'm pid: 20138, poll on : 3
用户调用poll堵塞进程流程
用户调用poll后,在内核态流程调用:
do_sys_poll->do_poll->do_pollfd-> f.file->f_op->poll, 对于socket来说,poll函数为调用sock_poll
static unsigned int sock_poll(struct file *file, poll_table *wait)
return busy_flag | sock->ops->poll(file, sock, wait); //tcp_poll
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
unsigned int mask;
struct sock *sk = sock->sk;
const struct tcp_sock *tp = tcp_sk(sk);
sock_rps_record_flow(sk);
sock_poll_wait(file, sk_sleep(sk), wait);
if (sk->sk_state == TCP_LISTEN)
return inet_csk_listen_poll(sk);
}
//如果全连接队列不为空,则直接返回POLLIN事件
static inline unsigned int inet_csk_listen_poll(const struct sock *sk)
{
return !reqsk_queue_empty(&inet_csk(sk)->icsk_accept_queue) ?
(POLLIN | POLLRDNORM) : 0;
}
static inline void sock_poll_wait(struct file *filp,
wait_queue_head_t *wait_address, poll_table *p)
{
if (!poll_does_not_wait(p) && wait_address) {
poll_wait(filp, wait_address, p);
/* We need to be sure we are in sync with the
* socket flags modification.
*
* This memory barrier is paired in the wq_has_sleeper.
*/
smp_mb();
}
}
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
//p->_qproc为__pollwait
if (p && p->_qproc && wait_address)
p->_qproc(filp, wait_address, p);
}
/* Add a new entry */
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
{
struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
struct poll_table_entry *entry = poll_get_entry(pwq);
if (!entry)
return;
entry->filp = get_file(filp);
entry->wait_address = wait_address;
entry->key = p->_key;
init_waitqueue_func_entry(&entry->wait, pollwake);
entry->wait.private = pwq;
add_wait_queue(wait_address, &entry->wait);
}
void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)
{
unsigned long flags;
//将exclusive标志清除掉,这样有10个进程调用select,就会将10个进程添加到等待队列中。
//因为没有exclusive,所以这10个进程都会被唤醒。
wait->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
__add_wait_queue(q, wait);
spin_unlock_irqrestore(&q->lock, flags);
}
唤醒进程流程
唤醒流程和accept中唤醒流程相同,只不过最后调用__wake_up_common时,因为poll添加到等待队列时,没有设置exclusive,所以所有进程都会被唤醒。被唤醒的进程再调用accept接收新连接,但是只有一个进程会成功,其余9个都返回错误EAGAIN。
4. epoll惊群问题
还是先说结论:ET模式下不存在惊群问题,LT模式下存在。
下面通过代码验证,再分析实现流程。
root@master:~# cat epoll_thunder.c
#include<stdio.h>
#include<stdlib.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<sys/wait.h>
#include<string.h>
#include<netinet/in.h>
#include<unistd.h>
#include <errno.h>
#include<sys/epoll.h>
#define MAXEVENTS 64
#define PROCESS_NUM 10
int main()
{
int fd = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
int connfd;
int pid;
int i, epoll_fd, new_fd, ret=0, num;
struct epoll_event event;
struct epoll_event *events;
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(2222);
bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
listen(fd, 1024);
if ((epoll_fd = epoll_create(MAXEVENTS))< 0) {
perror("epoll_create");
exit(1);
}
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
//event.events = EPOLLIN;
if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) < 0){
perror("epoll_ctl");
exit(1);
}
events = calloc(MAXEVENTS, sizeof(event));
for(i = 0; i < PROCESS_NUM; ++i) {
pid = fork();
if(pid == 0) {
while (1) {
printf("I'm pid: %d, epoll on : %d\n", getpid(), fd);
num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
if (num < 0) {
printf("epoll_wait failed %d, on pid %d\n", errno, getpid());
continue;
}
for(i = 0; i < num; ++i){
if((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))){
fprintf(stderr, "epoll error\n");
close(events[i].data.fd);
continue;
}else if(fd == events[i].data.fd){
new_fd = accept(fd, (struct sockaddr *)NULL, NULL);;
if(new_fd < 0)
{
printf("accept failed: %d on pid: %d\n", errno, getpid());
printf("\n");
continue;
}
printf("new read event: accept new_fd: %d on pid: %d\n", new_fd, getpid());
close(new_fd);
}
}
}
}
}
wait(0);
return 0;
}
在epoll_thunder.c中,如果events加上标志EPOLLET就是ET模式,不加的话默认是LT模式
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event)
ET模式测试结果如下,始终只有一个进程接收新连接,并且是同一个进程。这是因为调用epoll_wait堵塞时,添加wait节点加到了ep等待队列头部,并且是exclusive的,而唤醒进程时总是从队列头部开始,由于设置了exclusive,所以只好唤醒一个进程。
为什么总是同一个进程呢?是因为第一个被唤醒的进程马上又调用epoll_wait将其再次加入等待队列头部,所以下次事件到来时仍然唤醒同一个进程。
root@ubuntu:/home/jk/socket# ./epoll
I'm pid: 119280, epoll on : 3
I'm pid: 119281, epoll on : 3
I'm pid: 119282, epoll on : 3
I'm pid: 119285, epoll on : 3
I'm pid: 119283, epoll on : 3
I'm pid: 119287, epoll on : 3
I'm pid: 119284, epoll on : 3
I'm pid: 119286, epoll on : 3
I'm pid: 119288, epoll on : 3
I'm pid: 119289, epoll on : 3
new read event: accept new_fd: 5 on pid: 119289
I'm pid: 119289, epoll on : 3
new read event: accept new_fd: 5 on pid: 119289
I'm pid: 119289, epoll on : 3
new read event: accept new_fd: 5 on pid: 119289
I'm pid: 119289, epoll on : 3
下面将进程被唤醒后sleep一段时间20s,然后用client连接server,会发现新连接总是被等待队列头部的进程处理。
root@master:~# ./epoll
I'm pid: 27656, epoll on : 3
I'm pid: 27657, epoll on : 3
I'm pid: 27658, epoll on : 3
I'm pid: 27659, epoll on : 3
I'm pid: 27660, epoll on : 3
I'm pid: 27661, epoll on : 3
I'm pid: 27662, epoll on : 3
I'm pid: 27663, epoll on : 3
I'm pid: 27664, epoll on : 3
I'm pid: 27665, epoll on : 3
new read event: accept new_fd: 5 on pid: 27665 -->第一次连接被27665进程处理,然后sleep 20s
new read event: accept new_fd: 5 on pid: 27664 -->第二次连接被27664进程处理,然后sleep 20s
new read event: accept new_fd: 5 on pid: 27663 -->第三次连接被27663进程处理,然后sleep 20s
I'm pid: 27665, epoll on : 3 -->进程27665 sleep结束,重新开始wait
new read event: accept new_fd: 5 on pid: 27665 -->第四次连接又被27663进程处理
LT模式下,测试结果如下,从结果看,好像也是只有一个进程被唤醒了,但是实际上唤醒进程不只一个
root@ubuntu:/home/jk/socket# ./epoll
I'm pid: 119318, epoll on : 3
I'm pid: 119319, epoll on : 3
I'm pid: 119320, epoll on : 3
I'm pid: 119324, epoll on : 3
I'm pid: 119321, epoll on : 3
I'm pid: 119326, epoll on : 3
I'm pid: 119322, epoll on : 3
I'm pid: 119323, epoll on : 3
I'm pid: 119325, epoll on : 3
I'm pid: 119327, epoll on : 3
new read event: accept new_fd: 5 on pid: 119327
I'm pid: 119327, epoll on : 3
new read event: accept new_fd: 5 on pid: 119327
I'm pid: 119327, epoll on : 3
new read event: accept new_fd: 5 on pid: 119327
I'm pid: 119327, epoll on : 3
LT模式下,如果添加sleep后,就会看到所有进程都被唤醒了。唤醒多少个进程和被唤醒进程处理时间长短有关系,如果处理越快,唤醒的进程越少。
num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
if (num < 0) {
printf("epoll_wait failed %d, on pid %d\n", errno, getpid());
continue;
}
sleep(2);
再次执行,可看到所有进程都被唤醒了
root@ubuntu:/home/jk/socket# ./epoll
I'm pid: 119480, epoll on : 3
I'm pid: 119481, epoll on : 3
I'm pid: 119482, epoll on : 3
I'm pid: 119483, epoll on : 3
I'm pid: 119484, epoll on : 3
I'm pid: 119485, epoll on : 3
I'm pid: 119486, epoll on : 3
I'm pid: 119487, epoll on : 3
I'm pid: 119488, epoll on : 3
I'm pid: 119489, epoll on : 3
new read event: accept new_fd: 5 on pid: 119489
accept failed: 11 on pid: 119488
I'm pid: 119488, epoll on : 3
I'm pid: 119489, epoll on : 3
accept failed: 11 on pid: 119487
I'm pid: 119487, epoll on : 3
accept failed: 11 on pid: 119486
I'm pid: 119486, epoll on : 3
accept failed: 11 on pid: 119485
I'm pid: 119485, epoll on : 3
accept failed: 11 on pid: 119484
I'm pid: 119484, epoll on : 3
accept failed: 11 on pid: 119483
I'm pid: 119483, epoll on : 3
accept failed: 11 on pid: 119482
I'm pid: 119482, epoll on : 3
accept failed: 11 on pid: 119481
I'm pid: 119481, epoll on : 3
accept failed: 11 on pid: 119480
I'm pid: 119480, epoll on : 3
代码分析
调用epll_wait时,如果需要堵塞等待,则将调用进程加入到ep等待队列中,设置exclusive,并且添加到队列头部,如果有新连接到来,也只会唤醒一个进程。
init_waitqueue_entry(&wait, current);
__add_wait_queue_exclusive(&ep->wq, &wait);
这样看貌似epoll_wait已经解决了惊群问题,但在LT和ET模式下处理流程的差别导致了LT模式下惊群问题。
假如调用epoll_ctl将一个fd加入到epoll进行监听,会调用目标文件fd的poll函数,将wait节点(fd上有新事件发生时,调用ep_poll_callback唤醒监听进程)加入到目标文件fd的等待队列中,再fork十个进程调用epoll_wait等待事件到来,这样就会有10个进程堵塞在ep的等待队列中(如果有事件发生时,则调用default_wake_function唤醒堵塞进程)。
如果此时client和server完成了三次握手,则会调用socket的fd等待队列上的task,即会调用ep_poll_callback,将发生事件的fd添加到就绪链表rdlist中,如果ep的等待队列中不为空(此例不为空,有10个节点),则会唤醒第一个进程(因为添加了exclusive标志,所以只会唤醒第一个进程)。
ep_poll_callback
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
第一个进程被唤醒后,首先将自己从ep的等待队列中删除,然后调用rdlist上发生事件的fd的poll函数获取发生的事件,将其传递到用户程序,如果是用户程序感兴趣的事件,用户程序再调用accept接收新连接。
ep_send_events_proc
revents = ep_item_poll(epi, &pt);
if (revents) {
__put_user(revents, &uevent->events)
如果是LT模式,则将目标文件再次添加到就绪链表rdlist,
如果是ET模式,就不会将目标文件再次添加到就绪链表rdlist。
ep_send_events_proc
revents = ep_item_poll(epi, &pt);
if (revents) {
if (!(epi->event.events & EPOLLET))
list_add_tail(&epi->rdllink, &ep->rdllist);
后面流程还会做如下判断
ep_scan_ready_list
if (!list_empty(&ep->rdllist)) {
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
在LT模式下,rdlist不为空,并且ep->wq中还有9个进程在堵塞等待,则又会唤醒第二个进程。
第二个进程被唤醒后,后面处理方式和第一个进程相同。
如果第一个进程调用accept把事件取走了,则第二个进程调用目标文件poll函数时就得不到事件,ep->wq中的其他进程就不会被唤醒了。如果每个被唤醒进程调用accept的时间更长,则会唤醒更多的进程。
在ET模式下,rdlist为空,就不会唤醒等待队列上的其他进程了。
epoll下涉及两个等待队列:
a. 调用epoll_ctl时,将进程添加到目标文件的等待队列中,目标文件发生事件时调用ep_poll_callback,判断如果ep等待队列不为空,则唤醒ep等待队列上的进程。
b. 调用epoll_wait时,将进程添加到ep的等待队列中。
参考
https://www.cnblogs.com/Anker/p/7071849.html
https://blog.csdn.net/lyztyycode/article/details/78648798