JAVA进阶篇(10)—Guava实现的EventBus(调度算法源码分析)

1. 使用方式

  1. 引入依赖
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>27.0.1-jre</version>
</dependency>
  1. 定义被观察者类

由该类触发事件通知:

public class TestBus {

    /**
     * EventBus,默认使用PerThreadQueuedDispatcher分发器(该分发器内部维护的Executor是执行执行线程run方法,即使用主线程执行监听方法)。
     * 该分发器是每个线程内部维护了一个queue。
     * 每个线程互不干扰(都利于本身线程去串行的执行观察者的方法)
     *
     */
    public static void testPerThreadQueuedDispatcher(){
        EventBus eventBus = new EventBus();

        //观察者1
        DataObserver1 observer1 = new DataObserver1();
        //观察者2
        DataObserver2 observer2 = new DataObserver2();
        
        eventBus.register(observer2);
        eventBus.register(observer1);

        Thread t1 = new Thread(() -> {
            eventBus.post("信息1;");
            eventBus.post("信息5;");
        });

        Thread t2 = new Thread(() -> {
            eventBus.post("信息2;");
        });

        Thread t3 = new Thread(() -> {
            eventBus.post(123);
        });
        
        t1.start();
        t2.start();
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        t3.start();
    }

}
  1. 定义多个观察者
@Slf4j
public class DataObserver2 {
    /**
     * post() 不支持自动装箱功能,只能使用Integer,不能使用int,否则handlersByType的Class会是int而不是Intege
     * 而传入的int msg参数在post(int msg)的时候会被包装成Integer,导致无法匹配到
     */
    @Subscribe
    public void func(Integer msg) {
        log.info("Integer msg: " + msg);
    }
}
@Slf4j
public class DataObserver1 {
    /**
     * 只有通过@Subscribe注解的方法才会被注册进EventBus
     * 而且方法有且只能有1个参数
     *
     * @param msg
     */
    @Subscribe
//    @AllowConcurrentEvents
    public void func(String msg) throws InterruptedException {
        log.info("消息开始~:" + msg);
        Thread.sleep(2000);
        log.info("消息结束~:" + msg);
    }
}

使用原理:观察者对象注册到EventBus中,而EventBus会通过反射解析观察者及其父类对象是否存在@Subscribe注解,若是存在,则维护一个Map(key是对应方法的参数类型,value是Subscriber对象)。
当被观察者通过post()方法发送事件后,会解析事件的类型,找打对应的Subscriber(消费者对象)。然后循环通过反射调用对应的观察者方法。完成事件通知。

2. EventBus源码分析

事件总线的配置:

@Beta
public class EventBus {

  private static final Logger logger = Logger.getLogger(EventBus.class.getName());
  //id标识符
  private final String identifier;
  //发送事件的线程池
  private final Executor executor;
  //订阅者异常处理器
  private final SubscriberExceptionHandler exceptionHandler;
  //订阅者解析器
  private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
  //分发器策略
  private final Dispatcher dispatcher;
  ...
}

查看其构造方法:

public class EventBus {

 ...
  /** Creates a new EventBus named "default". */
  public EventBus() {
    this("default");
  }

  public EventBus(String identifier) {
    this(
        identifier,
        MoreExecutors.directExecutor(),
        Dispatcher.perThreadDispatchQueue(),
        LoggingHandler.INSTANCE);
  }

  public EventBus(SubscriberExceptionHandler exceptionHandler) {
    this(
        "default",
        MoreExecutors.directExecutor(),
        Dispatcher.perThreadDispatchQueue(),
        exceptionHandler);
  }

  EventBus(
      String identifier,
      Executor executor,
      Dispatcher dispatcher,
      SubscriberExceptionHandler exceptionHandler) {
    this.identifier = checkNotNull(identifier);
    this.executor = checkNotNull(executor);
    this.dispatcher = checkNotNull(dispatcher);
    this.exceptionHandler = checkNotNull(exceptionHandler);
  }
}

可以看到,EventBus对外暴露的构造方法,只能去修改identifierexceptionHandler两个参数。

  • 发送事件的线程池executor使用的是MoreExecutors.directExecutor()
  • 消息的转发器dispatcher使用的是Dispatcher.perThreadDispatchQueue()

executordispatcher两个参数决定了什么呢?

public class EventBus {

  public void post(Object event) {
    //通过事件,找到所有的订阅者。
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    //存在订阅者
    if (eventSubscribers.hasNext()) {
      //使用dispatcher去分发消息
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }
}

默认:Dispatcher.perThreadDispatchQueue()的作用:

每一个线程内部都有一个queue,从而保证单线程中消息的有序性。

  private static final class PerThreadQueuedDispatcher extends Dispatcher {

    // This dispatcher matches the original dispatch behavior of EventBus.

    /** Per-thread queue of events to dispatch. */
    private final ThreadLocal<Queue<Event>> queue =
        new ThreadLocal<Queue<Event>>() {
          @Override
          protected Queue<Event> initialValue() {
            return Queues.newArrayDeque();
          }
        };

    /** Per-thread dispatch state, used to avoid reentrant event dispatching. */
    private final ThreadLocal<Boolean> dispatching =
        new ThreadLocal<Boolean>() {
          @Override
          protected Boolean initialValue() {
            return false;
          }
        };

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      checkNotNull(subscribers);
      Queue<Event> queueForThread = queue.get();
      //放入队尾【1】重入的线程事件会放入到队列尾部
      queueForThread.offer(new Event(event, subscribers));
      //【1】线程再次重入后,该方法!dispatching.get()为false,直接结束
      if (!dispatching.get()) {
        dispatching.set(true);
        try {
          Event nextEvent;
          //检索并删除此队列的头,如果此队列为空,则返回 null 。
          while ((nextEvent = queueForThread.poll()) != null) {
            //第一个事件通知给所有的订阅者,才会通知后续的消息。
            while (nextEvent.subscribers.hasNext()) {
              //当订阅者中再次使用同一个EventBus发布消息,线程会冲入【1】
              nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
            }
          }
        } finally {
          //单线程
          dispatching.remove();
          queue.remove();
        }
      }
    }
    //构建事件对象(队列的元素)
    private static final class Event {
      private final Object event;
      private final Iterator<Subscriber> subscribers;

      private Event(Object event, Iterator<Subscriber> subscribers) {
        this.event = event;
        this.subscribers = subscribers;
      }
    }
  }

使用场景:

代码A中发布事件(String类型),B订阅者收到消息后,在B中发布事件(Integer类型)。

该分发器会确保A事件通知给所有订阅者才会执行B事件(同一个线程中,订阅者发布的事件要排到后面去执行)。

默认:MoreExecutors.directExecutor()

class Subscriber {
  final void dispatchEvent(final Object event) {
    //分发事件,是使用的线程池
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }
}

而MoreExecutors.directExecutor()使用如下的线程池,即订阅者使用当前线程同步的处理事件。

enum DirectExecutor implements Executor {
  INSTANCE;

  @Override
  public void execute(Runnable command) {
    command.run();
  }

  @Override
  public String toString() {
    return "MoreExecutors.directExecutor()";
  }
}

订阅者去使用EventBus的线程去消费消息,可以保证消息的有序性。即先post的事件一定会先执行。

总结:

EventBus的特点:

  1. 单个线程上发布的所有事件都按其发布的顺序被调度到所有订阅服务器;
  2. 发布者和多个订阅者使用同一个线程处理。可能会影响发布者的性能,且某个订阅者耗时,也会影响其他订阅者;

EventBus特点的场景:

    public static void testPre1(){
        //单例获取到事件总线
        EventBus eventBus = EventBusCenter.getInstance();
        DataObserver1 observer1 = new DataObserver1();
        DataObserver2 observer2 = new DataObserver2();
        //注册订阅者1
        eventBus.register(observer1);
        //注册订阅者2
        eventBus.register(observer2);
        //通知订阅者1
        eventBus.post("发送事件!");
    }

订阅者1收到消息后,通知订阅者2。但是123事件会存储在ThreadLocal<Queue>中,等待发送事件!事件通知完所有的订阅者,才开始通知123事件。

@Slf4j
public class DataObserver1 {
    /**
     * 只有通过@Subscribe注解的方法才会被注册进EventBus
     * 而且方法有且只能有1个参数
     *
     * @param msg
     */
    @Subscribe
    public void func(String msg) throws InterruptedException {
        log.info("收到消息:{}", msg);
        EventBus eventBus = EventBusCenter.getInstance();
        eventBus.post(123);
    }
}
@Slf4j
public class DataObserver2 {
    /**
     * post() 不支持自动装箱功能,只能使用Integer,不能使用int,否则handlersByType的Class会是int而不是Intege
     * 而传入的int msg参数在post(int msg)的时候会被包装成Integer,导致无法匹配到
     */
    @Subscribe
    public void func(Integer msg) {
        log.info("Integer msg: " + msg);
    }
}

3. AsyncEventBus源码分析

构造方法:

  public AsyncEventBus(String identifier, Executor executor) {
    super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
  }
  1. 可以传入exector去异步发布消息。
  2. 只能使用Dispatcher.legacyAsync()去调度消息。
  private static final class LegacyAsyncDispatcher extends Dispatcher {

    // This dispatcher matches the original dispatch behavior of AsyncEventBus.
    //
    // We can't really make any guarantees about the overall dispatch order for this dispatcher in
    // a multithreaded environment for a couple reasons:
    //
    // 1. Subscribers to events posted on different threads can be interleaved with each other
    //    freely. (A event on one thread, B event on another could yield any of
    //    [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.)
    // 2. It's possible for subscribers to actually be dispatched to in a different order than they
    //    were added to the queue. It's easily possible for one thread to take the head of the
    //    queue, immediately followed by another thread taking the next element in the queue. That
    //    second thread can then dispatch to the subscriber it took before the first thread does.
    //
    // All this makes me really wonder if there's any value in queueing here at all. A dispatcher
    // that simply loops through the subscribers and dispatches the event to each would actually
    // probably provide a stronger order guarantee, though that order would obviously be different
    // in some cases.

    /** Global event queue. */
    //【注意:】若发布者产生消息的速度远远大于生产者消费消息的速度,此处容易造成OOM
    private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
        Queues.newConcurrentLinkedQueue();

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      //先放入队列中
      while (subscribers.hasNext()) {
        queue.add(new EventWithSubscriber(event, subscribers.next()));
      }

      EventWithSubscriber e;
      //队头取出,并删除元素
      while ((e = queue.poll()) != null) {
        //使用配置线程池去发布事件。
        e.subscriber.dispatchEvent(e.event);
      }
    }

    private static final class EventWithSubscriber {
      private final Object event;
      private final Subscriber subscriber;

      private EventWithSubscriber(Object event, Subscriber subscriber) {
        this.event = event;
        this.subscriber = subscriber;
      }
    }
  }
  1. 多线程事件先存储到ConcurrentLinkedQueue中,然后在循环调用订阅者。
  2. 可以自定义线程池可以异步的去处理事件。
  3. (订阅者发布的事件一定会在队尾,但是可能会被别的线程先消费)故只能某些情况下可以保证事件按照发布的顺序被调度到订阅服务器;
  4. 因为使用了ConcurrentLinkedQueue,所以可能会造成OOM。
  5. 性能没有ImmediateDispatcher好。(采用了队列)

源码中注释:(多线程下不能保证顺序),所有这些让我真的怀疑在这里排队是否有任何价值。LegacyAsyncDispatcher它只是简单地循环通过订阅者并将事件分派给每个订阅者。在某些情况下,可能会提供更强的顺序保证,尽管顺序明显不同。

同一个线程,A发布事件到订阅者B,在订阅者B中再次发布另一个事件到C。线程会重入到dispatch方法,会将B发布的事件放到队列中(排队)。继续从队列头开始消费消息。

【注意:该队列是全局队列,每一个线程都会消费其消息。】

4. ImmediateDispatcher源码:

Guava没有对应的EventBus,但是我们可以继承EventBus类实现自定义的EventBus。

  /** Implementation of {@link #immediate()}. */
  private static final class ImmediateDispatcher extends Dispatcher {
    private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher();

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      //收到消息后,直接遍历所有的订阅者
      while (subscribers.hasNext()) {
        //订阅者可以使用线程池去执行
        subscribers.next().dispatchEvent(event);
      }
    }
  }

特点:

  1. 没有使用队列,但凡事件到达后立即使用去处理;
  2. 可以使用线程池异步的去消费消息;
  3. 性能要比LegacyAsyncDispatcher好;

总结

Guava的EventBus源码还是比较简单、清晰的。从源码来看,它一反常用的Observer的设计方式放弃采用统一的接口、统一的事件对象类型。转而采用基于注解扫描的绑定方式。

其实无论是强制实现统一的接口,还是基于注解的实现方式都是在构建一种关联关系(或者说满足某种契约)。很明显接口的方式是编译层面上强制的显式契约,而注解的方式则是运行时动态绑定的隐式契约关系。接口的方式是传统的方式,编译时确定观察者关系,清晰明了,但通常要求有一致的事件类型、方法签名。而基于注解实现的机制,刚好相反,编译时因为没有接口的语法层面上的依赖关系,显得不那么清晰,至少静态分析工具很难展示观察者关系,但无需一致的方法签名、事件参数,至于多个订阅者类之间的继承关系,可以继承接收事件的通知,可以看作既是其优点也是其缺点。

  1. EventBus需要注意:发布者和订阅者使用同一个线程,可能会影响发布者的性能。但可以保证单线程中事件的发布顺序和调度顺序保持一致。
  2. AsyncEventBus需要注意的是:发布者和订阅者可以使用不同的线程处理;发布事件时维护了一个LinkedQueue,若订阅者消费速度慢,可能会造成内存溢出;采用全局队列维护事件顺序性,但不能完全保证调度和发布的顺序;性能不如直接分发好;
  3. guava的EventBus虽然通过注解的方式更加灵活,但是没有接口的语法层面的依赖关系,代码维护性、可读性不是特别好。

推荐阅读

Google-Guava-EventBus源码解读

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容