事件总线:实际上就是一个简单的观察者模式的实现。包含三个基本组件:
- Event - 事件接口,其实现类就是一个个的被观察者
- Subscriber - 观察者接口,其实现类就是一个个的观察者,当 EventBus 发布一个 Event 时,对应的 Subscriber 会做相应的处理
- EventBus - 事件总线,用于组织 Event 与 Subscriber 的关系,并且为用户提供发布 Event 的接口
一、Event
/**
* 事件接口
*/
public interface Event {
}
二、Subscriber
/**
* 事件订阅器(即事件的真正处理器)
*/
public interface Subscriber {
/**
* 真正的处理事件的逻辑
* @param event 具体事件
*/
void onEvent(Event event);
}
三、EventBus
/**
* 事件总线
*/
public class EventBus {
private static final Logger LOGGER = LoggerFactory.getLogger(EventBus.class);
/**
* 事件订阅器集合
* key: Class<? extends Event> - 事件类型
* value: CopyOnWriteArrayList<Subscriber> - 事件订阅器
*/
private static final Map<Class<? extends Event>, CopyOnWriteArrayList<Subscriber>> EVENT_SUBSCRIBERS = new ConcurrentHashMap<>();
/**
* 注册事件类型和处理器
*
* @param eventClass 事件类型
* @param subscriber 事件订阅器
*/
public static void register(Class<? extends Event> eventClass, Subscriber subscriber) {
CopyOnWriteArrayList<Subscriber> subscribers = EVENT_SUBSCRIBERS.get(eventClass);
if (subscribers == null) {
subscribers = new CopyOnWriteArrayList<>();
EVENT_SUBSCRIBERS.putIfAbsent(eventClass, subscribers);
}
subscribers.addIfAbsent(subscriber);
}
/**
* 反注册事件类型和处理器
*
* @param eventClass 事件类型
* @param subscriber 事件订阅器
*/
public static void unRegister(Class<? extends Event> eventClass, Subscriber subscriber) {
CopyOnWriteArrayList<Subscriber> subscribers = EVENT_SUBSCRIBERS.get(eventClass);
if (subscribers == null) {
return;
}
subscribers.remove(subscriber);
}
/**
* 发布事件
* 事件总线调用订阅器依次对发布的事件进行处理
*
* @param event 具体事件
*/
public static void post(Event event) {
CopyOnWriteArrayList<Subscriber> subscribers = EVENT_SUBSCRIBERS.get(event.getClass());
if (subscribers == null) {
return;
}
subscribers.forEach(x -> {
try {
x.onEvent(event);
} catch (Exception e) {
LOGGER.error("handle event {} error, ", event.getClass(), e);
}
});
}
}
在 SOFARPC 中,还实现了:是否启动事件总线功能的开关 + 是否异步执行处理 Event 的逻辑。具体见以下两个文件
四、测试
具体事件
public class ConcreteEvent1 implements Event {
private String name;
public ConcreteEvent1(String name) {
this.name = name;
}
public void sayHello() {
System.out.println("hi, " + name);
}
}
具体订阅器
public class ConcreteSubscriber1 implements Subscriber {
@Override
public void onEvent(Event event) {
if (event instanceof ConcreteEvent1) {
((ConcreteEvent1) event).sayHello();
}
}
}
事件总线测试
public class EventBusTest {
@Test
public void testMainFunc() {
// 1. 注册 Event 及其 Subscriber
EventBus.register(ConcreteEvent1.class, new ConcreteSubscriber1());
// 2. 发布 Event 到 EventBus
EventBus.post(new ConcreteEvent1("lili"));
}
}