多路复用IO模型:多路复用IO模型是目前使用得比较多的模型。JavaNIO实际上就是多路复用IO。在多路复用IO模型中,会有一个线程不断去轮询多个socket的状态,只有当socket真正有读写时,才真正调用实际的IO读写操作。因为在多路复用IO模型中,只需要使用一个线程就可以管理多个socket,系统不需要建立新的进程或线程,也不必维护这些线程和进程,并且只有在真正有socket读写事件时才会使用IO资源,所以它大大减少了资源占用。在JavaNIO中,是通过select.select()去查询每个通道是否有到达事件,如果没有事件,则一直阻塞在哪里。因此这种方式会导致用户线程的阻塞。多路复用IO模式,通过一个线程就可以管理多个socket,只有当socket真正有读写事件发生才会占用资源来进行实际的读写操作。因为,多路复用IO比较适合连接数比较多的情况------【引用来自https://blog.csdn.net/yueaini10000/article/details/108747224
】
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* NIO selector 多路复用reactor线程模型
*/
public class ZFDMultiReactorServer {
LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
/**
* 业务线程池
*/
ExecutorService businessThread = Executors.newCachedThreadPool();
abstract class ReactorThread extends Thread {
volatile boolean running = false;
Selector selector;
public ReactorThread() throws IOException {
this.selector = Selector.open();
}
@Override
public void run() {
while (running) {
try {
Runnable task;
while ((task = taskQueue.poll()) != null) {
task.run();
}
int select = selector.select(1000);//最多等待1s
if (select == 0) {
continue;
}
System.out.println("-----触发了事件-----");
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keys = selectionKeys.iterator();
SelectableChannel chanel = null;
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
int readyOps = key.readyOps();//准备就绪的事件
System.out.println("readyOps: " + readyOps);
if (readyOps == SelectionKey.OP_ACCEPT || readyOps == SelectionKey.OP_READ) {
try {
chanel = (SelectableChannel) key.attachment();
/**非阻塞模式**/
chanel.configureBlocking(false);
/**调用真实的方法**/
handler(chanel);
if (!chanel.isOpen()) {
// 如果关闭了,就取消这个KEY的订阅
chanel.close();
}
} catch (Exception e) {
chanel.close();
}
}
}
selector.selectNow();
} catch (Exception e) {
}
}
}
public void doStart() {
if (!running) {
running = true;
start();
}
}
/**
* 需要实现这个方法[selector接收到事件后,真正执行的方法]
*
* @param channel
* @throws Exception
*/
abstract void handler(SelectableChannel channel) throws Exception;
/**
* 为什么register要以任务提交的形式,让reactor线程去处理?
* 因为线程在执行channel注册到selector的过程中,会和调用selector.select()方法的线程争用同一把锁
* 而select()方法实在eventLoop中通过while循环调用的,争抢的可能性很高,为了让register能更快的执行,就放到同一个线程来处理
*
* @param channel
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
public SelectionKey register(SelectableChannel channel) throws ExecutionException, InterruptedException {
FutureTask<SelectionKey> futureTask = new FutureTask<SelectionKey>(new Callable<SelectionKey>() {
@Override
public SelectionKey call() throws Exception {
System.out.println("-------call-------");
return channel.register(selector, 0, channel);
}
});
taskQueue.add(futureTask);
return futureTask.get();
}
}
/**
* 主线程只用于接受客户端连接事件处理----->[accept处理reactor线程 (accept线程)]
**/
private ReactorThread[] mainReactorThread = new ReactorThread[1];
/**
* 子线程只用于客户端的数据处理------>[io处理reactor线程 (I/O线程)]
**/
private ReactorThread[] subReactorThread = new ReactorThread[8];
private ServerSocketChannel serverSocketChannel;
/**
* 两个线程组初始化
*
* @throws IOException
*/
public void ThreadGroupInit() throws IOException {
AtomicInteger count = new AtomicInteger(1);
for (int i = 0; i < mainReactorThread.length; i++) {
mainReactorThread[i] = new ReactorThread() {
@Override
void handler(SelectableChannel channel) throws Exception {
// 只做请求分发,不做具体的数据读取
ServerSocketChannel socketChannel = (ServerSocketChannel) channel;
// 返回一个包含新进来的连接SocketChannel,因为前面设置的非阻塞模式,这里会立即返回。
SocketChannel clientChannel = socketChannel.accept();// mainReactor 轮询accept
if (clientChannel == null) {
return;
}
// 收到连接建立的通知之后,分发给I/O线程继续去读取数据
int index = count.getAndIncrement() % subReactorThread.length;
ReactorThread workThreadGroup = subReactorThread[index];
workThreadGroup.doStart();
// 非阻塞模式
clientChannel.configureBlocking(false);
System.out.println("----------");
SelectionKey selectionKey = workThreadGroup.register(clientChannel);
System.out.println("!!!");
selectionKey.interestOps(SelectionKey.OP_READ);
System.out.println("收到新连接 : " + clientChannel.getRemoteAddress());
}
};
}
for (int i = 0; i < subReactorThread.length; i++) {
subReactorThread[i] = new ReactorThread() {
@Override
void handler(SelectableChannel channel) throws Exception {
SocketChannel socketChannel = (SocketChannel) channel;
ByteBuffer buffer = ByteBuffer.allocate(1024);
//上面设置了这里是非阻塞的
while (socketChannel.isOpen() && socketChannel.read(buffer) != -1) {
// 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)
if (buffer.position() > 0) break;
}
// 如果没数据了, 则不继续后面的处理
if (buffer.position() == 0) return;
buffer.flip();
byte[] content = new byte[buffer.limit()];
buffer.get(content);
System.out.println("收到数据,来自:" + socketChannel.getRemoteAddress() + " 的数据:" + new String(content));
// TODO 业务操作 数据库 接口调用等等
businessThread.submit(() -> {
});
// 响应结果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
while (responseBuffer.hasRemaining()) {
socketChannel.write(responseBuffer);
}
}
};
}
}
/**
* 初始化channel,并且绑定一个eventLoop线程
*
* @throws IOException IO异常
*/
private void initAndRegister() throws Exception {
// 1、 创建ServerSocketChannel
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 2、 将serverSocketChannel注册到selector
int index = new Random().nextInt(mainReactorThread.length);
ReactorThread reactorThread = mainReactorThread[index];
reactorThread.doStart();
SelectionKey selectionKey = reactorThread.register(serverSocketChannel);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
}
/**
* bind端口
*
* @param port
* @throws IOException
*/
public void bind(int port) throws IOException {
serverSocketChannel.socket().bind(new InetSocketAddress(port));
System.out.println("启动完成,端口8081");
}
public static void main(String[] args) throws Exception {
ZFDMultiReactorServer server = new ZFDMultiReactorServer();
server.ThreadGroupInit();
server.initAndRegister();
server.bind(8081);
}
}