1 什么是Reactor模式
wki上对reactor模式的定义是
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.
翻译成中文: Reactor模式是一种事件处理模式。在这种模式中,存在一个或多个并发的输入,它们将事件提交到事件处理服务,事件处理服务以多路复用的方式同步的将它们分发到请求处理器。
在Reactor模式中,有以下几个核心的模块
-
Handles
资源句柄:如文件,网络连接,同步事件分解器可以从资源句柄中等待事件。 -
Synchronous Event Demultiplexer
同步事件分解器: 以阻塞的形式从资源句柄集合中等待事件,当资源集合中的某个资源已经就绪,允许对其执行某个操作时,同步事件分解器即可返回。典型的实现有linux下的eselect
和epoll
。 -
Initiation Dispatcher
初始分发器:定义一个统一的接口用于事件处理器的注册,移除,和事件分发。 -
Event Handler
事件处理器:定义一个统一的接口用于处理事件。 -
Concrete Event Handler
具体事件处理器:实现了事件处理器,用于处理不同类型的事件。
这些模块以如下方式互动:
应用将具体事件处理器注册到事件分发器上,当同步事件处理器等待到某个事件发生时,它调用初始事件分发器,由应用注册的具体的事件处理器对事件进行处理。
通常而言,事件是指接受连接(connection accept),数据输入输出(data input and output),超时(timeout)等。
Reactor pattern的类图如下:
根据此类图的实现代码和测试代码如下:
package scaiz.pattern;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
public class ReactorPattern {
public static class Event {
String type;
String input;
Event(String type, String input) {
this.type = type;
this.input = input;
}
public String toString() {
return "[type: " + type + ", input: " + input + " ]";
}
}
public static class Demultiplexer {
private BlockingQueue<Handle> blockingQueue = new LinkedBlockingDeque<>();
Handle select() {
try {
return blockingQueue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public static class Handle { // Event Producer
private Event event;
Demultiplexer demultiplexer;
Handle(Demultiplexer demultiplexer) {
this.demultiplexer = demultiplexer;
}
Event getEvent() {
Event e = this.event;
this.event = null;
return e;
}
void putEvent(Event event) {
this.event = event;
this.demultiplexer.blockingQueue.add(this);
}
}
public interface EventHandler {
void handle(Event event);
}
public static class ConcreteEventHandler implements EventHandler {
private final String type;
ConcreteEventHandler(String type) {
this.type = type;
}
@Override
public void handle(Event event) {
if (Objects.equals(this.type, event.type)) {
System.out.println("Event " + event + " handled by " + Thread.currentThread().getName());
}
}
}
public static class InitiationDispatcher {
private List<EventHandler> handlers = new LinkedList<>();
void handle(Event event) {
for (EventHandler handler : handlers) {
handler.handle(event);
}
}
void registerHandler(EventHandler handler) {
handlers.add(handler);
}
void removeHandler(EventHandler handler) {
handlers.remove(handler);
}
}
public static void main(String args[]) {
Demultiplexer demultiplexer = new Demultiplexer();
Handle handle1 = new Handle(demultiplexer);
Handle handle2 = new Handle(demultiplexer);
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
handle1.putEvent(new Event("accept",
UUID.randomUUID().toString()));
}
}).start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
handle1.putEvent(new Event("input",
UUID.randomUUID().toString()));
}
}).start();
new Thread(() -> handle2.putEvent(new Event("input",
UUID.randomUUID().toString()))).start();
InitiationDispatcher dispatcher = new InitiationDispatcher();
dispatcher.registerHandler(new ConcreteEventHandler("accept"));
dispatcher.registerHandler(new ConcreteEventHandler("input"));
new Thread(() -> {
Handle handle;
do {
handle = demultiplexer.select();
if (handle != null) {
dispatcher.handle(handle.getEvent());
}
} while (handle != null);
}).start();
}
}