多路复用

继承树

  • SelectionKey继承树:
java.nio.channels.SelectionKey
    -> java.nio.channels.spi.AbstractSelectionKey
        -> sun.nio.ch.SelectionKeyImpl
  • Selector继承树:
java.nio.channels.Selector
    -> java.nio.channels.spi.AbstractSelector
        -> sun.nio.ch.SelectorImpl
            -> sun.nio.ch.EPollSelectorImpl
  • SelectorProvider继承树:
java.nio.channels.spi.SelectorProvider
    -> sun.nio.ch.SelectorProviderImpl
        -> sun.nio.ch.EPollSelectorProvider

SelectionKey

SelectionKey(选择键)表示SelectableChannel(可选择通道)在Selector(选择器)注册的令牌。

通道向选择器注册时会创建一个选择键。调用选择键的cancel方法,或关闭通道,或关闭选择器,选择键会失效,可调用isValid方法探测有效状态。

选择键包含两个操作集:

  1. 兴趣集:选择器下一次选择时,检测哪些操作的准备就绪信息。
  2. 就绪集:选择器检测到哪些操作准备就绪。

操作可以是读(read)、写(write)、连接(connect)、接受(accept)。

// 选择键创建时的通道和选择器
public abstract SelectableChannel channel();
public abstract Selector selector();

// 取消注册
public abstract void cancel();

// 探测选择键有效状态
public abstract boolean isValid();


// 选择键的兴趣集
public abstract int interestOps();
// 设置兴趣集,返回自身
public abstract SelectionKey interestOps(int ops);
// 选择键的就绪集
public abstract int readyOps();

// 读操作
public static final int OP_READ = 1 << 0;
public final boolean isReadable();

// 写操作
public static final int OP_WRITE = 1 << 2;
public final boolean isWritable();

// 连接操作
public static final int OP_CONNECT = 1 << 3;
public final boolean isConnectable();

// 接受操作
public static final int OP_ACCEPT = 1 << 4;
public final boolean isAcceptable();

Selector

SelectableChannel的多路复用器(选择器)。

创建选择器:

// provider: sun.nio.ch.EPollSelectorProvider
// selector: sun.nio.ch.EPollSelectorImpl
public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}

Selector维护三个选择键(SelectionKey)集:

  • 注册键集:向此选择器注册的通道的选择键。不可直接修改。
  • 已选择键集:选择器上一次选择时,兴趣集中有操作就绪的选择键。为注册键集的子集。不可添加,可通过集合方法删除选择键。
  • 已取消键集:已取消但通道尚未注销的选择键。为注册键集的子集。不可直接访问。

选择器创建时,三个选择键集均为空;
通道向选择器注册时,生成的选择键加入注册键集;
选择过程中,键加入已选择键集;
取消选择键(cancel键或关闭通道),键加入已取消键集;
下次选择时,已取消键集对应的通道被注销,选择键从所有键集中删除。

// 注册键集
public abstract Set<SelectionKey> keys();
// 已选择键集
public abstract Set<SelectionKey> selectedKeys();
// 无获取已取消键集方法

选择操作:
查询底层操作系统,更新通道的准备状态,以执行选择操作开始时由兴趣集标示的任何操作。

// 阻塞
// 返回时机:有通道选中,或选择器wakeup方法被调用,或当前线程被中断
// 返回:就绪集被更新的选择键数量
public abstract int select() throws IOException;

// select()的超时版本
// timeout:正,超时阻塞;0,无限阻塞;非负
public abstract int select(long timeout) throws IOException;

// select()的非阻塞版本
public abstract int selectNow() throws IOException;

// 使尚未返回的第一个选择操作立即返回
// 若当前无选择操作,则下一次选择操作立即返回,除非是selectNow()
// 在连续两次选择操作之间多次调用只相当于一次
public abstract Selector wakeup();

Linux epoll

epoll是Linux下多路复用IO接口select/poll的增强版本,能显著提高程序在大量并发连接中只有少量活跃的情况下系统CPU的利用率。

epoll有两种工作方式:

  • LT(level triggered,水平触发):默认工作方式,同时支持阻塞和非阻塞。只要fd就绪(有数据时),内核就会通知fd就绪。传统的select/poll就是此工作方式。
  • ET(edge triggered,边缘触发):高速工作方式,仅支持非阻塞。当fd从未就绪变为就绪时(状态变化),内核会通知fd就绪,并且不会再次通知。

epoll的使用:

  1. 创建epoll描述符
int epoll_create (int size);

epoll实现由hash表改为红黑树后,size已无意义
  1. 注册监控事件
int epoll_ctl (int epfd, int op, int fd, struct epoll_event *event);

epfd:epoll描述符
op:操作类型
    EPOLL_CTL_ADD:添加注册事件
    EPOLL_CTL_MOD:修改注册事件
    EPOLL_CTL_DEL:删除注册事件
fd:要监听的描述符
event:要监听的事件
    EPOLLIN:读
    EPOLLOUT:写
    EPOLLRDHUP:关闭了连接
    EPOLLET:设置ET工作方式
    EPOLLONESHOT:设置one-shot工作方式
  1. epoll等待
int epoll_wait (int epfd, struct epoll_event* events, int maxevents, int timeout);

epfd:epoll描述符
events:就绪事件,将事件从内核复制到events数组中
maxevents:最多监听多少个事件
timeout:超时毫秒,-1为无限阻塞,0则立即返回
  1. 例子
#include <stdio.h>
#include <unistd.h>
#include <sys/epoll.h>
int main(void) {
    int epfd = epoll_create(1); // 创建epoll描述符
    
    struct epoll_event ev; // 监听事件
    ev.data.fd = STDIN_FILENO; // 监听标准输入
    ev.events = EPOLLIN|EPOLLET; // 监听读,设置ET
    epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev); // 注册epoll事件
    
    struct epoll_event events[5]; // 就绪事件
    for (;;) {
        int nfds = epoll_wait(epfd, events, 5, -1);
        for (int i = 0; i < nfds; i++) {
            if (events[i].data.fd == STDIN_FILENO)
                printf("epoll read event");
        }
    }
}

Java Epoll

Java利用Linux Epoll实现多路复用。多路复用器类为sun.nio.ch.EPollSelectorImpl,epoll封装类为sun.nio.ch.EPollArrayWrapper。

class EPollSelectorImpl extends SelectorImpl {
    // epoll对象
    EPollArrayWrapper pollWrapper;
    // 通道fd->选择键
    private Map<Integer,SelectionKeyImpl> fdToKey;
    
    // 保存要监听的描述符和事件
    // 描述符值小时:eventsLow:描述符为下标,事件为值
    // 描述符值大时:eventsHigh:描述符->事件
    private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
    private Map<Integer,Byte> eventsHigh;
    // 特殊的事件值:表示忽略要更新的监听描述符和事件
    private static final byte  KILLED = (byte)-1;

    // 创建选择器实例
    EPollSelectorImpl(SelectorProvider sp) throws IOException {
        ...
        pollWrapper = new EPollArrayWrapper();
        ...
        fdToKey = new HashMap<>();
    }
}
class EPollArrayWrapper {
    // epoll描述符
    private final int epfd;
    // epoll_wait函数返回的epoll_event数组
    private final AllocatedNativeObject pollArray;
    // epoll_event数组基址
    private final long pollArrayAddress;
    // epoll更新的
    int updated;

    // 创建epoll封装类实例
    EPollArrayWrapper() throws IOException {
        // 创建epoll描述符
        epfd = epollCreate();

        // 创建epoll_wait函数epoll_event数组
        int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
        pollArray = new AllocatedNativeObject(allocationSize, true);
        pollArrayAddress = pollArray.address();
        
        ...
    }
}

向选择器注册通道:

// AbstractSelectableChannel
// sel:选择器;ops:兴趣集;att:附件
SelectionKey register(Selector sel, int ops, Object att) {
    synchronized (regLock) {
        // 若已注册,则更新兴趣集和附件
        SelectionKey k = findKey(sel);
        if (k != null) {
            k.interestOps(ops);
            k.attach(att);
        }

        // 若未注册,则向选择器进行注册
        if (k == null) {
            synchronized (keyLock) {
                k = ((AbstractSelector)sel).register(this, ops, att); // 
                addKey(k);
            }
        }
        return k;
    }
}

// SelectorImpl
SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
    // 创建选择键
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);
    // 注册通道
    synchronized (publicKeys) {
        implRegister(k); // 
    }
    // 设置兴趣集
    k.interestOps(ops);
    return k;
}

// EPollSelectorImpl
void implRegister(SelectionKeyImpl ski) {
    // 多路复用通道
    SelChImpl ch = ski.channel;
    // 通道描述符
    int fd = Integer.valueOf(ch.getFDVal());
    // 通道描述符->通道选择键
    fdToKey.put(fd, ski);
    // 向epoll实例添加通道描述符
    pollWrapper.add(fd); //
    // 添加到注册键集
    keys.add(ski);
}
void add(int fd) {
    synchronized (updateLock) {
        setUpdateEvents(fd, (byte)0, true);
    }
}
// fd:要监听的描述符;events:要监听的事件;
void setUpdateEvents(int fd, byte events, boolean force) {
    if (fd < MAX_UPDATE_ARRAY_SIZE) {
        if ((eventsLow[fd] != KILLED) || force) {
            eventsLow[fd] = events;
        }
    } else {
        Integer key = Integer.valueOf(fd);
        if (!isEventsHighKilled(key) || force) {
            eventsHigh.put(key, Byte.valueOf(events));
        }
    }
}

// 设置兴趣操作集
// SelectionKeyImpl
SelectionKey interestOps(int ops) {
    return nioInterestOps(ops);
}
SelectionKey nioInterestOps(int ops) {
    channel.translateAndSetInterestOps(ops, this);
    interestOps = ops;
    return this;
}
// ServerSocketChannelImpl
void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
    int newOps = 0;
    //翻译成Linux epoll监听事件
    if ((ops & SelectionKey.OP_ACCEPT) != 0)
        newOps |= PollArrayWrapper.POLLIN;
    // 注册监听事件
    sk.selector.putEventOps(sk, newOps);
}
// EPollSelectorImpl
void putEventOps(SelectionKeyImpl ski, int ops) {
    SelChImpl ch = ski.channel;
    // 向epoll注册要监听的描述符和事件
    pollWrapper.setInterest(ch.getFDVal(), ops);
}
// EPollArrayWrapper
void setInterest(int fd, int mask) {
    synchronized (updateLock) {
        // 设置监听描述符
        updateDescriptors[updateCount++] = fd;
        // 设置监听事件
        byte b = (byte)mask;
        assert (b == mask) && (b != KILLED);
        setUpdateEvents(fd, b, false);
    }
}


选择操作:

// SelectorImpl
int select() {
    return select(0);
}
int select(long timeout) {
    return lockAndDoSelect((timeout == 0) ? -1 : timeout);
}
public int selectNow() {
    return lockAndDoSelect(0);
}
int lockAndDoSelect(long timeout) {
    synchronized (this) {
        synchronized (publicKeys) {
            synchronized (publicSelectedKeys) {
                return doSelect(timeout); //
            }
        }
    }
}

// EPollSelectorImpl
// timeout:正,超时阻塞;-1,无限阻塞;0,立即返回;与epoll_wait.timeout一致
int doSelect(long timeout) {
    // 注销已取消键集的通道
    processDeregisterQueue();
    try {
        // 设置中断器,以便在阻塞时能够响应中断
        // 中断器将调用wakeup()使选择操作返回
        begin();
        // epoll_wait
        pollWrapper.poll(timeout);
    } finally {
        end();
    }
    
    // 再次处理已取消键集
    processDeregisterQueue();

    int numKeysUpdated = updateSelectedKeys();
    if (pollWrapper.interrupted()) {
        // Clear the wakeup pipe
        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
        synchronized (interruptLock) {
            pollWrapper.clearInterrupted();
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}

// EPollArrayWrapper
int poll(long timeout) {
    // 更新注册的通道和事件
    updateRegistrations();
    // epoll_wait
    updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
    for (int i=0; i<updated; i++) {
        if (getDescriptor(i) == incomingInterruptFD) {
            interruptedIndex = i;
            interrupted = true;
            break;
        }
    }
    return updated;
}
// 更新通道注册
void updateRegistrations() {
    synchronized (updateLock) {
        int j = 0;
        while (j < updateCount) {
            int fd = updateDescriptors[j];
            short events = getUpdateEvents(fd);
            boolean isRegistered = registered.get(fd);
            int opcode = 0;

            if (events != KILLED) {
                if (isRegistered) {
                    opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                } else {
                    opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
                }
                if (opcode != 0) {
                    // 注册监听描述符和事件
                    epollCtl(epfd, opcode, fd, events);
                    if (opcode == EPOLL_CTL_ADD) {
                        registered.set(fd);
                    } else if (opcode == EPOLL_CTL_DEL) {
                        registered.clear(fd);
                    }
                }
            }
            j++;
        }
        updateCount = 0;
    }
}

private int updateSelectedKeys() {
    int entries = pollWrapper.updated;
    int numKeysUpdated = 0;
    for (int i=0; i<entries; i++) {
        // 获取就绪描述符
        int nextFD = pollWrapper.getDescriptor(i);
        // 获取就绪描述符的选择键
        SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
        if (ski != null) {
            // 获取就绪事件
            int rOps = pollWrapper.getEventOps(i);
            if (selectedKeys.contains(ski)) {
                // 设置选择键的就绪操作集
                if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                    numKeysUpdated++;
                }
            } else {
                ski.channel.translateAndSetReadyOps(rOps, ski);
                if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                    selectedKeys.add(ski);
                    numKeysUpdated++;
                }
            }
        }
    }
    return numKeysUpdated;
}

// 对应操作系统的epoll:
// 创建epoll实例
private native int epollCreate();
// epoll事件注册
private native void epollCtl(int epfd, int opcode, int fd, int events);
// epoll等待
private native int epollWait(long pollAddress, int numfds, long timeout, int epfd);
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,076评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,658评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,732评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,493评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,591评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,598评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,601评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,348评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,797评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,114评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,278评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,953评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,585评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,202评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,180评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,139评论 2 352

推荐阅读更多精彩内容