Java IO 从BIO到NIO

BIO

同步并阻塞,服务器实现模式为一个连接一个线程,BIO通信模型实现通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成后,通过输出流返回应答给客户端,线程销毁。

image

服务端(线程池)

public class MultiThreadEchoServer {
    //线程池,用于处理业务
    private static ExecutorService tp = Executors.newCachedThreadPool();

    //处理请求
    static class HandleMsg implements Runnable {
        Socket clientSocket;

        public HandleMsg(Socket clientSocket) {
            this.clientSocket = clientSocket;
        }

        public void run() {
            BufferedReader is = null;
            PrintWriter os = null;
            try {
                System.out.println(Thread.currentThread().getId() + ",线程开始处理。");
                is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                os = new PrintWriter(clientSocket.getOutputStream(), true);
                //从InputStream中读取客户端锁发送的数据
                String inputLine;
                long b = System.currentTimeMillis();
                while ((inputLine = is.readLine()) != null) {
                    System.out.println(Thread.currentThread().getId() + ",from client: " + inputLine + ",用时:" + (
                        System.currentTimeMillis() - b) + "ms");
                    os.println(inputLine);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (is != null)
                        is.close();
                    if (os != null)
                        os.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        ServerSocket echoServer = null;
        Socket clientSocket;
        try {
            echoServer = new ServerSocket(8000);

        } catch (IOException e) {
            System.out.println(e);
        }
        while (true) {
            try {
                clientSocket = echoServer.accept();
                System.out.println(clientSocket.getRemoteSocketAddress() + " connect!");
                tp.execute(new HandleMsg(clientSocket));

            } catch (IOException e) {
                System.out.println(e);
            }
        }
    }
}

客户端(高并发)

public class HeavySocketClient {

    private static ExecutorService tp = Executors.newCachedThreadPool();

    private static final int sleep_time = 1000 * 1000 * 1000;

    public static class EchoClient implements Runnable {
        public void run() {
            Socket client = null;
            PrintWriter writer = null;
            BufferedReader reader = null;
            try {
                client = new Socket();
                client.connect(new InetSocketAddress("localhost", 8000));
                long b = System.currentTimeMillis();
                writer = new PrintWriter(client.getOutputStream(), true);
                writer.println("He");
                LockSupport.parkNanos(sleep_time);
                writer.println("llo");
                LockSupport.parkNanos(sleep_time);
                writer.println("!");
                writer.flush();
                reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
                String readerLine;
                while ((readerLine = reader.readLine()) != null) {
                    System.out.println("from server : " + readerLine);
                }
                long e = System.currentTimeMillis();
                System.out.println("spend:" + (e - b) + "ms");

            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (writer != null) writer.close();
                    if (reader != null) reader.close();
                    if (client != null) client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        EchoClient client = new EchoClient();
        for (int i = 0; i < 3; i++) {
            tp.execute(client);
        }
    }
}

服务端处理日志

/127.0.0.1:58421 connect!
/127.0.0.1:58422 connect!
10,线程开始处理。
/127.0.0.1:58423 connect!
11,线程开始处理。
12,线程开始处理。
12,from client: He,用时:2ms
11,from client: He,用时:2ms
10,from client: He,用时:3ms
11,from client: llo,用时:1002ms
12,from client: llo,用时:1002ms
10,from client: llo,用时:1002ms
11,from client: !,用时:2006ms
12,from client: !,用时:2006ms
10,from client: !,用时:2006ms

缺点

  • 服务端的线程数和并发访问数成线性正比
  • 服务端各个线程的处理时间完全取决于客服端的处理能力,以上例子中绝大部分时间用于IO等待而非服务端本身的业务处理,造成了资源的浪费

NIO

Reactor模型

responds to IO events by dispatching
the appropriate handler
通过调度合适的执行者来相应IO事件
  • 分而治之

1、把处理过程拆分成明确的、职责单一的任务,使得每一个小的任务都可以采用非阻塞的方式才执行。

2、在任务状态是可执行时,才开始执行。
一个IO事件通常可以划分分为:read -> decode -> compute -> encode -> send


image
  • 多路复用

NIO由原来的阻塞读写(占用线程)变成了单线程轮询事件,通过记录跟踪每个I/O流(sock)的状态,来同时管理多个I/O流 ,从而将IO阻塞操作从业务线程中抽取出来。


单线程版的Reactor 模型

image

可扩展性

为了实现可扩展性,我们可以战略性的新增线程来适应多处理器的计算机。
仔细分析一下我们需要的线程,其实主要包括以下几种:

  1. 事件分发器(Reactor),用于选择就绪的事件。
  2. I/O处理器,包括accept、read、write等
  3. 业务线程,在处理完I/O后,用于处理具体的业务。
  • 单个Reactor-工作线程池

Reactor将非IO处理交给其他线程来处理。有的业务逻辑比较繁琐复杂且耗,有的还会有其他的阻塞I/O,如DB操作,RPC等。这些情况下我们就可以使用工作者线程池

  • Reactor线程池

当单一的Reactor线程池模式达到饱和时,还能扩展成多个Reactor,使CPU、IO速率达到负载均衡。


单个Reactor-多线程模式

image


多Reactor模式

image


NIO 基本概念

缓冲区 Buffer

本质上是一块可以存储数据的内存,被封装成了buffer对象而已

通道 Channel

类似于流,但是可以异步读写数据(流只能同步读写),通道是双向的(流是单向的),通道的数据总是要先读到一个buffer 或者 从一个buffer写入,即通道与buffer进行数据交互

选择器 Selectors

相当于一个观察者,用来监听通道感兴趣的事件,一个选择器可以绑定多个通道。Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作

SelectionKeys

维护事件的状态,并与事件绑定,通过SelectionKey来完成IO的读写

  • OP_ACCEPT

服务端收到一个连接请求

  • OP_CONNECT

客户端发起连接

  • OP_READ

当OS的读缓冲区中有数据可读

  • OP_WRITE
    当OS的写缓冲区中有空闲的空间

多线程-单Reactor模式

NIO服务端


/**
 * 使用NIO实现多线程的Echo服务器
 *
 * @author guolinlin
 * @version V1.0
 * @since 2017-08-14 22:48
 */
public class EchoServer {

    //用于处理所有网络连接
    private Selector selector;

    //用于统计服务器线程在一个客户端上花费的时间
    public static Map<Socket, Long> time_stat = new HashMap<Socket, Long>(10240);
    SimpleDateFormat df = new SimpleDateFormat("hh:mm:ss:SSS");

    private void startServer() throws Exception {
        //获得selector实例
        selector = SelectorProvider.provider().openSelector();
        //获得SocketChannel实例
        ServerSocketChannel socketChannel = ServerSocketChannel.open();
        //设置为非阻塞模式
        socketChannel.configureBlocking(false);

        //绑定端口
        InetSocketAddress isa = new InetSocketAddress(8000);
        socketChannel.socket().bind(isa);

        /*
        将ServerSocketChannel绑定到selector上,并注册它感兴趣的事件为accept,当selector发现
        ServerSocketChannel有新的客户端连接时,就会通知ServerSocketChannel进行处理
        */
        socketChannel.register(selector, SelectionKey.OP_ACCEPT);

        //循环,等待-分发网络信息
        for (; ; ) {
            //获取已经准备就绪的SelectionKey
            selector.select();
            Set readyKeys = selector.selectedKeys();
            Iterator i = readyKeys.iterator();
            long e = 0;
            while (i.hasNext()) {
                SelectionKey selectionKey = (SelectionKey) i.next();
                i.remove();
                if (selectionKey.isAcceptable()) {
                    doAccept(selectionKey);
                } else if (selectionKey.isValid() && selectionKey.isReadable()) {
                    if (!time_stat.containsKey(((SocketChannel) selectionKey.channel()).socket())) {
                        time_stat.put(((SocketChannel) selectionKey.channel()).socket(), System.currentTimeMillis());
                    }
                    doRead(selectionKey);
                } else if (selectionKey.isValid() && selectionKey.isWritable()) {
                    doWrite(selectionKey);
                    e = System.currentTimeMillis();
                    long b = time_stat.remove(((SocketChannel) selectionKey.channel()).socket());
                    System.out.println("spead:" + (e - b) + "ms");
                } else if (selectionKey.isConnectable()) {
                    System.out.println("isConnectable = true");
                }
            }
        }

    }

    private void doAccept(SelectionKey selectionKey) {

        ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
        //域客户端通信的通道
        SocketChannel clientChannel;
        try {
            clientChannel = server.accept();
            //设置成非阻塞模式,要求系统在准备好IO后,再通知我们的线程来读取或者写入
            clientChannel.configureBlocking(false);
            //将这个channel注册到selector,并告诉Selector,我现在对读操作感兴趣
            SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);
            EchoClient echoClient = new EchoClient();
            clientKey.attach(echoClient);
            System.out.println(
                Thread.currentThread().getId() + ",do accept from port:" + clientChannel.socket().getPort() + "," + df
                    .format(new Date()));

        } catch (Exception e) {
            System.out.println("Failed to accept new client.");
            e.printStackTrace();
        }
    }

    private void doRead(SelectionKey selectionKey) {
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        System.out.println(
            Thread.currentThread().getId() + ",do Read from port:" + channel.socket().getPort() + "," + df
                .format(new Date()));
        ByteBuffer bb = ByteBuffer.allocate(8192);
        int len;
        try {
            len = channel.read(bb);
            if (len < 0) {
                channel.close();
                selectionKey.cancel();
                return;
            }

        } catch (Exception e) {
            System.out.println("Failed to read from client.");
            e.printStackTrace();
            selectionKey.cancel();
            return;
        }
        bb.flip();
        //业务处理
        new WorkHandle(selectionKey, bb);
    }

    public void doWrite(SelectionKey selectionKey) {
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        System.out.println(
            Thread.currentThread().getId() + ",do Write from port:" + channel.socket().getPort() + "," + df
                .format(new Date()));

        EchoClient echoClient = (EchoClient) selectionKey.attachment();
        LinkedList<ByteBuffer> outq = echoClient.getOutoutQueue();

        ByteBuffer bb = outq.getLast();
        try {
            int len = channel.write(bb);
            if (len == -1) {
                channel.close();
                selectionKey.cancel();
                return;
            }
            if (bb.remaining() == 0) {
                outq.remove();
            }

        } catch (Exception e) {
            System.out.println("Failed to write from client.");
            e.printStackTrace();
            try {
                channel.close();
            } catch (IOException e1) {

            }
            selectionKey.cancel();
            return;
        }
        if (outq.size() == 0) {
            selectionKey.interestOps(SelectionKey.OP_READ);
        }
    }

    public static void main(String[] args) {
        EchoServer echoServer = new EchoServer();
        try {
            echoServer.startServer();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

/**
 * 工作线程
 *
 * @author guolinlin
 * @version V1.0
 * @since 2017-08-14 23:08
 */
public class WorkHandle {

    //用于对每个客户端进行相应的处理
    private static final ExecutorService service = Executors.newCachedThreadPool();

    private SelectionKey selectionKey;

    private ByteBuffer bb;

    public WorkHandle(SelectionKey selectionKey, ByteBuffer bb) {
        this.selectionKey = selectionKey;
        this.bb = bb;
        start();
    }

    public void start() {
        service.submit(() -> {
            System.out.println(Thread.currentThread().getId() + ",HandleMsg...");
            EchoClient echoClient = (EchoClient) selectionKey.attachment();
            echoClient.enqueue(bb);
            selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            //强迫selector强制返回
            selectionKey.selector().wakeup();
        });
    }
}

NIO 服务端日志

1,do accept from port:62855,05:17:33:919
1,do accept from port:62856,05:17:33:920
1,do Read from port:62855,05:17:33:921
10,HandleMsg...
1,do accept from port:62857,05:17:33:925
1,do Read from port:62856,05:17:33:925
1,do Read from port:62857,05:17:33:926
11,HandleMsg...
1,do Write from port:62855,05:17:33:926
12,HandleMsg...
spead:6ms
1,do Write from port:62857,05:17:33:928
spead:2ms
1,do Write from port:62856,05:17:33:928
spead:3ms
1,do Read from port:62856,05:17:34:917
1,do Read from port:62855,05:17:34:918
12,HandleMsg...
1,do Read from port:62857,05:17:34:918
11,HandleMsg...
12,HandleMsg...
1,do Write from port:62856,05:17:34:918
spead:1ms
1,do Write from port:62855,05:17:34:918
spead:0ms
1,do Write from port:62857,05:17:34:918
spead:0ms
1,do Read from port:62857,05:17:35:922
1,do Read from port:62856,05:17:35:922
12,HandleMsg...
11,HandleMsg...
1,do Read from port:62855,05:17:35:922
11,HandleMsg...
1,do Write from port:62857,05:17:35:923
spead:1ms
1,do Write from port:62856,05:17:35:923
spead:1ms
1,do Write from port:62855,05:17:35:923
spead:1ms

多Reactor模式

多Reactor,也即一个主Reactor负责监控所有的连接请求,多个子Reactor负责监控并处理读/写请求,减轻了主Reactor的压力,降低了主Reactor压力太大而造成的延迟。
并且每个子Reactor分别属于一个独立的线程,每个成功连接后的Channel的所有操作由同一个线程处理。这样保证了同一请求的所有状态和上下文在同一个线程中,避免了不必要的上下文切换,同时也方便了监控请求响应状态。

/**
 * 多Reactor
 *
 * @author guolinlin
 * @version V1.0
 * @since 2017-08-25 21:06
 */
public class MultiReactorService {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(8000));
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        //获取系统CPU核数
        int coreNum = Runtime.getRuntime().availableProcessors();
        SubReactor[] subReactors = new SubReactor[coreNum];
        for (int i = 0; i < subReactors.length; i++) {
            subReactors[i] = new SubReactor();
        }
        int index = 0;
        while (selector.select() > 0) {
            Set<SelectionKey> keys = selector.selectedKeys();
            for (SelectionKey key : keys) {
                keys.remove(key);

                if (key.isAcceptable()) {
                    ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = acceptServerSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    System.out.println("Accept request from:" + socketChannel.getRemoteAddress());
                    SubReactor subReactor = subReactors[((index++) % coreNum)];
                    subReactor.addChannel(socketChannel);
                    subReactor.wakeup();
                }
            }
        }
    }
}
/**
 * 用于读写的Reactor
 *
 * @author guolinlin
 * @version V1.0
 * @since 2017-08-25 21:10
 */
public class SubReactor {
    private static final ExecutorService service = Executors
        .newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
    private Selector selector;

    public SubReactor() throws IOException {
        this.selector = SelectorProvider.provider().openSelector();
        start();
    }

    public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
        socketChannel.register(this.selector, SelectionKey.OP_READ);
    }

    public void wakeup() {
        this.selector.wakeup();
    }

    public void start() {
        service.submit(() -> {
            while (true) {
                if (selector.select(500) <= 0) {
                    continue;
                }
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isReadable()) {
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        int count = socketChannel.read(buffer);
                        if (count < 0) {
                            socketChannel.close();
                            key.cancel();
                            System.out.println(socketChannel + "Read end");
                            continue;
                        } else if (count == 0) {
                            System.out.println(socketChannel + "Message size is 0");
                            continue;
                        } else {
                            System.out.println(socketChannel + "Read message :" + new String(buffer.array()));
                        }
                    }
                }
            }
        });
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容