前言
EventBus3.0之后添加了一项新功能——订阅者索引,该功能不是强制使用的,若是我们只导入implementation 'org.greenrobot:eventbus:3.1.1'
是不能够使用索引的。该功能是在项目编译时生成索引文件,项目运行时执行这些文件,以达到提高运行速度的目的,因为在不使用索引功能情况下,使用的是反射,因此要耗时一些。
本文就主要分析EventBus在不使用索引功能的情况下的源码,最后稍微分析下索引功能的实现。
EventBus使用
方式
//发送对象
//Object obj = ...
//直接发送
EventBus.getDefault().post(obj)
//发送粘性事件
EventBus.getDefault().postSticky(obj)
以上是发送事件的方法,常见就这两种使用方式,可在任意线程使用。
//对象初始化时-注册
EventBus.getDefault().register(this)
//对象销毁时-解绑
EventBus.getDefault().unregister(this)
//实现一个方法,该方法接收想要的Event类型
//这里的MessageEvent自己定义的一种Event类型
@Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
fun onRecived(even: MessageEvent) {
tv_recived.text = even.message
}
这就是接收事件的使用方法,自定义一个方法,方法名任意,但是参数类型必须是我们想要接收的数据的类或者接口类型,再使用EventBus提供的注解去注释该方法:
- threadMode: 定义接收线程模型,例如
MAIN
是指当前方法在主线程中执行,默认是POSTING
指与调用者保持在同一线程中(保证开销最小)。- sticky:是否是粘性事件,即订阅者是否可以延时接收。
- priority:接收的优先级。
注意
- 每次注册、使用之后,一定要注销。例如在一个活动中注册了,若未注销,那么在这个活动进入
onStop()
之后,仍然能够接收消息(当然包括粘性消息),若此时更新UI,就可能引发程序崩溃。 - 发送事件之后,所有接收参数类型跟该事件类型相同的订阅方法都会被执行。
源码分析
订阅者注册
我们注意到我们订阅的方法是EventBus实现调用的,那它肯定要收集订阅的方法,因此我们从EventBus.register(this)
注册开始分析源码,先弄懂它的注册的机制是怎样的。
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder
.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
这几句代码很清楚:
首先找到订阅者的所有订阅方法,放入List集合中,这里使用了SubscriberMethod对象来表示订阅方法,里面包含了method
、eventType
等字段,这里的eventType
就是我们发送事件的对象类型,例如上面的MessageEvent类型。
然后是订阅List中的方法,在这个过程中要检查粘性事件是否发送给订阅者(发送粘性事件的实现)。
接着往下看,我们要找到订阅方法,需要去到SubscriberMethodFinder这个类中去看,它的findSubscriberMethods()
方法:
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//缓存中拿取
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
//区分是否使用索引功能
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//放入缓存
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
上面的代码就是使用索引查找和不使用索引查找,默认是使用索引查找。下面是findUsingInfo()
方法
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
//初始化FindState对象
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
//使用索引
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
//不使用索引,回到反射拿去方法
findUsingReflectionInSingleClass(findState);
}
//移动到父类
findState.moveToSuperclass();
}
//生成方法list,回收FindState对象
return getMethodsAndRelease(findState);
}
先介绍下新出现的FindState类,它就是一个中间变量,最后是会被回收的,当中主要包含着:
//就是作为订阅者及其订阅方法的信息
final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
final Map<Class, Object> anyMethodByEventType = new HashMap<>();
final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
final StringBuilder methodKeyBuilder = new StringBuilder(128);
Class<?> subscriberClass;
Class<?> clazz;
boolean skipSuperClasses;
SubscriberInfo subscriberInfo;
而SubscriberMethod就是一个订阅方法类:
//包含订阅方法的方法体、注解内容、参数类型
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
String methodString;
回到上面的方法中去,在索引功能不可用的情况下是进行findUsingReflectionInSingleClass(findState)
方法:
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
//反射拿到方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
//失败则使用另外一种方式获取
methods = findState.clazz.getMethods();
//跳过父类检索
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
//要求是公开并只有一个参数的方法
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
//检查能否添加
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
//将订阅方法加入进FindState的容器
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
}
}
}
}
反射不用多说,这里的checkAdd()
方法主要是FindState内部自己的检查,是否能够添加进去,就不贴代码了。最终实现的是将订阅方法全部放入FindState内部的subscriberMethods
容器。
再次回到上面的findUsingInfo()
方法中去,它的代码还没有执行完,应该执行到最后一步getMethodsAndRelease(findState)
:
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
//将订阅方法拿出
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
//释放findState,插入数组
findState.recycle();
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
return subscriberMethods;
}
这个方法重要的地方在于findState
的释放和回收,POOL_SIZE
默认是4,那么之前findState
的初始化也是优先考虑FIND_STATE_POOL
中不为空元素。
到此,我们终于拿到了订阅者中的所有订阅方法,接下来应该回到最上面的register()
方法中,继续往下看for (SubscriberMethod subscriberMethod : subscriberMethods) subscribe(subscriber, subscriberMethod);
将每一个方法进行订阅:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
//生成订阅者
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass()
+ " already registered to event " + eventType);
}
}
//改变插入位置,实现优先级功能
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//投入容器
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
//针对粘性事件,是否post事件出去
if (subscriberMethod.sticky) {
//事件类型是否有继承类型
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
//若事件类型是key类型或者父类类型的话
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
//post 实现粘性事件
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
这一大段代码实现的功能很简单,就是将订阅方法投入EventBus的订阅方法容器中去,并在最后判断粘性事件则post一次。
这里的CopyOnWriteArrayList是一种可在并发条件下操作的容器,它在每次操作时都会将数组拷贝一份出去,进行操作,所以它在并发条件下不用加锁。
最后的粘性事件的实现其实很简单:在当前的对象实现订阅者注册时,EventBus检测到某个注册方法是允许接收粘性事件,它就调用stickyEvents.get(eventType);
拿到对应事件类型的事件,并单独为这个newSubscription
post一次,将该事件单独发送给它,也就是checkPostStickyEventToSubscription(newSubscription, stickyEvent)
这句代码。
发起者post
先讲postSticky()
,因为它就比post()
方法多了一行代码stickyEvents.put(event.getClass(), event)
,当然是加了线程锁的,就是将该事件投递进粘性事件的容器里面去。
再讲post()
方法:
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//一条一条地发送事件(根据优先级排序好的容器)
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
//重置状态
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
这里新出现了一个PostingThreadState类,先说它获取的方式currentPostingThreadState.get()
,其实postingState
是放在了ThreadLocal中去,作为当前线程独有的一个变量,我们看下这个变量里面有什么:
//就只有这几个属性
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
可见,在每一个线程中都有一个“事件队列”,每一次发送都会将该队列清空。并且每一个线程只有能
接下来再看postSingleEvent()
方法的实现
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
//事件是否使用继承检查
if (eventInheritance) {
//查找事件的所有类型(接口、父类)
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//依次发送事件
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
进入postSingleEventForEventType()
方法,就不贴它的源码了,从它再次进入postToSubscription(subscription, event, postingState.isMainThread);
:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING: //同一线程执行订阅方法
invokeSubscriber(subscription, event);
break;
case MAIN: //在主线程实现
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED: //进入在主线程的事件队列中去,以实现事件串行
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND: //保证在后台线程实现订阅方法
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC: //在线程池中实现
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: "
+ subscription.subscriberMethod.threadMode);
}
一目了然,根据订阅方法需要的不同来在不同的环境下实现它。
订阅方法的实现
上面的invokeSubscriber(subscirption, event)
方法是通过反射实现订阅方法:
//参数一:订阅者类 参数二:事件对象
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
我们再根据threadMode
的值来看不同情况下,订阅方法的实现:
1. MAIN和MAIN_ORDERED
它们的实现都主要是依靠mainThreadPoster
,将event
投递进mainThreadPoster
的队列中去。看一下mainThreadPoster
的由来:
//这里的looper自然就是主线程的looper
public Poster createPoster(EventBus eventBus) {
return new HandlerPoster(eventBus, looper, 10);
}
这里的HandlerPoster是一个自定义的类,它有许多细节实现,就不再贴代码了。简述一下,它就是继承自Handler(绑定主线程的Looper),每次插入消息的时候,就使用handler.sendMessage(), handler.handleMessage()
这两个方法来执行订阅方法实现的步骤。
2. BACKGROUND
它的实现主要是依靠backgroundPoster
,也是同样执行enqueue()
方法来将该事件插入队列,backgroundPoster
是直接实例出来的,它的类实现接口Runnnable,类中主要实现代码:
eventBus.getExecutorService().execute(this);
借助EventBus中的线程池来执行自身的run()
方法,而该线程池是newCachedThreadPool()
,就是线程可复用,没啥特殊的。backgroundPoster
的run()
方法想必都能够猜到了,就是依次从事件队列中拿取事件并执行订阅方法,但是依旧有部分细节代码,就不贴了。
3. ASYNC
它的实现主要是依靠asyncPoster
,它跟backgroundPoster
的实现效果差不多,只是backgroundPoster
保证事件是根据订阅方法优先级依次执行的,而asyncPoster
将所有订阅方法一起执行(线程池的特性)。
自定义EventBus——Builder
我们常用的是EventBus.getDefault()拿到EventBus,但是也可以使用EventBus.Builder().build()来自定义我们的EventBus,下面罗列出Builder的主要方法:
//关于Exception的方法就不罗列了
//是否检查事件继承(事件类的父类、接口),关乎是否收集其父类的订阅方法,默认true
public EventBusBuilder eventInheritance(boolean eventInheritance) {
this.eventInheritance = eventInheritance;
return this;
}
//自定义线程池,如上面所说,另外两种threadMode都使用该线程池
public EventBusBuilder executorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}
//是否进行方法验证,例如验证其修饰符...
public EventBusBuilder skipMethodVerificationFor(Class<?> clazz) {
if (skipMethodVerificationForClasses == null) {
skipMethodVerificationForClasses = new ArrayList<>();
}
skipMethodVerificationForClasses.add(clazz);
return this;
}
//是否忽略索引功能,文章顶上有说明(不使用时建议自定义为true,后面少验证)
public EventBusBuilder ignoreGeneratedIndex(boolean ignoreGeneratedIndex) {
this.ignoreGeneratedIndex = ignoreGeneratedIndex;
return this;
}
//......
EventBus的Builder不是很复杂,可以自己在使用的时候自己查看方法。
拓展——索引功能
导库请看官方讲解:http://greenrobot.org/eventbus/documentation/subscriber-index/
我这里贴出生成类关键代码:
private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;
static {
SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();
putIndex(new SimpleSubscriberInfo(Main2Activity.class, true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onRecived", MessageEvent.class, ThreadMode.MAIN, 0, true),
}));
}
private static void putIndex(SubscriberInfo info) {
SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
}
@Override
public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
if (info != null) {
return info;
} else {
return null;
}
}
分析:EventBus就是在编译时期找到被注解的类,并由此生成一个类。生成类内部维护着一个static Map,该Map的key是订阅者类,value是订阅者信息对象(包含订阅方法名等重要信息),因此在我们register()
的时候,就会使用getSubscriberInfo()
方法将value提出来放入EventBus中的订阅者集合中去,这样在post的时候,就可以通过EventBus中的集合找到我们对应的订阅方法,并去invoke()
实现订阅方法,从而达到传递消息功能。
对比:在register()
的时候:没有索引功能的情况下,只能通过反射找到订阅类中的注解,而使用反射寻找订阅方法是比较消耗时间的。而有索引功能的情况下,在项目编译时就已经生成了索引类,只需直接调用getSubscriberInfo()
拿到订阅信息,而无需再使用反射去找注解等系列过程,时间上肯定是节约了不少。
最后
EventBus使用时业务代码是相当简洁的,其也实现了发送者和接收者之间很大程度的解耦,用户只需关心其事件类型即可,而且不像Intent必须使用可序列化的数据,并且EventBus支持粘性事件。
我觉得EventBus也有些缺点,个人拙见:EventBus采用的模式是多对一的模式,由于规定发送、接收方的只有事件类型,这导致发送者不知道其接收者究竟有哪些,在接收者错误发生时,接收者也不知道该事件来自于哪个发送者。还有一个问题在于其粘性事件的处理,粘性事件在实际开发中是比较常见的,接收者只要已注册即会接收到粘性事件,一旦粘性事件堆积过多,或者说已经“过期”的粘性事件没能及时清除,这将导致不可预测的结果。
笔者水平有限,有写得不好的地方,请大胆指出~
转载请注明出处~