https://www.cnblogs.com/imstudy/p/9908791.html
什么是IO模型:
简单地说,就是用什么样的通道进行数据的发送和接收。
比如通道是阻塞的还是非阻塞的,是同步还是异步的。
>> BIO:
在用BIO进行网络通信时,服务端的实现模式为一个连接一个线程,
即客户端有连接请求时服务端就要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。
适用场景:连接数比较小且固定的架构,程序简单易于理解。
>> NIO:
同步非阻塞的IO,服务端的实现模式为一个线程处理多个请求,
即客户端发送的连接请求都会注册到多路复用器上,多路复用器(selector)轮询到连接有IO请求就进行处理。
当然server也可以启动多个线程,一个线程维护一个selector,一个selector维护多个client。
适用场景:连接数多且连接比较短,比如聊天服务器、弹幕系统、服务器之间的通讯等,编程比较复杂。
>> AIO:
异步非阻塞的IO,JDK1.7开始出现的,目前还没得到广泛的应用。适用场景:连接数多且连接比较长的重架构,编程比较复杂。
1.BIO
流是个抽象的概念,是对输入输出设备的抽象,
Java程序中,对于数据的输入/输操作都是以“流”的方式进行。
设备可以是文件,网络,内存等。
将数据从内存写入到文件或网络这种输出设备的流称为输出流,
将数据从文件或网络输入到内存的流叫中输入流,
java程序运行在内存中,所以java程序变量代表内存数据。
#按操作的数据单元类型划分
(1)字节流
表示以字节(8位)为单位从流(stream)读取数据或者往流(stream)中写入数据,
通常用来处理二进制文件,如图像和声音。系统输入输出(System.in与System.out)也是字节流
字节流的两个基类为InputStream和OutputStream,分别代表字节输入流和字节输出流,
其他具体的字节流类都是从这两个类派生而来的。
(2)字符流
表示以字符(16位)为单位从流(stream)中读取数据或者往流(stream)中写入数据,同来用来处理字符或字符串数据。
"这里的字符是Unicode为标准的字符,Unicode字符是16位的。"
字符流的两个基本类为Reader和Writer,分别代表字符输出流和字符输入流,
其他具体的字符流都是从这两个类派生而来的。
#按流的角色来划分,分为节点流与处理流
节点流是指程序可以向一个特定的节点读写数据,直接连接数据源;
这个节点最常见的是文件,类名中包含关键字File;
还可以是数组、管道、字符串,关键字分别为ByteArray/CharArray,Piped,String。
处理流并不直接连接数据源,它大多情况是对已存在的节点流进行包装,是一种典型的装饰器设计模式。
使用处理流主要是为了更方便的执行输入输出工作,
如PrintStream,输出功能很强大,推荐输出时都使用处理流包装。
其他说法
#字节流转为字符流(转换流)
java中提供两个适配器(adapter):
InputStreamReader将InputStream转换为Reader,
OutputStreamWriter将OutputStream转换为Writer。
#缓冲流
有关键字Buffered,也是一种处理流,为其包装的流增加了缓存功能,提高了输入输出的效率,
增加缓冲功能后需要使用flush()才能将缓冲区中内容写入到实际的物理节点。
但是,在现在版本的Java中,只需记得关闭输出流(调用close()方法),
就会自动执行输出流的flush()方法,可以保证将缓冲区中内容写入。
#对象流
有关键字Object,主要用于将目标对象保存到磁盘中或允许在网络中直接传输对象时使用(对象序列化)。
#推回输入流
有关键字PushBack,当程序调用推回输入流的unread()方法时,系
统回把指定数组内容的内容推回到一个推回缓冲区中,在调用read()方法读入内容时,
就先从推回缓冲区中读取,直到读完推回缓冲区中内容后才会从原输入流中读取。
RandomAccessFile
BIO的四个基类
InputStream的主要功能是将数据一个字节一个字节的读到内存
OutputStream的主要作用是将数据一个字节一个子集的写入到文件或者网络中
Reader的主要功能是将数据一个字符一个字符的读入到内存
Writer的主要功能是将数据一个字符一个字符的写入到文件或者网络中
基于流的单向操作
同步阻塞I/O,服务器实现模式为一个连接一个线程,
即客户端有连接请求时服务器就需要启动一个线程进行处理,
如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制来改善。
BIO方式适用于连接数目比较小且固定的架构,
这种方式对服务端资源要求比较高,并发局限于应用中
类图如下
2.NIO(同步非阻塞)(jdk1.4--java.nio)
是通过Channel和Buffer缓冲池双向操作
同步非阻塞I/O,服务器实现模式为一个请求一个线程,
即客户端发送的连接请求都会注册到多路复用器上,
多路复用器轮询到连接有IO请求时才启动一个线程进行处理。
NIO方式适用于连接数目多且连接比较短(轻操作)的架构,
比如聊天服务器,并发局限于应用中,编程比较复杂,jdk1,4开始支持
Java NIO:Buffer、Channel 和 Selector
#核心类
#Buffer
Buffer本身是一块内存, 底层是数组,它有 position、limit、capacity 几个重要属性。
put() 一下数据、flip() 切换到读模式、然后用 get() 获取数据、clear() 一下清空数据、重新回到 put() 写入数据。
#Channel
基本上只和 Buffer 打交道,类似于java.io中的InputStrea, OutputStream, 但channel可读可写, 是双向的。
常用方法就是 channel.read(buffer) 和 channel.write(buffer)。
#Selector
用于实现非阻塞 IO
#Pipe:
只支持数据单向流动的channel
2.1基础概念介绍
2.1.1 Buffer
一个 Buffer 本质上是内存中的一块,我们可以将数据写入这块内存,之后从这块内存获取数据。
核心是 ByteBuffer, Buffer可以理解为一个数组,
IntBuffer、CharBuffer、DoubleBuffer 等分别对应 int[]、char[]、double[] 等。
MappedByteBuffer 用于实现内存映射文件,也不是本文关注的重点。
position、limit、capacity
#比较(都不为负数, mark例外)
mark <= position <= limit <= capacity
#capacity:
代表这个缓冲区的容量,一旦设定就不可以更改。
比如 capacity 为 1024 的 IntBuffer,代表其一次可以存放 1024 个 int 类型的值。
一旦 Buffer 的容量达到 capacity,需要清空 Buffer,才能重新写入值。
#position
指代下一次读或写的位置的索引, 即将要读或写的元素的index
初始值是 0,每往 Buffer 中写入一个值,position 就自动加 1,代表下一次的写入位置。
读操作的时候也是类似的,每读一个值,position 就自动加 1。
从写操作模式到读操作模式切换的时候(flip),position 都会归零,这样就可以从头开始读写了。
#Limit
指代要读或写的最后一个元素的下一个元素的位置。
写操作模式下,limit 代表的是最大能写入的数据,这个时候 limit 等于 capacity。
写结束后,切换到读模式,此时的 limit 等于 Buffer 中实际的数据大小,因为 Buffer 不一定被写满了。
常见方法
#rewind():
会重置 position 为 0,通常用于重新从头读写 Buffer。
#clear():
有点重置 Buffer 的意思,相当于重新实例化了一样,
但并不会将 Buffer 中的数据清空,只不过后续的写入会覆盖掉原来的数据,也就相当于清空了数据了。
#compact():
和 clear() 一样的是,它们都是在准备往 Buffer 填充新的数据之前调用。
compact() 方法会先处理还没有读取的数据,也就是 position 到 limit 之间的数据(还没有读过的数据),
先将这些数据移到左边,然后在这个基础上再开始写入。
很明显,此时 limit 还是等于 capacity,position 指向原来数据的右边。
#mark():
用于临时保存 position 的值,每次调用 mark() 方法都会将 mark 设值为当前的 position,便于后续需要的时候使用。
#reset():
考虑以下场景,我们在 position 为 5 的时候,
先 mark() 一下,然后继续往下读,读到第 10 的时候,
我想重新回到 position 为 5 的地方重新来一遍,
那只要调一下 reset() 方法,position 就回到 5 了。
#flip():
flip() 方法,可以从写入模式切换到读取模式。其实这个方法也就是设置了一下 position 和 limit 值罢了。
#put ():
put 方法用于将数据填充到 Buffer 中,
该方法需要自己控制 Buffer 大小,不能超过 capacity,超过会抛 java.nio.BufferOverflowException 异常。
#read():
对于 Buffer 来说,要将来自 Channel 的数据填充到 Buffer 中,
在系统层面上,这个操作我们称为读操作,因为数据是从外部(文件或网络等)读到内存中。
/**
* Sets this buffer's mark at its position.
*
* @return This buffer
*/
public final Buffer mark() {
mark = position;
return this;
}
/**
* Resets this buffer's position to the previously-marked position.
*
* <p> Invoking this method neither changes nor discards the mark's
* value. </p>
*
* @return This buffer
*
* @throws InvalidMarkException
* If the mark has not been set
*/
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
/**
* Clears this buffer. The position is set to zero, the limit is set to
* the capacity, and the mark is discarded.
*
* <p> Invoke this method before using a sequence of channel-read or
* <i>put</i> operations to fill this buffer. For example:
*
* <blockquote><pre>
* buf.clear(); // Prepare buffer for reading
* in.read(buf); // Read data</pre></blockquote>
*
* <p> This method does not actually erase the data in the buffer, but it
* is named as if it did because it will most often be used in situations
* in which that might as well be the case. </p>
*
* @return This buffer
*/
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
/**
* Flips this buffer. The limit is set to the current position and then
* the position is set to zero. If the mark is defined then it is
* discarded.
*
* <p> After a sequence of channel-read or <i>put</i> operations, invoke
* this method to prepare for a sequence of channel-write or relative
* <i>get</i> operations. For example:
*
* <blockquote><pre>
* buf.put(magic); // Prepend header
* in.read(buf); // Read data into rest of buffer
* buf.flip(); // Flip buffer
* out.write(buf); // Write header + data to channel</pre></blockquote>
*
* <p> This method is often used in conjunction with the {@link
* java.nio.ByteBuffer#compact compact} method when transferring data from
* one place to another. </p>
*
* @return This buffer
*/
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
/**
* Rewinds this buffer. The position is set to zero and the mark is
* discarded.
*
* <p> Invoke this method before a sequence of channel-write or <i>get</i>
* operations, assuming that the limit has already been set
* appropriately. For example:
*
* <blockquote><pre>
* out.write(buf); // Write remaining data
* buf.rewind(); // Rewind buffer
* buf.get(array); // Copy data into array</pre></blockquote>
*
* @return This buffer
*/
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}
/**
* Returns the number of elements between the current position and the
* limit.
*
* @return The number of elements remaining in this buffer
*/
public final int remaining() {
return limit - position;
}
/**
* Tells whether there are any elements between the current position and
* the limit.
*
* @return <tt>true</tt> if, and only if, there is at least one element
* remaining in this buffer
*/
public final boolean hasRemaining() {
return position < limit;
}
demo
package com.zy;
import org.junit.Test;
import java.nio.IntBuffer;
import java.util.Random;
public class NioTest {
@Test
public void fn01() {
IntBuffer buffer = IntBuffer.allocate(7);
for (int i = 0; i < buffer.capacity() - 2; i ++) {
buffer.put(new Random().nextInt());
}
System.out.println("before flip, limit is: " + buffer.limit());
buffer.flip();
System.out.println("after flip, limit is: " + buffer.limit());
while (buffer.hasRemaining()) {
System.out.println("----------------------");
System.out.println("position: " + buffer.position());
System.out.println("limit: " + buffer.limit());
System.out.println("capacity: " + buffer.capacity());
System.out.println(buffer.get());
}
}
}
零拷贝 zero copy
#前置知识
#堆外内存
堆外内存是相对于堆内内存的一个概念。
堆内内存是由JVM所管控的Java进程内存,在Java中创建的对象都处于堆内内存中,
并且它们遵循JVM的内存管理机制,JVM会采用垃圾回收机制统一管理它们的内存。
那么堆外内存就是存在于JVM管控之外的一块内存区域,因此它是不受JVM的管控。不参与GC。
DirectByteBuffer是通过虚引用(Phantom Reference)来实现堆外内存的释放的。
PhantomReference 是所有“弱引用”中最弱的引用类型。
不同于软引用和弱引用,虚引用无法通过 get() 方法来取得目标对象的强引用从而使用目标对象,
观察源码可以发现 get() 被重写为永远返回 null。
那虚引用到底有什么作用?其实虚引用主要被用来跟踪对象被垃圾回收的状态,
通过查看引用队列中是否包含对象所对应的虚引用来判断它是否即将被垃圾回收,从而采取行动。
它并不被期待用来取得目标对象的引用,而目标对象被回收前,
它的引用会被放入一个 ReferenceQueue 对象中,从而达到跟踪对象垃圾回收的作用。
#关于linux的内核态和用户态
>> 内核态:
控制计算机的硬件资源,并提供上层应用程序运行的环境。
比如socket I/0操作或者文件的读写操作等
>> 用户态:
上层应用程序的活动空间,应用程序的执行必须依托于内核提供的资源。
>> 系统调用:
为了使上层应用能够访问到这些资源,内核为上层应用提供访问的接口。
当我们通过JNI调用的native方法实际上就是从用户态切换到了内核态的一种方式。
并且通过该系统调用使用操作系统所提供的功能。
Q:为什么需要用户进程(位于用户态中)要通过系统调用(Java中即使JNI)来调用内核态中的资源,或者说调用操作系统的服务了?
A:intel cpu提供Ring0-Ring3四种级别的运行模式,Ring0级别最高,Ring3最低。
Linux使用了Ring3级别运行用户态,Ring0作为内核态。
Ring3状态不能访问Ring0的地址空间,包括代码和数据。
因此用户态是没有权限去操作内核态的资源的,它只能通过系统调用外完成用户态到内核态的切换,
然后在完成相关操作后再有内核态切换回用户态。
Q:为什么操作系统不直接访问Java堆内的内存区域了?
A:这是因为JNI方法访问的内存区域是一个已经确定了的内存区域地质,
那么该内存地址指向的是Java堆内内存的话,那么如果在操作系统正在访问这个内存地址的时候,
Java在这个时候进行了GC操作,而GC操作会涉及到数据的移动操作, GC经常会进行先标志在压缩的操作。
即,将可回收的空间做标志,然后清空标志位置的内存,然后会进行一个压缩,
压缩就会涉及到对象的移动,移动的目的是为了腾出一块更加完整、连续的内存空间,
以容纳更大的新对象,数据的移动会使JNI调用的数据错乱。
所以JNI调用的内存是不能进行GC操作的。
Q:JNI调用的内存是不能进行GC操作的,那该如何解决了?
A:
①堆内内存与堆外内存之间数据拷贝的方式
(并且在将堆内内存拷贝到堆外内存的过程JVM会保证不会进行GC操作):
比如我们要完成一个从文件中读数据到堆内内存的操作,
即FileChannelImpl.read(HeapByteBuffer)。
这里实际上File I/O会将数据读到堆外内存中,然后堆外内存再讲数据拷贝到堆内内存,
这样我们就读到了文件中的内存。
写操作则反之,我们会将堆内内存的数据线写到堆外内存中,
然后操作系统会将堆外内存的数据写入到文件中。
② 直接使用堆外内存,如DirectByteBuffer:
这种方式是直接在堆外分配一个内存(即,native memory)来存储数据,
程序通过JNI直接将数据读/写到堆外内存中。
因为数据直接写入到了堆外内存中,所以这种方式就不会再在JVM管控的堆内再分配内存来存储数据了,
也就不存在堆内内存和堆外内存数据拷贝的操作了。
这样在进行I/O操作时,只需要将这个堆外内存地址传给JNI的I/O的函数就好了。
DirectByteBuffer是Java用于实现堆外内存的一个重要类,该类实现了堆外内存的创建、使用和销毁。
DirectByteBuffer该类本身还是位于Java内存模型的堆中。堆内内存是JVM可以直接管控、操纵。
而DirectByteBuffer中的unsafe.allocateMemory(size);
是个一个native方法,这个方法分配的是堆外内存,通过C的malloc来进行分配的。
分配的内存是系统本地的内存,并不在Java的内存中,也不属于JVM管控范围,
所以在DirectByteBuffer一定会存在某种方式来操纵堆外内存。
在DirectByteBuffer的父类Buffer中有个address属性:
// Used only by direct buffers
// NOTE: hoisted here for speed in JNI GetDirectBufferAddress
long address;
address只会被直接缓存给使用到。之所以将address属性升级放在Buffer中,
是为了在JNI调用GetDirectBufferAddress时提升它调用的速率。
address表示分配的堆外内存的地址。
外部设备进行IO交互时, 流程为:
OS的Memory(address变量已将堆外内存与JVM关联起来) --> 外部IO设备
也称之为'零拷贝'
# 如果是 HeapByteBuffer, 与外部设备进行IO交互时, 流程为:
# JVM的ByteBuffer --> OS的Memory --> 外部IO设备
DirectByteBuffer也位于JMM中的堆内存中, 其构造器为:
DirectByteBuffer(int cap) {// package-private
super(-1, 0, cap, cap);
boolean pa = VM.isDirectMemoryPageAligned();
int ps = Bits.pageSize();
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
Bits.reserveMemory(size, cap);
long base = 0;
try {
// 调用native方法, 进行分配内存, 称之为堆外内存(OS的Memory), JVM无法直接操控
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) {
// Round up to page boundary
// 这个address来自顶层抽象类Buffer的成员变量, 负责将堆外内存(OS的Memory)与JVM内存关联起来, 提升效率
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}
# jdk 中的零拷贝示例
# server
package com.zy.zerocopy;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class NioServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.setReuseAddress(true);
serverSocket.bind(new InetSocketAddress(9999));
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(true);
int len = 0;
while (len != -1) {
try {
len = socketChannel.read(buffer);
} catch (IOException e) {
e.printStackTrace();
}
buffer.rewind();
}
}
}
}
#client
package com.zy.zerocopy;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
public class NioClient {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 9999));
socketChannel.configureBlocking(true);
String fileName = "/input.txt";
FileChannel fileChannel = new FileInputStream(fileName).getChannel();
// transferFrom 表示从...处读取... fileChannel.transferFrom()
// transferTo 表示向 ... 处写 ...
long begin = System.currentTimeMillis();
long transferByteCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);
long end = System.currentTimeMillis();
System.out.println(String.format("实际传递的字节数count: %s, 耗时: %sms", transferByteCount, (end - begin)));
fileChannel.close();
socketChannel.close();
}
}
https://www.jianshu.com/p/007052ee3773
Nio中Buffer的Scattering & Gathering特性
#scattering
从channel中读到buffer中时, 可传递多个buffer
当第一个buffer读满时, 才读到第二个, 以此类推
#gathering
从buffer中向channel中写数据时, 可传递多个buffer
当第一个buffer写满时, 才开始写第二个, 以此类推
#例子: win10上, 执行 ' telnet 127.0.0.1 8090', 回车后, 输入8个字符串, 再次回车
package com.zy;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
public class NioTest {
@Test
public void fn03() throws IOException {
ServerSocketChannel open = ServerSocketChannel.open();
open.socket().bind(new InetSocketAddress(8090));
int msgLength = 2 + 3 + 4;
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = ByteBuffer.allocate(2);
buffers[1] = ByteBuffer.allocate(3);
buffers[2] = ByteBuffer.allocate(4);
SocketChannel channel = open.accept();
while (true) {
// 读取外部输入
long byteRead = 0;
while (byteRead < msgLength) {
long read = channel.read(buffers);
byteRead += read;
System.out.println("byteRead: " + byteRead);
Arrays.stream(buffers).map(buffer -> "position: " + buffer.position() + "; limit: " + buffer.limit() + "; capacity: " + buffer.capacity())
.forEach(System.out::println);
}
Arrays.stream(buffers).forEach(Buffer::flip);
// 向外部写信息
long byteWrite = 0;
while (byteWrite < msgLength) {
long write = channel.write(buffers);
byteWrite += write;
}
Arrays.stream(buffers).forEach(Buffer::clear);
}
}
}
2.1.2 Channel
所有的 NIO 操作始于通道,通道是数据来源或数据写入的目的地
Channel 类似 IO 中的流,用于读取和写入。
"读操作"的时候将 Channel 中的数据填充到 Buffer 中,
"写操作"时将 Buffer 中的数据写入到 Channel 中。
#FileChannel:
文件通道,用于文件的读和写, FileChannel 是不支持非阻塞的
#DatagramChannel:
用于 UDP 连接的接收和发送
#SocketChannel:
把它理解为 TCP 连接通道,简单理解就是 TCP 客户端
#ServerSocketChannel:
TCP 对应的服务端,用于监听某个端口进来的请求
2.1.3Selector (java的多路复用器)
NIO 三大组件就剩 Selector 了,Selector 建立在非阻塞的基础之上,
大家经常听到的 "多路复用" 在 Java 世界中指的就是它,用于实现一个线程管理多个 Channel。
开启一个 Selector
Selector selector = Selector.open();
将 Channel 注册到 Selector 上
// 将通道设置为非阻塞模式,因为默认都是阻塞模式的
channel.configureBlocking(false);
// 注册
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
// register 方法的第二个 int 型参数(使用二进制的标记位), 用于表明需要监听哪些感兴趣的事件,共以下四种事件:
# SelectionKey.OP_READ
对应 00000001,通道中有数据可以进行读取
#SelectionKey.OP_WRITE
对应 00000100,可以往通道中写入数据
#SelectionKey.OP_CONNECT
对应 00001000,成功建立 TCP 连接
#SelectionKey.OP_ACCEPT
对应 00010000,接受 TCP 连接
// 注册方法返回值是 SelectionKey 实例,
// 它包含了 Channel 和 Selector 信息,也包括了一个叫做 Interest Set 的信息,
// 即我们设置的我们感兴趣的正在监听的事件集合。
调用 select() 方法获取通道信息。用于判断是否有我们感兴趣的事件已经发生了。
小demo
Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
// 判断是否有事件准备好
int readyChannels = selector.select();
if(readyChannels == 0) continue;
// 遍历
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
}
其他方法
#select()
将上次 select 之后的准备好的 channel 对应的 SelectionKey 复制到 selected set 中。
如果没有任何通道准备好,这个方法会阻塞,直到至少有一个通道准备好。
#selectNow()
功能和 select 一样,区别在于如果没有准备好的通道,那么此方法会立即返回 0。
#select(long timeout)
看了前面两个,这个应该很好理解了,如果没有通道准备好,此方法会等待一会
#wakeup()
这个方法是用来唤醒等待在 select() 和 select(timeout) 上的线程的。
如果 wakeup() 先被调用,此时没有线程在 select 上阻塞,
那么之后的一个 select() 或 select(timeout) 会立即返回,而不会阻塞,当然,它只会作用一次。
3.AIO(NIO.2)(异步非阻塞)(jdk1.7--java.nio)
Channel来操作,AIO依赖操作系统的实现来将来回调
异步非阻塞I/O,服务器实现模式为一个有效请求一个线程,
客户端的IO请求都是由操作系统先完成了再通知服务器用其启动线程进行处理。
AIO方式适用于连接数目多且连接比较长(重操作)的架构,
比如相册服务器,充分调用OS参与并发操作,编程比较复杂,jdk1.7开始支持。
核心类
Path:可以指向文件或文件夹,很多情况下,可以用Path来代替File类,和Files类配合着用。
Files 工具类: readAllLines ,write ,copy,deleteIfExists
AsynchronousFileChannel: 用于文件异步读写;
AsynchronousSocketChannel: 客户端异步socket;
AsynchronousServerSocketChannel: 服务器异步socket。
如:
Path sourcePath= Paths.get(fileFrom);
Path descPath= Paths.get(fileTo);
Files.copy(sourcePath, descPath);
4.一些代码
4.1不同方式进行文件复制
try-with-resource机制
JDK1.7之后有了try-with-resource处理机制。
首先被自动关闭的资源需要实现Closeable或者AutoCloseable接口,
因为只有实现了这两个接口才可以自动调用close()方法去自动关闭资源。
写法为try(){}catch(){},将要关闭的外部资源在try()中创建,catch()捕获处理异常。
其实try-with-resource机制是一种语法糖,其底层实现原理仍然是try{}catch(){}finally{}写法,
不过在catch(){}代码块中有一个addSuppressed()方法,即异常抑制方法。
如果业务处理和关闭连接都出现了异常,业务处理的异常会抑制关闭连接的异常,
只抛出处理中的异常,仍然可以通过getSuppressed()方法获得关闭连接的异常。
package com.io;
import org.junit.Test;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
public class FileReadAndWrite {
/**
* 使用bio字节流进行文件复制
*/
@Test
public void fn01() {
try(BufferedInputStream bis = new BufferedInputStream(new FileInputStream("/a.jpg"));
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream("/a1.jpg"))) {
byte[] bytes = new byte[1024];
int len;
while ((len = bis.read(bytes)) != -1) {
bos.write(bytes, 0 , len);
}
} catch (IOException e) {
System.out.println("failed to write file through byte bio");
}
}
/**
* 使用bio字符流进行文件复制
*/
@Test
public void fn02() {
try (InputStreamReader isr = new InputStreamReader(new FileInputStream("/a.txt"), StandardCharsets.UTF_8);
OutputStreamWriter osr = new OutputStreamWriter(new FileOutputStream("/a2.txt"), StandardCharsets.UTF_8);
BufferedReader br = new BufferedReader(isr);
BufferedWriter bw = new BufferedWriter(osr)) {
char[] chars = new char[1024];
int len;
while ((len = br.read(chars)) != -1) {
bw.write(chars, 0, len);
}
} catch (IOException e) {
System.out.println("failed to write file through char bio");
}
}
/**
* 使用Nio中的fileChannel来进行文件复制
*/
@Test
public void fn03() {
try(FileInputStream fis = new FileInputStream("/input.txt");
FileOutputStream fos = new FileOutputStream("/output.txt");
FileChannel in = fis.getChannel();
FileChannel out = fos.getChannel()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len;
while ((len = in.read(buffer)) != - 1) {
System.out.println("len value is: " + len); // 若没有调用 buffer.clear() 方法, 这里的 len 将是0
buffer.flip();
out.write(buffer);
// buffer.clear(); // 此行代码若注释掉, 将从头重复向文件中写数据
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
4.2简单聊天程序的server与client
server
package com.zy;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
public class NioServer {
/**
* 所有 client 的连接信息
*/
private static final Map<String, SocketChannel> clientMap = new ConcurrentHashMap<>();
public static void main(String[] args) throws IOException {
// 服务端 channel 并绑定端口
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8090));
serverSocketChannel.configureBlocking(false);
// 将 channel 注册到 selector
Selector selector = Selector.open();
// 注册了 serverSocketChannel, 关注连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int select = selector.select();
// 如果这里有空轮询, 这里会一直打印
// 据说 jdk1.6 中已解决, 但也有说 jdk1.7 仍有,
// 本处未发现空轮询, netty 也解决了该问题
System.out.println("select ---> " + select);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey selectionKey = it.next();
// 清除这个 key
it.remove();
// 判断 key 的状态
if (Objects.nonNull(selectionKey)) {
if (selectionKey.isAcceptable()) {
System.out.println("----isAcceptable-----");
ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverChannel.accept();
if (Objects.isNull(socketChannel)) {
continue;
}
socketChannel.configureBlocking(false);
// 注册了 socketChannel, 关注数据读取事件, 服务端一般不注册 可写事件
socketChannel.register(selector, SelectionKey.OP_READ);
// 将客户端连接信息写入 clientMap
clientMap.put(UUID.randomUUID().toString(), socketChannel);
} else if (selectionKey.isReadable()) {
System.out.println("------isReadable------");
while (true) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = socketChannel.read(buffer);
if (len > 0) {
buffer.flip();
String msg = String.valueOf(StandardCharsets.UTF_8.decode(buffer).array());
System.out.println("receive from client, msg is: " + msg);
AtomicReference<String> atomicSender = new AtomicReference<>();
clientMap.forEach((k, v) -> {
if (socketChannel == v) {
atomicSender.set(k);
return;
}
});
String sender = atomicSender.get();
clientMap.forEach((k, v) -> {
if (v != socketChannel) {
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put(String.format("%s: send msg: %s.", sender, msg).getBytes());
writeBuffer.flip();
try {
v.write(writeBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
});
} else {
// 解决客户端关闭或者输入结束时的问题
socketChannel.close();
break;
}
}
} else if (selectionKey.isValid()) {
System.out.println("------isValid----------");
}
}
}
}
}
}
client
package com.zy;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NioClient {
private static final ExecutorService executor = Executors.newCachedThreadPool();
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 这里要写在 注册 后面
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8090));
while (true) {
int select = selector.select();
// System.out.println("select--->" + select);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey selectionKey = it.next();
// 清除
it.remove();
// 判断
if (Objects.nonNull(selectionKey)) {
if (selectionKey.isConnectable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
if (channel.isConnectionPending()) {
// 完成连接
channel.finishConnect();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(String.format("%s连接成功", LocalDateTime.now()).getBytes());
byteBuffer.flip();
channel.write(byteBuffer);
executor.submit(() -> {
while (true) {
byteBuffer.clear();
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
byteBuffer.put(br.readLine().getBytes());
byteBuffer.flip();
channel.write(byteBuffer);
}
});
}
// 注册读取事件
channel.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
while (true) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int len = channel.read(byteBuffer);
if (len > 0) {
System.out.println(String.valueOf(StandardCharsets.UTF_8.decode(byteBuffer).array()));
} else {
channel.close();
break;
}
}
}
}
}
}
}
}
参考
https://www.javadoop.com/
https://blog.csdn.net/anxpp/article/details/51503329
https://blog.csdn.net/anxpp/article/details/51512200
https://www.cnblogs.com/weiqihome/p/9926490.html
https://www.jianshu.com/p/007052ee3773