传统Socket网络编程模式
通常有一个服务器server
循环等待客户端的连接,每接受一个连接,生成对应的socket
对象并新起一个线程,在新线程中的I/O操作使用该socket
对象。例如:
public class ClassicSocket {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(8899);
while (true) {
new Thread(new Handler(serverSocket.accept())).start();
}
}
}
class Handler implements Runnable {
private Socket socket;
public Handler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
InputStream inputStream = this.socket.getInputStream();
OutputStream outputStream = this.socket.getOutputStream();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
String line = "";
while (line != null) {
line = bufferedReader.readLine();
System.out.println(line);
outputStream.write(line.getBytes());
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Reactor模式
- Reactor通过分配合适的处理器
handler
来响应I/O事件。 - 将
handler
绑定到特定的I/O事件 -
handler
通过非阻塞的方式处理
上图中包含:
- Reactor
- Acceptor
- Handler
Reactor
示例代码:
public class Reactor implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public Reactor(int port) throws Exception {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//在"接受连接"事件的SelectionKey中添加一个Acceptor作为附件
selectionKey.attach(new Acceptor());
}
@Override
public void run() {
//死循环,等待客户端连接
while (true) {
try {
//阻塞方法,获取IO事件
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//拿到selectionKey的附件并执行
Object attachment = selectionKey.attachment();
//获取的附件可能是Acceptor,也可能是Handler
Runnable runnable = (Runnable)attachment;
runnable.run();
//处理完IO事件后,从事件集合中删除selectionKey
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
- Reactor也是一个线程对象,实现了
Runnable
接口 - Reactor持有
Selector
对象和ServerSocketChannel
对象,ServerSocketChannel
注册到Selector
,返回SelectionKey
对象。 - 然后,创建了一个
Acceptor
对象,并添加到SelectionKey
对象的附件中。通过selectionKey.attachment()
方法可以取出。即Acceptor
感兴趣的I/O事件是OP_ACCEPT
接收连接事件。 -
run()
方法中,死循环等待客户端的连接。当有I/O事件产生时,取得selectionKey
对象的附件并执行。当事件是OP_ACCEPT
时,附件是Acceptor
,而当事件是OP_READ
时,附件是Handler
。接收客户端连接的动作在Acceptor
中执行,而读取IO数据的动作在Handler
中处理。
Acceptor
示例代码:
/**
* Acceptor的功能是:
* 接收客户端连接,分配合适的处理器handler来响应I/O事件
* SelectionKey代表I/O事件的状态
*/
public class Acceptor implements Runnable {
@Override
public void run() {
SocketChannel socketChannel = null;
try {
socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
new Handler(socketChannel, selector);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 在
Acceptor
中,serverSocektChannel
接收客户端连接,生成真正的连接对象SocketChannel
。 - 连接对象
SocketChannel
也需要注册I/O事件到Selector
,所以它需要访问到Selector
对象,因此常常Acceptor
是作为Reactor
的内部类。 - 接收连接后,
Acceptor
将I/O事件的处理分发给相应的Handler
。
Handler
示例代码:
/**
* Handler处理I/O读写事件,需要持有SocketChannel,并注册到Selector
*/
public class Handler {
//真正的连接对象
private SocketChannel socketChannel;
private SelectionKey selectionKey;
public Handler(SocketChannel socketChannel, Selector selector) throws IOException {
this.socketChannel = socketChannel;
this.socketChannel.configureBlocking(false);
this.selectionKey = this.socketChannel.register(selector, SelectionKey.OP_READ);
//添加 handler为附件
this.selectionKey.attach(this);
}
@Override
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(128);
try {
int read = 0;
while ((read = socketChannel.read(buffer)) > 0) {
buffer.flip();
Charset charset = Charset.forName("utf-8");
CharBuffer charBuffer = charset.decode(buffer);
System.out.println(charBuffer.toString());
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
-
Handler
持有真正的连接对象socketChannel
。 - 在
Handler
中,socketChannel
对象注册I/O读事件到Selector
,并将自身this
添加到selectionKey
的附件,方便后续处理I/O事件时获得Handler
对象。
经典论文示例--日志服务器
Reactor by Douglas C. Schmidt