EventBus 源代码分析-你不知道的EventBus小环节
1.STICKY 粘性事件
在EventBus中有一种事件叫粘性事件,不是只有注册了之后发送的事件才能收到,在注册之前发送的事件,在注册之后也可以收到的事件就是粘性事件。
粘性事件有一些特点:
1.在发送的时候用postSticky(event) 方法。
2.每种类型的事件只存储一个,多个事件会被覆盖成最后一个。
3.新注册的订阅者,在注册之后就会马上收到之前发过的最后一个事件。
4.接受事件的订阅者必须标明为sticky = true。
实现:
在发送事件的时候需要调用postSticky()这个方法
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
可以看得出来最终是调用了post()方法,这个post的方法就是发送普通事件的方法,
在加这个之前将这个事件加入到了stickyEvents中,
Map<Class<?>, Object> stickyEvents = new ConcurrentHashMap<>();
这个stickyEvents 是一个线程安全的map,key为这个事件的类型,即这个class,
value为这个事件。
再来看看新的订阅者注册的时候
subscribe(){
...
if (subscriberMethod.sticky) { //是否是接收粘性事件
if (eventInheritance) { //是否开启父子继承关系的通知,默认true,为false可以优化性能
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) { //eventType类或者子类,可能有多个类
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType); //从map中获取对应类型的粘性事件
checkPostStickyEventToSubscription(newSubscription, stickyEvent); //检查并且发送粘性事件
}
}
...
}
以上这段代码是在订阅者register()的时候调用的。
这里边关于粘性事件的处理关键是
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
这两行代码,如果根据当前订阅事件的类型得到的粘性事件,
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
这一行是检查这个事件是否未null,如果不为null的话,将这个事件发送给newSubscription这个订阅者。
这里用到了一个方法isAssignableFrom()看下面的例子就可以实现明白了。需要和instanceof对比着记忆和使用。
public class Test {
public static void main(String[] args) {
List<String> list = new ArrayList<String>();
System.out.println(list instanceof List);
System.out.println(list instanceof ArrayList);
System.out.println(list.getClass().isAssignableFrom(List.class));
System.out.println(List.class.isAssignableFrom(list.getClass()));
}
}
结果:
true
true
false
true
其中instanceof是子-->父
isAssignableFrom是父-->子
2.Priority 优先级
EventBus中处理事件的优先级是订阅者在注册的时候将该事件的所有订阅者都排成一个list,在处理事件时就直接按顺序处理了。优先级默认值都是0,如果需要设置的话,可以将其调大及数据之间间隔处理,在int范围内。
下面看源码:
存储所有订阅者的是这个list
CopyOnWriteArrayList<Subscription> subscriptions = new CopyOnWriteArrayList<>();
这个是写时复制的容器:通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。用CopyOnWriteArrayList是考虑EventBus在线程中频繁收发消息用同步锁住这个容器,会对性能产生影响。用这种写时复制List可以解决List同时读写产生的异常。
在订阅者注册的方法register会循环该类的每一个订阅的方法subscribe():
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
下面是 subscribe(subscriber, subscriberMethod)方法的实现
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
...
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { //比较订阅方法的优先级,并将其插入队列中
subscriptions.add(i, newSubscription);
break;
}
}
...
}
3.PendingPost--EventBus中的快递包裹
发送快递包裹的时候,有两个要素:准备邮递的东西和邮递的目的地。
PendingPost 是快递包裹,event是要邮递东西、subscription是邮递的目的地。
两个要素: 事件和订阅者
PendingPost 中的两个主要成员:
Object event; 事件
Subscription subscription; 订阅者
这个其实就是一个对象,封装了事件和订阅者。
没有使用咱们常用的get、set的方式。而是使用了对象池的概念,将对象复用。
>//这个是存储对象的池列表即对象池。
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
//下面有两个静态的方法
static PendingPost obtainPendingPost(Subscription subscription, Object event){}
static void releasePendingPost(PendingPost pendingPost){}
//还有一个私有的构造方法
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
因为这个类创建和使用的概率比较大,用对象池的方式减少频繁创建和销毁对象的开销,属于性能优化的一种。但这个对象池需要手动回收使用过的对象。
4.PendingPostQueue--EventBus中的配送站点
即将发送事件的队列 PendingPostQueue, 这里边记载了队列的头元素和尾元素.
开放了三个同步的方法
synchronized void enqueue(PendingPost pendingPost) //加入队列
synchronized PendingPost poll() //取出队列头部的元素
synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException //也是取出队列头部的元素,只是当head为null的时候等待一段时间再执行poll()或者在这期间有元素进入了队列。
5.Poster--EventBus中的邮递员
这里的Poster是根据事件处理线程的不同,用来将事件从当前线程邮递到另外一个线程并且处理这个事件。
这里的Poster共有三种:
HandlerPoster 主线程执行
BackgroundPoster 后台线程执行,执行小而多的任务
AsyncPoster 后台线程执行,执行相对耗时的操作。
这个对应着线程模型中的四种模型: POSTING,MAIN,BACKGROUND,ASYNC.
POSTING 是在当前线程中执行的,就不走这三个Poster了。
MAIN和BACKGROUND先判断下是否是当前线程,如果不是就会邮递到另外的Poster中。
ASYNC线程是每一个事件处理都在一个单独的线程中执行,可以执行比较耗时的操作。
ASYNC相比BACKGROUND,后者不能执行耗时的操作,如果执行耗时的操作会影响其他事件的处理速度。。
BACKGROUND是在一个线程中执行里边的所有事件,ASYNC是每一个事件都在一个单独的线程中执行。
这三个Poster的实现都有一个前面介绍的PendingPostQueue来存储即将处理的事件
6.Subscription--EventBus中的收快递
订阅者 subscription 快递到了,该收消息了.
Subscription 有三个关键信息:
>final Object subscriber; //订阅者所在的类,家门地址
final SubscriberMethod subscriberMethod; //订阅的方法,具体是谁的快递
volatile boolean active; //现在是否可用(是否已经注册,或者已经取消注册)是否收取这个快递
这个类前两个属性比较好理解,就是一个封装,把原来订阅的方法,加了一个这个方法所在的类。active这个是在事件邮递(Poster)的过程中,这个事件会加入了另一个队列(PendingPostQueue )。当反取消注册的时候会从当前待处理的队列中清除,但是加入了PendingPostQueue就不能马上移除掉,就用这个active作为是否处理的标识。
这个字段在当前线程处理的话是没有使用到的。这个是处理异常并发的一种方法,加入是否可用的标识。
7.EventBusBuilder--EventBus中的配置中心
每次都用EventBus.getDefault() 获得EventBus实例,但这个实例可以设置吗?
答案是可以的,在EventBusBuilder里
**设置是这样 **
EventBus.builder().eventInheritance(false).installDefaultEventBus().register(this);
下面是installDefaultEventBus()源码
/**
* Installs the default EventBus returned by {@link EventBus#getDefault()} using this builders' values. Must be
* done only once before the first usage of the default EventBus.
*
* @throws EventBusException if there's already a default EventBus instance in place
*/
public EventBus installDefaultEventBus() {
synchronized (EventBus.class) {
if (EventBus.defaultInstance != null) {
throw new EventBusException("Default instance already exists." +
" It may be only set once before it's used the first time to ensure consistent behavior.");
}
EventBus.defaultInstance = build();
return EventBus.defaultInstance;
}
}
注释说的意思是这个installDefaultEventBus()这个方法只能调用一次,必须在 EventBus.getDefault()这个方法调用之前调用。看来放在Applicaton的onCreate()方法里比较合适了。第一次调用installDefaultEventBus(),创建完EventBus对象实例后,以后就用EventBus.getDefault()就可以了。
这个EventBusBuilder有一些关于EventBus相关的设置
例如这个eventInheritance(false)就是设置事件的类型是否是使用父子关系,默认是true的,就是执行父子继承关系的事件。如果使用中没用用到那么复杂的事件关系可以将此事件设置为false,优化EventBus的处理速度。还有日志开关,线程池等相关的设置都在这里。
8.EventBus设计模式之单例
EventBus是只有一个实例,用的单例模式是双重校验锁
static volatile EventBus defaultInstance;
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
两次校验,并且从内存中取出defaultInstance实例。另外要注意的是,EventBus实例只能初始化一次。初始化了之后,在用EventBusBuilder.installDefaultEventBus()会抛出异常的。
9.EventBus设计模式之建造者模式
每次创建EventBus实例,处理调用EventBus.getDefaul()方法外,也可以用EventBusBuilder来配置EventBus默认实例。
先创建 EventBusBuilder实例
EventBusBuilder.build();
public static EventBusBuilder builder() {
return new EventBusBuilder();
}
在设置EventBus相关的配置信息,例如:
/** Default: true */
public EventBusBuilder logSubscriberExceptions(boolean logSubscriberExceptions) {
this.logSubscriberExceptions = logSubscriberExceptions;
return this;
}
/** Default: true */
public EventBusBuilder logNoSubscriberMessages(boolean logNoSubscriberMessages) {
this.logNoSubscriberMessages = logNoSubscriberMessages;
return this;
}
/** Default: true */
public EventBusBuilder sendSubscriberExceptionEvent(boolean sendSubscriberExceptionEvent) {
this.sendSubscriberExceptionEvent = sendSubscriberExceptionEvent;
return this;
}
/** Default: true */
public EventBusBuilder sendNoSubscriberEvent(boolean sendNoSubscriberEvent) {
this.sendNoSubscriberEvent = sendNoSubscriberEvent;
return this;
}
最后在调用installDefaultEventBus()生成EventBus实例
/**
* Installs the default EventBus returned by {@link EventBus#getDefault()} using this builders' values. Must be
* done only once before the first usage of the default EventBus.
*
* @throws EventBusException if there's already a default EventBus instance in place
*/
public EventBus installDefaultEventBus() {
synchronized (EventBus.class) {
if (EventBus.defaultInstance != null) {
throw new EventBusException("Default instance already exists." +
" It may be only set once before it's used the first time to ensure consistent behavior.");
}
EventBus.defaultInstance = build();
return EventBus.defaultInstance;
}
这里的build()是创建EventBus的方法,并不上上面的builder()创建EventBusBuilder的,这两个要区分来。
/** Builds an EventBus based on the current configuration. */
public EventBus build() {
return new EventBus(this);
}
EventBus使用了建造者模式,使EventBus的全局配置脱离开EventBus本身,
以EventBusBuilder的形式作为设置的方式,使配置与使用有效的分离,使得EventBus更专注处理事件相关的处理。
10.EventBus 设计模式之观察者模式
EventBus并没有使用android系统带有的Observable实现,而是在EventBus内部自己维护了相关的容器.
private final Map<Object, List<Class<?>>> typesBySubscriber;
观察者模式中典型的注册和反注册
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
EventBus将订阅者存储起来,待有事件发生时,将这个事件与所有订阅者匹配,有订阅者关心的事件,就将事件发送给订阅者。
11.EventBus中的共享池缓存
EventBus默认的事件是支持父子关系的事件传递的。用eventTypesCache的key来存储这个事件,用这个value存储这个事件的所有父类。便于以后每次查找这个事件的时候都遍历查找一次,使用了空间换时间的做法。
查找事件类型对应的所有父类的,下面这个属性是用来存储事件类型对应的所有父类的。
private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<>();
查找注册类中的所有方法
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}
还有一处也是用了此种方法就是在每次反射查找类中的所有方法的时候。
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
这个map的可以是存储的类,value为这个类中所有订阅的方法。
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
这个是对其中cache存储的实现,减少了反射带来的代价,第一次就将这个类的所有方法都存储起来,第二次用的时候不用再重复处理了。在存储数量不大的情况下,使用这种空间换时间的方法更有效果。还有要注意的一点是,这两个的map存储的都是类的类型,并不是对象,所以不会带来引用的问题,内存释放不了的问题。
在不同的EventBus版本这个实现是有区别的,这里的代码是eventbus-3.0.0 中的。
12.EventBus中的线程池
在EventBus中的任务执行,在当前线程执行的任务就在当前线程直接执行了。
如果需要在其他线程执行的,例如:从主线程发送任务到非主线程执行,从非主线程需要在另外单独的非主线程中执行的任务。这个另外的线程的就是用线程池实现的。
EventBus是有线程池相关的设置的在EventBusBuilder中
public class EventBusBuilder {
...
//默认线程池Executors.newCachedThreadPool()
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
...
//EventBus可以配置的的线程池,默认使用的是上边默认的线程池类型
ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
...
//在这里设置其他类型的线程池
/**
* Provide a custom thread pool to EventBus used for async and background event delivery. This is an advanced
* setting to that can break things: ensure the given ExecutorService won't get stuck to avoid undefined behavior.
*/
public EventBusBuilder executorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}
...
}
Java通过Executors提供四种线程池,分别为:
newCachedThreadPool:是一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool: 是一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool: 是一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor: 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
EventBus中默认是Executors.newCachedThreadPool(),可以根据需求设置其他类型的线程池处理事件。
13.EventBus中的线程变量
EventBus中会在发送事件的时候,将当前发送的线程会在线程中设置一些线程变量
下面是当前发送事件的线程属性
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
下面是post事件的时候处理线程变量相关的信息。
/** Posts the given event to the event bus. */
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
下面是取消事件路由的代码:
/**
* Called from a subscriber's event handling method, further event delivery will be canceled. Subsequent
* subscribers
* won't receive the event. Events are usually canceled by higher priority subscribers (see
* {@link Subscribe#priority()}). Canceling is restricted to event handling methods running in posting thread
* {@link ThreadMode#POSTING}.
*/
public void cancelEventDelivery(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
if (!postingState.isPosting) {
throw new EventBusException(
"This method may only be called from inside event handling methods on the posting thread");
} else if (event == null) {
throw new EventBusException("Event may not be null");
} else if (postingState.event != event) {
throw new EventBusException("Only the currently handled event may be aborted");
} else if (postingState.subscription.subscriberMethod.threadMode != ThreadMode.POSTING) {
throw new EventBusException(" event handlers may only abort the incoming event");
}
postingState.canceled = true;
}
下面是在线程中设置的信息
/** For ThreadLocal, much faster to set (and get multiple values). */
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<Object>(); //需要处理的事件列表
boolean isPosting; //是否正在发送
boolean isMainThread; //是否是主线程
Subscription subscription; //订阅者
Object event; //当前处理的事件
boolean canceled; //是否取消发送
}
EventBus中使用线程变量是在发送事件的时候,在当前线程中设置当前信息以及现在处理事件的状态,在使用的时候可以方便处理。