本文分为以下几个部分:创建、注册、发送事件、粘性事件来讲解它的实现原理,本文使用Eventbus版本为3.1.1。
注册
在使用EventBus时第一步得注册一下
EventBus.getDefault().register(this);
我们先看getDefault()的源码,EventBus#getDefault()
getDefault
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
这里使用了双重检锁加同步的方式实现单例对象,确保在不同线程中只有一个实例。
除了使用单例的方式创建对象外,我们发现Eventbus还提供了一个静态的builder()来创建实例对象,通过建造者方式来创建具有不同功能的Eventbus实例。先看一下EventBusBuilder源码中的属性
EventBusBuilder
public class EventBusBuilder {
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
boolean logSubscriberExceptions = true;
boolean logNoSubscriberMessages = true;
boolean sendSubscriberExceptionEvent = true;
boolean sendNoSubscriberEvent = true;
boolean throwSubscriberException;
boolean eventInheritance = true;
boolean ignoreGeneratedIndex;
boolean strictMethodVerification;
ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
List<Class<?>> skipMethodVerificationForClasses;
List<SubscriberInfoIndex> subscriberInfoIndexes;
Logger logger;
MainThreadSupport mainThreadSupport;
}
通过建造者方式来配置各种日志打印,消息事件的处理。我们可能通过具体事件单独创建一个实例来发送消息,这样可以避免一些不必要的处理判断。创建EventBus实例可能通过这两种方式来创建,再看一下EventBus构造方法,对属性做了一系列的初始化,我们以部分属性来分析。
EventBus
EventBus(EventBusBuilder builder) {
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
eventInheritance = builder.eventInheritance;
}
subscriptionsByEventType:以事件类为key,以订阅列表为value,支持多个订阅方法。事件发送后,在这里寻找订阅者,而Subscription是CopyOnWriteArrayList,线程安全的容器,封装了订阅者,订阅方法。
typesBySubscriber:这是一个用HashMap实现的订阅管理类,负责register与unregister
stickyEvents:使用ConcurrentHashMa来保存粘性事件
subscriberMethodFinder:用于查找订阅类中的Subscribe注解方法
eventInheritance:Eventbus默认会考虑事件的父类,如果事件继承自父类,那么该父类也会作为事件发送给订阅者,设为false则不考虑父类
注册
接下来再看一下注册方法EventBus#register()
register
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
首先是获取订阅类的class,接着是查找订阅类中的注解方法,并保存在List集合中,再分析SubscriberMethodFinder#findSubscriberMethods方法
findSubscriberMethods
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
首先从缓存中获取,若缓存中有,则直接返回。我们在初始化时一般也不设置ignoreGeneratedIndex的值,findUsingReflection()方法是通过反射获取注解方法,所以我们直接分析findUsingInfo()方法
findUsingInfo
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
这里有个FindState类,Eventbus会将注册后的订阅信息保存在其中,接着再分析initForSubscriber()方法
initForSubscriber
void initForSubscriber(Class<?> subscriberClass) {
this.subscriberClass = clazz = subscriberClass;
skipSuperClasses = false;
subscriberInfo = null;
}
这里是对FindState部分属性赋值,其中subscriberInfo初始化为null,再看findState.subscriberInfo = getSubscriberInfo(findState);这一步是查找当前类以及父类中的subscriberInfo的值,由此可知,若有多个子类需要订阅处理消息,可以直接在父类中进行注册。由前面可知findState.subscriberInfo的值为null,我们再接着看findUsingReflectionInSingleClass()方法
findUsingReflectionInSingleClass
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
这里利用反射的方式,对订阅类进行扫描,找出符合要求的订阅方法,并用Map进行保存。订阅方法需要是public,参数为1,并且用Subscribe注解修饰。这里会将相关数据保存在findState中,数据包括符合要求的方法,事件类型,线程,优先级以及sticky事件等等。在保存前还做了一些检测,我们接着分析checkAdd()方法
checkAdd
boolean checkAdd(Method method, Class<?> eventType) {
// 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
// Usually a subscriber doesn't have methods listening to the same event type.
Object existing = anyMethodByEventType.put(eventType, method);
if (existing == null) {
return true;
} else {
if (existing instanceof Method) {
if (!checkAddWithMethodSignature((Method) existing, eventType)) {
// Paranoia check
throw new IllegalStateException();
}
// Put any non-Method object to "consume" the existing Method
anyMethodByEventType.put(eventType, this);
}
return checkAddWithMethodSignature(method, eventType);
}
}
这里做了双重检测,第一次是判断eventType的类型,而第二次检验是判断方法的完整签名。首先通过anyMethodByEventType.put(eventType, method) 将eventType以及method放进anyMethodByEventType这个Map中,同时该put方法会返回同一个key的上一个value值,所以如果之前没有别的方法订阅了该事件,那么existing应该为null,可以直接返回true;否则为某一个订阅方法的实例,要进行下一步的判断。接着分析checkAddWithMethodSignature()方法
checkAddWithMethodSignature
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>').append(eventType.getName());
String methodKey = methodKeyBuilder.toString();
Class<?> methodClass = method.getDeclaringClass();
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
// Only add if not already found in a sub class
return true;
} else {
// Revert the put, old class is further down the class hierarchy
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}
从上面的代码看出,该方法首先获取了当前方法的methodKey、methodClass等,并赋值给subscriberClassByMethodKey,如果方法签名相同,那么返回旧值给methodClassOld,接着是if判断,判断methodClassOld是否为空,由于第一次调用该方法的时候methodClassOld肯定是null,此时就可以直接返回true了。但是,后面还有一个判断即methodClassOld.isAssignableFrom(methodClass),这个的意思是:methodClassOld是否是methodClass的父类或者同一个类。如果这两个条件都不满足,则会返回false,那么当前方法就不会添加为订阅方法了。
那么这两个方法到底有什么作用呢?从这两个方法的逻辑来看,第一层判断根据eventType来判断是否有多个方法订阅该事件,而第二层判断根据完整的方法签名来判断。
第一种情况:比如一个类有多个订阅方法,方法名不同,但它们的参数类型都是相同的,那么遍历这些方法的时候,会多次调用到checkAdd方法,由于existing不为null,那么会进而调用checkAddWithMethodSignature方法,但是由于每个方法的名字都不同,因此methodClassOld会一直为null,因此都会返回true。也就是说,允许一个类有多个参数相同的订阅方法。
第二种情况:类B继承自类A,而每个类都是有相同订阅方法,它们都有着一样的方法签名。方法的遍历会从子类开始,即B类,在checkAddWithMethodSignature方法中,methodClassOld为null,那么B类的订阅方法会被添加到列表中。接着,向上找到类A的订阅方法,由于methodClassOld不为null而且显然类B不是类A的父类,methodClassOld.isAssignableFrom(methodClass)也会返回false,那么会返回false。也就是说,子类继承并重写了父类的订阅方法,那么只会把子类的订阅方法添加到订阅者列表,父类的方法会忽略。
分析完findSubscriberMethods()逻辑,我们再接着分析subscribe()方法
subscribe
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
//处理粘性事件
...
}
该方法主要实现了订阅方法与事件直接的关联。以事件为key,方法为value保存在subscriptionsByEventType中。处理订阅事件的优先级,优先级高的会先被通知,最后处理sticky事件
注销
注册完我们还得注销订阅
EventBus.getDefault().unregister(this)
我们再分析一下注销逻辑EventBus#unregister
unregister
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
首先是获取所有订阅事件,再遍历订阅事件进行注销,注销完后移除订阅者,我们再看一下unsubscribeByEventType()方法
unsubscribeByEventType
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
根据事件类型获取订阅信息subscriptions集合,对其进行遍历移除,相比注册简单了很多。
发送事件
注册注销分析完后,我们再看一下发送消息逻辑,以最简单的发送字符串为例
EventBus.getDefault().post("aloe");
查看post()方法
post
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
第一行里的PostingThreadState封装了当前线程信息,订阅者以及订阅事件,currentPostingThreadState是ThreadLocal对象,是线程安全的。后面是将事件放入消息队列中。我们再看一下postSingleEvent()方法
postSingleEvent
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
对于一个事件,默认地会搜索出它的父类,并把父类也作为事件之一发送给订阅者,我们再看一下postSingleEventForEventType()方法
postSingleEventForEventType
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
这里获取subscriptions并调用postToSubscription()发送事件
postToSubscription
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
首先获取订阅方法运行的线程,如果是POSTING,那么直接调用invokeSubscriber()方法即可,如果是MAIN,则要判断当前线程是否是MAIN线程,如果是也是直接调用invokeSubscriber()方法,否则会交给mainThreadPoster来处理,其他情况相类似。最后利用反射的方式来调用订阅方法,将事件发送给订阅者。
粘性事件的发送及接收
粘性事件与一般的事件不同,粘性事件是先发送出去,然后让后面注册的订阅者能够收到该事件。粘性事件的发送是通过EventBus#postSticky方法进行发送的
EventBus.getDefault().postSticky("aloe");
我们看一下postSticky()源码
postSticky
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
把该事件放进了 stickyEvents这个map中,接着调用了post()方法,那么流程和上面分析的一样了,只不过是找不到相应的subscriber来处理这个事件罢了。那么当注册订阅者的时候是怎么匹配的呢?我们再来看一下subscribe()方法
subscribe
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
...
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
EventBus并不知道当前的订阅者对应了哪个粘性事件,因此需要全部遍历一次,找到匹配的粘性事件后,会调用checkPostStickyEventToSubscription()方法,内部又调用了postToSubscription。因此无论对于普通事件还是粘性事件,都会根据threadMode来选择对应的线程来执行订阅方法,而切换线程的关键就是mainThreadPoster、backgroundPoster和asyncPoster。
HandlerPoster
我们先看mainThreadPoster,在EventBus构造方法中初始化了mainThreadSupport,分析createPoster可知mainThreadSupport是HanlderPoster对象
HanlderPoster
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
...
}
HandlerPoster内部有一个PendingPostQueue,这是一个队列,保存了PendingPost,即待发送的post,该PendingPost封装了event和subscription,方便在线程中进行信息的交互。在postToSubscription方法中,当前线程如果不是主线程的时候,会调用HandlerPoster#enqueue方法
enqueue
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
首先会从PendingPostPool中获取一个可用的PendingPost,接着把该PendingPost放进PendingPostQueue,发送消息,那么由于该HandlerPoster在初始化的时候获取了UI线程的Looper,所以它的handleMessage()方法运行在UI线程。
handleMessage
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
...
eventBus.invokeSubscriber(pendingPost);
...
}
} finally {
handlerActive = rescheduled;
}
}
这里调用了EventBus#invokeSubscriber方法,在这个方法里面,将PendingPost解包,进行正常的事件分发。
BackgroundPoster
BackgroundPoster继承自Runnable,与HandlerPoster相似的,它内部都有PendingPostQueue这个队列,当调用到它的enqueue的时候,会将subscription和event打包成
enqueue
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
该方法通过Executor来运行run()方法,run()方法内部也是调用到了EventBus#invokeSubscriber方法。
AsyncPoster
与BackgroundPoster类似,它也是一个Runnable,实现原理与BackgroundPoster大致相同,但有一个不同之处,就是它内部不用判断之前是否已经有一条线程已经在运行了,它每次post事件都会使用新的一条线程。