EventBus源码阅读

EventBus源码阅读

怎么阅读EventBus?核心无非就两个,一个是注册事件,一个是发布事件!从这两个方法往下看,就很清晰了.

包结构

包结构

如何使用

  public static void main(String[] args) {
        // 定义一个EventBus对象,这里的Joker是该对象的id
        EventBus eventBus = new EventBus("Joker");
        // 向上述EventBus对象中注册一个监听对象
        eventBus.register(new EventAListener());
        eventBus.register(new EventBListener());
        // 使用EventBus发布一个事件,该事件会给通知到所有注册的监听者
        eventBus.post(new DickEvent(18, "black"));
    }

核心构造器,核心属性

EventBus(
    String identifier,
    Executor executor,
    Dispatcher dispatcher,
    SubscriberExceptionHandler exceptionHandler) {
  this.identifier = checkNotNull(identifier);
  this.executor = checkNotNull(executor);
  this.dispatcher = checkNotNull(dispatcher);
  this.exceptionHandler = checkNotNull(exceptionHandler);
}

核心变量

//
private final String identifier;
//
private final Executor executor;
//
private final SubscriberExceptionHandler exceptionHandler;
//
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
//
private final Dispatcher dispatcher;

注册方法

  public void register(Object object) {
    subscribers.register(object);
  }
void register(Object listener) {
  //
  Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
    //
  for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
    Class<?> eventType = entry.getKey();
    Collection<Subscriber> eventMethodsInListener = entry.getValue();
        //
    CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
        //
    if (eventSubscribers == null) {
      //
      CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
      //
      eventSubscribers =
          MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
    }
        //修改了subscribers里的value的值.
    eventSubscribers.addAll(eventMethodsInListener);
  }
}

找到listener里所有加注解的监听方法.

private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
  //
  Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
  //
  Class<?> clazz = listener.getClass();
  //
  for (Method method : getAnnotatedMethods(clazz)) {
    Class<?>[] parameterTypes = method.getParameterTypes();
    Class<?> eventType = parameterTypes[0];
    //每个方法生成一个对象.
    methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
  }
  return methodsInListener;
}
static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
      ? new Subscriber(bus, listener, method)
        //synchronized
      : new SynchronizedSubscriber(bus, listener, method);
}

获取注解方法.

private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
  return subscriberMethodsCache.getUnchecked(clazz);
}

用了自己的cache.

/**
 * A thread-safe cache that contains the mapping from each class to all methods in that class and
 * all super-classes, that are annotated with {@code @Subscribe}. The cache is shared across all
 * instances of this class; this greatly improves performance if multiple EventBus instances are
 * created and objects of the same class are registered on all of them.
 */
private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
    CacheBuilder.newBuilder()
        .weakKeys()
        .build(
            new CacheLoader<Class<?>, ImmutableList<Method>>() {
              @Override
              public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
                return getAnnotatedMethodsNotCached(concreteClass);
              }
            });
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
  //
  Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
  //
  Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
  //
  for (Class<?> supertype : supertypes) {
    //
    for (Method method : supertype.getDeclaredMethods()) {
      //
      if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
        // TODO(cgdecker): Should check for a generic parameter type and error out
        Class<?>[] parameterTypes = method.getParameterTypes();
        checkArgument(
            parameterTypes.length == 1,
            "Method %s has @Subscribe annotation but has %s parameters."
                + "Subscriber methods must have exactly 1 parameter.",
            method,
            parameterTypes.length);
                //
        MethodIdentifier ident = new MethodIdentifier(method);
        if (!identifiers.containsKey(ident)) {
          identifiers.put(ident, method);
        }
      }
    }
  }
  return ImmutableList.copyOf(identifiers.values());
}

MethodIdentifier

方法名+参数

   private final String name;
   private final List<Class<?>> parameterTypes;

发布事件

核心,调用注册者的注解方法.

public void post(Object event) {
  Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
  if (eventSubscribers.hasNext()) {
    dispatcher.dispatch(event, eventSubscribers);
  } else if (!(event instanceof DeadEvent)) {
    // the event had no subscribers and was not itself a DeadEvent
    post(new DeadEvent(this, event));
  }
}

Dispatcher 事件分发器

Handler for dispatching events to subscribers, providing different event ordering guarantees that
make sense for different situations.

那么问题来了,事件的分发是不是可以变化的呢,比如有的是要立即分发,有的是异步分发,有的是一个线程一个分发.

所以这个类是抽象类.

abstract void dispatch(Object event, Iterator<Subscriber> subscribers);

静态的工厂方法.返回三个实现类.

static Dispatcher legacyAsync() {
  return new LegacyAsyncDispatcher();
}
static Dispatcher immediate() {
  return ImmediateDispatcher.INSTANCE;
}
static Dispatcher perThreadDispatchQueue() {
  return new PerThreadQueuedDispatcher();
}

ImmediateDispatcher

@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
  checkNotNull(event);
  while (subscribers.hasNext()) {
    subscribers.next().dispatchEvent(event);
  }
}

LegacyAsyncDispatcher

全局事件队列.

/** Global event queue. */
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
    Queues.newConcurrentLinkedQueue();
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
  checkNotNull(event);
  while (subscribers.hasNext()) {
    queue.add(new EventWithSubscriber(event, subscribers.next()));
  }

  EventWithSubscriber e;
  while ((e = queue.poll()) != null) {
    e.subscriber.dispatchEvent(e.event);
  }
}

PerThreadQueuedDispatcher

默认的分发策略.

Dispatcher.perThreadDispatchQueue()
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
  checkNotNull(event);
  checkNotNull(subscribers);
  Queue<Event> queueForThread = queue.get();
  queueForThread.offer(new Event(event, subscribers));

  if (!dispatching.get()) {
    dispatching.set(true);
    try {
      Event nextEvent;
      //遍历
      while ((nextEvent = queueForThread.poll()) != null) {
        //有下一个.
        while (nextEvent.subscribers.hasNext()) {
          //获取监听者.next()
          nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
        }
      }
    } finally {
      dispatching.remove();
      queue.remove();
    }
  }
}

回到了Subscriber.每个有注解的方法都生成一个Subscriber对象.

final void dispatchEvent(final Object event) {
  executor.execute(
      new Runnable() {
        @Override
        public void run() {
          try {
            invokeSubscriberMethod(event);
          } catch (InvocationTargetException e) {
            bus.handleSubscriberException(e.getCause(), context(event));
          }
        }
      });
}
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
  try {
    method.invoke(target, checkNotNull(event));
  } catch (IllegalArgumentException e) {
    throw new Error("Method rejected target/argument: " + event, e);
  } catch (IllegalAccessException e) {
    throw new Error("Method became inaccessible: " + event, e);
  } catch (InvocationTargetException e) {
    if (e.getCause() instanceof Error) {
      throw (Error) e.getCause();
    }
    throw e;
  }
}

总体还是很简单的,谷歌的代码抽象的是真的好,包括每个方法的归属都恰到好处!事件总线,注册者,事件分发者,注册器,太优雅了.

触发监听者的方法该放在哪个类里呢?肯定是注册者,因为注册者知道自己的方法名,知道自己的参数,就是火车启动的方法是火车的一样!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容