前言
开篇要说声sorry,限于各种原因,Okhttp的下篇和OKIO要delay 了,本周先来一个简单一些的。
EventBus 是一个基于观察者模式的事件发布/订阅框架,开发者可以通过极少的代码去实现多个模块之间的通信,而不需要以层层传递接口的形式去单独构建通信桥梁。从而降低因多重回调导致的模块间强耦合,同时避免产生大量内部类。其可以很好的应用于Activity之间,Fragment之间,后台线程之间的通信,避免使用intent或者handler所带来的复杂度。其缺点则是可能会造成接口的膨胀。特别是当程序要求大量形式各异的通知,而没有做出良好的抽象时,代码中会包含大量的接口,接口数量的增长又会带来命名、注释等等一大堆问题。本质上说观察者要求从零开始实现事件的产生、分发与处理过程,这就要求参与者必须对整个通知过程有着良好的理解。当程序代码适量时,这是一个合理的要求,然而当程序太大时,这将成为一种负担。
EventBus基于观察者模式的Android事件分发总线。
EventBus基本使用
1.定义消息事件MessageEvent,也就是创建事件类型
public class MessageEvent {
public final String message;
public MessageEvent(String message) {
this.message = message;
}
}
2.注册观察者并订阅事件
选择要订阅该事件的订阅者(subscriber),Activity即在onCreate()加入,调用EventBus的register方法,注册。
EventBus.getDefault().register(this);
在不需要接收事件发生时可以
EventBus.getDefault().unregister(this);
在订阅者里需要用注解关键字 @Subscribe
来告诉EventBus使用什么方法处理event。
@Subscribe
public void onMessageEvent(MessageEvent event) {
Toast.makeText(this, event.message, Toast.LENGTH_SHORT).show();
}
注意方法只能被public修饰,在EventBus3.0之后该方法名字就可以自由的取了,之前要求只能是onEvent().
3.发送事件
通过EventBus的post方法,发出我们要传递的事件。
EventBus.getDefault().post(new MessageEvent("HelloEveryone"));
这样选择的Activity就会接收到该事件,并且触发onMessageEvent方法。
EventBus源码解析
了解了对于EventBus的基础使用,解析来,我们针对其基础使用的调用流程,来了解EventBus的实现流程和源码细节。
注册观察者
EventBus.getDefault().register(this);
- getDefault()
EventBus.getDefault()是一个单例,实现如下:
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
保证了App单个进程中只会有一个EventBus实例。
- register(Object subscriber)
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
register方法中,首先获得订阅实例的类,然后调用SubscriberMethodFinder实例的findSubscriberMethods
方法来找到该类中订阅的相关方法,然后对这些方法调用订阅方法。注册的过程涉及到两个问题,一个是如何查找注册方法?另一个是如何将这些方法进行存储,方便后面的调用?
SubscriberMethodFinder是如何从实例中查找到相关的注册方法的呢?
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//根据类信息丛缓存中查找订阅方法
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
//查找注册方法
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
//将得到的订阅方法加入到缓存中
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
首先从缓存的方法中,通过Class作为Key进行查找,如何查找内容为空,则会调用findUsingReflection或者findUsingInfo来从相关类中查找,得到注册的方法列表之后,将其添加到缓存之中。
缓存的数据结构如下:
Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
订阅方法
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//获取订阅方法要监听的事件类型
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//根据事件类型查找相应的订阅者
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
//如果不存在该事件类型,则创建,如果已经包含该订阅者,抛出异常
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
//获得该事件类型的订阅者列表,根据其优先级确定当前插入者的位置
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//在该注册者中加入对应的监听事件类型
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
//黏性事件处理
if (subscriberMethod.sticky) {
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
subscribe方法的执行流程是先根据事件类型,判断该注册者是否已经进行过注册,如果未注册将其中的方法进行保存,以事件类型为键保存一份,然后以注册者实例为键保存一份。
发送事件
对于事件的发送,调用的是post函数
- post(Object event)
public void post(Object event) {
//获取当前线程的Event队列,并将其添加到队列中
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
//如果当前PostingThreadState不是在post 中
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//遍历事件队列,调用postSingleEvent方法
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
post方法中,首先从当前的PostingThreadState中获取当前的事件队列,然后将要post的事件添加到其中,之后判断当前的线程是否处在post中,如果不在,那么则会遍历事件队列,调用postSingleEvent
将其中的事件抛出。
currentPostingThreadState是一个ThreadLocal类型的,里面存储了PostingThreadState。
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
}
PostingThreadState包含了一个eventQueue和一些标志位。类具体结构如下。
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
- postSingleEvent
postSingleEvent的具体实现如下。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
通过lookupAllEventTypes(eventClass)
得到当前eventClass的Class,以及父类和接口的Class类型,而后逐个调用postSingleEventForEventType方法。事件派发的核心方法在postSingleEventForEventType方法中。
- postSingleEventForEventType
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
从subscriptionsByEventType中拿到订阅了eventClass的订阅者列表 ,遍历,调用postToSubscription方法,逐个将事件抛出。
- postToSubscription
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//根据订阅者方法的线程模型进行不同的处理
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
根据threadMode去判断应该在哪个线程去执行该方法,而invokeSubscriber方法内通过反射调用函数。
MainThread
首先去判断当前如果是UI线程,则直接调用;否则, mainThreadPoster.enqueue(subscription, event)
BackgroundThread
如果当前非UI线程,则直接调用;如果是UI线程,则调用backgroundPoster.enqueue方法。
Async
调用asyncPoster.enqueue方法
接下来会针对这几种广播方式展开分析
- invokeSubscriber
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
通过反射的方式,直接调用订阅该事件方法。
- mainThreadPoster.enqueue
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
mainThreadPoster 通过mainThreadSupport.createPoster创建。
public Poster createPoster(EventBus eventBus) {
return new HandlerPoster(eventBus, looper, 10);
}
返回HandlerPoster实例。
通过Subscription和Event实例构造出PendingPost,然后将其加入到PendingPostQueue之中,然后调用sendMessage,其handleMessage函数将会被回调。
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
消息处理
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
当得到消息之后,开启循环,从队列中取PendingPost,调用invokeSubscriber方法执行。
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}
这里调用了releasePendingPost
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
为了避免对象的重复创建,在PendingPost中维护了一个PendingPost列表,方便进行对象的复用。
List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
对于对象的创建,可以通过其obtainPendingPost方法来获得。
- asyncPoster.enqueue
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
将PendingPost添加到PendingPost队列中,线程池会从队列中取数据,然后执行。
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
- backgroundPoster.enqueue
相比于asyncPoster,backgroundPoster可以保证添加进来的数据是顺序执行的,通过同步锁和信号量的方式来保证,只有一个线程是在活跃从事件队列中取事件,然后执行。
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
}
} finally {
executorRunning = false;
}
}
函数扫描
在register方法中对于订阅方法的查找,调用的方法是SubscriberMethodFinder的findSubscriberMethods方法,对于其中方法的查找有两种方式,一个是findUsingInfo
,一个是findUsingReflection
。
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
//获取FindState实例
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
//从当前类中查找,然后跳到其父类,继续查找相应方法
while (findState.clazz != null) {
findUsingReflectionInSingleClass(findState);
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
首先,会获得一个FindState实例,其用来保存查找过程中的一些中间变量和最后结果,首先找当前类中的注册方法,然后跳到其父类之中,其父类会自动过滤掉Java,Android中的相应类,然后继续查找。
查找的核心实现在方法findUsingReflectionInSingleClass中。
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// 获取该类中的所有方法,不包括继承的方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
//遍历获取的方法,判断添加规则为是否为public函数,其参数是否只有一个,获取其注解,然后调用checkAdd,
//在加入到订阅方法之前
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
//多于一个参数
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
//非public,abstract,非静态的
}
}
}
按照如下扫描规则,对类中的函数进行扫描
扫描规则:1.函数非静态,抽象函数 2.函数为public;3.函数仅单个参数;4.函数拥有@Subscribe
的注解;
在符合了以上规则之后,还不能够直接将其加入到函数的队列之中,还需要对方法进行校验。
boolean checkAdd(Method method, Class<?> eventType) {
Object existing = anyMethodByEventType.put(eventType, method);
if (existing == null) {
return true;
} else {
if (existing instanceof Method) {
if (!checkAddWithMethodSignature((Method) existing, eventType)) {
throw new IllegalStateException();
}
anyMethodByEventType.put(eventType, this);
}
return checkAddWithMethodSignature(method, eventType);
}
}
//函数签名校验,来进行
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>').append(eventType.getName());
String methodKey = methodKeyBuilder.toString();
Class<?> methodClass = method.getDeclaringClass();
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
// Only add if not already found in a sub class
return true;
} else {
// Revert the put, old class is further down the class hierarchy
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}
为扫描到的函数做校验,在校验后,释放自己持有的资源。第一层校验在checkAdd函数中,如果当前尚未有函数监听过当前事件,就直接跳过第二层检查。第二层检查为完整的函数签名的检查,将函数名与监听事件类名拼接作为函数签名,如果当前subscriberClassByMethodKey
中不存在相同methodKey时,返回true,检查结束;若存在相同methodKey时,说明子类重写了父类的监听函数,此时应当保留子类的监听函数而忽略父类。由于扫描是由子类向父类的顺序,故此时应当保留methodClassOld而忽略methodClass。
上述的方式是通过在运行期通过注解处理的方式进行的,效率是比较慢的,在EventBus最新版中引入了在编译器通过注解处理器,在编译器生成方法索引的方式进行,以此来提升效率。
粘性事件处理
粘性事件的设计初衷是,在事件的发出早于观察者的注册,EventBus将粘性事件存储起来,在观察者注册后,将其发出。通过其内部的一个数据结构:
Map<Class<?>, Object> stickyEvents
保存每个Event类型的最近一次post出的event
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
将粘性事件保存在stickyEvents,而后post出,此时如果存在已经注册的观察者,则情况同普通事件情况相同;如尚无注册的观察者,在postSingleEvent函数中将时间转化为一个NoSubscriberEvent事件发出,可由EventBus消耗并处理。待观察者注册时,从stickyEvents中将事件取出,重新分发给注册的观察者。
if (subscriberMethod.sticky) {
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
在对于粘性事件处理这段代码中,首先判断是否监听Event的子类,而后调用checkPostStickyEventToSubscription将黏性事件发出,在checkPostStickyEventToSubscription中,判空后按一半事件的post流程将事件传递给观察者。
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
postToSubscription(newSubscription, stickyEvent, isMainThread());
}
}
小结
轮子的每周一篇,已经到了第四周了,下周是对OkHttp的更细致的一个剖析,然后是对于OkIO的剖析