提出问题
在使用EventBus的过程中肯定会对这个库提出一些疑问:
- EventBus是怎么实现全局的消息传递和接收;
- EventBus发送的消息内部的流转过程;
- EventBus最后怎么收到的消息;
- EventBus怎么依托@Subscribe来进行线程的切换的;
- 怎么实现的粘性事件.
在查看EventBus的过程中,带着问题去看源码,事半功倍.
查看源码
EventBus初始化
首先看EventBus的构造函数:
/** Convenience singleton for apps using a process-wide EventBus instance. */
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
看到这里大概明白了提出的第一个问题,EventBus用双重判定的单例来实现了一个全局的访问点.
接着看看new EventBus()
/**
* Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a
* central bus, consider {@link #getDefault()}.
*/
public EventBus() {
this(DEFAULT_BUILDER);
}
EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
这里采用了建造者模式来对EventBus进行配置, 通过变量我们能猜测一些的作用, 有一些也不清楚, 不过没关系, 在业务中去理解这些变量的含义.
订阅者注册register()
按照逻辑来, 接下来会选择去看订阅者的注册.
也就是我们调用的EventBus.getDefault().register(this);
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
看到这一小段代码, 通过单词(抛开具体实现细节)就很能清楚register的作用了.
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
从订阅者中找出相应的订阅方法, 也就是被@subscribe注解的方法.
而subscribe(subscriber, subscriberMethod);
显然就是将订阅类和它的订阅方法进行绑定存储在某个集合类, 方便我们下次进行查找.
带着这样一个总览的视角在接着去分享它具体实现的细节就很清晰明了了.
订阅方法查找findSubscriberMethods()
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 先从缓存中获取, 其中METHOD_CACHE的定义为线程安全的HashMap
// private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
// 两种方式查找订阅方法 注:看到这的时候并不知道ignoreGeneratedIndex的含义. 看到findUsingInfo()才会知道. 先解释一下ignoreGeneratedIndex是忽略产生的编译索引的意思. 3.0 后@Subscriber 方法会在编译时建立索引, 加快查找的速度, 同样由于注解处理的限制, 只能索引public方法. ignoreGeneratedIndex 默认为false.
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
// 这里就是在订阅类中不加订阅方法抛异常的原因.
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
// 订阅方法存入Cache
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
接下来我们需要分别分析 findUsingReflection(subscriberClass);
和 findUsingInfo(subscriberClass);
两种方法
通过反射查找订阅方法 findUsingReflection()
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findUsingReflectionInSingleClass(findState);
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
看方法名涉及到了父类子类. 这里看起来稍微有点儿复杂了, 所以要集中注意力来挨着分析了.
没有注释的方法, 就需要看它的方法名, 那就是这个方法的含义. 比如下面这个方法 prepareFindState()
private FindState prepareFindState() {
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
if (state != null) {
FIND_STATE_POOL[i] = null;
return state;
}
}
}
return new FindState();
}
获取 查找状态类
,这里有个复用池的设计思想, 还不快记笔记. FindState
后面也会用到,所以得来看看这个里面到底有什么信息.
static class FindState {
final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
final Map<Class, Object> anyMethodByEventType = new HashMap<>();
final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
final StringBuilder methodKeyBuilder = new StringBuilder(128);
Class<?> subscriberClass;
Class<?> clazz;
boolean skipSuperClasses;
SubscriberInfo subscriberInfo;
...
}
FindState
里有着
subscriberMethods
存储订阅方法.
subscriberClass
相应的订阅类
SubscriberInfo
这里保存着有父类的SubscriberInfo.
当然同样还有一些并不知道作用的变量, 这个就需要从后文里去看了.
其中findState.clazz
是在findState.initForSubscriber(subscriberClass);
对findState初始化的时候进行赋值的.
那么我们就能理解FindState
其实就是保存着订阅类的相关信息, 还包括着它父类的订阅信息.
接下来看看真正通过反射去查找订阅方法的内容了,findUsingReflectionInSingleClass(findState):
查找到的订阅方法肯定会放到findState类里保存.
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
// 获取类的方法, 其中getDeclaredMethods 获取本类中的所有方法,包括私有的(private、protected、默认以及public)的方法。
// getMethods(),该方法是获取本类以及父类或者父接口中所有的公共方法(public修饰符修饰的)
// 反射中 getDeclaredMethods() 获取速度比 getMethods() 要更快
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
// issue里说是由于EventBus混淆导致的异常.
// getMethods()能获取父类的还有接口中的公共方法所以这里skipSuperClasses = true来跳过父类的查找
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
// 获取方法的修饰符,也就是PUBLIC, PRIVATE等
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
// 获取方法的参数
Class<?>[] parameterTypes = method.getParameterTypes();
// @subscribe注解的订阅方法必须有一个入参
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
// subscribeAnnotation != null,找到了@subscribe注解并且是合法的方法
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
// 二重校验 这里找到了anyMethodByEventType的含义
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
// ****将解析出来的订阅信息都加入了findState.subscriberMethods里面*****
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
// 下面主要是@subscribe注解的方法但是又不满足条件的抛出异常
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
寻找一个订阅类的本类订阅方法到这里就结束了, 但是订阅类还会有父类.
while (findState.clazz != null) {
findUsingReflectionInSingleClass(findState);
findState.moveToSuperclass();
}
moveToSuperclass()
会一直移动到系统类后会置findState.clazz
为null
跳出while循环.
最后通过getMethodsAndRelease(findState)
; 将订阅方法返回.
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
// findState 其实就是一个中间类, 保存着处理过程中的信息, 使用完后回收
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
findState.recycle();
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
return subscriberMethods;
}
到此, 通过反射查找订阅方法就结束了.
通过索引查找订阅方法findUsingInfo()
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
// 未获取到 采用反射方式
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
整个方法大体的处理逻辑是和反射查找方式是一样的, 其中getSubscriberInfo(findState);
就是从索引获取订阅方法的.
private SubscriberInfo getSubscriberInfo(FindState findState) {
// 刚进入的时候 findState.subscriberInfo == null
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
return superclassInfo;
}
}
// 从编译产生的索引中获取 SubscriberInfo
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}
订阅关系绑定subscribe()
其中subscriber 是订阅类, subscriberMethod是其订阅方法
这个方法需要保存订阅方法和其订阅类, 所以需要关注这个void方法里面的成员变量, 也就是subscriptionsByEventType
和 subscribedEvents
, 不同的保存方式是方便后面的查询. 而对于粘性事件则关注 stickyEvents
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// subscriptionsByEventType 以消息类型为键, 订阅者和其订阅方法的包装类为值
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
// 按照优先级对订阅方法进行排序, 这里是优先级priority实现逻辑
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// typesBySubscriber里以订阅类为键,其消息类型为值
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
// 粘性事件的处理
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
事件发送post()
/** Posts the given event to the event bus. */
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
EventBus.getDefault().post();
这个方法比较简单, 就是将消息放入队列, 然后从队列里获取然后执行.
继续看postSingleEvent(eventQueue.remove(0), postingState);
方法:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
// 如果在构造的时候设置eventInheritance也就是事件继承, 则会去搜索消息事件的super类.
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
还要继续看 postSingleEventForEventType(event, postingState, eventClass);
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
看到这就很明白了 subscriptionsByEventType
, 在register里面让记住的一个成员变量在这就用上了, 通过消息事件的类型就能获取register中保存下来订阅类和其订阅方法. 这里同样也是threadMode线程管理的地方.
最后再看 postToSubscription(subscription, event, postingState.isMainThread);
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
// 直接执行
case POSTING:
invokeSubscriber(subscription, event);
break;
// 如果当前不是主线程, 则交由一个队列入队, 出队后当然是让Handler一条一条交给主线程进行执行.
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
// 带顺序的主线程执行.
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
// 后台poster入队后,出队后会由ExecutorService依次执行消息事件
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
// 会直接通过线程池执行消息事件, 也就是消息都是异步的.
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
着重关注 invokeSubscriber(subscription, event);
void invokeSubscriber(Subscription subscription, Object event) {
try {
// 通过反射调用订阅类的方法, 并将消息事件传递了过去! subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
这样最终就执行到了我们的订阅方法,整个post流程也就走得差不多了.
unregister()
用完记得调用EventBus.getDefault().unregister(this);
这里面就很简单了, 对应着register中保存着该订阅类的集合进行了去除操作.
那么文章开头的几个问题你能解答了么?