首先从订阅开始
EventBus.getDefault().register(this);
register
方法会获取传入的object对象的class,通过findSubscriberMethods
方法来查找这个class中订阅的方法,如下
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
findSubscriberMethods
方法实现如下,其中有一个ConcurrentHashMap
类型的静态对象METHOD_CACHE
,是用来缓存对应类的订阅方法的,以便后续再次订阅时不用重新去findMethods,可以直接从缓存中读取。
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
查找订阅方法通过ignoreGeneratedIndex
字段分为两种方式
第一种findUsingReflection
是通过反射来查找,找到被@Subscribe
注解修饰的方法,并且根据具体的注解以及方法参数生成一个SubscriberMethod
对象:
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
第二种findUsingInfo
是通过apt的方式,提前找到订阅的方法,可以避免通过反射查找方法带来的耗时。
具体使用方法:在gradle配置apt,rebuild项目,会生成一个注解方法索引类,在EventBusBuilder
中通过addIndex
方法新建一个该类的对象传入即可。
这边还有一个问题,对于子类重写父类的订阅方法如何处理。在上面的两种方式中在查找完子类的订阅方法后都会继续去查找父类的订阅方法,都通过一个叫做checkAdd
的方法进行支撑,该方法返回true
表示可以添加到订阅方法的集合中去。
boolean checkAdd(Method method, Class<?> eventType) {
// 2 level check: 1st level with event type only (fast), 2ndlevelwith complete signature when required.
// Usually a subscriber doesn't have methods listening to thesameevent type.
Object existing = anyMethodByEventType.put(eventType, method);
if (existing == null) {
return true;
} else {
if (existing instanceof Method) {
if (!checkAddWithMethodSignature((Method)existing,eventType)) {
// Paranoia check
throw new IllegalStateException();
}
// Put any non-Method object to "consume" theexistingMethod
anyMethodByEventType.put(eventType, this);
}
return checkAddWithMethodSignature(method, eventType);
}
}
checkAdd
中设置了两种检查方式,第一种是通过eventType
也就是订阅方法的入参来检查,这种方式比较快,只需要看下之前有没有这种入参的方法就可以了。注释中也指出了通常一个类不会有多个入参相同的订阅方法。
第二种是通过checkAddWithMethodSignature
方法来检查,源码如下:
private boolean checkAddWithMethodSignature(Method method,Class<?>eventType) {
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>').append(eventType.getName());
String methodKey = methodKeyBuilder.toString();
Class<?> methodClass = method.getDeclaringClass();
Class<?> methodClassOld=subscriberClassByMethodKey.put(methodKey, methodClass);
if (methodClassOld == null||methodClassOld.isAssignableFrom(methodClass)) {
// Only add if not already found in a sub class
return true;
} else {
// Revert the put, old class is further down theclasshierarchy
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}
通过method
以及eventType
来生成一个key,来存储方法所在的类。其中methodClassOld == null ||methodClassOld.isAssignableFrom(methodClass)
这个判断条件对应着两种情况,methodClassOld == null
说明是入参相同但是方法名不同的方法正在被添加,直接返回true
就可以了methodClassOld.isAssignableFrom(methodClass)
这个条件是为了过滤掉父类被子类重写的方法,前面说过了查找订阅方法是从子类开始遍历的,此时如果子类重写了父类的订阅方法,那么methodClassOld
对应的是子类,methodClass
对应的是父类,显然这个判断就会为false,之后进入下面的else分支return false
,也就是忽略掉父类被子类重写的方法。
查找订阅方法基本就这么点,查找完毕之后需要执行订阅操作,也就是register
方法中的subscribe(subscriber, subscriberMethod);
操作,直接看下该方法的实现:
private void subscribe(Object subscriber, SubscriberMethodsubscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber,subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions =subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " +subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority >subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
List<Class<?>> subscribedEvents =typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventTypehave to be considered.
// Note: Iterating over all events may be inefficientwith lots of sticky events,
// thus data structure should be changed to allow a moreefficient lookup
// (e.g. an additional map storing sub classes of superclasses: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries =stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscriptin, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription,stickyEvent);
}
}
}
subscriptionsByEventType
这个Map是将eventType
作为key保存其所对应的订阅方法的集合。该方法将刚查找到的方法添加到对应的集合中去,添加时有这样一层判断i == size || subscriberMethod.priority >subscriptions.get(i).subscriberMethod.priority
这表示这个集合里的方法会按照所设定的优先级进行排序。紧接着又出现了个MaptypesBySubscriber
将订阅者作为key保存一个Class
的集合,暂时看不出有啥用,就先不管,最后再检查下是不是粘性事件,如果是粘性事件就根据所保存的粘性事件来执行该方法。eventInheritance
也是在bulider中设置的,如果为true
则会考虑事件的继承性,如果现在有eventType
为正在订阅的方法的eventType
的子类的粘性事件存在,那么这个粘性事件也会被正在订阅的方法接收到,直接说可能比较绕,举个栗子,现在我有两个事件,其中一个是另一个的子类,并且有两个粘性订阅方法,如下:
class EventMessage {
}
class SubEventMessage extends EventMessage {
}
@Subscribe(sticky = true)
public void onEvent(EventMessage message) {
// do something
}
@Subscribe(sticky = true)
public void onSubEvent(SubEventMessage message) {
// do something
}
当执行register
时,如果内存中存在着一个类型为SubEventMessage
的事件,那么订阅的时候onEvent
方法会被执行,入参是内存中类型为SubEventMessage
的事件。
现在register
大致就分析完了,再来看下unregister
方法:
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
unregister
方法十分简单,typesBySubscriber
是刚才在进行订阅的时候不知道用来干什么的Map,现在知道是在取消订阅时用到的,这个Map将订阅者作为key,将其所有的订阅方法的eventType
存入到对应的List中去,取消订阅时将这个List取出来,遍历去移除对应的订阅方法,具体实现在unsubscribeByEventType
中,也十分简单,就不赘述了。
订阅和取消订阅都看过了,还差个发送事件,发送事件分为post
和postSticky
两种,先看post
:
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
currentPostingThreadState
是个ThreadLocal
,然后从中取出当前线程的postingState
,也就是说每个线程都会维护一个自己的posting
状态,之后会有个循环将事件队列清空,通过postSingleEvent
方法来进一步处理:
private void postSingleEvent(Object event, PostingThreadStatepostingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event,postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event,postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered forevent " + eventClass);
}
if (sendNoSubscriberEvent && eventClass !=NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
同样是通过eventInheritance
来判断是否要涉及eventType
的父类,之后再通过postSingleEventForEventType
方法的返回值来得到该事件是否被处理,如果没有被处理,那么会返回false
进入下一个分支,logNoSubscriberMessages
和sendNoSubscriberEvents
都是在builder
中传入的,前者用于没有订阅者处理事件时打印日志,后者用于没有订阅者处理事件时发送一个NoSubscriberEvent
类型的事件,所以具体是怎么处理事件的还要继续看postSingleEventForEventType
方法:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
postSingleEventForEventType
方法从subscriptionsByEventType
中去获取对应事件类型的所有订阅者,如果没有订阅者就返回false
表示事件没有被处理,否则就遍历所有的订阅者,通过postToSubscription
方法来处理事件,接着往里看:
private void postToSubscription(Subscription subscription, Objectevent, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster notdecoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " +subscription.subscriberMethod.threadMode);
}
}
在这个方法内终于看到通过区分注解中的threadMode
来区分不同的处理方式了,先来看下这几种threadMode
分别代表什么意思:
Mode | 含义 |
---|---|
POSTING | 在当前线程执行 |
MAIN | 在主线程执行 |
MAIN_ORDERED | 在主线程有序执行 |
BACKGROUND | 在后台线程执行 |
ASYNC | 在新的线程执行 |
可以看到有几个差不多,那具体有什么区别呢?直接从代码里看,先说明几个东西,invokeSubscriber
就是直接调用订阅方法,还有几个后缀为poster
的变量暂时先理解为调用了enqueue
方法后,订阅方法就会在某个时间被执行,后面再详细讲。
现在可以看代码了,POSTING
没什么好说的,直接调用invokeSubscriber
,也就是说在调用eventBus.post
的线程执行。
MAIN
和MAIN_ORDERED
都是在主线程执行,后者的ORDERED
体现在什么地方呢,先看下MAIN
的分支,其中通过mainThreadPoster.enqueue
插入的事件会在主线程执行,判断当前线程是否是主线程来决定直接调用订阅方法还是通过mainThreadPoster
来发布,这里应该没什么疑惑的,主要是MAIN_ORDERED
:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster notdecoupled from subscriber
invokeSubscriber(subscription, event);
}
mainThreadPoster
不为空时,通过mainThreadPoster
来发布事件,为空时直接调用订阅方法,说好的在主线程调用呢?这里注释也说明了是不正确的,实际上mainThreadPoster
为空本身就是种异常情况,具体可以看下它的初始化过程,这里就不细说了。所以下面的else
分支就先不管了,那么为什么说通过mainThreadPoster
发布的事件就是“有序”的呢,实际上mainThreadPoster
内部实现是个handler
,可以将事件post
到主线程中去执行,所以说是有序的,这里简单说明下原因:
主线程维护着一个消息队列,循环从里面取出消息来处理,我们知道可以通过view
的post
方法来获取它绘制完成之后的宽高,原因是post
方法里的事件会被插入到消息队列的尾部,而view
的measure
,layout
,draw
都在新插入的消息的前面,所以当post
的方法执行时,view
肯定已经绘制好了。
而handler
通过sendMessage
发送的消息也会被插入到主线程消息队列的尾部,这就是“有序”,比如现在有一个ImageView
,在它的onMeasure
中去发布一个事件,如果订阅方法的模式是MAIN
那么会在onMeasure
中调用订阅方法,而如果模式是MAIN_ORDERED
那么会在ImageView
绘制完成后调用订阅方法。
再来看下BACKGROUND
和ASYNC
的区别:
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
其中backgroundPoster
和asyncPoster
都会开启一个新线程来执行订阅方法,暂时当成是一样的就行,那么区别就是BACKGROUND
模式如果在子线程post
一个事件,那么会直接在该线程调用订阅方法,只有在主线程post
事件才会开启一个新线程。而ASYNC
模式,不管是在哪post
事件,都会开启一个新线程来调用订阅方法。
最后再看下几个poster
基本上就看完了,几个poster
都实现了同一个接口Poster
:
interface Poster {
/**
* Enqueue an event to be posted for a particular subscription.
*
* @param subscription Subscription which will receive the event.
* @param event Event that will be posted to subscribers.
*/
void enqueue(Subscription subscription, Object event);
}
可以看到里面只有一个需要实现的方法enqueue
,是用来插入事件的,这个接口被三个类实现,分别是HandlerPoster
,BackgroundPoster
和AsyncPoster
,上面的mainThreadPoster
对应的就是HandlerPoster
,这三个类中都有个类型为PendingPostQueue
的成员变量,这是个事件队列,具体实现就不看了,这个队列提供了入队和出队的方法。
先看下HandlerPoster
的enqueue
方法:
public class HandlerPoster extends Handler implements Poster {
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
}
HandlerPoster
继承了Handler
,调用enqueue
方法后会向事件队列中插入一个事件,然后将标记位handlerActive
设置为true
表示正在处理事件,然后调用sendMessage
发送消息通知处理事件。PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
这行是用来获取一个消息队列的Node
用来插入到队列中去,EventBus
维护着一个pool
用来保存闲置的Node
当有需要时从中取出一个给事件使用,pool
不够用时才会new
新的Node
出来,具体可以看下PendingPost
,这样做的好处是可以避免频繁创建对象带来的开销。
再看下HandlerPoster
的handleMessage
方法:
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
首先会记录下开始处理事件的时间,然后从事件队列中取出事件,如果为空就将handlerActive
设置为false
直接return
了,如果不为空,就调用eventBus.invokeSubscriber(pendingPost);
来调用订阅方法,执行完后,再看下时间,如果超出了规定的时间那么重新发送一条消息,本次消息处理结束,等下次轮到自己的时候再处理事件,毕竟不能一直处理队列里的事件而阻塞了主线程,如果没有超出规定事件,那么说明还可以有事件可以处理下一个事件,就会再次进入循环。
BackgroundPoster
和AsyncPoster
其实和HandlerPoster
差不多,只是没有用Handler
而是用了线程池去处理事件,具体就不看了。
对了,还有个发送粘性事件:
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriberwants to remove immediately
post(event);
}
就是在stickyEvents
这个map
里存一下。
好了,完了。