Java IO, NIO, AIO和Netty

背景

最近在回顾一下Java IO相关的知识,顺带写一下入门级别的文章。感觉工作以后很少写文章,一直想写点高质量的文章导致最后一篇文章都很难写。所以不写原理,只写实践,随大流,有问题请留言。(后续有时间再补充原理性的东西,从硬件到操作系统到JVM到JDK)

实现案例

创建一个server,可以接受多个client端的连接,接收到信息后返回一个接收到的信息。

传统IO实现

传统的IO就是我们所说的BIO(block io),

server端源码如下

package tech.sohocoder.postman.io;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class Server {

    private ServerSocket serverSocket;


    private void start() throws IOException, ClassNotFoundException {
        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 9000);
        serverSocket = new ServerSocket();
        serverSocket.bind(inetSocketAddress);
        ExecutorService executorService = Executors.newCachedThreadPool(new CaughtExceptionsThreadFactory());
        while (true) {
            Socket socket = serverSocket.accept();
            System.out.println("accept socket: " + socket.getRemoteSocketAddress());
            executorService.submit(new SocketHandler(socket));
        }
    }

    private static class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
        @Override public void uncaughtException(Thread t, Throwable e) {
            e.printStackTrace();
        }
    }

    private class SocketHandler implements Runnable {

        private Socket socket;

        public SocketHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                    String message = ois.readObject().toString();
                    System.out.println("Message Received: " + message);
                    ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                    //write object to Socket
                    oos.writeObject("Hi Client " + message);
                    if (message.equals("quit")) {
                        ois.close();
                        oos.close();
                        socket.close();
                        break;
                    }
                }
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException {
        Server server = new Server();
        server.start();
    }
}

client端源码如下

package tech.sohocoder.postman.io;

import java.io.*;
import java.net.Socket;

public class Client {

    private Socket socket;

    public void start() throws IOException, ClassNotFoundException {
        socket = new Socket("localhost", 9000);
        if(socket.isConnected()) {
            System.out.println("socket is connected");
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            for (;;) {
                final String input = in.readLine();
                final String line = input != null ? input.trim() : null;
                if (null == line || line.isEmpty()) { // skip `enter` or `enter` with spaces.
                    continue;
                }
                ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                oos.writeObject(line);
                ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                System.out.println("Message: " + ois.readObject());
                if(line.equals("quit")) {
                    oos.close();
                    ois.close();
                    socket.close();
                    break;
                }
            }
        }
        System.out.println("Bye");
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException {
        Client client = new Client();
        client.start();
    }
}

NIO的阻塞实现

NIO实际上就是面向缓存及通道的新型IO(由JSR 51定义,后面JSR 203进行了扩展,有兴趣阅读一下这两个JSR)可以支持阻塞和非阻塞方式。先实现一下阻塞方式

client

package tech.sohocoder.nio.block;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

import static java.lang.System.out;

public class Client {

    private SocketChannel socketChannel;

    public void start() throws IOException {
        socketChannel = SocketChannel.open();
        SocketAddress socketAddress = new InetSocketAddress("localhost", 9000);
        socketChannel.connect(socketAddress);

        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        for (;;) {
            final String input = in.readLine();
            final String line = input != null ? input.trim() : null;
            if (null == line || line.isEmpty()) { // skip `enter` or `enter` with spaces.
                continue;
            }

            ByteBuffer byteBuffer = ByteBuffer.wrap(line.getBytes());
            socketChannel.write(byteBuffer);

            if(line.equals("quit")) {
                out.println("quit!");
                socketChannel.close();
                break;
            }

            ByteBuffer returnByteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(returnByteBuffer);
            String message = new String(returnByteBuffer.array()).trim();
            out.println("Receive message: " + message);
        }
    }

    public static void main(String[] args) throws IOException {
        tech.sohocoder.nio.noblock.Client client = new tech.sohocoder.nio.noblock.Client();
        client.start();
    }
}

server

package tech.sohocoder.nio.block;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class Server {

    private ServerSocketChannel serverSocketChannel;

    private void start() throws IOException {
        serverSocketChannel = ServerSocketChannel.open();
        SocketAddress socketAddress = new InetSocketAddress(9000);
        serverSocketChannel.bind(socketAddress);

        while (true) {
            System.out.println("listening...");
            SocketChannel socketChannel = serverSocketChannel.accept();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            int readLength = socketChannel.read(byteBuffer);
            if(readLength != -1) {
                String receiveStr = new String(byteBuffer.array()).trim();
                System.out.println(receiveStr);
                socketChannel.write(byteBuffer);
            }
            socketChannel.close();
        }
    }

    public static void main(String[] args) throws IOException {
        Server server = new Server();
        server.start();
    }
}

NIO的非阻塞方式

NIO如果需要非阻塞,需要使用到selector。selector是在JDK1.4加入,主要是用于支持IO多路复用,Linux下jdk实现就是基于epoll。

client端代码保存一致。

server端实际上就是使用一个线程来支持多个连接

package tech.sohocoder.nio.noblock;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

import static java.lang.System.out;

public class Server {

    private ServerSocketChannel serverSocketChannel;

    private Selector selector;

    private void start() throws IOException, InterruptedException {
        serverSocketChannel = ServerSocketChannel.open();
        SocketAddress socketAddress = new InetSocketAddress(9000);
        serverSocketChannel.bind(socketAddress);

        serverSocketChannel.configureBlocking(false);

        int opSelectionKey = serverSocketChannel.validOps();

        selector = Selector.open();

        SelectionKey selectionKey = serverSocketChannel.register(selector, opSelectionKey);

        out.println(selector);
        out.println(selectionKey);
        while(true) {
            out.println("waiting for connected...");
            selector.select();
            Set<SelectionKey> set  = selector.selectedKeys();
            Iterator<SelectionKey> iterator = set.iterator();
            while (iterator.hasNext()) {
                SelectionKey mySelectionKey = iterator.next();
                if(mySelectionKey.isAcceptable()) {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    SelectionKey selectionKey1 = socketChannel.register(selector, SelectionKey.OP_READ);
                    out.println("socket channel selectionkey: " + selectionKey1);
                    out.println("connect from : " + socketChannel.getRemoteAddress());
                }else if(mySelectionKey.isReadable()){
                    SocketChannel socketChannel = (SocketChannel) mySelectionKey.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    socketChannel.read(byteBuffer);
                    String message = new String(byteBuffer.array()).trim();
                    out.println("Receive message: " + message);
                    if(message.equals("quit")) {
                        out.println("close connection: " + socketChannel.getRemoteAddress());
                        socketChannel.close();
                        mySelectionKey.cancel();
                    }else {
                        ByteBuffer returnByteBuffer = ByteBuffer.wrap(" receive your message".getBytes());
                        socketChannel.write(returnByteBuffer);
                    }
                }
                iterator.remove();
            }
        }

    }

    public static void main(String[] args) throws IOException, InterruptedException {
        Server server = new Server();
        server.start();
    }
}

AIO实现

上面的IO,NIO的阻塞实际上是同步阻塞的方式,NIO的非阻塞是同步非阻塞方式。AIO(asynchronous I/O))是异步IO,实现是异步非阻塞方式,在jdk1.7中引入。

server端源码如下:

package tech.sohocoder.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static java.lang.System.out;

public class Server {

    private AsynchronousServerSocketChannel asynchronousServerSocketChannel;

    private void start() throws IOException, InterruptedException {
        // worker thread pool
        AsynchronousChannelGroup asynchronousChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 4);
        asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(asynchronousChannelGroup);
        int port = 9000;
        InetSocketAddress socketAddress = new InetSocketAddress("localhost", port);
        asynchronousServerSocketChannel.bind(socketAddress);

        out.println("Starting listening on port " + port);
        // add handler
        asynchronousServerSocketChannel.accept(asynchronousServerSocketChannel, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel asynchronousSocketChannel, Object o) {

                try {
                    out.println("connect from : " + asynchronousSocketChannel.getRemoteAddress());
                } catch (IOException e) {
                    e.printStackTrace();
                }
                // accept next connection
                asynchronousServerSocketChannel.accept(asynchronousServerSocketChannel, this);
                while (true) {
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    Future<Integer> future = asynchronousSocketChannel.read(byteBuffer);
                    try {
                        future.get();
                        String message = new String(byteBuffer.array()).trim();
                        out.println("Receive message: " + message);
                        if (message.equals("quit")) {
                            out.println("close client: " + asynchronousSocketChannel.getRemoteAddress());
                            asynchronousSocketChannel.close();
                            break;
                        }

                        ByteBuffer returnByteBuffer = ByteBuffer.wrap("receive your message".getBytes());
                        Future<Integer> returnFuture = asynchronousSocketChannel.write(returnByteBuffer);
                        returnFuture.get();

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }

            @Override
            public void failed(Throwable throwable, Object o) {
                out.println("error to accept: " + throwable.getMessage());
            }
        });
        asynchronousChannelGroup.awaitTermination(Integer.MAX_VALUE, TimeUnit.MINUTES);
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        Server server = new Server();
        server.start();
    }
}

Netty实现

Netty是java中使用很广泛的库,既可以实现NIO也可以实现AIO,还是针对上面的例子来实现一下

server端

package tech.sohocoder.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import static java.lang.System.out;

public class Server {

    private void start() throws InterruptedException {
        EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // add handler into pipeline
                            socketChannel.pipeline()
                                    .addLast(new StringDecoder())
                                    .addLast(new StringEncoder())
                                    .addLast(new ServerHandler());
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
            out.println("listening...");
            channelFuture.channel().closeFuture().sync();
        }finally {
           bossEventLoopGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        Server server = new Server();
        server.start();
    }

}

这里面需要使用到ServerHandler,具体代码如下

package tech.sohocoder.netty;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;

import java.net.SocketAddress;

import static java.lang.System.out;

public class ServerHandler extends ChannelDuplexHandler {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        out.println("Receive message: " + msg);
        String message = "receive your message";
        ctx.writeAndFlush(message);
    }

    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        super.connect(ctx, remoteAddress, localAddress, promise);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        out.println("connect from: " + ctx.channel().remoteAddress().toString());
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        out.println("close connection: " + ctx.channel().remoteAddress().toString());
        super.channelInactive(ctx);
    }

}

client端也用netty写一下

package tech.sohocoder.aio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

import static java.lang.System.out;

public class Client {

    private SocketChannel socketChannel;

    public void start() throws IOException {
        socketChannel = SocketChannel.open();
        SocketAddress socketAddress = new InetSocketAddress("localhost", 9000);
        socketChannel.connect(socketAddress);
        if(socketChannel.isConnected()) {
            out.println("connect to " + socketChannel.getRemoteAddress());
        }

        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        for (;;) {
            final String input = in.readLine();
            final String line = input != null ? input.trim() : null;
            if (null == line || line.isEmpty()) { // skip `enter` or `enter` with spaces.
                continue;
            }

            ByteBuffer byteBuffer = ByteBuffer.wrap(line.getBytes());
            socketChannel.write(byteBuffer);

            if(line.equals("quit")) {
                out.println("quit!");
                socketChannel.close();
                break;
            }

            ByteBuffer returnByteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(returnByteBuffer);
            String message = new String(returnByteBuffer.array()).trim();
            out.println("Receive message: " + message);
        }
    }

    public static void main(String[] args) throws IOException {
        Client client = new Client();
        client.start();
    }
}

同样要实现一个ClientHandler

package tech.sohocoder.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import static java.lang.System.out;

public class ClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        out.println("Receive message: " + s);
    }
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容