以前大家都是用阻塞式IO来对网络IO进行数据请求,对于不同的IO都要分配一个线程来处理,如果没有数据就会进行等待,从而造成了阻塞,这种方式极大地浪费了资源(如图1)。于是,有人就提出了一个想法,使用一个线程去监控多个IO请求,如果哪一个IO数据准备完毕后就通知相应的线程来处理,这就是selector模型(如图2)。而Java中的selector就是对selector模型的一种实现,用于询问选择已就绪的IO处理任务。
image
Selector的几个核心的概念
- Channel(通道):用于进行网络传输的通道,网络传输的数据都放在通道中,可以进行写入,也可以进行读取。Channel主要有两种:ServerSocketChannel和SocketChannel,其中ServerSocketChannel是用于服务端开发的,而SocketChannel是用于客户端开发的。
- Selector(选择器):用于进行监控多个通道数据状态。
- SelectableChannel(可选择通道):可以被选择器选择的通道,继承了抽象类SelectableChannel的Channel,而FileChannel没有继承此类,所以不可以被选择器选择。
-
SelectionKey(选择键):用于表示通道可以被选择的某种就绪事件状态。选择键的事件主要有以下几种:
OP_READ
:可读事件。OP_WRITE
:可写事件。OP_CONNECT
:客户端连接服务端的事件,一般为创建SocketChannel
客户端channel。OP_ACCEPT
:服务端接收客户端连接的事件,一般为创建ServerSocketChannel
服务端channel。
Selector的使用
1.创建选择器
Selector selector = Selector.open();
2.获取通道
ServerSocketChannel channel = ServerSocketChannel.open(); // 创建通道
channel.bind(new InetSocketAddress(8080)); // 绑定端口
channel.configureBlocking(false); // 设置为非阻塞,注册到selector上的通道一定设置为非阻塞,否则会报IllegalBlockingModeException错误
3.将通道注册到选择器上
channel.register(selector, SelectionKey.OP_ACCEPT); // 将通道注册到选择器上,监听可接收事件,对于监听多个事件可以用“按位或”来操作
4.轮询已就绪的事件,并对不同的事件进行处理
while (true) {
int count = selector.select(); // 获取已就绪事件的数量
if (count == 0) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 获取已就绪键集
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if(key.isAcceptable()) {
// 接收处理
} else if (key.isConnectable()) {
// 连接处理
} else if (key.isReadable()) {
// 读取处理
} else if (key.isWritable()) {
// 写入处理
}
key.remove(); // 移除键,防止下次重复处理
}
}
// 注意:key.isWritable只要建立连接,就会触发,所以在设置可写入事件时,在写入之后要改回可监听事件,否则就会死循环
其他关于Selector的知识点
Selector内部总共维护了三组键集:
keys
:当前Channel注册在Selector上面的所有的key,可以调用keys()
获取。
selectedKeys
:当前Channel所有已就绪的事件,可以调用selectedKeys()
获取。
cancelledKeys
:当前Channel所有已取消的事件,主动调用cancel()
方法的事件会放在该集合。
其他一些常用的方法:
-
Selector#isOpen()
:判断selector是否是open状态,如果调用了close()
方法则会返回false
。 -
SelectionKey#isValid()
:判断选择键是否有效。 -
Selector#selectNow()
:获取是否有就绪的事件,该方法立即返回结果,不会阻塞。 -
Selector#select(long timeout)
:在超时时间内,有就绪事件时才会返回,其次超过时间也会返回。 -
Selector#select()
:阻塞直到有事件就绪时才会返回 -
Selector#wakeup()
:调用该方法会时,阻塞在select()
处的线程会立即返回。即使当前不存在线程阻塞在select()
处,那么下一个select()
方法也会立即返回。 -
Selector#close()
:用完Selector
后调用其close()
方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。
一个Selector的简单示例
/**
仅服务端,客户端可以使用telnet命令
**/
@Test
public void testSelector() throws IOException, InterruptedException {
ServerSocketChannel channel = ServerSocketChannel.open(); // 创建通道
channel.bind(new InetSocketAddress(8080)); // 绑定端口号
channel.configureBlocking(false); // 设置为非阻塞
Selector selector = Selector.open(); // 创建选择器
channel.register(selector, SelectionKey.OP_ACCEPT); // 注册到选择器上
Thread thread = new Thread(() -> {
try {
while (true) {
int count = selector.select(); // 获取已就绪事件数量
if (count == 0) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 获取已就绪键集
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (!key.isValid()) { // 判断选择键是否有效
continue;
} else if (key.isAcceptable()) { // 处理接收事件
SocketChannel socket = channel.accept();
socket.configureBlocking(false);
socket.register(selector, SelectionKey.OP_READ);
System.out.println("已注册" + socket);
} else if (key.isReadable()) { // 处理读取事件
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 声明buffer
socketChannel.read(byteBuffer); // 将通道中的数据读取到buffer
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String str = new String(bytes);
str = "\r\nreceive: " + str;
System.out.println(str);
socketChannel.write(ByteBuffer.wrap(str.getBytes())); // 将数据写回通道中
}
iterator.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
});
thread.start();
thread.join();
}
专栏部分持续更新中,请收藏或关注我哦!