Guava之EventBus源码

最近需要使用事件驱动,打算使用EventBus管理事件的注册和分发。于是仔细阅读了下Guava的EventBus实现,并在此做了些整理。
EventBus是基于设计模式中的Observer模式的实现。Observer模式是非常常用的设计模式之一,jdk中的EventObject、EventListener、Observable、Observer都是为观察者模式服务的。但随着业务场景复杂度的不断提高,我们希望能在管理事件的同时提供更多的扩展。所以我们通过EventBus来优雅的实现这些。


Observer模式

我们先简单回顾下Observer模式:


观察者模式.png

定义对象之间的一对多依赖关系,以便当一个对象更改状态时,它的所有依赖关系都会被通知并自动更新。

Observer Pattern: Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.

一些文中的“发布-订阅(Publish/Subscribe)模式”其实就是Observer模式,他所做的事情和我们将要做的事情一样:丰富Subject的功能。


EventBus

首先,我们先来看一下EventBus模块的类:


EventBus.jpg

EventBus.class

EventBus.class:它对应于Subject类,是整个模块的核心,也是功能扩展的中心点。
首先看下EventBus.class包含的以下几个变量:

  private final String identifier;
  private final Executor executor;
  private final SubscriberExceptionHandler exceptionHandler;
  private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
  private final Dispatcher dispatcher;
  • identifier:定义了EventBus的名称,用来区分项目中的多个EventBus,在之后的Exception等日志输出,和线程命名时都会有使用。
private static Logger logger(SubscriberExceptionContext context) {
      return Logger.getLogger(EventBus.class.getName() + "." + context.getEventBus().identifier());
    }
  • executor:java的异步执行的实现类。用来执行订阅者处理Event的方法。值得注意的是在'EventBus'中对Executor变量赋值的构造器是私有的,也就是说我们只能使用它所指定的Executor:'DirectExecutor.class'。但'AsyncEventBus'的Executor是被允许传入的。这也是这两者的区别所在之一。
  public EventBus(String identifier) {
    this(identifier, MoreExecutors.directExecutor(),
        Dispatcher.perThreadDispatchQueue(), LoggingHandler.INSTANCE);
  }
  EventBus(String identifier, Executor executor, Dispatcher dispatcher,
      SubscriberExceptionHandler exceptionHandler) {...}
  • exceptionHandler:SubscriberExceptionHandler的实现类,用于处理过程中产生的异常。
  • subscribers:创建了一个SubscriberRegistry用来维护Subscriber与Event的对应关系。
  • dispatcher:Dispatcher的实现类,他是一个Event的分发器,所有Event都会经过dispatcher传递给Subscriber。和executor一样,它也不能被从外部传入,在'EventBus'中默认使用了'PerThreadQueuedDispatcher',在'AsyncEventBus'中默认使用'LegacyAsyncDispatcher'。这是两个类的唯二区别了。

然后我们看下EventBus的方法,作为一个核心类,它一共只有三个public方法:

  • register:注册event。
  • unregister:取消event注册。
  • post:发布event。
 public void register(Object object) {
    subscribers.register(object);
  }
public void unregister(Object object) {
    subscribers.unregister(object);
  }
public void post(Object event) {
  ...
      dispatcher.dispatch(event, eventSubscribers);
  ...
  }

仅有的三个方法也都异常的简单,'register'和'unregister'都调用了SubscriberRegistry类,'post'交给了Dispatcher类。而多线程的控制也通过'executor'交给了Subscriber,异常的处理不在自身管理同样传递给了Subscriber,作为中心的EventBus只做了功能的定义和分配,事件的转发,完美的实现了功能的解耦,做到了职责单一原则。

AsyncEventBus.class

AsyncEventBus.class是EventBus.class的异步多线程的子类,上面也有提到过,二者之间只在构造器中有两处区别:

  1. Executor:EventBus默认使用'DirectExecutor.class',他是一个线程执行器,简单的直接执行传入的Runnable。AsyncEventBus正好相反,它的Executor必须是传入的。
 private enum DirectExecutor implements Executor {
    INSTANCE;
    @Override public void execute(Runnable command) {
      command.run();
    }
}
  1. Dispatcher:在'EventBus'中默认使用了'PerThreadQueuedDispatcher',在'AsyncEventBus'中默认使用'LegacyAsyncDispatcher'。前者是单线程同步,后者是多线程同步。两者的具体区别在下面介绍。

通过上面的描述,两者并不能通过他们类名简单的区别为一个单线程,一个多线程。他们的区别同样可以总结为两点:

  • Subscriber中都是多线程调用方法执行event,区别是'EventBus'只简单的run()了线程,而'AsyncEventBus'能过定义线程池。
  • Dispatcher中都是同步分发,区别是'EventBus'使用了ThreadLocal实现了单线程同步,而'AsyncEventBus'通过ConcurrentLinkedQueue使多线程同步分发。

Dispatcher.class

Dispatcher是一个抽象类,它本身是default的,因此无法被外部继承,EventBus也没有可以传入Dispatcher的构造器,所以对于Dispatcher我们是无法正常扩展的。
Dispatcher中只有一个抽象方法:来实现消息的分发。

abstract void dispatch(Object event, Iterator<Subscriber> subscribers);

还有三个静态方法来创建它的三个实现类:

  • PerThreadQueuedDispatcher:将收到的Event保存在了ThreadLoacl中,意味着多线程中即使使用了同一个Dispatcher实现收到的event都会分开保存互不影响。
private final ThreadLocal<Queue<Event>> queue =
        new ThreadLocal<Queue<Event>>() {...};
private final ThreadLocal<Boolean> dispatching =
        new ThreadLocal<Boolean>() {...};
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
...
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
        dispatching.set(true);
        try {
          Event nextEvent;
          while ((nextEvent = queueForThread.poll()) != null) {
            while (nextEvent.subscribers.hasNext()) {
              nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
            }
          }
        } finally {
          dispatching.remove();
          queue.remove();
        }
      }
}

里面的dispatching用于避免重入事件分派,例如循环发起Event的场景。

  • LegacyAsyncDispatcher:创建了一个ConcurrentLinkedQueue来保存收到的Event。多个线程中使用同一个LegacyAsyncDispatcher实现的话,线程收到的Event会保存在一起,并共同完成所有Event的分发。
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
        Queues.newConcurrentLinkedQueue();
 @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      while (subscribers.hasNext()) {
        queue.add(new EventWithSubscriber(event, subscribers.next()));
      }
      EventWithSubscriber e;
      while ((e = queue.poll()) != null) {
        e.subscriber.dispatchEvent(e.event);
      }
    }

值得注意的是由于分发也是多线程共同完成,这使得它将无法保证Event的顺序性。

  • ImmediateDispatcher:该dispatcher在事件发布时立即将事件分发给订阅者不使用中间队列。

Subscriber.class

对应于Observer的抽象类,但它更像是一种封装,Subscriber自身提供了静态创建方法,将真正的Observer实现类和执行Event的方法都与EventBus封装在了一起,通过反射实现了对应于不同Observer的抽象。

static Subscriber create(EventBus bus, Object listener, Method method) {
    return isDeclaredThreadSafe(method)
        ? new Subscriber(bus, listener, method)
        : new SynchronizedSubscriber(bus, listener, method);
  }
final void dispatchEvent(final Object event) {
    executor.execute(new Runnable() {
      @Override
      public void run() {
        try {
         method.invoke(target, checkNotNull(event));
        } catch (InvocationTargetException e) {
          bus.handleSubscriberException(e.getCause(), context(event));
        }
      }
    });
  }

SynchronizedSubscriber:在dispatchEvent()方法上加了synchronized同步锁,如果正在的Observer方法是线程不安全的话就需要用到此类。他是通过@AllowConcurrentEvents注解来判断的,这里就不多讲了。

SubscriberRegistry.class

维护了Subscriber与Event的对应关系,对EventBus进行了解耦,使EventBus职责单一。

private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
      Maps.newConcurrentMap();
  • register/unregister/getSubscribers:这三个都是维护Subscriber与Event的对应关系的基本方法,这里就不多讲了,他们只会在EventBus中被调用。
  • findAllSubscribers:这是一个private方法,他在register/unregister中被调用,之所以单独拿出来,主要是想说明一下,他也是基于Annotation来实现的。在register时,我们只会传入Observer类,但Observer类需要订阅哪个Event,Event到底又需要调用哪个方法,都是在这个方法中能通过对@Subscriber找个Annotation的读取和method的反射。
  private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    Class<?> clazz = listener.getClass();
    for (Method method : getAnnotatedMethods(clazz)) {
      Class<?>[] parameterTypes = method.getParameterTypes();
      Class<?> eventType = parameterTypes[0];
      methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
    }
    return methodsInListener;
  }

最后多提一句,@Subscriber标注的那些Method都是事先通过'getAnnotatedMethodsNotCached' 方法获取,保存在了一个LoadingCache中的。由于和EventBus的机制没有太大关系,这里就不展开了。

  private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
      CacheBuilder.newBuilder().weakKeys()
          .build(new CacheLoader<Class<?>, ImmutableList<Method>>() {
            @Override
            public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
              return getAnnotatedMethodsNotCached(concreteClass);
            }
          });

总结

Guava的EventBus以EventBus类为中心,对于Event的发布、订阅者的管理、异常的处理都提供了专门的实现类,流程非常清楚。而且基于Annotation扫描绑定的方式会使代码非常的简洁。但由于这种方式,在EventBus中对于事件类型和事件参数等等并不能提供很好的支撑,而且由于基本所有的类都是default权限的,这使得扩展异常的艰难T~T


Classfier扩展

待续...

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容