一、简介
EventBus是一个Android端优化的publish/subscribe消息总线,简化了应用程序内各组件内、组件与后台线程间的通信,比如请求网络,等网络返回时,通过Handler、Broadcast更新UI等。它有很多优点:简化应用组件间的通信;解耦事件的发送者和接收者。
二、使用方法
注册: EventBus.getDefault().register(this);
解注册:EventBus.getDefault().unregister(this);
事件Event: 事件可以是任意普通的Java对象,没有任何特殊的要求
发送消息: EventBus.getDefault().post(new MessageEvent("Hello EventBus!"));
三、线程模式
EventBus支持订阅者方法在不同于发布事件所在线程的线程中被调用。你可以使用线程模式来指定调用订阅者方法的线程。EventBus总共支持5种线程模式:
ThreadMode.POSTING 订阅者方法将在发布事件所在的线程中被调用。这是 默认的线程模式。事件的传递是同步的,一旦发布事件,所有该模式的订阅者方法都将被调用。这种线程模式意味着最少的性能开销,因为它避免了线程的切换。因此,对于不要求是主线程并且耗时很短的简单任务推荐使用该模式。使用该模式的订阅者方法应该快速返回,以避免阻塞发布事件的线程,这可能是主线程。
ThreadMode.MAIN 订阅者方法将在主线程(UI线程)中被调用。因此,可以在该模式的订阅者方法中直接更新UI界面。如果发布事件的线程是主线程,那么该模式的订阅者方法将被直接调用。使用该模式的订阅者方法必须快速返回,以避免阻塞主线程。
ThreadMode.MAIN_ORDERED 订阅者方法将在主线程(UI线程)中被调用。因此,可以在该模式的订阅者方法中直接更新UI界面。事件将先进入队列然后才发送给订阅者,所以发布事件的调用将立即返回。这使得事件的处理保持严格的串行顺序。使用该模式的订阅者方法必须快速返回,以避免阻塞主线程。
ThreadMode.BACKGROUND 订阅者方法将在后台线程中被调用。如果发布事件的线程不是主线程,那么订阅者方法将直接在该线程中被调用。如果发布事件的线程是主线程,那么将使用一个单独的后台线程,该线程将按顺序发送所有的事件。使用该模式的订阅者方法应该快速返回,以避免阻塞后台线程。
ThreadMode.ASYNC 订阅者方法将在一个单独的线程中被调用。因此,发布事件的调用将立即返回。如果订阅者方法的执行需要一些时间,例如网络访问,那么就应该使用该模式。避免触发大量的长时间运行的订阅者方法,以限制并发线程的数量。EventBus使用了一个线程池来有效地重用已经完成调用订阅者方法的线程。
@Subscribe(threadMode = ThreadMode.POSTING)
public void onMessageEventPosting(MessageEvent event) {
Log.i(TAG, "onMessageEventPosting(), current thread is "
+ Thread.currentThread().getName());
}
@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEventMain(MessageEvent event) {
Log.i(TAG, "onMessageEventMain(), current thread is "
+ Thread.currentThread().getName());
}
@Subscribe(threadMode = ThreadMode.MAIN_ORDERED)
public void onMessageEventMainOrdered(MessageEvent event) {
Log.i(TAG, "onMessageEventMainOrdered(), current thread is "
+ Thread.currentThread().getName());
}
@Subscribe(threadMode = ThreadMode.BACKGROUND)
public void onMessageEventBackground(MessageEvent event) {
Log.i(TAG, "onMessageEventBackground(), current thread is "
+ Thread.currentThread().getName());
}
@Subscribe(threadMode = ThreadMode.ASYNC)
public void onMessageEventAsync(MessageEvent event) {
Log.i(TAG, "onMessageEventAsync(), current thread is "
+ Thread.currentThread().getName());
}
四、粘性事件
发布一个粘性事件之后,EventBus将在内存中缓存该粘性事件。当有订阅者订阅了该粘性事件,订阅者将接收到该事件。
// 发布粘性事件
EventBus.getDefault().postSticky(new MessageEvent("Hello EventBus!"));
// 订阅粘性事件
@Subscribe(sticky = true)
public void onMessageEvent(MessageEvent event) {
...
}
五、源码解析
- 从EventBus.getDefault()方法开始说起。主要是获取EventBus对象。
// 典型的双重校验锁的单例模式
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
public EventBus() {
this(DEFAULT_BUILDER);
}
// key 为订阅事件类型,value为订阅该事件的所有订阅者集合
// Subscription则是一个封装了订阅者和方法体的一个类
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>>
subscriptionsByEventType;
// key为订阅者,value为订阅者的所有订阅方法
private final Map<Object, List<Class<?>>> typesBySubscriber;
private final Map<Class<?>, Object> stickyEvents;
// 初始化成员变量
EventBus(EventBusBuilder builder) {
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
//一系列的builder赋值
}
- 消息注册register(this)函数
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
// 从订阅类中获取所有的订阅方法信息
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder
.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 首先从缓存中读取
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
// 默认是false
if (ignoreGeneratedIndex) {
// 利用反射来获取订阅方法中的信息
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 从注解器获取的类中获得订阅方法信息
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with “
+” the @Subscribe annotation");
} else {
// 保存在缓存中
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
// 准备一个FindState,该FindState保存了订阅者类的信息
FindState findState = prepareFindState();
//对FindState初始化
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
//获得订阅者的信息,一开始会返回null
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo
.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method
, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are
// fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError,
// see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0
&& (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method
.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method
, eventType, threadMode,subscribeAnnotation.priority()
, subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification
&& method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName()
+ "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification
&& method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "."
+ method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public "
+ ", non-static, and non-abstract");
}
}
}
- 发送消息源码
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
// 如果没有正在发送,则发送消息
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
// 发送消息
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
private void postSingleEvent(Object event, PostingThreadState postingState)
throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event
, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState
, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event "
+ eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class
&& eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
private boolean postSingleEventForEventType(Object event
, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
// 根据事件类型获取所有的订者
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
private void postToSubscription(Subscription subscription, Object event
, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled
// from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: "
+ subscription.subscriberMethod.threadMode);
}
}
- 取消注册源码分析
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not
“+registered before: " + subscriber.getClass());
}
}
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
// 获取事件类型的所有订阅者
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
// 遍历订阅者集合将解除的订阅者移除
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
简单的进行总结一下,主要就是注册和发送过程比较重要。
注册:
- 通过反射或者注解获取所有的订阅方法
- 将当前订阅者添加到EventBus总的事件订阅者集合中subscriptionsByEventType
- 将当前订阅者所有订阅的事件类型添加到typesBySubscriber,方便解注册
发送:
- 得到要发送的事件类型
- 根据事件类型获取订阅者,并循环向每个订阅者发送
解注册:
- 通过typesBySubscriber获取当前订阅者所有的订阅事件类型
- 循环遍历每一个事件类型,并删除当前订阅者的订阅的方法