本文的代码参考了Tomcat的NIO实现NioEndpoint以及AIO实现Nio2Endpoint代码逻辑。
上图是一个AIO模型。
下面我们通过代码判断其①②③线程是否相同
AIO服务端AsyncServer
public class AsyncServer {
private ThreadPoolExecutor acceptorExecutor; // 连接处理线程池
private ThreadPoolExecutor callBackExecutor; // 回调处理线程池
private AsynchronousServerSocketChannel serverSocketChannel;
public static void main(String[] args) throws Exception{
AsyncServer asyncServer = new AsyncServer();
asyncServer.listen();
TimeUnit.SECONDS.sleep(500); // 由于方法都是异步的,避免主程序执行完结束
}
private void listen() {
AtomicInteger atomicInteger = new AtomicInteger(0);
AtomicInteger atomic = new AtomicInteger(0);
acceptorExecutor = new ThreadPoolExecutor(5, 10
, 500, TimeUnit.SECONDS
, new LinkedBlockingQueue<>(10),
r -> {
Thread thread = new Thread(r);
thread.setName("acceptorExecutor" + atomicInteger.getAndIncrement());
return thread;
});
callBackExecutor = new ThreadPoolExecutor(5, 10
, 500, TimeUnit.SECONDS
, new LinkedBlockingQueue<>(10),
r -> {
Thread thread = new Thread(r);
thread.setName("callBackExecutor" + atomic.getAndDecrement());
return thread;
});
try {
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(callBackExecutor);
serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup); // 设置内核完成的处理回调线程池
InetSocketAddress addr = new InetSocketAddress("0.0.0.0", 8088);
serverSocketChannel.bind(addr);
serverSocketChannel.accept(this, new AcceptorHandler()); // 该方法为异步方法,通过底层javadoc可以获悉
System.out.println("accept end"); // 此命令的打印佐证了上面的方法为异步方法,不会像BIO或者NIO那样阻塞。
} catch (IOException ex) {
System.out.println(" listen failed");
}
}
class AcceptorHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServer> {
@Override
public void completed(AsynchronousSocketChannel channel, AsyncServer attachment) {
attachment.serverSocketChannel.accept(AsyncServer.this, this); // 这个方法是异步方法,请求进来的时候需要处理请求的同时要再次调用来持续监听,如果不调用的话,则只能接收一个请求
AsyncServer.this.handleWithExecutor(channel);
System.out.println(Thread.currentThread().getName() + " submit to executor");
}
@Override
public void failed(Throwable exc, AsyncServer attachment) {
System.out.println(" accept failed");
}
}
private void handleWithExecutor(AsynchronousSocketChannel socketChannel) {
acceptorExecutor.execute(new SocketProcessor(socketChannel));
}
class SocketProcessor implements Runnable {
private AsynchronousSocketChannel socketChannel;
private CompletionHandler<Integer, ByteBuffer> readHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
String input = new String(attachment.array()).trim();
if (input.length() == 0) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer, buffer, readHandler); // 客户端数据可能没用准备好,再次调用下
return;
}
System.out.println(Thread.currentThread().getName() + " 收到客户端消息: " + input);
attachment.clear();
attachment.put((input + " can do!").getBytes());
attachment.flip();
socketChannel.write(attachment);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("read failed");
}
};
public SocketProcessor(AsynchronousSocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer, buffer, readHandler);
System.out.println(Thread.currentThread().getName() + " SocketProcessor run end"); // 这里打印submit to executor后的线程名
}
}
}
AIO客户端:
public class AsyncClient {
private AsynchronousSocketChannel asc;
private static ThreadPoolExecutor poolExecutor;
public static void main(String[] args) throws Exception{
AtomicInteger atomic = new AtomicInteger(0);
poolExecutor = new ThreadPoolExecutor(5, 10
, 10, TimeUnit.SECONDS
, new LinkedBlockingQueue<>(1),
r -> {
Thread thread = new Thread(r);
thread.setName("poolExecutor " + atomic.getAndIncrement());
return thread;
});
new Thread(() -> runOneClient("HONOR")).start();
new Thread(() -> runOneClient("MEIZU")).start();
new Thread(() -> runOneClient("HUAWEI")).start();
new Thread(() -> runOneClient("VIVO")).start();
}
public AsyncClient() throws Exception{
asc = AsynchronousSocketChannel.open(AsynchronousChannelGroup.withThreadPool(poolExecutor)); // 设置回调处理的线程池
}
private Future<Void> getSocket() {
return asc.connect(new InetSocketAddress("127.0.0.1", 8088));
}
public static void runOneClient(String content) {
try {
AsyncClient asyncClient = new AsyncClient();
Future<Void> socket = asyncClient.getSocket();
while (!socket.isDone()) {
TimeUnit.MILLISECONDS.sleep(100);
}
asyncClient.writeToServer(content);
while (asyncClient.asc.isOpen()) {
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (Exception ex) {
System.out.println( "something wrong occurs when runOneClient");
}
}
private void writeToServer(String content) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
asc.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.clear();
attachment.put(content.getBytes());
attachment.flip();
asc.write(attachment);
attachment.clear();
readFromServer();
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("writeToServer failed");
}
});
}
private void readFromServer() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
asc.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
String input = new String(attachment.array()).trim();
attachment.clear();
System.out.println(Thread.currentThread().getName() + " 收到服务端消息: " + input);
try {
asc.close();
} catch (IOException ex) {
System.out.println(" something wrong occurs when readFromServer");
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("readFromServer failed");
}
});
}
}
NIO的服务端NioSocketServer代码:
public class NioSocketServer {
private static ThreadPoolExecutor poolExecutor;
private Selector selector;
private Map<SelectionKey, SocketHandler> map = new HashMap<>();
public static void main(String[] args) throws Exception{
AtomicInteger atomic = new AtomicInteger(0);
poolExecutor = new ThreadPoolExecutor(5, 10
, 10, TimeUnit.SECONDS
, new LinkedBlockingQueue<>(1),
r -> {
Thread thread = new Thread(r);
thread.setName("poolExecutor " + atomic.getAndIncrement());
return thread;
});
NioSocketServer nioSocketServer = new NioSocketServer();
nioSocketServer.listen();
}
private void listen() {
try {
selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("0.0.0.0", 8088));
serverSocketChannel.configureBlocking(true); // 设置监听阻塞,避免没有请求到来时线程空转
poolExecutor.execute(() -> {
while (true) {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println(Thread.currentThread().getName() + " accept ");
if (socketChannel != null) {
socketChannel.configureBlocking(false); // 设置连接为非阻塞的,只有非阻塞的才可以注册到selector上去,这个很好理解,因为selector需要轮询监听,没用数据时需要立即返回
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, null);
}
} catch (Exception ex) {
System.out.println("something wrong occurs when accept");
}
}
});
selectorHandle();
} catch (Exception ex) {
System.out.println("something wrong occurs when listen");
}
}
private void selectorHandle() {
try {
while (true) {
int selectNum = selector.select(50);// select方法是阻塞的,会占用锁,导致无法Register,这里调用此方法,具体的可以点进去看底层代码实现。
if (selectNum == 0) {
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
synchronized (key) {
SocketHandler socketHandler = map.get(key);
if (socketHandler == null) {
socketHandler = new SocketHandler(key);
map.put(key, socketHandler);
}
iterator.remove();
poolExecutor.execute(socketHandler);
}
}
}
} catch (Exception ex) {
System.out.println("something wrong occurs when selectorHandle");
}
}
private void cancelKey(SelectionKey selectionKey, SocketChannel channel) {
try {
selectionKey.channel();
channel.close();
} catch (Exception ex) {
System.out.println(" cancelKey occurs exception");
}
}
class SocketHandler implements Runnable{
private SelectionKey selectionKey;
private SocketChannel channel;
public SocketHandler(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
channel = (SocketChannel) selectionKey.channel();
}
@Override
public void run() {
synchronized (selectionKey) {
try {
if (!(selectionKey.isValid() && selectionKey.isReadable() && channel.isOpen())) {
return;
}
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
String input = new String(buffer.array()).trim();
if (input.length() == 0) {
return;
}
System.out.println(Thread.currentThread().getName() + "收到客户端的消息 : " + input);
buffer.clear();
buffer.put((input + " can do!").getBytes());
buffer.flip();
channel.write(buffer);
cancelKey(selectionKey, channel);
} catch (Exception ex) {
System.out.println("something wrong occurs when SocketHandler");
cancelKey(selectionKey, channel);
}
}
}
}
}
普通客户端socketClient代码
public class SocketClient {
public static void main(String[] args) {
runOneClient("HONOR");
runOneClient("HUAWEI");
runOneClient("MEIZU");
runOneClient("XIAOMI");
}
private static void runOneClient(String content) {
new Thread( () -> {
try {
Socket socket = new Socket();
InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 8088);
socket.connect(addr);
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
outputStream.write(content.getBytes());
byte[] bytes = new byte[1024];
inputStream.read(bytes);
System.out.println("收到服务端消息 :" + new String(bytes));
} catch (IOException ex) {
System.out.println("something wrong occurs");
}
}).start();
}
}