初识NIO之Java小Demo

Java中的IO、NIO、AIO:

BIO:在Java1.4之前,我们建立网络连接均使用BIO,属于同步阻塞IO。默认情况下,当有一条请求接入就有一条线程专门接待。所以,在客户端向服务端请求时,会询问是否有空闲线程进行接待,如若没有则一直等待或拒接。当并发量小时还可以接受,当请求量一多起来则会有许多线程生成,在Java中,多线程的上下文切换会消耗计算机有限的资源和性能,造成资源浪费。

NIO:NIO的出现是为了解决再BIO下的大并发量问题。其特点是能用一条线程管理所有连接。如下图所示:


图片来自网络

NIO是同步非阻塞模型,通过一条线程控制选择器(Selector)来管理多个Channel,减少创建线程和上下文切换的浪费。当线程通过选择器向某条Channel请求数据但其没有数据时,是不会阻塞的,直接返回,继续干其他事。而一旦某Channel就绪,线程就能去调用请求数据等操作。当该线程对某条Channel进行写操作时同样不会被阻塞,所以该线程能够对多个Channel进行管理。

NIO是面向缓冲流的,即数据写入写出都是通过 Channel —— Buffer 这一途径。(双向流通)

AIO:与之前两个IO模型不同的是,AIO属于异步非阻塞模型。当进行读写操作时只须调用api的read方法和write方法,这两种方法均是异步。对于读方法来说,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。换言之就是当调用完api后,操作系统完成后会调用回调函数。

总结:一般IO分为同步阻塞模型(BIO),同步非阻塞模型(NIO),异步阻塞模型,异步非阻塞模型(AIO)

同步阻塞模型指的是当调用io操作时必须等到其io操作结束

同步非阻塞模型指当调用io操作时不必等待可以继续干其他事,但必须不断询问io操作是否完成。

异步阻塞模型指应用调用io操作后,由操作系统完成io操作,但应用必须等待或去询问操作系统是否完成。

异步非阻塞指应用调用io操作后,由操作系统完成io操作并调用回调函数,应用完成放手不管。

NIO的小Demo之服务端

首先,先看下服务端的大体代码

public class ServerHandle implements Runnable{
    //带参数构造函数
    public ServerHandle(int port){
        
    }
    //停止方法
    public void shop(){
        
    }
    //写方法
    private void write(SocketChannel socketChannel, String  response)throws IOException{
        
    }
    //当有连接进来时的处理方法
    private void handleInput(SelectionKey key) throws IOException{
        
    } 
    
    //服务端运行主体方法
    @Override
    public void run() {
    
    }
}

首先我们先看看该服务端的构造函数的实现:

public ServerHandle(int port){
        try {
            //创建选择器
            selector = Selector.open();
            //打开监听通道
            serverSocketChannel = ServerSocketChannel.open();
            //设置为非阻塞模式
            serverSocketChannel.configureBlocking(false);
            //传入端口,并设定连接队列最大为1024
            serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
            //监听客户端请求
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            //标记启动标志
            started = true;
            System.out.println("服务器已启动,端口号为:" + port);
        } catch (IOException e){
            e.printStackTrace();
            System.exit(1);
        }
    }

在这里创建了选择器和监听通道,并将该监听通道注册到选择器上并选择其感兴趣的事件(accept)。后续其他接入的连接都将通过该 监听通道 传入。

然后就是写方法的实现:

    private void doWrite(SocketChannel channel, String response) throws IOException {
        byte[] bytes = response.getBytes();
        ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);
        wirteBuffer.put(bytes);
        //将写模式改为读模式
        wirteBuffer.flip();
        //写入管道
        channel.write(wirteBuffer);
    }

其次是当由事件传入时,即对连接进来的链接的处理方法

    private void handleInput(SelectionKey key) throws IOException{
        //当该键可用时
        if (key.isValid()){
            if (key.isAcceptable()){
                //返回该密钥创建的通道。
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                通过该通道获取链接进来的通道
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()){
                //返回该密钥创建的通道。
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                int readBytes = socketChannel.read(byteBuffer);
                if (readBytes > 0){
                    byteBuffer.flip();
                    byte[] bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes);
                    String expression = new String(bytes, "UTF-8");
                    System.out.println("服务器收到的信息:" + expression);
                    //此处是为了区别打印在工作台上的数据是由客户端产生还是服务端产生
                    doWrite(socketChannel, "+++++" + expression + "+++++");
                } else if(readBytes == 0){
                    //无数据,忽略
                }else if (readBytes < 0){
                    //资源关闭
                    key.cancel();
                    socketChannel.close();
                }
            }
        }
    }

这里要说明的是,只要ServerSocketChannel及SocketChannel向Selector注册了特定的事件,Selector就会监控这些事件是否发生。
如在构造方法中有一通道serverSocketChannel注册了accept事件。当其就绪时就可以通过调用selector的selectorKeys()方法,访问”已选择键集“中的就绪通道。

压轴方法:

    @Override
    public void run() {
        //循环遍历
        while (started) {
            try {
                //当没有就绪事件时阻塞
                selector.select();
                //返回就绪通道的键
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                SelectionKey key;
                while (iterator.hasNext()){
                    key = iterator.next();
                    //获取后必须移除,否则会陷入死循环
                    iterator.remove();
                    try {
                        //对就绪通道的处理方法,上述有描述
                        handleInput(key);
                    } catch (Exception e){
                        if (key != null){
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch (Throwable throwable){
                throwable.printStackTrace();
            }
        }
    }

此方法为服务端的主体方法。大致流程如下:

  1. 打开ServerSocketChannel,监听客户端连接
  2. 绑定监听端口,设置连接为非阻塞模式(阻塞模式下不能注册到选择器)
  3. 创建Reactor线程,创建选择器并启动线程
  4. 将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件
  5. Selector轮询准备就绪的key
  6. Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,简历物理链路
  7. 设置客户端链路为非阻塞模式
  8. 将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息
    异步读取客户端消息到缓冲区
  9. 调用write将消息异步发送给客户端

NIO的小Demo之客户端

public class ClientHandle implements Runnable{
    //构造函数,构造时顺便绑定
    public ClientHandle(String ip, int port){
        
    }
    //处理就绪通道
    private void handleInput(SelectionKey key) throws IOException{
        
    }
    //写方法(与服务端的写方法一致)
    private void doWrite(SocketChannel channel,String request) throws IOException{
        
    }
    //连接到服务端
    private void doConnect() throws IOException{
        
    }
    //发送信息
    public void sendMsg(String msg) throws Exception{
        
    }
}

首先先看构造函数的实现:

    public ClientHandle(String ip,int port) {
        this.host = ip;
        this.port = port;
        try{
            //创建选择器
            selector = Selector.open();
            //打开监听通道
            socketChannel = SocketChannel.open();
            //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
            socketChannel.configureBlocking(false);
            started = true;
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
    }

接下来看对就绪通道的处理办法:

    private void handleInput(SelectionKey key) throws IOException{
        if(key.isValid()){
            SocketChannel sc = (SocketChannel) key.channel();
            if(key.isConnectable()){
                //这里的作用将在后面的代码(doConnect方法)说明
                if(sc.finishConnect()){
                    System.out.println("已连接事件");
                }
                else{
                    System.exit(1);
                }
            }
            //读消息
            if(key.isReadable()){
                //创建ByteBuffer,并开辟一个1k的缓冲区
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //读取请求码流,返回读取到的字节数
                int readBytes = sc.read(buffer);
                //读取到字节,对字节进行编解码
                if(readBytes>0){
                    buffer.flip();
                    //根据缓冲区可读字节数创建字节数组
                    byte[] bytes = new byte[buffer.remaining()];
                    //将缓冲区可读字节数组复制到新建的数组中
                    buffer.get(bytes);
                    String result = new String(bytes,"UTF-8");
                    System.out.println("客户端收到消息:" + result);
                }lse if(readBytes==0){
                    //忽略
                }else if(readBytes<0){
                    //链路已经关闭,释放资源
                    key.cancel();
                    sc.close();
                }
            }
        }
    }

在run方法之前需先看下此方法的实现:

    private void doConnect() throws IOException{
        
        if(socketChannel.connect(new InetSocketAddress(host,port))){
            System.out.println("connect");
        }
        else {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            System.out.println("register");
        }
    }

当SocketChannel工作于非阻塞模式下时,调用connect()时会立即返回:
如果连接建立成功则返回的是true(比如连接localhost时,能立即建立起连接),否则返回false。

在非阻塞模式下,返回false后,必须要在随后的某个地方调用finishConnect()方法完成连接。
当SocketChannel处于阻塞模式下时,调用connect()时会进入阻塞,直至连接建立成功或者发生IO错误时,才从阻塞状态中退出。

所以该代码在connect服务端后返回false(但还是有作用的),并在else语句将该通道注册在选择器上并选择connect事件。

客户端的run方法:

    @Override
    public void run() {
        try{
            doConnect();
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
        //循环遍历selector
        while(started){
            try{
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key ;
                while(it.hasNext()){
                    key = it.next();
                    it.remove();
                    try{
                        handleInput(key);
                    }catch(Exception e){
                        if(key != null){
                            key.cancel();
                            if(key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch(Exception e){
                e.printStackTrace();
                System.exit(1);
            }
        }
        //selector关闭后会自动释放里面管理的资源
        if(selector != null){
            try{
                selector.close();
            }catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

发送信息到服务端的方法:

    public void sendMsg(String msg) throws Exception{
        //覆盖其之前感兴趣的事件(connect),将其更改为OP_READ
        socketChannel.register(selector, SelectionKey.OP_READ);
        doWrite(socketChannel, msg);
    }

完整代码:

服务端:

/**
 * Created by innoyiya on 2018/8/20.
 */
public class Service {
    private static int DEFAULT_POST = 12345;
    private static ServerHandle serverHandle;
    public static void start(){
        start(DEFAULT_POST);
    }

    public static synchronized void start(int post) {
        if (serverHandle != null){
            serverHandle.shop();
        }
        serverHandle = new ServerHandle(post);
        new Thread(serverHandle,"server").start();
    }
}

服务端主体:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by innoyiya on 2018/8/20.
 */
public class ServerHandle implements Runnable{

    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private volatile boolean started;

    public ServerHandle(int port){
        try {
            //创建选择器
            selector = Selector.open();
            //打开监听通道
            serverSocketChannel = ServerSocketChannel.open();
            //设置为非阻塞模式
            serverSocketChannel.configureBlocking(false);
            //判定端口,并设定连接队列最大为1024
            serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
            //监听客户端请求
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            //标记启动标志
            started = true;
            System.out.println("服务器已启动,端口号为:" + port);
        } catch (IOException e){
            e.printStackTrace();
            System.exit(1);
        }
    }
    public void shop(){
        started = false;
    }

    private void doWrite(SocketChannel channel, String response) throws IOException {
        byte[] bytes = response.getBytes();
        ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);
        wirteBuffer.put(bytes);
        wirteBuffer.flip();
        channel.write(wirteBuffer);
    }

    private void handleInput(SelectionKey key) throws IOException{
        if (key.isValid()){
            if (key.isAcceptable()){
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()){
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                int readBytes = socketChannel.read(byteBuffer);
                if (readBytes > 0){
                    byteBuffer.flip();
                    byte[] bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes);
                    String expression = new String(bytes, "UTF-8");
                    System.out.println("服务器收到的信息:" + expression);
                    doWrite(socketChannel, "+++++" + expression + "+++++");
                } else if (readBytes < 0){
                    key.cancel();
                    socketChannel.close();
                }
            }
        }
    }

    @Override
    public void run() {
        //循环遍历
        while (started) {
            try {
                selector.select();
                //System.out.println(selector.select());
                Set<SelectionKey> keys = selector.selectedKeys();
                //System.out.println(keys.size());
                Iterator<SelectionKey> iterator = keys.iterator();
                SelectionKey key;
                while (iterator.hasNext()){
                    key = iterator.next();
                    iterator.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e){
                        if (key != null){
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch (Throwable throwable){
                throwable.printStackTrace();
            }
        }
    }
}

客户端:

/**
 * Created by innoyiya on 2018/8/20.
 */
public class Client {
    private static String DEFAULT_HOST = "localhost";
    private static int DEFAULT_PORT = 12345;
    private static ClientHandle clientHandle;
    private static final String EXIT = "exit";

    public static void start() {
        start(DEFAULT_HOST, DEFAULT_PORT);
    }

    public static synchronized void start(String ip, int port) {
        if (clientHandle != null){
            clientHandle.stop();
        }
        clientHandle = new ClientHandle(ip, port);
        new Thread(clientHandle, "Server").start();
    }

    //向服务器发送消息
    public static boolean sendMsg(String msg) throws Exception {
        if (msg.equals(EXIT)){
            return false;
        }
        clientHandle.sendMsg(msg);
        return true;
    }

}

客户端主体代码:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by innoyiya on 2018/8/20.
 */

public class ClientHandle implements Runnable{
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean started;

    public ClientHandle(String ip,int port) {
        this.host = ip;
        this.port = port;
        try{
            //创建选择器
            selector = Selector.open();
            //打开监听通道
            socketChannel = SocketChannel.open();
            //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
            socketChannel.configureBlocking(false);
            started = true;
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
    }
    public void stop(){
        started = false;
    }
    
    private void handleInput(SelectionKey key) throws IOException{
        if(key.isValid()){
            SocketChannel sc = (SocketChannel) key.channel();
            if(key.isConnectable()){
                if(sc.finishConnect()){
                    System.out.println("已连接事件");
                }
                else{
                    System.exit(1);
                }
            }
            //读消息
            if(key.isReadable()){
                //创建ByteBuffer,并开辟一个1M的缓冲区
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //读取请求码流,返回读取到的字节数
                int readBytes = sc.read(buffer);
                //读取到字节,对字节进行编解码
                if(readBytes>0){
                    //将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
                    buffer.flip();
                    //根据缓冲区可读字节数创建字节数组
                    byte[] bytes = new byte[buffer.remaining()];
                    //将缓冲区可读字节数组复制到新建的数组中
                    buffer.get(bytes);
                    String result = new String(bytes,"UTF-8");
                    System.out.println("客户端收到消息:" + result);
                } else if(readBytes<0){
                    key.cancel();
                    sc.close();
                }
            }
        }
    }
    //异步发送消息
    private void doWrite(SocketChannel channel,String request) throws IOException{
        byte[] bytes = request.getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        writeBuffer.put(bytes);
        //flip操作
        writeBuffer.flip();
        //发送缓冲区的字节数组
        channel.write(writeBuffer);

    }
    private void doConnect() throws IOException{
        if(socketChannel.connect(new InetSocketAddress(host,port))){
            System.out.println("connect");
        }
        else {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            System.out.println("register");
        }
    }
    public void sendMsg(String msg) throws Exception{
        //覆盖其之前感兴趣的事件,将其更改为OP_READ
        socketChannel.register(selector, SelectionKey.OP_READ);
        doWrite(socketChannel, msg);
    }

    @Override
    public void run() {
        try{
            doConnect();
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
        //循环遍历selector
        while(started){
            try{
                selector.select();

                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key ;
                while(it.hasNext()){
                    key = it.next();
                    it.remove();
                    try{
                        handleInput(key);
                    }catch(Exception e){
                        if(key != null){
                            key.cancel();
                            if(key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch(Exception e){
                e.printStackTrace();
                System.exit(1);
            }
        }
        //selector关闭后会自动释放里面管理的资源
        if(selector != null){
            try{
                selector.close();
            }catch (Exception e) {
                e.printStackTrace();
            }
        }

    }
}

测试类:

import java.util.Scanner;

/**
 * Created by innoyiya on 2018/8/20.
 */
public class Test {
    public static void main(String[] args) throws Exception {
        Service.start();
        Thread.sleep(1000);
        Client.start();
        while(Client.sendMsg(new Scanner(System.in).nextLine()));
    }
}

控制台打印:

服务器已启动,端口号为:12345
register
已连接事件
1234
服务器收到的信息:1234
客户端收到消息:+++++1234+++++
5678
服务器收到的信息:5678
客户端收到消息:+++++5678+++++

如有不妥之处,请告诉我。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351

推荐阅读更多精彩内容