EventBus源码阅读
怎么阅读EventBus?核心无非就两个,一个是注册事件,一个是发布事件!从这两个方法往下看,就很清晰了.
包结构

包结构
如何使用
public static void main(String[] args) {
// 定义一个EventBus对象,这里的Joker是该对象的id
EventBus eventBus = new EventBus("Joker");
// 向上述EventBus对象中注册一个监听对象
eventBus.register(new EventAListener());
eventBus.register(new EventBListener());
// 使用EventBus发布一个事件,该事件会给通知到所有注册的监听者
eventBus.post(new DickEvent(18, "black"));
}
核心构造器,核心属性
EventBus(
String identifier,
Executor executor,
Dispatcher dispatcher,
SubscriberExceptionHandler exceptionHandler) {
this.identifier = checkNotNull(identifier);
this.executor = checkNotNull(executor);
this.dispatcher = checkNotNull(dispatcher);
this.exceptionHandler = checkNotNull(exceptionHandler);
}
核心变量
//
private final String identifier;
//
private final Executor executor;
//
private final SubscriberExceptionHandler exceptionHandler;
//
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
//
private final Dispatcher dispatcher;
注册方法
public void register(Object object) {
subscribers.register(object);
}
void register(Object listener) {
//
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
//
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
//
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
//
if (eventSubscribers == null) {
//
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
//
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
//修改了subscribers里的value的值.
eventSubscribers.addAll(eventMethodsInListener);
}
}
找到listener里所有加注解的监听方法.
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
//
Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
//
Class<?> clazz = listener.getClass();
//
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
//每个方法生成一个对象.
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}
static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
? new Subscriber(bus, listener, method)
//synchronized
: new SynchronizedSubscriber(bus, listener, method);
}
获取注解方法.
private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
return subscriberMethodsCache.getUnchecked(clazz);
}
用了自己的cache.
/**
* A thread-safe cache that contains the mapping from each class to all methods in that class and
* all super-classes, that are annotated with {@code @Subscribe}. The cache is shared across all
* instances of this class; this greatly improves performance if multiple EventBus instances are
* created and objects of the same class are registered on all of them.
*/
private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
CacheBuilder.newBuilder()
.weakKeys()
.build(
new CacheLoader<Class<?>, ImmutableList<Method>>() {
@Override
public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
return getAnnotatedMethodsNotCached(concreteClass);
}
});
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
//
Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
//
Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
//
for (Class<?> supertype : supertypes) {
//
for (Method method : supertype.getDeclaredMethods()) {
//
if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
// TODO(cgdecker): Should check for a generic parameter type and error out
Class<?>[] parameterTypes = method.getParameterTypes();
checkArgument(
parameterTypes.length == 1,
"Method %s has @Subscribe annotation but has %s parameters."
+ "Subscriber methods must have exactly 1 parameter.",
method,
parameterTypes.length);
//
MethodIdentifier ident = new MethodIdentifier(method);
if (!identifiers.containsKey(ident)) {
identifiers.put(ident, method);
}
}
}
}
return ImmutableList.copyOf(identifiers.values());
}
MethodIdentifier
方法名+参数
private final String name;
private final List<Class<?>> parameterTypes;
发布事件
核心,调用注册者的注解方法.
public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
Dispatcher 事件分发器
Handler for dispatching events to subscribers, providing different event ordering guarantees that
make sense for different situations.
那么问题来了,事件的分发是不是可以变化的呢,比如有的是要立即分发,有的是异步分发,有的是一个线程一个分发.
所以这个类是抽象类.
abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
静态的工厂方法.返回三个实现类.
static Dispatcher legacyAsync() {
return new LegacyAsyncDispatcher();
}
static Dispatcher immediate() {
return ImmediateDispatcher.INSTANCE;
}
static Dispatcher perThreadDispatchQueue() {
return new PerThreadQueuedDispatcher();
}
ImmediateDispatcher
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
subscribers.next().dispatchEvent(event);
}
}
LegacyAsyncDispatcher
全局事件队列.
/** Global event queue. */
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
Queues.newConcurrentLinkedQueue();
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
e.subscriber.dispatchEvent(e.event);
}
}
PerThreadQueuedDispatcher
默认的分发策略.
Dispatcher.perThreadDispatchQueue()
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
//遍历
while ((nextEvent = queueForThread.poll()) != null) {
//有下一个.
while (nextEvent.subscribers.hasNext()) {
//获取监听者.next()
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
回到了Subscriber.每个有注解的方法都生成一个Subscriber对象.
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
总体还是很简单的,谷歌的代码抽象的是真的好,包括每个方法的归属都恰到好处!事件总线,注册者,事件分发者,注册器,太优雅了.
触发监听者的方法该放在哪个类里呢?肯定是注册者,因为注册者知道自己的方法名,知道自己的参数,就是火车启动的方法是火车的一样!