一.事件总线模式的产生背景以及意义?
在说事件模式(EventBus)之前,我们可以追朔到Bus 的起源,所谓Bus ,在计算机中就是存在主板上的总线,映射到生活中,就是公交车的含义;
在计算机上,我们的输入设备,输出设备之间种类繁多,当我们从键盘输入一个字符串,cpu 处理完成之后回显给显示器;那这个过程需要怎么传输呢?最直接的方式就是为这些设备相互之间建立线路,这样可以解决问题,但是随着设备越来越多,这些设备建立的私有的线路越来越繁杂,很难以维护,于是我们暂时丢弃了这个方式,采用公共的总线方式,所有的设备将传输的数据,以及目的设备地址发送出去,总线只是为我们进行运输,这样也很方便维护,类比我们生活中,a用户要给b 用户买礼物,那么当a 自己买到了礼物后,然后坐上公交车去b 的地址,当公交车到达目的地之后,然后下车将礼物送到b 手中;
所以,软件中也借鉴了硬件中总线的背景,产生了事件总线模式,在软件中,组件模块会有很多,相互通信的协议,方式多种多样,如果每个组件都要写一套,维护成本较高,那么我们就要用这个模式来解决这个问题.
这个模式跟发布-订阅模式,观察者模式都很类似。但相比之下,EventBus解除了通知者与观察者之间的关系,是组件之间能够进一步的解耦;
二.EventBus 如何使用?
通过电商中一个案例,来说明一下使用的场景,假如我们买了一件商品,商品发货后,物流状态发生了变化,此时会给我们发通知短信,进行处理;
引入guava 依赖:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
</dependency>
物流事件定义:
public class LogisticsEvent {
private String orderStatus;
public String getOrderStatus() {
return orderStatus;
}
public void setOrderStatus(String orderStatus) {
this.orderStatus = orderStatus;
}
public LogisticsEvent getData() {
return this;
}
}
用户监听器定义:
/**
* 用户监听器
*/
public class UserListener {
/**
@Subscribe 标记了监听器需要处理的方法,可以存在多个方法来处理变更
*/
@Subscribe
public void orderUpdate(LogisticsEvent logisticsEvent){
System.out.println("用户监听器收到该事件"+ " start process");
System.out.println("processing");
System.out.println("status"+ logisticsEvent.getOrderStatus() +"物流状态处理完成");
}
}
客户端测试:
public class EventBusTest {
public static void main(String[] args) {
EventBus eventBus = new EventBus();
LogisticsEvent logisticsEvent = new LogisticsEvent();
logisticsEvent.setOrderStatus("3");
UserListener userListener = new UserListener();
eventBus.register(userListener);
eventBus.post(logisticsEvent);
}
}
创建一个EventBus 实例,通过将用户监听器注册到eventBus 中,然后通过eventBus 发送事件,在监听器收到该类型事件时,进行处理;也可注册多个监听器,发送多个事件,只要明确不同监听器接收不同的事件类型,这样就能进行处理;
三.EventBus 如何工作?
所涉及的类在eventbus 包下,主要包括:
EventBus: 事件总线,以及消息的广播传输
Subscriber: 订阅者,观察者
Dispatcher: 事件分发,有3种实现,分别是ImmediateDispatcher(同步分发器),LegacyAsyncDispatcher(异步分发器),PerThreadQueuedDispatcher(单线程分发)
SubscriberRegistry:订阅者注册器,通过ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> 维护了事件消息类型和 观察者之间的关系;
SubscriberExceptionContext: 订阅者异常处理器上下文;
SubscriberExceptionHandler:异常处理器;
DeadEvent:死亡事件,事件没有相应的订阅者处理时,则标记为一个死亡事件;
AsyncEventBus:异步事件总线;
/**
创建一个事件总线,
identifier 为事件的标识,
executor 为 执行器
dispatcher 分发器
exceptionHandler:异常处理器
*/
public EventBus(String identifier) {
this(
identifier,
MoreExecutors.directExecutor(),
Dispatcher.perThreadDispatchQueue(),
LoggingHandler.INSTANCE);
}
/**
EventBus 中 观察者注册
*/
public void register(Object object) {
subscribers.register(object);
}
/**
为事件类型建立订阅者之间的处理关系,通过类的反射确定带有@Subscribe 的方法,为一个具体的观察者,方法上的参数为具体的事件类型,
*/
void register(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
注册了订阅者之后,此时创建事件,并广播发送事件,调用EventBus 中post()方法
分发器处理,默认为PerThreadQueuedDispatcher 单线程分发器,分发的过程:
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
通过使用ThreadLocal 的方式,创建Queue,存储事件队列,首先将事件和订阅者封装为一个事件,放入队列中,若当前事件状态未被处理,则标记当前事件为已处理,接着从队列中取出当前事件,若有存在的监听器,则分发事件给具体的监听器来处理,处理完成之后,队列清空,状态移除;下来继续接收下一个事件的处理,保证了事件分发处理的顺序性;
观察者对事件的处理:
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
通过反射调用观察者的方法进行事件的响应和处理;
在事件分发器中,剩余的两个LegacyAsyncDispatcher(AsyncEventBus 使用这个异步分发器),ImmediateDispatcher(同步分发器);
LegacyAsyncDispatcher:异步分发器分发过程:
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
e.subscriber.dispatchEvent(e.event);
}
}
使用ConcurrentLinkedQueue 全局队列来存储事件和观察者处理器组成的对象,如果一个事件有多个观察者处理器,则会拆分成1*N 个对象放入队列中,然后进行分发
ImmediateDispatcher:同步分发器处理过程:
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
subscribers.next().dispatchEvent(event);
}
}
循环遍历当前观察者处理器,如果有下一个处理器,则进行事件分发;