EventBus 是一种用于Android的事件发布-订阅总线,用于代替BroadcastReceiver,可以简化应用程序中各个组件之间的通信复杂度。
今天剖析EventBus的源代码,主要是想了解EventBus的工作原理,学习相关的设计模式、反射、多线程等知识等。
用法
public static class MessageEvent { /* Additional fields if needed */ }
@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(MessageEvent event) {
/* Do something */
};
@Override
public void onStart() {
super.onStart();
EventBus.getDefault().register(this);
}
@Override
public void onStop() {
super.onStop();
EventBus.getDefault().unregister(this);
}
EventBus.getDefault().post(new MessageEvent());
EventBus的用法非常的简单,只需要在Activity中使用@Subscribe
定义了需要接收的事件类型,使用EventBus.getDefault().register(this)
进行注册,就可以收到在其他地方发送的消息,同时也可以进行参数的传递。
问题
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
ThreadMode threadMode() default ThreadMode.POSTING;
boolean sticky() default false;
int priority() default 0;
}
@Subscribe
有三个参数,今天的剖析就是分析这三个参数的作用,及实现原理。从官方文档得知threadMode()
定义接收事件的线程,一共有POSTING、MAIN、MAIN_ORDERED、BACKGROUND、ASYNC这5种线程类型;priority是接收事件的优先级;然后还有sticky,粘性事件,也就是先发送,然后在订阅的时候接收到之前发送的粘性事件,把其消费掉。
代码剖析
EventBus.getDefault()
static volatile EventBus defaultInstance;
public static EventBus getDefault() {
EventBus instance = defaultInstance;
if (instance == null) {
synchronized (EventBus.class) {
instance = EventBus.defaultInstance;
if (instance == null) {
instance = EventBus.defaultInstance = new EventBus();
}
}
}
return instance;
}
volatile:volatile是一个类型修饰符(type specifier).volatile的作用是作为指令关键字,确保本条指令不会因编译器的优化而省略,且要求每次直接读值。
我们就从EventBus.getDefault()开始分析,这里使用了双检锁/双重校验锁(double-checked locking)
方式创建单例,并使用volatile
修饰符,保证指令不会重排序,确保单例的多线程安全。
EventBus.getDefault().register()
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
public class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
/** Used for efficient comparison */
String methodString;
...
}
上面的代码是传入对象进行登记,通过findSubscriberMethods
方法查找subscriberClass
带有订阅(@Subscribe)的方法,然后通过subscribe()
进行订阅。SubscriberMethod
包含了方法,线程模式,优先级等。那接下来我们看看EventBus的findSubscriberMethods
的实现方法。
SubscriberMethodFinder.findSubscriberMethods()
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
ConcurrentHashMap:主要为了解决HashMap线程不安全和Hashtable效率不高的问题。JDK1.7之前的ConcurrentHashMap使用分段锁机制实现,JDK1.8则使用数组+链表+红黑树数据结构和CAS原子操作实现。
由于EventBus查找订阅方法是通过反射的方式实现,会有较大的性能开销,所以在这个方法中,使用线程安全的Map来缓存类对应的SubscriberMethod
,同一个类多次使用时候,可以提高查找效率。这里可以看到,一个需要注册的类必须带有订阅的方法,否则就会抛出异常,这点在开发中必须要注意的。这里可以到非常有意思的的一点,通过ignoreGeneratedIndex
来选择通过反射的方式查找
或者通过findUsingInfo
方式实现,然后这个ignoreGeneratedIndex又是从何而来的?findUsingReflection
和findUsingInfo
两种方式又有什么不同呢?
findUsingInfo
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
findUsingInfo和findUsingReflection的代码很接近,findUsingInfo其实也包含了findUsingReflection的功能,只是仅仅多了通过EventBusBuilder.subscriberInfoIndexes
配置实现,具体看到getSubscriberInfo
方法。
findUsingReflection
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findUsingReflectionInSingleClass(findState);
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
我们先看反射的方式实现,prepareFindState()
是从缓存池中读取或者创建一个FindState,getMethodsAndRelease
就是把上面获取的FindState写入缓存池中。从代码可以看到,findUsingReflectionInSingleClass
是从当前的Class通过反射查找带有注册的方法,然后转移到父类继续查找,直到没有父类为之。那么接下来看看findUsingReflectionInSingleClass的实现方式。
findUsingReflectionInSingleClass
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
方法代码量比较多,但是逻辑其实也是很简单的,就是获取当前类所有的方法,找到带有@Subscribe
注解的方法,获取threadMode
进程类型。getMethods
只能拿到当前类的方法,逐个逐个分析,所以代码还尝试使用getDeclaredMethods
来获取方法数组,由于getDeclaredMethods是会获取到当前类和父类所有的方法,如果执行成功就无法再重复获取父类了,这样的速度也会更加快一些。
EventBus.subscribe(subscriber, subscriberMethod)
到了这里我们重新回到前面的EventBus.register(Object subscriber)
方法,前面已经介绍了通过反射查找SubscriberMethod
,接下来分析EventBus.subscribe(subscriber, subscriberMethod);
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
private final Map<Class<?>, Object> stickyEvents;
private final Map<Object, List<Class<?>>> typesBySubscriber;
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
CopyOnWriteArrayList:传统的List在多线程同时读写的时候会抛出java.util.ConcurrentModificationException,Vector是线程安全的,但是由于使用了synchronized同步锁,导致同一时间内只有一个线程能访问,效率较低。而CopyOnWriteArrayList是使用CopyOnWrite(写时复制)技术解决了这个问题,但是这一般需要很大的内存开销。
isAssignableFrom:isAssignableFrom()方法是从类继承的角度去判断,instanceof关键字是从实例继承的角度去判断。isAssignableFrom()方法是判断是否为某个类的父类,instanceof关键字是判断是否某个类的子类。
EventBus.subscribe() 有2个关键的东西,Object subscriber
注册的对象和SubscriberMethod
订阅的方法,包含的method(方法)
、threadMode(订阅线程)
、eventType(事件类)
、priority(优先级)
、sticky
。一个subscriber当中是可以有多个SubscriberMethod。
- subscriptionsByEventType:是以事件类作为key,来保存Subscription(subscriber, subscriberMethod),并且用priority排序,所以注册和取消注册都是对subscriptionsByEventType进行增删。
- typesBySubscriber:以subscriber注册对象作为key,保存事件类列表,其实他的作用当前只是用于判断subscriber是否已经注册,并没有其他的作用,改成Set类型也未尝不可。
- stickyEvents:是以事件类作为key,保存事件对象,stickyEvents是以key-value形式,意味着粘性事件一个事件类只能保存一个事件对象,后面发送的事件对象对覆盖前面发的。
stickyEvents接收时机:粘性属性的的实现,就是在注册的时候就会收到sticky粘性事件,意味着没有注册完成就收到了,所以我们在使用sticky属性时候,UI的初始化必须放在EventBus.register()之前,避免在收到事件后还没有初始化相关的信息。
eventInheritance:比如 A extends B implements C 发布者post(A()),那么找订阅者的时候不仅要找订阅了事件A的订阅者,还要找订阅了B和C的订阅者。这个参数是EventBusBuilder.eventInheritance配置的,默认是false,意味着只注册A的事件。
到了这里,注册就已经讲解完毕,由于注册还有checkPostStickyEventToSubscription
方法,这个方法就是粘性事件的发送,如果当前存在需要发送的粘性事件,那么就会去调用postToSubscription
方法发送事件,后面会说到postToSubscription方法。
从代码可见,粘性事件就算发送了,也不会从stickyEvents移除,意味着必须手动删除这个粘性事件才能够停止下次的分发。查看EventBus类的代码,就有
removeStickyEvent
、removeAllStickyEvents
实现移除工作,所以使用使用必须注意。
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
// If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
// --> Strange corner case, which we don't take care of here.
postToSubscription(newSubscription, stickyEvent, isMainThread());
}
}
发送事件 EventBus.post(event) 、EventBus.postSticky(event)
ThreadLocal:为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
public class ThreadLocalTest {
static ThreadLocal<String> threadLocal = ThreadLocal.withInitial(() -> "default");
public static void main(String[] args) {
Runnable runnable = () -> {
String name = Thread.currentThread().getName();
System.out.println(name + ":" + threadLocal.get());
threadLocal.set(name);
try { Thread.sleep(2000); } catch (InterruptedException ignored) {}
System.out.println(name + ":" + threadLocal.get());
};
new Thread(runnable,"线程A").start();
new Thread(runnable,"线程B").start();
new Thread(runnable,"线程C").start();
}
}
运行结果:
线程A:default
线程B:default
线程C:default
线程B:线程B
线程A:线程A
线程C:线程C
EventBus代码
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
private final ThreadLocal<PostingThreadState> currentPostingThreadState = null;
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
前面已经把注册事件讲完了,现在就谈谈一个事件在怎么发送的,postSticky
只是在post
的基础上增加了一个存储事件到stickyEvents的功能,前面我们已经讲了其作用。这里使用了了ThreadLocal来保存当前线程的事件发送状态,postingState.isPosting表示是否发送中,如果是已经在发送就不重复进入发送进程,postingState.eventQueue是用于存储发送的事件,接下来就分析postSingleEvent()
。
细心的人可能会问为何这个方法中没有使用同步锁相关的线程管理,难道就不怕会出现多线程冲突问题吗?其实currentPostingThreadState其实就已经解决这个问题,这个属性是基于当前线程存储信息的,不会出现多线程冲突问题,这个设计很精妙。
postSingleEvent()
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
eventInheritance前面已经讲过了,这里就不展开了。如果没有找到注册的事件方法,就会打印到日志里面,同时如果设置了sendNoSubscriberEvent,还会给sendNoSubscriberEvent发送一个NoSubscriberEvent
事件。接下来就到了postSingleEventForEventType。
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
这个方法的参数是事件和线程信息还有注册的事件类,这个方法的返回值是如果找不到注册的方法就返回false,方便前面一个类进行其他的操作。这里会以注册的事件类从subscriptionsByEventType
找到对应的注册方法列表,然后遍历一个一个执行发送,所以在这里priority的作用就生效了,因为subscriptionsByEventType中的value列表就是基于priority排序的,前面已经讲过,所以这个方法也没有什么内容类,接下来就到了postToSubscription
,前面的checkPostStickyEventToSubscription
也是调用postToSubscription。
postToSubscription
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
这个方法的逻辑也很简单,其实就是根据@Subscribe的threadMode参数进行不同的线程切换执行,从这里我们可以很好地知道threadMode的意义。
- POSTING:在和post相同的线程执行。
- MAIN:在UI线程执行,如果当前是UI进程马上执行。
- MAIN_ORDERED:和MAIN不同的是,MAIN_ORDERED是放到队列中,不会马上执行,等到前面的UI线程任务执行完毕才去执行。
- BACKGROUND:如果不是UI主进程,就马上执行,否则就放到后台队列。
- ASYNC:和BACKGROUND类似都是在非UI线程执行,但是不同的是BACKGROUND是多个事件共用一个线程先后执行,而ASYNC是为每一个事件分发都新开一个线程执行。
invokeSubscriber
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
这个方法很简单,就是通过反射执行方法并入事件参数,到这里事件的发送也基本讲完了。
EventBus.unregister()
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
取消注册的代码很简单,就是从Map和List中查找删除注册的对象信息。