IO多路复用--select,poll,epoll

背景

传统的IO,OS只提供了对一个FD进行操作的功能,也就是BIO。

  • 1对1模型


    image.png
  • 线程池模型


    image.png

传统IO模式基于OS提供的功能,就限制了一个线程同一个时刻就只能处理一个连接。

IO多路复用

基于传统IO的模式,OS提供了IO多路复用模型,一个线程可以同一个时刻处理多个连接,可以一次性给多个FD给内核进行操作。

select

select的逻辑比较简单,接受从用户态传过来的FD数组,循环遍历调用VFS中file->poll函数判断是否有准备数据可以进行操作,如果有设置标识位。

image.png
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, j, numevents = 0;

    memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
    memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));

    retval = select(eventLoop->maxfd+1,
                &state->_rfds,&state->_wfds,NULL,tvp);
    if (retval > 0) {
        for (j = 0; j <= eventLoop->maxfd; j++) {
            int mask = 0;
            aeFileEvent *fe = &eventLoop->events[j];

            if (fe->mask == AE_NONE) continue;
            if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
                mask |= AE_READABLE;
            if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
                mask |= AE_WRITABLE;
            eventLoop->fired[numevents].fd = j;
            eventLoop->fired[numevents].mask = mask;
            numevents++;
        }
    }
    return numevents;
}

asmlinkage long
sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timeval __user *tvp)
{
    fd_set_bits fds;
    char *bits;
    long timeout;
    int ret, size, max_fdset;

    timeout = MAX_SCHEDULE_TIMEOUT;
    if (tvp) {
        time_t sec, usec;

        if ((ret = verify_area(VERIFY_READ, tvp, sizeof(*tvp)))
            || (ret = __get_user(sec, &tvp->tv_sec))
            || (ret = __get_user(usec, &tvp->tv_usec)))
            goto out_nofds;

        ret = -EINVAL;
        if (sec < 0 || usec < 0)
            goto out_nofds;

        if ((unsigned long) sec < MAX_SELECT_SECONDS) {
            timeout = ROUND_UP(usec, 1000000/HZ);
            timeout += sec * (unsigned long) HZ;
        }
    }

    ret = -EINVAL;
    if (n < 0)
        goto out_nofds;

    /* max_fdset can increase, so grab it once to avoid race */
    max_fdset = current->files->max_fdset;
    if (n > max_fdset)
        n = max_fdset;

    /*
     * We need 6 bitmaps (in/out/ex for both incoming and outgoing),
     * since we used fdset we need to allocate memory in units of
     * long-words. 
     */
    ret = -ENOMEM;
    size = FDS_BYTES(n);
    bits = select_bits_alloc(size);
    if (!bits)
        goto out_nofds;
    fds.in      = (unsigned long *)  bits;
    fds.out     = (unsigned long *) (bits +   size);
    fds.ex      = (unsigned long *) (bits + 2*size);
    fds.res_in  = (unsigned long *) (bits + 3*size);
    fds.res_out = (unsigned long *) (bits + 4*size);
    fds.res_ex  = (unsigned long *) (bits + 5*size);

    if ((ret = get_fd_set(n, inp, fds.in)) ||
        (ret = get_fd_set(n, outp, fds.out)) ||
        (ret = get_fd_set(n, exp, fds.ex)))
        goto out;
    zero_fd_set(n, fds.res_in);
    zero_fd_set(n, fds.res_out);
    zero_fd_set(n, fds.res_ex);

    ret = do_select(n, &fds, &timeout);

    if (tvp && !(current->personality & STICKY_TIMEOUTS)) {
        time_t sec = 0, usec = 0;
        if (timeout) {
            sec = timeout / HZ;
            usec = timeout % HZ;
            usec *= (1000000/HZ);
        }
        put_user(sec, &tvp->tv_sec);
        put_user(usec, &tvp->tv_usec);
    }

    if (ret < 0)
        goto out;
    if (!ret) {
        ret = -ERESTARTNOHAND;
        if (signal_pending(current))
            goto out;
        ret = 0;
    }

    set_fd_set(n, inp, fds.res_in);
    set_fd_set(n, outp, fds.res_out);
    set_fd_set(n, exp, fds.res_ex);

out:
    select_bits_free(bits, size);
out_nofds:
    return ret;
}
int do_select(int n, fd_set_bits *fds, long *timeout)
{
    struct poll_wqueues table;
    poll_table *wait;
    int retval, i;
    long __timeout = *timeout;

    spin_lock(&current->files->file_lock);
    retval = max_select_fd(n, fds);
    spin_unlock(&current->files->file_lock);

    if (retval < 0)
        return retval;
    n = retval;

    poll_initwait(&table);
    wait = &table.pt;
    if (!__timeout)
        wait = NULL;
    retval = 0;
    for (;;) {
        unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;

        set_current_state(TASK_INTERRUPTIBLE);

        inp = fds->in; outp = fds->out; exp = fds->ex;
        rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

        for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
            unsigned long in, out, ex, all_bits, bit = 1, mask, j;
            unsigned long res_in = 0, res_out = 0, res_ex = 0;
            struct file_operations *f_op = NULL;
            struct file *file = NULL;

            in = *inp++; out = *outp++; ex = *exp++;
            all_bits = in | out | ex;
            if (all_bits == 0) {
                i += __NFDBITS;
                continue;
            }

            for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
                if (i >= n)
                    break;
                if (!(bit & all_bits))
                    continue;
                file = fget(i);
                if (file) {
                    f_op = file->f_op;
                    mask = DEFAULT_POLLMASK;
                    if (f_op && f_op->poll)
                        mask = (*f_op->poll)(file, retval ? NULL : wait);
                    fput(file);
                    if ((mask & POLLIN_SET) && (in & bit)) {
                        res_in |= bit;
                        retval++;
                    }
                    if ((mask & POLLOUT_SET) && (out & bit)) {
                        res_out |= bit;
                        retval++;
                    }
                    if ((mask & POLLEX_SET) && (ex & bit)) {
                        res_ex |= bit;
                        retval++;
                    }
                }
            }
            if (res_in)
                *rinp = res_in;
            if (res_out)
                *routp = res_out;
            if (res_ex)
                *rexp = res_ex;
        }
        wait = NULL;
        if (retval || !__timeout || signal_pending(current))
            break;
        if(table.error) {
            retval = table.error;
            break;
        }
        __timeout = schedule_timeout(__timeout);
    }
    __set_current_state(TASK_RUNNING);

    poll_freewait(&table);

    /*
     * Up-to-date the caller timeout.
     */
    *timeout = __timeout;
    return retval;
}

poll

sys_poll
asmlinkage long sys_poll(struct pollfd __user * ufds, unsigned int nfds, long timeout)
{
    struct poll_wqueues table;
    int fdcount, err;
    unsigned int i;
    struct poll_list *head;
    struct poll_list *walk;

    /* Do a sanity check on nfds ... */
    if (nfds > current->files->max_fdset && nfds > OPEN_MAX)
        return -EINVAL;

    if (timeout) {
        /* Careful about overflow in the intermediate values */
        if ((unsigned long) timeout < MAX_SCHEDULE_TIMEOUT / HZ)
            timeout = (unsigned long)(timeout*HZ+999)/1000+1;
        else /* Negative or overflow */
            timeout = MAX_SCHEDULE_TIMEOUT;
    }

    poll_initwait(&table);

    head = NULL;
    walk = NULL;
    i = nfds;
    err = -ENOMEM;
    while(i!=0) {
        struct poll_list *pp;
        pp = kmalloc(sizeof(struct poll_list)+
                sizeof(struct pollfd)*
                (i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i),
                    GFP_KERNEL);
        if(pp==NULL)
            goto out_fds;
        pp->next=NULL;
        pp->len = (i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i);
        if (head == NULL)
            head = pp;
        else
            walk->next = pp;

        walk = pp;
        if (copy_from_user(pp->entries, ufds + nfds-i, 
                sizeof(struct pollfd)*pp->len)) {
            err = -EFAULT;
            goto out_fds;
        }
        i -= pp->len;
    }
    fdcount = do_poll(nfds, head, &table, timeout);

    /* OK, now copy the revents fields back to user space. */
    walk = head;
    err = -EFAULT;
    while(walk != NULL) {
        struct pollfd *fds = walk->entries;
        int j;

        for (j=0; j < walk->len; j++, ufds++) {
            if(__put_user(fds[j].revents, &ufds->revents))
                goto out_fds;
        }
        walk = walk->next;
    }
    err = fdcount;
    if (!fdcount && signal_pending(current))
        err = -EINTR;
out_fds:
    walk = head;
    while(walk!=NULL) {
        struct poll_list *pp = walk->next;
        kfree(walk);
        walk = pp;
    }
    poll_freewait(&table);
    return err;
}


do_poll
static int do_poll(unsigned int nfds,  struct poll_list *list,
            struct poll_wqueues *wait, long timeout)
{
    int count = 0;
    poll_table* pt = &wait->pt;

    if (!timeout)
        pt = NULL;
 
    for (;;) {
        struct poll_list *walk;
        set_current_state(TASK_INTERRUPTIBLE);
        walk = list;
        while(walk != NULL) {
            do_pollfd( walk->len, walk->entries, &pt, &count);
            walk = walk->next;
        }
        pt = NULL;
        if (count || !timeout || signal_pending(current))
            break;
        count = wait->error;
        if (count)
            break;
        timeout = schedule_timeout(timeout);
    }
    __set_current_state(TASK_RUNNING);
    return count;
}

do_pollfd
static void do_pollfd(unsigned int num, struct pollfd * fdpage,
    poll_table ** pwait, int *count)
{
    int i;

    for (i = 0; i < num; i++) {
        int fd;
        unsigned int mask;
        struct pollfd *fdp;

        mask = 0;
        fdp = fdpage+i;
        fd = fdp->fd;
        if (fd >= 0) {
            struct file * file = fget(fd);
            mask = POLLNVAL;
            if (file != NULL) {
                mask = DEFAULT_POLLMASK;
                if (file->f_op && file->f_op->poll)
                    mask = file->f_op->poll(file, *pwait);
                mask &= fdp->events | POLLERR | POLLHUP;
                fput(file);
            }
            if (mask) {
                *pwait = NULL;
                (*count)++;
            }
        }
        fdp->revents = mask;
    }
}
  • select 函数的声明比较复杂,有read,write,excpect 等参数,而poll直接封装到一个参数中了,比较优雅。
  • select 中内核操作使用数组,poll使用链表,因为数组需要连续的空间,而内核中的连续内存比较昂贵。
  • select和poll都需要自己遍历所有的FD集合,就算没有事件过来也需要遍历,造成资源的浪费;

epoll

image.png
关键函数
  • epoll_create:创建epfd,eventpoll,并设置一些VFS相关的操作,例如:file的f_op的函数指针。
  • epoll_clt: fd的CRUD操作,函数比较复杂,涉及比较多结构转化,回调函数的操作等。
  • epoll_wait: 等待eventpoll的rdllist集合中是否有准备好的事件的FD。

tip epoll_clt函数部分结构图,可以自行进行分析。

epoll_ctl.png

总结

从传统IO到多路复用是操作系统IO模型的变化,像JAVA提供的NIO不过是对操作系统提供多路复用进行封装而已,所以一个很重要的观念是操作系统提供什么,上层就只能使用什么。

tip: 上面源码基于 redis-2.6,linux-2.6

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。