EventBus是目前比较常用的一个消息总线,用于应用程序内部的通信。今天我们就来聊聊EventBus的相关内容。
本文的要点如下:
- 概述
- 使用方法
- 粘性事件
- 源码分析
- 注册过程
- 事件的发布
- 总结
概述
EventBus是一个Android端优化的publish/subscribe消息总线,简化了应用程序内各组件间、组件与后台线程间的通信。
在没使用EventBus之前,我们开发中也会发布订阅模式的机制来处理一些问题,如果需要用到的地方较多,则每一处需要用到事件的发布订阅的地方,都需要实现一整套发布订阅的机制,比如定义Listener接口,定义Notification Center/Observable,定义事件类,定义注册监听者的方法、移除监听者的方法、发布事件的方法等。我们不得不写许多重复的冗余的代码来实现我们的设计目的。
既然有重复冗余,那么就一定可以将整套机制实现成一个框架,来实现高复用,减少代码量,提高效率,EventBus便是如此。
作为一个消息总线,EventBus主要有三个组成部分:
- 事件(Event):可以是任意类型的对象。通过事件的发布者将事件进行传递。
- 事件订阅者(Subscriber):接收特定的事件。
- 事件发布者(Publisher):用于通知 Subscriber 有事件发生。可以在任意线程任意位置发送事件。
使用方法
1.添加依赖:
//eventbus
implementation 'org.greenrobot:eventbus:3.1.1'
2.定义事件类
public class MyEvent {
public String name;
public MyEvent(String name) {
this.name = name;
}
}
要发送事件,我们就要有对应的事件类,没有具体的要求,可以根据需求进行自定义。
3.注册/注销事件
//注册事件
@OnClick(R.id.registevent)
public void setRegistevent(){
EventBus.getDefault().register(this);
}
//处理事件
@Subscribe(threadMode = ThreadMode.MAIN, priority = 100)
public void getMyEvent(MyEvent event){
receive_event.setText(event.name);
}
//注销事件
@Override
protected void onDestroy() {
super.onDestroy();
EventBus.getDefault().unregister(this);
}
订阅部分一般分为两步,首先通过register(this)来表示该订阅者进行了订阅,然后通过Subscribe注解,getMyEvent(MyEvent event)方法表示指定对事件MyEvent的订阅。
注意:在注册之后一定要用unregister(this)注销事件,以免内存泄露。
注解Subscribe中的参数threadMode 表示线程模式,有四种选项:
- ThreadMode.POSTING:事件的处理在和事件的发送在相同的进程,所以事件处理时间不应太长,不然影响事件的发送线程。
- ThreadMode.MAIN:事件的处理会在UI线程中执行。事件处理时间不能太长,这个不用说了,长了会ANR的。
- ThreadMode.BACKGROUND:如果事件是在UI线程中发布出来的,那么事件处理就会在子线程中运行,如果事件本来就是子线程中发布出来的,那么事件处理直接在该子线程中执行。所有待处理事件会被加到一个队列中,由对应线程依次处理这些事件,如果某个事件处理时间太长,会阻塞后面的事件的派发或处理。
- ThreadMode.Async:事件处理会在单独的线程中执行,主要用于在后台线程中执行耗时操作,每个事件会开启一个线程。
注解Subscribe中的参数priority 表示事件优先级,事件的优先级类似广播的优先级,优先级越高优先获得消息。
另外,EventBus允许对事件进行截断:
EventBus.getDefault().cancelEventDelivery(event);
4.发送事件
@OnClick(R.id.bt_sendevent)
public void sendEvent(){
EventBus.getDefault().post(new MyEvent("0123"));
}
发送事件就比较简单了,我们可以在代码的任何位置发布事件,当前所有事件类型与发布的事件类型匹配的订阅者都将收到它。
粘性事件
EventBus除了普通事件也支持粘性事件。
粘性事件:订阅在发布事件之后,但同样可以收到事件(类似于粘性广播)。订阅/解除订阅和普通事件一样,但是处理订阅的方法有所不同,需要注解中添加sticky = true。 用法如下:
@Subscribe(priority = 100,sticky = true)
public void getMyEvent(MyEvent event){
receive_event.setText(event.name);
}
发送事件:
EventBus.getDefault().postSticky(new MyEvent("这是一条粘性事件"));
移除粘性事件:
//移除某种特定类型的粘性事件
EventBus.getDefault().removeStickyEvent(MyEvent.class);
//移除所有粘性事件
EventBus.getDefault().removeAllStickyEvents();
源码分析
从用法中不难看出,获取EventBus实例用到的是getDefault方法,那么我们来看看源码:
static volatile EventBus defaultInstance;
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
很熟悉,有木有?单例模式啊,标准的双重检验锁的写法。不过有个问题,EventBus的构造函数是public而不是private,似乎违背了单例模式的原则,但其实是由EventBus的功能决定的,EventBus 是可以创造出很多条总线的,在不同的总线之间是不能传递事件的(不能通信)。这样的设计是为后面可以创建不同的总线。
接着来看EventBus的构造函数:
public EventBus() {
this(DEFAULT_BUILDER);
}
EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
EventBus类提供了一个public的构造函数,可以用默认的EventBusBuilder配置来构造EventBus对象。
注册过程:
还是回过头来看register方法:
public void register(Object subscriber) {
//通过反射获取注册的对象的类型
Class<?> subscriberClass = subscriber.getClass();
//获取注册的对象的订阅方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
首先通过反射获取注册的对象的类型,然后通过subscriberMethodFinder实例的findSubscriberMethods方法来获取该观察者类型中的所有订阅方法,最后通过subscribe方法将所有的订阅方法分别进行订阅。
下面我们先看下查找订阅者的方法:
SubscriberMethod类封装了订阅方法(使用@Subscribe注解的方法)类型的信息。
public class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
/** Used for efficient comparison */
String methodString;
//省略代码
}
从代码中我们不难看出,实际上该类就是通过几个字段来存储@Subscribe注解中指定的类型信息,以及一个方法的类型变量。
SubscriberMethodFinder类中的findSubscriberMethods方法:
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//首先从缓存当中尝试找该订阅者的订阅方法
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
//找到直接返回
return subscriberMethods;
}
//当缓存中没有找到该观察者的订阅方法的时候使用下面的方法获取方法信息
if (ignoreGeneratedIndex) {
//ignoreGeneratedIndex参数表示是否忽略注解器生成的MyEventBusIndex,默认为false
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//获取方法信息
subscriberMethods = findUsingInfo(subscriberClass);
}
//没获取到,抛出异常
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//获取到了,加缓存,返回
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
可以看到,逻辑并不难理解,就是先找缓存,如果缓存中没有再获取,获取后加缓存然后返回。
那么我们来具体看看获取方法信息的关键方法:findUsingInfo()
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
//过FindState对象用来存储找到的方法信息
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
//从当前类开始遍历该类的所有父类
while (findState.clazz != null) {
//获取订阅者信息
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
//如果使用了MyEventBusIndex,将会进入到这里并获取订阅方法信息
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
//未使用MyEventBusIndex将会进入这里使用反射获取方法信息
findUsingReflectionInSingleClass(findState);
}
//将findState.clazz设置为当前的findState.clazz的父类,有点类似于链表的遍历
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
其中关键的方法是findUsingReflectionInSingleClass(),对当前类中声明的所有方法进行校验,并将符合要求的方法的信息封装成一个SubscriberMethod对象添加到了FindState对象中的列表中。
获取到订阅方法后,我们回到subscribe()方法:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
//将所有的观察者和订阅方法封装成一个Subscription对象
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//尝试从缓存中根据事件类型来获取所有的Subscription对象
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
//指定的事件类型没有对应的观察对象的时候
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
//根据新加入的方法的优先级决定插入到队列中的位置
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//从“订阅者-事件类型”列表中尝试获取该订阅者对应的所有事件类型
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
//黏性事件
if (subscriberMethod.sticky) {
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
代码比较长,我们来理一下逻辑:
首先,将传进来的观察者和订阅方法封装成一个Subscription对象,这里才算是真正的订阅。
然后,从CopyOnWriteArrayList中根据事件类型来获取所有的Subscription对象组成,根据新加入的方法的优先级决定插入到队列中的哪个位置。
然后,获取指定的观察者对应的全部的观察事件类型,这里也是通过一个哈希表来维护这种映射关系的。
最后,根据当前的订阅方法是否是黏性的,来决定是否将当前缓存中的信息发送给新订阅的方法。
至此,注册过程就基本结束了,注销的逻辑比较比较简单,基本上就是注册操作反过来,删除缓存中的订阅方法。
事件的发布:
发布事件的流程是从post方法开始的:
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
public void post(Object event) {
//获取当前线程的状态信息
PostingThreadState postingState = currentPostingThreadState.get();
//获取当前事件队列
List<Object> eventQueue = postingState.eventQueue;
//将当前要发送的事件加入到队列中
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
//队列不空就一直循环发送事件
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
//恢复当前线程的信息
postingState.isPosting = false;
postingState.isMainThread = false;
}
逻辑并不复杂,首先从currentPostingThreadState中取出当前线程的状态信息。currentPostingThreadState是ThreadLocal类型的对象,其中保存者PostingThreadState类型的对象,里面储存了当前线程对应的事件列表和线程的状态信息等。
然后,将当前要发送的事件加入到队列eventQueue中。
最后,通过循环调用postSingleEvent()方法取出事件并进行发布。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
//eventInheritance为ture会查找该事件的所有父类
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//处理事件
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//找不到事件会抛异常
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
可以看到最终使用的是postSingleEventForEventType()方法对每个事件类型进行处理。
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//获取指定的事件对应的所有的观察对象
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
//遍历观察对象,并最终执行事件的分发操作
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
这里的逻辑就非常清晰了,通过传入的事件类型到缓存中取寻找它对应的全部的Subscription,然后对得到的Subscription列表进行遍历,并依次调用postToSubscription方法执行事件的发布操作。
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
终于,我们看到了在订阅消息时设置的ThreadMode参数,switch语句中会根据当前的线程状态和订阅方法指定的ThreadMode信息来决定合适触发方法。
这里的invokeSubscriber会在当前线程中立即调用反射来触发指定的观察者的订阅方法。
如果当前线程不符合ThreadMode参数中的信息,会根据具体的情况将事件加入到不同的队列中进行处理。
mainThreadPoster最终继承自Handler,当调用它的enqueue方法的时候,它会发送一个事件并在它自身的handleMessage方法中从队列中取值并进行处理,从而达到在主线程中分发事件的目的。
backgroundPoster实现了Runnable接口,它会在调用enqueue方法的时候,拿到EventBus的ExecutorService实例,并使用它来执行自己。在它的run方法中会从队列中循环取值来进行执行。
至此,事件的发布过程也基本结束。
总结
EventBus是一个不错的开源消息总线库,使用了诸如单例模式、观察者模式、builder模式等设计模式。它简化了应用程序内各组件间、组件与后台线程间的通信,解耦了事件的发送者和接收者,避免了复杂的、易于出错的依赖及生命周期问题,甚至完全可以代替Intent,boardcast等Android传统的方法在Fragment,Activity,Service以及不同线程之间传递数据。可以使我们的代码更加简洁、健壮。