EventBus基本使用
EventBus基于观察者模式的Android事件分发总线。
从这个图可以看出,EventBus使得 事件发送 和 事件接收 很好的解耦。另外使得Android的组件例如Activity和Fragment之间的通信变得简单。最重要的一点是可以使得代码变得整洁。
- 首先添加依赖:
compile 'org.greenrobot:eventbus:3.0.0'
-
使用分为三步:
1) 定义消息事件MessageEvent,也就是创建事件类型public class MessageEvent { public final String message; public MessageEvent(String message) { this.message = message; } }
2) 选择你要订阅的订阅者(subscriber),Activity即在onCreate()加入
EventBus.getDefault().register(this)
,在不需要接收事件发生时可以EventBus.getDefault().unregister(this)
。
在订阅者里需要用注解关键字@Subscribe
来告诉EventBus使用什么方法处理event@Subscribe public void onMessageEvent(MessageEvent event) { Toast.makeText(this, event.message, Toast.LENGTH_SHORT).show(); }
注意方法只能被public修饰,在EventBus3.0之后该方法名字就可以自由的取了,之前要求只能是onEvent().
3)发送事件
EventBus.getDefault().post(new MessageEvent("HelloEveryone"));
这样选择的Activity就会接收到该事件,并且触发onMessageEvent方法。
EventBus源码解析
-
成员变量
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
扫描注册的对象中的所有方法,并将对象与匹配的方法存储在Subscription中,将监听的类与其CopyOnWriteArrayList<Subscription>存储在以下Map中。
-
Subscription:当前注册对象与其中的观察者函数,数据结构:
final Object subscriber; final SubscriberMethod subscriberMethod
-
SubscriberMethod
将扫描到的含有注释@Subscribe
的函数封装为SubscriberMethod,数据结构如下:final Method method; final ThreadMode threadMode; final Class<?> eventType; (监听的事件类型) final int priority; final boolean sticky;
private final Map < Object, List < Class<?>>> typesBySubscriber;
存储每个注册过的对象中监听了的所有事件类型
-
register(Object subscriber)
整个register流程如下:
-
EventBus.getDefault().register(this);
EventBus.getDefault()是一个单例,实现如下:public static EventBus getDefault() { if (defaultInstance == null) { synchronized (EventBus.class) { if (defaultInstance == null) { defaultInstance = new EventBus(); } } } return defaultInstance; }
调用顺序
register(Object subscriber) -> subscriberMethodFinder.findSubscriberMethods(subscriberClass) -> subscribe(subscriber, subscriberMethod);-
findSubscriberMethods
扫描函数当前对象中符合条件的函数,将扫描到的函数传入FindState中,经过校验后存储于FindState的subscriberMethods中。
扫描规则:- 函数非静态,抽象函数;
- 函数为public;
- 函数仅单个参数;
- 函数拥有
@Subscribe
的注解;
-
FindState
为扫描到的函数做校验,在校验后,释放自己持有的资源。第一层校验在checkAdd函数中,如果当前尚未有函数监听过当前事件,就直接跳过第二层检查。
第二层检查为完整的函数签名的检查,将函数名与监听事件类名拼接作为函数签名,如果当前
subscriberClassByMethodKey
中不存在相同methodKey时,返回true,检查结束;若存在相同methodKey时,说明子类重写了父类的监听函数,此时应当保留子类的监听函数而忽略父类。由于扫描是由子类向父类的顺序,故此时应当保留methodClassOld而忽略methodClass。boolean checkAdd(Method method, Class<?> eventType) { // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required. // Usually a subscriber doesn't have methods listening to the same event type. Object existing = anyMethodByEventType.put(eventType, method); if (existing == null) { return true; } else { if (existing instanceof Method) { if (!checkAddWithMethodSignature((Method) existing, eventType)) { // Paranoia check throw new IllegalStateException(); } // Put any non-Method object to "consume" the existing Method anyMethodByEventType.put(eventType, this); } return checkAddWithMethodSignature(method, eventType); } } private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) { methodKeyBuilder.setLength(0); methodKeyBuilder.append(method.getName()); methodKeyBuilder.append('>').append(eventType.getName()); String methodKey = methodKeyBuilder.toString(); Class<?> methodClass = method.getDeclaringClass(); Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass); if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) { // Only add if not already found in a sub class return true; } else { // Revert the put, old class is further down the class hierarchy subscriberClassByMethodKey.put(methodKey, methodClassOld); return false; } }
-
subscribe(subscriber, subscriberMethod)
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { Class<?> eventType = subscriberMethod.eventType; Subscription newSubscription = new Subscription(subscriber, subscriberMethod); CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break; } } List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); }
在进行非空校验以及确定newSubscription尚未被添加至subscriptionsByEventType后,根据优先级将newSubscription插入subscriptionsByEventType的对应eventType的list中。
typesBySubscriber维护一个键为注册的对象,值为该对象中所监听的事件类型的List,根据subscriber拿到list,并将新扫描得到的eventType加入。
-
-
post(Object event)
post流程:
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
- currentPostingThreadState是一个ThreadLocal类型的,里面存储了PostingThreadState;PostingThreadState包含了一个eventQueue和一些标志位。
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() { @Override protected PostingThreadState initialValue() { return new PostingThreadState(); } }
把传入的event,保存到了当前线程中的一个变量PostingThreadState的eventQueue中。判断当前是否是UI线程,遍历队列中的所有的event,调用
postSingleEvent(ev entQueue.remove(0), postingState)
方法。通过对于isPosting的判断,防止每次post都会去调用整个队列时,造成方法多次调用。-
postSingleEvent
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error { Class<?> eventClass = event.getClass(); boolean subscriptionFound = false; if (eventInheritance) { List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } if (!subscriptionFound) { if (logNoSubscriberMessages) { logger.log(Level.FINE, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this, event)); } } }
通过lookupAllEventTypes(eventClass)得到当前eventClass的Class,以及父类和接口的Class类型,而后逐个调用postSingleEventForEventType方法。
-
postSingleEventForEventType
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } return true; } return false; }
从subscriptionsByEventType中拿到当前eventClass的List<Subscription> ,遍历,通过postToSubscription判断执行线程,逐个调用。
-
postToSubscription
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event); break; case MAIN: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break; case MAIN_ORDERED: if (mainThreadPoster != null) { mainThreadPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case ASYNC: asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
根据threadMode去判断应该在哪个线程去执行该方法,而invokeSubscriber方法内通过反射调用函数。
- MainThread:
首先去判断当前如果是UI线程,则直接调用;否则, mainThreadPoster.enqueue(subscription, event);把当前的方法加入到队列,然后通过handler去发送一个消息,在handler的handleMessage中,去执行方法。 - BackgroundThread:
如果当前非UI线程,则直接调用;如果是UI线程,则将任务加入到后台的一个队列,最终由Eventbus中的一个线程池去逐个调用
executorService = Executors.newCachedThreadPool()。 - Async:
将任务加入到后台的一个队列,最终由Eventbus中的一个线程池去调用;线程池与BackgroundThread用的是同一个,会动态控制并发。
- MainThread:
-
EventBus的线程池
BackgroundThread和Async的主要区别:
1. BackgroundThread会判断是否是主线程,是主线程会调用线程池来解决,不是主线程则直接在BackgroundThread当前线程中调用;而Async都会在线程池中调用。
2. BackgroundThread的任务会在线程池中顺序执行,而Async在没有限制。-
AsyncPoster
public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); queue.enqueue(pendingPost); eventBus.getExecutorService().execute(this); } @Override public void run() { PendingPost pendingPost = queue.poll(); if(pendingPost == null) { throw new IllegalStateException("No pending post available"); } eventBus.invokeSubscriber(pendingPost); }
eventBus.getExecutorService()获得如下线程池:
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
在enqueque()中入队并执行当前run(),从队列获取到pendingPost并发执行。
PendingPostQueue为链表结构,入队出队均为同步方法,为保证并发执行时,单个PendingPost不会被执行多次。synchronized PendingPost poll() { PendingPost pendingPost = head; if (head != null) { head = head.next; if (head == null) { tail = null; } } return pendingPost; }
- BackgroundPoster
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true; eventBus.getExecutorService().execute(this); } } } @Override public void run() { try { try { while (true) { PendingPost pendingPost = queue.poll(1000); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { executorRunning = false; return; } } } eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e); } } finally { executorRunning = false; } }
-
通过executorRunning和同步锁保证任何时刻仅有一个线程在 执行,仅当抛出异常或队列取空后,置executorRunning为false,才能新开线程,实现了 BackgroundThread的任务在线程池中顺序执行。
- 粘性事件
设计初衷:事件的发出早于观察者的注册,EventBus将粘性事件存储起来,在观察者注册后,将其发出。
数据结构:
private final Map<Class<?>, Object> stickyEvents;
保存每个Event类型的最近一次post出的event-
postSticky
public void postSticky(Object event) { synchronized (stickyEvents) { stickyEvents.put(event.getClass(), event); } // Should be posted after it is putted, in case the subscriber wants to remove immediately post(event); }
将粘性事件保存在stickyEvents,而后post出,此时如果存在已经注册的观察者,则情况同普通事件情况相同;如尚无注册的观察者,在postSingleEvent函数中将时间转化为一个NoSubscriberEvent事件发出,可由EventBus消耗并处理。待观察者注册时,从stickyEvents中将事件取出,重新分发给注册的观察者。
-
register
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { Class<?> eventType = subscriberMethod.eventType; Subscription newSubscription = new Subscription(subscriber, subscriberMethod); CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break; } } List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); if (subscriberMethod.sticky) { if (eventInheritance) { // Existing sticky events of all subclasses of eventType have to be considered. // Note: Iterating over all events may be inefficient with lots of sticky events, // thus data structure should be changed to allow a more efficient lookup // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>). Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet(); for (Map.Entry<Class<?>, Object> entry : entries) { Class<?> candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } }
在 if (subscriberMethod.sticky)这段代码中,首先判断是否监听Event的子类,而后调用checkPostStickyEventToSubscription将黏性事件发出,在checkPostStickyEventToSubscription中,判空后按一半事件的post流程将事件传递给观察者。
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) { if (stickyEvent != null) { // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state) // --> Strange corner case, which we don't take care of here. postToSubscription(newSubscription, stickyEvent, isMainThread()); } }
EventBus优缺点
-
组件间通信与解耦
它是一个基于观察者模式的事件发布/订阅框架,开发者可以通过极少的代码去实现多个模块之间的通信,而不需要以层层传递接口的形式去单独构建通信桥梁。从而降低因多重回调导致的模块间强耦合,同时避免产生大量内部类。
很好的应用于Activity之间,Fragment之间,后台线程之间的通信,避免使用intent或者handler所带来的复杂度
速度快,尤其是在做了优化之后
轻量
有诸多高级特性,例如多种类型的线程模式,subscriber的优先级,等等
缺点:可能会造成接口的膨胀。特别是当程序要求大量形式各异的通知,而没有做出良好的抽象时,代码中会包含大量的接口,接口数量的增长又会带来命名、注释等等一大堆问题。本质上说观察者要求从零开始实现事件的产生、分发与处理过程,这就要求参与者必须对整个通知过程有着良好的理解。当程序代码适量时,这是一个合理的要求,然而当程序太大时,这将成为一种负担。