Java多线程NIO学习

原文:https://juejin.im/post/5cb2f8d7e51d456e500f7cf3

阻塞IO 如果数据没有准备就绪,就一直等待,直到数据准备就绪;整个进程会被阻塞。

非阻塞IO 需不断询问内核是否已经准备好数据,非阻塞虽然不用等待但是一直占用CPU。

多路复用IO NIO 多路复用IO,会有一个线程不断地去轮询多个socket的状态,当socket有读写事件的时候才会调用IO读写操作。 用一个线程管理多个socket,是通过selector.select()查询每个通道是否有事件到达,如果没有事件到达,则会一直阻塞在那里,因此也会带来线程阻塞问题。

信号驱动IO模型 在信号驱动IO模型中,当用户发起一个IO请求操作时,会给对应的socket注册一个信号函数,线程会继续执行,当数据准备就绪的时候会给线程发送一个信号,线程接受到信号时,会在信号函数中进行IO操作。 非阻塞IO、多路复用IO、信号驱动IO都不会造成IO操作的第一步,查看数据是否准备就绪而带来的线程阻塞,但是在第二步,对数据进行拷贝都会使线程阻塞。

异步IO jdk7AIO 异步IO是最理想的IO模型,当线程发出一个IO请求操作时,接着就去做自己的事情了,内核去查看数据是否准备就绪和准备就绪后对数据的拷贝,拷贝完以后内核会给线程发送一个通知说整个IO操作已经完成了,数据可以直接使用了。 同步的IO操作在第二个阶段,对数据的拷贝阶段,都会造成线程的阻塞,异步IO则不会。

异步IO在IO操作的两个阶段,都不会使线程阻塞。 Java 的 I/O 依赖于操作系统的实现。

Java NIO的工作原理

由一个专门的线程(Selector)来处理所有的IO事件,并负责分发。

事件驱动机制:事件到的时候触发,而不是同步的去监视事件。

线程通讯:线程之间通过 wait,notify 等方式通讯。保证每次上下文切换都是有意义的。减少无谓的线程切换。

三大基本组件

Channel

FileChannel, 从文件中读写数据。

DatagramChannel,通过UDP读写网络中的数据。

SocketChannel,通过TCP读写网络中的数据。

ServerSocketChannel,可以监听新进来的TCP连接,对每一个新进来的连接都会创建一个SocketChannel。

Java NIO 的通道类似流,但又有些不同:

既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。

通道可以异步地读写。

通道中的数据总是要先读到一个 Buffer,或者总是要从一个 Buffer 中写入。

Buffer

关键的Buffer实现 ByteBuffer,CharBuffer,DoubleBuffer,FloatBuffer,IntBuffer,LongBuffer,ShortBuffer

Buffer两种模式、三个属性:

capacity

作为一个内存块,Buffer有一个固定的大小值,也叫“capacity”.你只能往里写capacity个byte、long,char等类型。一旦Buffer满了,需要将其清空(通过读数据或者清除数据)才能继续写数据往里写数据。

position

当你写数据到Buffer中时,position表示当前的位置。初始的position值为0.当一个byte、long等数据写到Buffer后, position会向前移动到下一个可插入数据的Buffer单元。position最大可为capacity – 1. 当读取数据时,也是从某个特定位置读。当将Buffer从写模式切换到读模式,position会被重置为0. 当从Buffer的position处读取数据时,position向前移动到下一个可读的位置。

limit

在写模式下,Buffer的limit表示你最多能往Buffer里写多少数据。 写模式下,limit等于Buffer的capacity。 当切换Buffer到读模式时, limit表示你最多能读到多少数据。因此,当切换Buffer到读模式时,limit会被设置成写模式下的position值。换句话说,你能读到之前写入的所有数据(limit被设置成已写数据的数量,这个值在写模式下就是position)

参考链接:Buffer原理 www.cnblogs.com/chenpi/p/64…

Selector

Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。

监听四种事件

SelectionKey.OP_CONNECT

SelectionKey.OP_ACCEPT

SelectionKey.OP_READ

SelectionKey.OP_WRITE

select()方法

select()阻塞到至少有一个通道在你注册的事件上就绪了。 select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。

selectedKeys()方法

调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。

参考链接:操作系统层面分析Selector原理 zhhphappy.iteye.com/blog/203289…

NIO实现

服务端

publicclassNIOServerSocket {//存储SelectionKey的队列privatestaticList writeQueue =newArrayList();privatestaticSelector selector =null;//添加SelectionKey到队列publicstaticvoidaddWriteQueue(SelectionKey key){synchronized(writeQueue) {            writeQueue.add(key);//唤醒主线程selector.wakeup();        }    }publicstaticvoidmain(String[] args)throwsIOException {// 1.创建ServerSocketChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 2.绑定端口serverSocketChannel.bind(newInetSocketAddress(60000));// 3.设置为非阻塞serverSocketChannel.configureBlocking(false);// 4.创建通道选择器selector = Selector.open();/*

        * 5.注册事件类型

        *

        *  sel:通道选择器

        *  ops:事件类型 ==>SelectionKey:包装类,包含事件类型和通道本身。四个常量类型表示四种事件类型

        *  SelectionKey.OP_ACCEPT 获取报文      SelectionKey.OP_CONNECT 连接

        *  SelectionKey.OP_READ 读          SelectionKey.OP_WRITE 写

        */serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while(true) {            System.out.println("服务器端:正在监听60000端口");// 6.获取可用I/O通道,获得有多少可用的通道intnum = selector.select();if(num >0) {// 判断是否存在可用的通道// 获得所有的keysSet selectedKeys = selector.selectedKeys();// 使用iterator遍历所有的keysIterator iterator = selectedKeys.iterator();// 迭代遍历当前I/O通道while(iterator.hasNext()) {// 获得当前keySelectionKey key = iterator.next();// 调用iterator的remove()方法,并不是移除当前I/O通道,标识当前I/O通道已经处理。iterator.remove();// 判断事件类型,做对应的处理if(key.isAcceptable()) {                        ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();                        SocketChannel socketChannel = ssChannel.accept();                        System.out.println("处理请求:"+ socketChannel.getRemoteAddress());// 获取客户端的数据// 设置非阻塞状态socketChannel.configureBlocking(false);// 注册到selector(通道选择器)socketChannel.register(selector, SelectionKey.OP_READ);                    }elseif(key.isReadable()) {                        System.out.println("读事件");//取消读事件的监控key.cancel();//调用读操作工具类NIOHandler.read(key);                    }elseif(key.isWritable()) {                        System.out.println("写事件");//取消读事件的监控key.cancel();//调用写操作工具类NIOHandler.write(key);                    }                }            }else{synchronized(writeQueue) {while(writeQueue.size() >0){                        SelectionKey key = writeQueue.remove(0);//注册写事件SocketChannel channel = (SocketChannel) key.channel();                        Object attachment = key.attachment();                        channel.register(selector, SelectionKey.OP_WRITE,attachment);                    }                }            }        }    } }复制代码

消息处理

publicclassNIOHandler{//构造线程池privatestaticExecutorService executorService  = Executors.newFixedThreadPool(10);publicstaticvoidread(finalSelectionKey key){//获得线程并执行executorService.submit(newRunnable() {@Overridepublicvoidrun(){try{                    SocketChannel readChannel = (SocketChannel) key.channel();// I/O读数据操作ByteBuffer buffer = ByteBuffer.allocate(1024);                    ByteArrayOutputStream baos =newByteArrayOutputStream();intlen =0;while(true) {                        buffer.clear();                        len = readChannel.read(buffer);if(len == -1)break;                        buffer.flip();while(buffer.hasRemaining()) {                            baos.write(buffer.get());                        }                    }                    System.out.println("服务器端接收到的数据:"+newString(baos.toByteArray()));//将数据添加到key中key.attach(baos);//将注册写操作添加到队列中NIOServerSocket.addWriteQueue(key);                }catch(IOException e) {                    e.printStackTrace();                }            }        });    }publicstaticvoidwrite(finalSelectionKey key){//拿到线程并执行executorService.submit(newRunnable() {@Overridepublicvoidrun(){try{// 写操作SocketChannel writeChannel = (SocketChannel) key.channel();//拿到客户端传递的数据ByteArrayOutputStream attachment = (ByteArrayOutputStream)key.attachment();                    System.out.println("客户端发送来的数据:"+newString(attachment.toByteArray()));                    ByteBuffer buffer = ByteBuffer.allocate(1024);                    String message ="你好,我是服务器!!";                    buffer.put(message.getBytes());                    buffer.flip();                    writeChannel.write(buffer);                    writeChannel.close();                }catch(IOException e) {                    e.printStackTrace();                }            }        });    }}复制代码

客户端

publicclassNIOClientSocket {publicstaticvoidmain(String[] args)throwsIOException {//使用线程模拟用户 并发访问for(inti =0; i <1; i++) {newThread(){publicvoidrun() {try{//1.创建SocketChannelSocketChannel socketChannel=SocketChannel.open();//2.连接服务器socketChannel.connect(newInetSocketAddress("localhost",60000));//写数据String msg="我是客户端"+Thread.currentThread().getId();                        ByteBuffer buffer=ByteBuffer.allocate(1024);                        buffer.put(msg.getBytes());                        buffer.flip();                        socketChannel.write(buffer);                        socketChannel.shutdownOutput();//读数据ByteArrayOutputStream bos =newByteArrayOutputStream();intlen =0;while(true) {                            buffer.clear();                            len = socketChannel.read(buffer);if(len == -1)break;                            buffer.flip();while(buffer.hasRemaining()) {                                bos.write(buffer.get());                            }                        }                        System.out.println("客户端收到:"+newString(bos.toByteArray()));                        socketChannel.close();                    }catch(IOException e) {                        e.printStackTrace();                    }                };            }.start();        }    }}复制代码

多线程NIO Tips

示例代码仅供学习参考。对于一个已经被监听到的事件,处理前先取消事件(SelectionKey .cancel())监控。否则selector.selectedKeys()会一直获取到该事件,但该方法比较粗暴,并且后续register会产生多个SelectionKey。推荐使用selectionKey.interestOps()改变感兴趣事件。

Selector.select()和Channel.register()需同步。

当Channel设置为非阻塞(Channel.configureBlocking(false))时,SocketChannel.read 没读到数据也会返回,返回参数等于0。

OP_WRITE事件,写缓冲区在绝大部分时候都是有空闲空间的,所以如果你注册了写事件,这会使得写事件一直处于就就绪,选择处理现场就会一直占用着CPU资源。参考下面的第二个链接。

粘包问题。

参考链接:SocketChannel.read blog.csdn.net/cao47820824…

参考链接:NIO坑 www.jianshu.com/p/1af407c04…

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,402评论 6 499
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,377评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,483评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,165评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,176评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,146评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,032评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,896评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,311评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,536评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,696评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,413评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,008评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,815评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,698评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,592评论 2 353

推荐阅读更多精彩内容