个人学习笔记,源码链接:https://github.com/GongXincheng/gxc-nio-netty
1:Buffer 缓冲区
1.1:Buffer 中的属性
规则:mark <= position <= limit <= capacity
capacity:容量,表示缓冲区中最大存储数据的容量,一旦声明无法更改。
limit:界限,表示缓冲区中可以操作数据的大小(limit后的数据不能读写)。
position:位置,表示缓存区中正在操作数据的位置。
mark:标记,表示当前position的位置,可以通过reset()恢复到 mark 位置
1.2:ByteBuffer中常用方法
/**
* 分配新的字节缓冲区
*/
ByteBuffer allocate(int cap) {
capacity = cap;
limit = cap;
position = 0;
mark = -1;
}
/**
* 将写模式转换为读模式
*/
Buffer flip() {
limit = position;
position = 0;
mark = -1;
}
/**
* 可重复读数据
*/
Buffer rewind() {
position = 0;
mark = -1;
}
/**
* 清空缓冲区,但是缓冲区中的数据都还在,处于"被遗忘"状态
*/
Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
}
/**
* 标记position的位置
*/
Buffer mark() {
mark = position;
}
/**
* position恢复到mark的位置
*/
Buffer reset() {
position = mark
}
/**
* 缓冲区中时候还有可读数据
*/
boolean hasRemaining() {
return position < limit
}
/**
* 可读数据的个数
*/
int remaining() {
return limit - position
}
1.3:直接缓冲区和非直接缓冲区
非直接缓冲区:通过allocate()方法分配缓冲区,将缓冲区建立在JVM的内存中
直接缓冲区:通过allocateDirect()方法分配直接缓冲区,将缓冲区建立在物理内存中
直接缓冲区
非直接缓冲区
2:Channel 通道
2.1:Channel简介
用于 源节点 与 目标节点 的连接,在 Java NIO 中负责缓冲区中的数据传输。
Channel 本身不存储数据,因此需要配合 Buffer 进行传输
Channel
2.2:Channel 的主要实现类
java.nio.channels.Channel 接口
|-- FileChanel
|-- SelectableChannel
|-- SocketChanel
|-- ServerSockerChanel
|-- DatagramChanel
2.3:获取 Channel 的方式
1:Java 针对支持通道的类提供了 getChannel() 方法
eg:
本地IO:
FileInputStream / FileOutputStream
RandomAccessFile
网络IO:
Socket
ServerSocket
DatagramSocket
2:在 JDK 1.7 中的 NIO.2 针对各个通道提供了静态方法 open()
3:在 JDK 1.7 中的 NIO.2 的 Files 工具类的 newByteChannel()
2.4:利用通道完成文件复制(非直接缓冲区)
@Test
public void testUseChannelToCopyFile() throws Exception {
FileInputStream fis = new FileInputStream("/Users/gxc/tmp/sl.mp4");
FileOutputStream fos = new FileOutputStream("/Users/gxc/tmp/sl2.mp4");
// 1:获取通道
FileChannel inChannel = fis.getChannel();
FileChannel outChannel = fos.getChannel();
// 2:分配指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
// 3:将通道中的数据存入到缓冲区中
while(inChannel.read(buf) != -1) {
// 将 buffer 的写模式切换成读模式
buf.flip();
// 4:将缓冲区中的数据写入到通道中
outChannel.write(buf);
// 清除缓冲区
buf.clear();
}
// 5:关闭资源
outChannel.close();
inChannel.close();
fos.close();
fis.close();
}
2.5:使用直接缓冲区完成文件的复制,直接缓冲区(内存映射文件)
@Test
public void testUseChannelToCopyFile2() throws Exception {
FileChannel inChannel = FileChannel.open(
Paths.get("/Users/gxc/tmp/sl.mp4"),
StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(
Paths.get("/Users/gxc/tmp/sl3.mp4"),
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW);
// 内存映射文件
MappedByteBuffer inMappedBuf = inChannel.map(
FileChannel.MapMode.READ_ONLY, 0,
inChannel.size());
MappedByteBuffer outMappedBuf = outChannel.map(
FileChannel.MapMode.READ_WRITE, 0,
inChannel.size());
// 直接对缓冲区 进行数据的读写操作
byte[] dst = new byte[inMappedBuf.limit()];
inMappedBuf.get(dst);
outMappedBuf.put(dst);
inChannel.close();
outChannel.close();
}
2.6:通道之间的数据传输(直接缓冲区)
@Test
public void testUseChannelToCopyFile3() throws Exception {
FileChannel inChannel = FileChannel.open(
Paths.get("/Users/gxc/tmp/sl.mp4"),
StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(
Paths.get("/Users/gxc/tmp/sl4.mp4"),
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
//inChannel.transferTo(0, inChannel.size(), outChannel);
outChannel.transferFrom(inChannel, 0, inChannel.size());
inChannel.close();
outChannel.close();
}
2.7:分散(Scatter)与聚集(Gather)
分散读取(Scattering Reads):将通道中的数据分散到多个缓冲区中
聚集写入(Gathering Writes):将多个缓冲区中的数据聚集到通道中
@Test
public void testUseChannelToCopyFile4() throws Exception {
RandomAccessFile raf = new RandomAccessFile("1.txt", "rw");
// 1:获取通道
FileChannel channel = raf.getChannel();
// 2:分配指定大小的缓冲区
ByteBuffer buf1 = ByteBuffer.allocate(100);
ByteBuffer buf2 = ByteBuffer.allocate(1024);
// 3:分散读取
ByteBuffer[] bufs = {buf1, buf2};
channel.read(bufs);
for (ByteBuffer buf : bufs) {
buf.flip();
}
System.out.println(
new String(bufs[0].array(), 0, bufs[0].limit()));
System.out.println("----------------------");
System.out.println(
new String(bufs[1].array(), 0, bufs[1].limit()));
// 4:聚集写入
RandomAccessFile raf2 = new RandomAccessFile("1-1.txt", "rw");
FileChannel channel2 = raf2.getChannel();
channel2.write(bufs);
}
2.8:字符集 Charset
编码:字符串 -> 字节数组
解码:字节数组 -> 字符串
@Test
public void test() throws CharacterCodingException {
Charset cs1 = StandardCharsets.UTF_8;
// 获取编码器和解码器
CharsetEncoder ce = cs1.newEncoder();
CharsetDecoder cd = cs1.newDecoder();
CharBuffer cb = CharBuffer.allocate(1024);
cb.put("GongXincheng测试");
cb.flip();
// 编码
ByteBuffer byteBuffer = ce.encode(cb);
for (int i = 0; i < byteBuffer.limit(); i++) {
System.out.println(byteBuffer.get());
}
byteBuffer.flip();
// 解码
System.out.println("------------");
CharBuffer charBuffer = cd.decode(byteBuffer);
System.out.println(charBuffer.toString());
}
3:Selector 选择器
是 SelectableChannel 的多路复用器。用于监控Selectable的 IO 状况
java.nio.channels.Channel 接口
|-- FileChanel
|-- SelectableChannel
|-- SocketChanel
|-- ServerSockerChanel
|-- DatagramChanel
|-- Pipe.SinkChannel
|-- Pipe.SourceChannel
3.1:阻塞式NIO网络通讯(传输图片)
/**
* 客户端
* 1:先创建Socket通道,并设置ip和端口号
* 2:分配指定大小的缓冲区
* 3:创建本地文件的通道,循环读取数据到缓冲区中
* 4:将缓冲区的数据写到Socket通道中去
* 5:关闭资源
*/
@Test
public void client() throws Exception {
// 1:获取通道
SocketChannel socketChannel = SocketChannel.open(
new InetSocketAddress("127.0.0.1", 10000));
// 2:分配指定大小的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 3:读取本地文件, 并发送到服务端
FileChannel inChannel = FileChannel.open(
Paths.get("1.png"), StandardOpenOption.READ);
while (inChannel.read(buffer) != -1) {
// 切换成读数据模式
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
// 关闭通道
inChannel.close();
socketChannel.close();
}
/**
* 服务端
* 1:获取服务端Socket通道,并绑定ip
* 2:获取客户端连接的通道
* 3:创建文件通道,并将文件保存到本地
* 4:关闭资源
*/
@Test
public void server() throws Exception {
// 1:获取服务端Socket通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
// 2:服务端绑定端口号
ssChannel.bind(new InetSocketAddress(10000));
// 3:获取客户端连接的通道
SocketChannel socketChannel = ssChannel.accept();
// 4:接收客户端的数据,并保存到本地
FileChannel fileChannel = FileChannel.open(
Paths.get("2.png"),
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
// 分配指定大小的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (socketChannel.read(buffer) != -1) {
buffer.flip();
fileChannel.write(buffer);
buffer.clear();
}
fileChannel.close();
socketChannel.close();
ssChannel.close();
}
3.2:阻塞式NIO网络通讯(服务端相应客户端内容)
/**
* 客户端
*/
@Test
public void client() throws Exception {
SocketChannel socketChannel = SocketChannel.open(
new InetSocketAddress("127.0.0.1", 10000));
FileChannel fileChannel = FileChannel.open(
Paths.get("1.png"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (fileChannel.read(buffer) != -1) {
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
// !告诉服务端,客户端已经发完数据,否则服务端会一直处于阻塞状态!
socketChannel.shutdownOutput();
// 接收服务端反馈
while (socketChannel.read(buffer) != -1) {
buffer.flip();
System.out.println(
new String(buffer.array(), 0, buffer.limit()));
buffer.clear();
}
fileChannel.close();
socketChannel.close();
}
/**
* 服务端
*/
@Test
public void server() throws IOException {
ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(10000));
SocketChannel socketChannel = serverSocketChannel.accept();
FileChannel fileChannel = FileChannel.open(
Paths.get("3.png"),
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (socketChannel.read(buffer) != -1) {
buffer.flip();
fileChannel.write(buffer);
buffer.clear();
}
// 发送反馈给客户端
buffer.put("服务端接收客户端数据成功".getBytes());
buffer.flip();
socketChannel.write(buffer);
fileChannel.close();
socketChannel.close();
serverSocketChannel.close();
}
3.3:非阻塞式NIO网络通讯
/**
* 客户端
*/
@Test
public void client() throws Exception {
// 1:获取Socket通道
SocketChannel sChannel = SocketChannel.open(
new InetSocketAddress("127.0.0.1", 10000));
// 2:切换成非阻塞模式
sChannel.configureBlocking(false);
// 3:分配指定大小的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 4:发送数据给服务端
buffer.put((LocalDateTime.now().toString()
+ "\n" + "Hello World").getBytes());
buffer.flip();
sChannel.write(buffer);
buffer.clear();
// 5:关闭通道
sChannel.close();
}
/**
* 服务端
*/
@Test
public void server() throws Exception {
// 1:获取通道
ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
// 2:切换到非阻塞模式
serverSocketChannel.configureBlocking(false);
// 3:绑定连接
serverSocketChannel.bind(new InetSocketAddress(10000));
// 4:获取选择器
Selector selector = Selector.open();
// 5:将通道注册到选择器上,并且指定"监听接收事件"
serverSocketChannel.register(selector,
SelectionKey.OP_ACCEPT);
// 6:轮询式的获取选择器上已经"准备就绪"的事件
while (selector.select() > 0) {
// 7:获取当前selector中所有注册的"选择键(已就绪的监听事件)"
Iterator<SelectionKey> iterator =
selector.selectedKeys().iterator();
while (iterator.hasNext()) {
// 8:获取准备"就绪"的事件
SelectionKey selectionKey = iterator.next();
// 9:判断具体是哪种事件类型
if(selectionKey.isAcceptable()) {
// 10:若接收事件"就绪",获取客户端连接
SocketChannel sChannel =
serverSocketChannel.accept();
// 11:将客户端通道切换成非阻塞模式
sChannel.configureBlocking(false);
// 12:将该 客户端通道 注册到Selector上,监听"读就绪"状态
sChannel.register(selector,
SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
// 13:获取当前Selector上"读就绪"的通道
SocketChannel socketChannel =
(SocketChannel) selectionKey.channel();
// 14:读取数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (socketChannel.read(buffer) != -1) {
buffer.flip();
System.out.println(
new String(buffer.array(), 0,
buffer.limit()));
buffer.clear();
}
}
// 15:取消选择键 SelectionKey
iterator.remove();
}
}
}
非阻塞服务端代码
3.4:非阻塞式NIO网络通讯(UDP)
/**
* 发送端.
*/
@Test
public void send() throws Exception {
DatagramChannel datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
String message = sc.next();
buffer.put(message.getBytes());
buffer.flip();
datagramChannel.send(buffer,
new InetSocketAddress("127.0.0.1", 10001));
buffer.clear();
}
sc.close();
datagramChannel.close();
}
/**
* 接收端
*/
@Test
public void receive() throws Exception {
DatagramChannel datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false);
datagramChannel.bind(new InetSocketAddress(10001));
Selector selector = Selector.open();
datagramChannel.register(selector, SelectionKey.OP_READ);
while (selector.select() > 0) {
Set<SelectionKey> skSet = selector.selectedKeys();
Iterator<SelectionKey> iterator = skSet.iterator();
while (iterator.hasNext()) {
SelectionKey sk = iterator.next();
if (sk.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
datagramChannel.receive(buffer);
buffer.flip();
System.out.println(new String(
buffer.array(), 0, buffer.limit()));
buffer.clear();
}
iterator.remove();
}
}
}
3.5:Pipe管道
@Test
public void testPipe() throws Exception {
// 1:获取管道
Pipe pipe = Pipe.open();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("通过单向管道发送数据".getBytes());
buffer.flip();
// 2:将缓冲区的数据接入管道
Pipe.SinkChannel sinkChannel = pipe.sink();
sinkChannel.write(buffer);
// 3:读取缓冲区中的数据
buffer.flip();
Pipe.SourceChannel sourceChannel = pipe.source();
sourceChannel.read(buffer);
System.out.println(
new String(buffer.array(), 0, buffer.limit()));
}