预备知识
Java NIO
传统的BIO模型在高并发情况下,每个连接都会新启动一个线程阻塞等待数据,而线程做为重要的资源,这是极为浪费的。
Java NIO是基于多路复用实现的IO模型,一个线程可以监听多个连接,大大节约了资源。具体NIO的Channal,Buffer,Selector这里就不做详细介绍了。
各种IO模型如下图所示
Reactor介绍
单线程使用Java NIO
当使用Java NIO时,会将Channel的相关事件(READ, WRITE, CONNECT, ACCEPT)到Selector,如果我们将所有的Channel的事件都注册到一个Selector,势必会影响整体性能,无法发挥多线程的优势。
Reactor模型使用Java NIO
Reactor模式就是充分发挥多线程的优势,将各个事件的监听解耦,各司其职。
例子
纸上得来终觉浅,下面是我自己写的一个Reactor模型的Demo,帮助大家学习和巩固,几乎都有注释,应该比较好理解,大家可以对照着上面的图进行学习。
这个Demo主要实现的功能是Base64加密,客户端发送数据到服务端,服务端接受数据之后,做Base64加密,并返回给客户端。
关于粘包拆包,因为客户端和服务端都有buffer,所以如何判定是否是一次完整的消息呢?
客户端和服务端发送消息之前,会先写入一个dataLength,一个4个字节,然后再把数据跟在后面。
下面说一下Server端的几个组件:
Accepter: 监听ACCEPT事件,并将连接分发给Reader,目前只有1个线程。
Reader: 只负责读取数据,从Channel中将数据读出,并放至callQueue中,目前是设置的2个线程。
Handler: 请求的处理器,主要是负责做真实的服务操作,目前是设置的4个线程。
Responder: 其实还应该有一个Responder组件,专门用于返回数据,我这边没有实现,直接是在Handler中就把数据返回了。
Accepter
做路由的分发
private static class Accepter implements Runnable, Closeable {
// 监听的端口
private final int port;
// reader数组
private final Reader[] readers;
private ServerSocketChannel serverSocketChannel;
private volatile boolean running = true;
private Selector selector;
// 选择Reader采用轮训的方式
private int readerIndex = 0;
public Accepter(int port, BlockingQueue<Call> callQueue) {
this.port = port;
System.out.println("start Accepter..");
this.readers = new Reader[DEFAULT_READER_SIZE];
for (int i = 0; i < this.readers.length; i++) {
this.readers[i] = new Reader(callQueue);
new Thread(this.readers[i]).start();
}
}
@Override
public void run() {
// 绑定端口,并把serverSocketChannel的ACCEPT事件注册到Accepter的Select
try {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("Failed to open server socket channel");
}
while (running) {
int acceptedCount;
try {
acceptedCount = selector.select(100);
} catch (IOException ignored) {
ignored.printStackTrace();
continue;
}
if (acceptedCount == 0) {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
System.out.println("interrupt when sleep!");
}
continue;
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 重要,一定记得要remove处理了的SelectionKey
iter.remove();
if (!key.isAcceptable()) {
System.out.println("WARNING: get selection key isn't a " +
"Acceptable key!");
continue;
}
try {
// 做accept事件
doAccept(key);
} catch (IOException e) {
e.printStackTrace();
}
}
}
// close
try {
if (selector != null) {
selector.close();
}
if (serverSocketChannel != null) {
serverSocketChannel.close();
}
} catch (IOException ignored) {
ignored.printStackTrace();
}
}
private void doAccept(SelectionKey key) throws IOException {
// 轮训选择一个Reader
Reader reader = this.readers[readerIndex++ % DEFAULT_READER_SIZE];
SocketChannel socketChannel = serverSocketChannel.accept();
InetSocketAddress remoteAddress = (InetSocketAddress) socketChannel.getRemoteAddress();
System.out.println(String.format("accept a connect from %s:%s",
remoteAddress.getHostName(), remoteAddress.getPort()));
// 将此Channel交给被选中的Reader
reader.addChannel(socketChannel);
}
@Override
public void close() {
Arrays.stream(readers).forEach(Reader::close);
running = false;
}
}
Reader
读取数据
private static class Reader implements Runnable, Closeable {
private final BlockingQueue<Call> callQueue;
private Selector selector;
private volatile boolean running = true;
public Reader(BlockingQueue<Call> callQueue) {
System.out.println("start Reader..");
this.callQueue = callQueue;
}
@Override
public void run() {
try {
this.selector = Selector.open();
} catch (IOException e) {
System.out.println("Failed to open select in Reader");
throw new RuntimeException("Failed to open select in Reader");
}
while (running) {
int acceptedCount;
try {
acceptedCount = selector.select(100);
} catch (IOException ignored) {
ignored.printStackTrace();
continue;
}
if (acceptedCount == 0) {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
System.out.println("interrupt when sleep!");
}
continue;
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (!key.isReadable()) {
continue;
}
doRead(key);
}
}
try {
if (selector != null) {
selector.close();
}
} catch (IOException ignored) {
ignored.printStackTrace();
}
}
private void addChannel(SocketChannel channel) {
try {
System.out.println("add reading channels..");
// 因为有可能Select会在轮询中block,所以wakeUp是有必要的
selector.wakeup();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
} catch (IOException ignored) {
ignored.printStackTrace();
}
}
private void doRead(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
byte[] dataBytes;
try {
// 从Channel中读取数据,并将Call推到阻塞队列中
ByteBuffer dataLengthBuf = ByteBuffer.allocate(4);
channel.read(dataLengthBuf);
dataLengthBuf.flip();
int dataLength = dataLengthBuf.getInt();
ByteBuffer dataBuf = ByteBuffer.allocate(dataLength);
channel.read(dataBuf);
dataBuf.flip();
dataBytes = dataBuf.array();
System.out.println("accept a msg, length = " + dataLength +
", content = " + new String(dataBytes));
} catch (IOException ignored) {
ignored.printStackTrace();
return;
}
callQueue.offer(new Call(channel, dataBytes));
}
@Override
public void close() {
running = false;
}
}
Handler
处理请求
private static class Handler implements Runnable, Closeable {
private final BlockingQueue<Call> callQueue;
private volatile boolean running = true;
public Handler(BlockingQueue<Call> callQueue) {
System.out.println("start Handler..");
this.callQueue = callQueue;
}
@Override
public void run() {
while (running) {
try {
Call call = callQueue.poll(100, TimeUnit.MILLISECONDS);
if (call == null) {
continue;
}
// Handler处理Call,对传过来的数据做Base64加密,其实在这里实现不同的方法
// 一个简单的rpc调用就可以实现了
byte[] encode = Base64.getEncoder().encode(call.dataBytes);
ByteBuffer resBuf = ByteBuffer.allocate(encode.length + 4);
resBuf.putInt(encode.length);
resBuf.put(encode);
System.out.println("response a msg, length = " + encode.length +
", content = " + new String(encode));
resBuf.flip();
call.channel.write(resBuf);
} catch (InterruptedException | IOException ignored) {
ignored.printStackTrace();
}
}
}
@Override
public void close() {
running = false;
}
}
Call
一次请求的封装
static final class Call {
final SocketChannel channel;
final byte[] dataBytes;
public Call(SocketChannel channel, byte[] dataBytes) {
this.channel = channel;
this.dataBytes = dataBytes;
}
}
Client
Client直接就用BIO简单实现了一下
public class Client {
private Socket socket;
public Client(String ip, int port) {
try {
this.socket = new Socket(ip, port);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public String base64(String originStr) {
try {
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
byte[] msg = originStr.getBytes();
dos.writeInt(msg.length);
dos.write(msg);
dos.flush();
} catch (IOException e) {
throw new RuntimeException("Failed to send request to server!", e);
}
try {
DataInputStream dis = new DataInputStream(socket.getInputStream());
int msgLength = dis.readInt();
byte[] msg = new byte[msgLength];
dis.read(msg);
return new String(msg);
} catch (IOException e) {
throw new RuntimeException("Failed to receive request from server!", e);
}
}
}
主程序
public static void main(String[] args) {
Server server = new Server(8080);
server.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Scanner sc = new Scanner(System.in);
String line;
while ((line = sc.nextLine()) != null) {
if (line.equals("stop")) {
server.close();
break;
}
Client client = new Client("localhost", 8080);
String base64 = client.base64(line);
System.out.println("result : " + base64);
}
}
结果展示
start Base64 Server...
start Accepter..
start Reader..
start Reader..
start Handler..
start Handler..
start Handler..
start Handler..
1
accept a connect from localhost:50476
add reading channels..
accept a msg, length = 1, content = 1
response a msg, length = 4, content = MQ==
result : MQ==
2
accept a connect from localhost:50481
add reading channels..
accept a msg, length = 1, content = 2
response a msg, length = 4, content = Mg==
result : Mg==
stop
Process finished with exit code 0
源码