channel 是双向的,同时读取和写入,流是单向的
channel 和多路复用器结合之后,有多种状态位,方便多路复用器(轮询)去识别(连接状态,阻塞状态,可读状态,可写状态)
channel 分为俩大类,网络读写的SelectableChannel,文件操作的FileChannel
网络读写的SocketChannel和ServerSocketChannel是SelectableChannel的子类
SocketChannel和ServerSocketChannel 依赖于多路复用器(Selector),Selector是NIO编程的基础,提供选择已经就绪的任务的能力。简单来说,Selector会不断地轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作
一个多路复用器可以负责成千上万个Channel通道,没有上限,这也是JDK使用了epoll代替了传统的select实现,获得连接句柄没有限制。这样意味着我们只要一个线程负责Selector的轮询,就可以接入成千上万个客户端,这是JDK NIO 的巨大进步。
Selector线程就类似一个管理者Master,管理成千上万个Channel,然后轮询哪个管道的数据已经准备好,通知CPU执行IO的读取或写入操作。
Selector模式:当IO事件(管道)注册到选择器后,Selector会分配给每个管道一个key值,相当于标签。Selector选择器是以轮询的方式进行查找注册所有的IO事件。当我们的IO事件(管道)准备就绪后,select就会识别,会通过key值找到相应的管道,进行相关的数据处理操作(从管道里读或写数据,写到我们的数据缓冲区Buffer去)
每个管道都会对选择器进行注册到不同的事件状态,以便选择器查找:SelectionKey.OP_CONNECT SelectionKey.OP_ACCEPT SelectionKey.OP_READ SelectionKey.OP_WRITE
io与nio比较:
- io当客户端多时,会创建大量的处理线程。且每个线程都要占用栈空间和一些CPU时间
- io阻塞可能带来频繁的上下文切换,且大部分上下文切换可能是无意义的。
- NIO的本质是原始的tcp建立连接使用3次握手的操作,减少连接的开销
- IO是面向流(Stream)的,NIO是面向块(buffer)的
- Java NIO的selectors允许一条线程去监控多个channels的输入,你可以向一个selector上注册多个channel,然后调用selector的select()方法判断是否有新的连接进来或者已经在selector上注册时channel是否有数据进入。selector的机制让一个线程管理多个channel变得简单。(不再使用多线程处理连接)
- NIO允许你用一个单独的线程或几个线程管理很多个channels(网络的或者文件的),代价是程序的处理和处理IO相比更加复杂
ByteBuffer buffer = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buffer);
ByteBuffer buffer = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buffer);
while(! bufferFull(bytesRead) ) {
bytesRead = inChannel.read(buffer);
}
注意第二行从channel中读取数据到ByteBuffer,当这个方法返回你不知道是否你需要的所有数据都被读到buffer了,你所知道的一切就是有一些数据被读到了buffer中,但是你并不知道具体有多少数据,这使程序的处理变得稍微有些困难
想象一下,调用了read(buffer)方法后,只有半行数据被读进了buffer,例如:“Name: An”,你能现在就处理数据吗?当然不能。你需要等待直到至少一整行数据被读到buffer中,在这之前确保程序不要处理buffer中的数据
你如何知道buffer中是否有足够的数据可以被处理呢?你不知道,唯一的方法就是检查buffer中的数据。可能你会进行几次无效的检查(检查了几次数据都不够进行处理),这会令程序设计变得比较混乱复杂
bufferFull方法负责检查有多少数据被读到了buffer中,根据返回值是true还是false来判断数据是否够进行处理。bufferFull方法扫描buffer但不能改变buffer的内部状态
- NIO允许你用一个单独的线程或几个线程管理很多个channels(网络的或者文件的),代价是程序的处理和处理IO相比更加复杂,如果你需要同时管理成千上万的连接,但是每个连接只发送少量数据,例如一个聊天服务器,用NIO实现会更好一些,相似的,如果你需要保持很多个到其他电脑的连接,例如P2P网络,用一个单独的线程来管理所有出口连接是比较合适的,如果你只有少量的连接但是每个连接都占有很高的带宽,同时发送很多数据,传统的IO会更适合
- NIO实现步骤
- 打开多路复用器
- 打开服务器端通道
- 设置服务器通道为非阻塞模式
- 服务端通道绑定端口和地址
- 把服务器通道注册到多路复用器上,并且监听阻塞事件
- 多路复用器轮询监听
- 返回多路复用器已经选择的SelectionKey结果集
- 结果集进行遍历,遍历时移除SelectionKey元素,防止重复处理
- 对selectionkey进行isValid()有效性校验
- 如果selectionkey是连接状态的,用socketchannel.finishConnect();
- 如果selectionkey是阻塞状态的,用socketchannel.register(seletor, SelectionKey.OP_READ)可读状态
- 如果selectionkey是可读状态的,进行读取
- 如果selectionkey是可写状态的,OP_WRITE比较特殊,表示本地的写缓冲区可用,一般只有在一次写没有把数据写完的情况下需要注册OP_WRITE,写完后要及时关闭,否则每次循环都有可能被调用,因为写缓冲区在大多数情况下是始终可用的。
- 双向通信示例
服务器端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class Server implements Runnable{
//1 多路复用器(管理所有的通道)
private Selector seletor;
//2 建立缓冲区
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
//3
private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
public Server(int port){
try {
//1 打开路复用器
this.seletor = Selector.open();
//2 打开服务器通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//3 设置服务器通道为非阻塞模式
ssc.configureBlocking(false);
//4 绑定地址
ssc.bind(new InetSocketAddress(port));
//5 把服务器通道注册到多路复用器上,并且监听阻塞事件
ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
System.out.println("Server start, port :" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(true){
try {
//1 必须要让多路复用器开始监听
this.seletor.select();
//2 返回多路复用器已经选择的结果集
Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
//3 进行遍历
while(keys.hasNext()){
//4 获取一个选择的元素
SelectionKey key = keys.next();
//5 直接从容器中移除就可以了
keys.remove();
//6 如果是有效的
if(key.isValid()){
//7 如果为阻塞状态
if(key.isAcceptable()){
this.accept(key);
}
//8 如果为可读状态
if(key.isReadable()){
this.read(key);
}
//9 写数据
if(key.isWritable()){
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void read(SelectionKey key) {
try {
//1 清空缓冲区旧的数据
this.readBuf.clear();
//2 获取之前注册的socket通道对象
SocketChannel sc = (SocketChannel) key.channel();
//3 读取数据
int count = sc.read(this.readBuf);
//4 如果没有数据
if(count == -1){
key.channel().close();
key.cancel();
return;
}
//5 有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
this.readBuf.flip();
//6 根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
byte[] bytes = new byte[this.readBuf.remaining()];
//7 接收缓冲区数据
this.readBuf.get(bytes);
//8 打印结果
String body = new String(bytes).trim();
System.out.println("收到客户端 : " + body);
// 9..可以写回给客户端数据
readBuf.flip();
sc.write(readBuf);
sc.register(this.seletor, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
private void accept(SelectionKey key) {
try {
//1 获取服务通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//2 执行阻塞方法
SocketChannel sc = ssc.accept();
//3 设置阻塞模式
sc.configureBlocking(false);
//4 注册到多路复用器上,并设置读取标识
sc.register(this.seletor, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(new Server(8765)).start();;
}
}
客户端:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class Client {
//需要一个Selector
private Selector selector;
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
public Client(){
try {
// 获得一个Socket通道
SocketChannel channel = SocketChannel.open();
// 设置通道为非阻塞
channel.configureBlocking(false);
// 获得一个通道管理器
this.selector = Selector.open();
// 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调
//用channel.finishConnect();才能完成连接
channel.connect(new InetSocketAddress("127.0.0.1", 8765));
//将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。
channel.register(selector, SelectionKey.OP_CONNECT);
} catch (IOException e) {
e.printStackTrace();
}
}
public void listen() throws IOException {
// 轮询访问selector
while (true) {
selector.select();
// 获得selector中选中的项的迭代器
Iterator ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 删除已选的key,以防重复处理
ite.remove();
// 连接事件发生
if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key
.channel();
// 如果正在连接,则完成连接
if(channel.isConnectionPending()){
channel.finishConnect();
}
// 设置成非阻塞
channel.configureBlocking(false);
//在这里可以给服务端发送信息哦
channel.write(ByteBuffer.wrap(new String("向服务端发送了一条信息").getBytes()));
//在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。
channel.register(this.selector, SelectionKey.OP_READ);
// 获得了可读的事件
} else if (key.isReadable()) {
read(key);
}
}
}
}
private void read(SelectionKey key) {
try {
//1 清空缓冲区旧的数据
this.readBuf.clear();
//2 获取之前注册的socket通道对象
SocketChannel sc = (SocketChannel) key.channel();
//3 读取数据
int count = sc.read(this.readBuf);
//4 如果没有数据
if(count == -1){
key.channel().close();
key.cancel();
return;
}
//5 有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
this.readBuf.flip();
//6 根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
byte[] bytes = new byte[this.readBuf.remaining()];
//7 接收缓冲区数据
this.readBuf.get(bytes);
//8 打印结果
String body = new String(bytes).trim();
System.out.println("收到Server : " + body);
// 9..可以写回给客户端数据
// readBuf.flip();
// sc.write(readBuf);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
new Client().listen();
} catch (IOException e) {
e.printStackTrace();
}
}
}
运行之后并不停止,服务器和客户端还在轮询