EventBus源码解析

一、概述

在上篇文章中,讲解了EventBus的入门,并在文章中提到了同步事件模式和异步事件模型,下面将具体分析两种事件模型的实现机制

二、EventBus架构图

同步事件模型:事件的触发与事件的处理在同一个线程中,因此事件的处理是有序的


同步模型.png

异步事件模型:事件的触发与事件的处理在不同线程中,事件的处理是在单独的线程池中,因此处理的过程是无效的


异步模型.png

三、EventBus

EventBus

public class EventBus {

  private static final Logger logger = Logger.getLogger(EventBus.class.getName());

 private final String identifier;
 private final Executor executor;
 private final SubscriberExceptionHandler exceptionHandler;

 private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
 private final Dispatcher dispatcher;

}
  • identifier:事件总线的标识,在应用中可以创建多个EventBus,通过 identifier 标识上面代码是EventBus的定义,下面对EventBus的属性简单讲解:

  • executor:该类实现了Java的Executor接口,用于对订阅者处理事件方法的执行,同步事件模型和异步事件模型采用了不同的 Executor

  • exceptionHandler:用于处理订阅者在执行事件处理方法时抛出的异常

  • subscribers:订阅者注册表,用于存储所有的事件以及事件处理器、订阅对象的对应关系。在上文中,@Subscribe注解标记的方法会被包装为一个Subscriber

  • dispatcher:事件分发器,用于分发事件给订阅对象的事件处理器,在不同事件模型上采用不同的事件分发器,在下面将详细讲解

四、事件订阅

(一)注册事件处理器

  • 注册
EventBus eventBus = new EventBus();
//注册事件处理器
eventBus.register(new MessageListener());

调用register方法想事件总线注册事件处理器,下面查看register方面内部实现。

  • register
void register(Object listener) {
  //扫描@Subscribe注解,分装成对应的Subscriber
  Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
  //将Subscriber添加到eventSubscribers
  for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
    Class<?> eventType = entry.getKey();
    Collection<Subscriber> eventMethodsInListener = entry.getValue();
    //获取已经注册了某个事件的处理器
    CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

    if (eventSubscribers == null) {
      CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
      eventSubscribers =
          MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
    }

    eventSubscribers.addAll(eventMethodsInListener);
  }
}

(二) 事件的触发以及处理

  • 事件的触发
EventBus eventBus = new EventBus();
//注册事件处理器
eventBus.register(new MessageListener());

ItemEvent itemEvent = new ItemEvent();
itemEvent.setTitle(Thread.currentThread().getName());

for (int i = 0; i < 100; i++) {
    //事件的触发
    eventBus.post(itemEvent);
}

通过调用 post 方法,触发一个ItemEvent,下面我们看 post 方法的具体实现

  • post
public void post(Object event) {
  //获取该事件所有的订阅者
  Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
  if (eventSubscribers.hasNext()) {
    //通过dispatcher分发事件
    dispatcher.dispatch(event, eventSubscribers);
  } else if (!(event instanceof DeadEvent)) {
 
    // 如果事件没有订阅者,那么该事件为DeadEvent
    post(new DeadEvent(this, event));
  }
}

从上面的代码可以看出,post 方法会获取该事件的所有监听者,如果监听者存在,就通过 dispatcher 分发事件到对应的处理器中,

dispatcher 是一个抽象类,对于不同的事件模型,Dispatcher有不同的实现类,下面从同步事件模型和异步事件模型来分析一下

Dispatcher的实现。

五、事件模型

(一) 同步事件模型

在同步事件模型中,通过 PerThreadQueuedDispatcher 来进行事件的分配,代码具体实现如下:

  • dispatcher
private static final class PerThreadQueuedDispatcher extends Dispatcher {

  /**每个线程一个事件分发队列 */
  private final ThreadLocal<Queue<Event>> queue =
      new ThreadLocal<Queue<Event>>() {
        @Override
        protected Queue<Event> initialValue() {
          return Queues.newArrayDeque();
        }
      };

  /** 每个线程一个事件分发状态,通过状态判断减少锁的操作,值得学习 */
  private final ThreadLocal<Boolean> dispatching =
      new ThreadLocal<Boolean>() {
        @Override
        protected Boolean initialValue() {
          return false;
        }
      };

  @Override
  void dispatch(Object event, Iterator<Subscriber> subscribers) {
    checkNotNull(event);
    checkNotNull(subscribers);
    //获取该线程的事件分发队列
    Queue<Event> queueForThread = queue.get();
    //事件压入队列
    queueForThread.offer(new Event(event, subscribers));

    if (!dispatching.get()) {
      //设置分发状态
      dispatching.set(true);
      try {
        Event nextEvent;
        while ((nextEvent = queueForThread.poll()) != null) {
          while (nextEvent.subscribers.hasNext()) {
            nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
          }
        }
      } finally {
        //删除
        dispatching.remove();
        queue.remove();
      }
    }
  }
}

在上面的代码中,通过 ThreadLocal 保证并发情况下的线程安全

(二) 异步事件模型

  • 异步
private static final class LegacyAsyncDispatcher extends Dispatcher {

  /** 全局的并发队列 */
  private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
      Queues.newConcurrentLinkedQueue();

  @Override
  void dispatch(Object event, Iterator<Subscriber> subscribers) {
    checkNotNull(event);
    while (subscribers.hasNext()) {
      queue.add(new EventWithSubscriber(event, subscribers.next()));
    }

    EventWithSubscriber e;
    while ((e = queue.poll()) != null) {
      e.subscriber.dispatchEvent(e.event);
    }
  }

从上面的代码可以看出,将事件加入一个全局的并发队列中,在LegacyAsyncDispatcher中采用了ConcurrentLinkedQueue,但是这里有个问题,如果事件的发布比事件的消费快,会造成消息的堆积

五、事件的处理

上一小节分析了事件的分发,我们可以看出事件的处理是调用 Subscriber 的 dispatchEvent 方法进行事件的处理,下面进入到 dispatchEvent 中查看具体的实现。

  • dispatchEvent
final void dispatchEvent(final Object event) {
  executor.execute(
      new Runnable() {
        @Override
        public void run() {
          try {
            invokeSubscriberMethod(event);
          } catch (InvocationTargetException e) {
            bus.handleSubscriberException(e.getCause(), context(event));
          }
        }
      });
}

从上面的代码可以看出,Subscriber 通过调用 Executor 的 execute 方法来执行一个任务,然后通过反射调用调用到对应的事件处理方法,也就是我们使用 @Subscribe 注解标记的方法,在同步事件模型和异步事件模型下采用的不同的 Executor,AsyncEventbus 采用的使我们做完入参的线程池,所以说异步事件模型的事件触发和事件处理在不同的线程中

在之前我们说个同步线程模型的事件的触发和事件的处理都是在同一个线程中 ?

查看了同步线程模型 Executor 的实现就了解了

  • executor
private enum DirectExecutor implements Executor {
  INSTANCE;

  @Override
  public void execute(Runnable command) {
    command.run();
  }

  @Override
  public String toString() {
    return "MoreExecutors.directExecutor()";
  }
}

同步事件模型的 Executor 的execute 方法是直接调用 run 方法,因此事件的触发和事件的处理在同一个线程中,保证了事件的有序处理。

上面的代码通过枚举实现了单例模式,以后可以尝试

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容

  • 项目地址:EventBus,本文分析版本: 3.1.1 一、概述 EventBus 是一个 Android 事件发...
    Yi__Lin阅读 1,036评论 1 10
  • 前面一篇文章讲解了EventBus的使用,但是作为开发人员,不能只停留在仅仅会用的层面上,我们还需要弄清楚它的内部...
    Lauren_Liuling阅读 5,945评论 7 28
  • 博文出处:EventBus源码解析,欢迎大家关注我的博客,谢谢! 0001B 时近年末,但是也没闲着。最近正好在看...
    俞其荣阅读 1,301评论 1 16
  • 小吴是我的狗头军师,最近谈恋爱了。 狗头军师那事儿,还得从我两相识说起。 朋友原打算把我两凑一对儿,给了我微信让我...
    谢唠钱呗阅读 645评论 0 0
  • 前言 入职场也有数年时间,在职业生涯中遇到了一些坎坷和迷茫,也在慢慢适应职场的规则。这里把关注点放在自身,通过对互...
    落影loyinglin阅读 15,842评论 39 135