Guava之EventBus原理

EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。

demo

public static void main(String...args) {
    // 定义一个EventBus对象,这里的Joker是该对象的id
    EventBus eventBus = new EventBus("Joker");
    // 向上述EventBus对象中注册一个监听对象   
    eventBus.register(new EventListener());
    // 使用EventBus发布一个事件,该事件会给通知到所有注册的监听者
    eventBus.post(new Event("Hello every listener, joke begins..."));
}

// 事件,监听者监听的事件的包装对象
public static class Event {
    public String message;
    Event(String message) {
        this.message = message;
    }
}

// 监听者
public static class EventListener {
    // 监听的方法,必须使用注解声明,且只能有一个参数,实际触发一个事件的时候会根据参数类型触发方法
    @Subscribe
    public void listen(Event event) {
        System.out.println("Event listener 1 event.message = " + event.message);
    }
}

这里我们封装了一个事件对象Event,一个监听者对象EventListener。然后,我们用EventBus的构造方法创建了一个EventBus实例,并将上述监听者实例注册进去。然后,我们使用上述EventBus实例发布一个事件Event。然后,以上注册的监听者中的使用@Subscribe注解声明并且只有一个Event类型的参数的方法将会在触发事件的时候被触发。

从上面的使用中,我们可以看出,EventBus与观察者模式不同的地方在于:当注册了一个监听者的时候,只有当某个方法使用了@Subscribe注解声明并且参数与发布的事件类型匹配,那么这个方法才会被触发。这就是说,同一个监听者可以监听多种类型的事件,也可以在多次监听同一个事件。

源码分析

下面我们来分析一下在Guava中是如何为我们实现这个API的。不过,首先,我们还是先试着考虑一下自己设计这个API的时候如何设计,并且提出几个问题,然后带着问题到源码中寻找答案。

假如要我们去设计这样一个API,最简单的方式就是在观察者模式上进行拓展:每次调用EventBus.post()方法的时候,会对所有的观察者对象进行遍历,然后获取它们全部的方法,判断该方法是否使用了@Subscribe并且方法的参数类型是否与post()方法发布的事件类型一致,如果一致的话,那么我们就使用反射来触发这个方法。在观察者模式中,每个观察者都要实现一个接口,发布事件的时候,我们只要调用接口的方法就行,但是EventBus把这个限制设定得更加宽泛,也就是监听者无需实现任何接口,只要方法使用了注解并且参数匹配即可。

这里面不仅要对所有的监听者进行遍历,还要对它们的方法进行遍历,找到了匹配的方法之后又要使用反射来触发这个方法。首先,当注册的监听者数量比较多的时候,链式调用的效率就不高;然后我们又要使用反射来触发匹配的方法,这样效率肯定又低了一些。那么在Guava的EventBus中是如何解决这两个问题的?

EventBus事件总线

  • register:把监听器中申明的所有订阅事件方法注册到SubscriberRegistry(订阅者注册器)中。
  • post:发布事件给所有已注册过的订阅者,最终开启线程完成订阅方法。
@Beta
public class EventBus {
  private final String identifier;//事件总线标识:用于自定义标识这个事件总线
  private final Executor executor;//默认的线程执行器,用于把事件转发给订阅者
  private final SubscriberRegistry subscribers = new SubscriberRegistry(this);//订阅注册器
  private final Dispatcher dispatcher;//事件转发器
 15   //构造器:使用默认字符串
  public EventBus() {
    this("default");
  }
  //构造器:使用自定义字符串
  public EventBus(String identifier) {
    this(
        identifier,
        MoreExecutors.directExecutor(),
        Dispatcher.perThreadDispatchQueue(),
        LoggingHandler.INSTANCE);
  } 58 
  //注册监听者中申明的所有订阅方法(@Subscribe标记的),用以接收事件
  public void register(Object object) {
    subscribers.register(object);
  }
  // 解除订阅
  public void unregister(Object object) {
    subscribers.unregister(object);
  }

  //发布事件给所有已注册过的订阅者
  public void post(Object event) {
        // 找到事件的所有订阅者
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
          // 事件转发器,把事件转发给订阅者
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // 如果该事件即没有订阅者,也没事DeadEvent,那么封装成DeadEvent并重新发布
      post(new DeadEvent(this, event));
    }
...省略非重要方法167 }
  • subscribers是SubscriberRegistry类型的,实际上EventBus在添加、移除和遍历观察者的时候都会使用该实例的方法,所有的观察者信息也都维护在该实例中.
  • executor是事件分发过程中使用到的线程池,可以自己实现; dispatcher是Dispatcher类型的子类,用来在发布事件的时候分发消息给监听者,它有几个默认的实现,分别针对不同的分发方式;
  • exceptionHandler是SubscriberExceptionHandler类型的,它用来处理异常信息,在默认的EventBus实现中,会在出现异常的时候打印出log,当然我们也可以定义自己的异常处理策咯。

如果我们想要了解EventBus是如何注册和取消注册以及如何遍历来触发事件的,就应该从SubscriberRegistry入手.

我们需要在EventBus中维护几个映射,以便在发布事件的时候找到并通知所有的监听者,首先是事件类型->观察者列表的映射。

EventBus中发布事件是针对各个方法的,我们将一个事件对应的类型信息和方法信息等都维护在一个对象中,在EventBus中就是观察者Subscriber. 然后,通过事件类型映射到观察者列表,当发布事件的时候,只要根据事件类型到列表中寻找所有的观察者并触发监听方法即可。 在SubscriberRegistry中通过如下数据结构来完成这一映射:

private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();

从上面的定义形式中我们可以看出,这里使用的是事件的Class类型映射到Subscriber列表的。这里的Subscriber列表使用的是Java中的CopyOnWriteArraySet集合,
它底层使用了CopyOnWriteArrayList,并对其进行了封装,也就是在基本的集合上面增加了去重的操作。这是一种适用于读多写少场景的集合,在读取数据的时候不会加锁,
写入数据的时候进行加锁,并且会进行一次数组拷贝。

既然,我们已经知道了在SubscriberRegistry内部会在注册的时候向以上数据结构中插入映射,那么我们可以具体看下它是如何完成这一操作的。

在分析register()方法之前,我们先看下SubscriberRegistry内部经常使用的几个方法,它们的原理与我们上面提出的问题息息相关。
首先是findAllSubscribers()方法,它用来获取指定监听者对应的全部观察者集合。下面是它的代码:

private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    // 创建一个哈希表
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    // 获取监听者的类型
    Class<?> clazz = listener.getClass();
    // 获取上述监听者的全部监听方法
    UnmodifiableIterator var4 = getAnnotatedMethods(clazz).iterator(); // 1
    // 遍历上述方法,并且根据方法和类型参数创建观察者并将其插入到映射表中
    while(var4.hasNext()) {
        Method method = (Method)var4.next();
        Class<?>[] parameterTypes = method.getParameterTypes();
        // 事件类型
        Class<?> eventType = parameterTypes[0];
        methodsInListener.put(eventType, Subscriber.create(this.bus, listener, method));
    }
    return methodsInListener;
}

这里注意一下Multimap数据结构,它是Guava中提供的集合结构,与普通的哈希表不同的地方在于,它可以完成一对多操作。这里用来存储事件类型到观察者的一对多映射。

  1. 调用SubscriberRegistry的register(listener)来执行注册监听器。
  2. register步骤如下:
    EventBus-包含-》SubscriberRegistry-包含-》ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers 用以维护事件和订阅者的映射

注意下1处的代码,我们上面也提到过,当新注册监听者的时候,用反射获取全部方法并进行判断的过程非常浪费性能,而这里就是这个问题的答案:

这里getAnnotatedMethods()方法会尝试从subscriberMethodsCache中获取所有的注册监听的方法(即使用了注解并且只有一个参数),下面是这个方法的定义:

private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
    return (ImmutableList)subscriberMethodsCache.getUnchecked(clazz);
}

这里的subscriberMethodsCache的定义是:

private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache = CacheBuilder.newBuilder().weakKeys().build(new CacheLoader<Class<?>, ImmutableList<Method>>() {
    public ImmutableList<Method> load(Class<?> concreteClass) throws Exception { // 2
        return SubscriberRegistry.getAnnotatedMethodsNotCached(concreteClass);
    }
});

这里的作用机制是:当使用subscriberMethodsCache.getUnchecked(clazz)获取指定监听者中的方法的时候会先尝试从缓存中进行获取,如果缓存中不存在就会执行2处的代码,
调用SubscriberRegistry中的getAnnotatedMethodsNotCached()方法获取这些监听方法。其实就是使用反射并完成一些校验,并不复杂。

private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
       //获取超类class集合
    Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
    Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
    //遍历超类
    for (Class<?> supertype : supertypes) {
      //遍历超类中的所有定义的方法  
         for (Method method : supertype.getDeclaredMethods()) {
           //如果方法上有@Subscribe注解
        if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
          // 方法的参数类型数组
          Class<?>[] parameterTypes = method.getParameterTypes();
          // 校验:事件订阅方法必须只能有一个参数,即事件类
        checkArgument(
              parameterTypes.length == 1,
              "Method %s has @Subscribe annotation but has %s parameters."
                  + "Subscriber methods must have exactly 1 parameter.",
              method,
              parameterTypes.length);
      // 封装方法定义对象
          MethodIdentifier ident = new MethodIdentifier(method);
        // 去重并添加进map
          if (!identifiers.containsKey(ident)) {
            identifiers.put(ident, method);
          }
        }
      }
    }
    // map转ImmutableList
    return ImmutableList.copyOf(identifiers.values());
  }

这样,我们就分析完了findAllSubscribers()方法,整理一下:当注册监听者的时候,首先会拿到该监听者的类型,然后从缓存中尝试获取该监听者对应的所有监听方法,如果没有的话就遍历该类的方法进行获取,并添加到缓存中;
然后,会遍历上述拿到的方法集合,根据事件的类型(从方法参数得知)和监听者等信息创建一个观察者,并将事件类型-观察者键值对插入到一个一对多映射表中并返回。

下面,我们看下EventBus中的register()方法的代码:

void register(Object listener) {
    // 获取事件类型-观察者映射表
    Multimap<Class<?>, Subscriber> listenerMethods = this.findAllSubscribers(listener);
    Collection eventMethodsInListener;
    CopyOnWriteArraySet eventSubscribers;
    // 遍历上述映射表并将新注册的观察者映射表添加到全局的subscribers中
    for(Iterator var3 = listenerMethods.asMap().entrySet().iterator(); var3.hasNext(); eventSubscribers.addAll(eventMethodsInListener)) {
        Entry<Class<?>, Collection<Subscriber>> entry = (Entry)var3.next();
        Class<?> eventType = (Class)entry.getKey();
        eventMethodsInListener = (Collection)entry.getValue();
        eventSubscribers = (CopyOnWriteArraySet)this.subscribers.get(eventType);
        // 如果指定事件对应的观察者列表不存在就创建一个新的
        if (eventSubscribers == null) {
            CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet();
            eventSubscribers = (CopyOnWriteArraySet)MoreObjects.firstNonNull(this.subscribers.putIfAbsent(eventType, newSet), newSet);
        }
    }
}

SubscriberRegistry中的register()方法与unregister()方法类似,我们不进行说明。下面看下当调用EventBus.post()方法的时候的逻辑。下面是其代码:

public void post(Object event) {
    // 调用SubscriberRegistry的getSubscribers方法获取该事件对应的全部观察者
    Iterator<Subscriber> eventSubscribers = this.subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
        // 使用Dispatcher对事件进行分发
        this.dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
        this.post(new DeadEvent(this, event));
    }
}

当调用EventBus.post()方法的时候回先用SubscriberRegistry的getSubscribers方法获取该事件对应的全部观察者

Iterator<Subscriber> getSubscribers(Object event) {
    // 获取事件类型的所有父类型和自身构成的集合
    ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass()); // 3
    List<Iterator<Subscriber>> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size());
    UnmodifiableIterator var4 = eventTypes.iterator();
    // 遍历上述事件类型,并从subscribers中获取所有的观察者列表
    while(var4.hasNext()) {
        Class<?> eventType = (Class)var4.next();
        CopyOnWriteArraySet<Subscriber> eventSubscribers = (CopyOnWriteArraySet)this.subscribers.get(eventType);
        if (eventSubscribers != null) {
            subscriberIterators.add(eventSubscribers.iterator());
        }
    }
    return Iterators.concat(subscriberIterators.iterator());
}

Dispatcher
从EventBus.post()方法可以看出,当我们使用Dispatcher进行事件分发的时候,需要将当前的事件和所有的观察者作为参数传入到方法中。然后,在方法的内部进行分发操作。最终某个监听者的监听方法是使用反射进行触发的,这部分逻辑在Subscriber内部,而Dispatcher是事件分发的方式的策略接口。EventBus中提供了3个默认的Dispatcher实现,分别用于不同场景的事件分发:

  • ImmediateDispatcher:直接在当前线程中遍历所有的观察者并进行事件分发;
  • LegacyAsyncDispatcher:异步方法,存在两个循环,一先一后,前者用于不断往全局的队列中塞入封装的观察者对象,后者用于不断从队列中取出观察者对象进行事件分发;实际上,EventBus有个字类AsyncEventBus就是用该分发器进行事件分发的。
  • PerThreadQueuedDispatcher:这种分发器使用了两个线程局部变量进行控制,当dispatch()方法被调用的时候,会先获取当前线程的观察者队列,并将传入的观察者列表传入到该队列中;然后通过一个布尔类型的线程局部变量,判断当前线程是否正在进行分发操作,如果没有在进行分发操作,就通过遍历上述队列进行事件分发。

上述三个分发器内部最终都会调用Subscriber的dispatchEvent()方法进行事件分发:

final void dispatchEvent(final Object event) {
    // 使用指定的执行器执行任务
    this.executor.execute(new Runnable() {
        public void run() {
            try {
                // 使用反射触发监听方法
                Subscriber.this.invokeSubscriberMethod(event);
            } catch (InvocationTargetException var2) {
                // 使用EventBus内部的SubscriberExceptionHandler处理异常
                Subscriber.this.bus.handleSubscriberException(var2.getCause(), Subscriber.this.context(event));
            }
        }
    });
}

上述方法中的executor是执行器,它是通过EventBus获取到的;处理异常的SubscriberExceptionHandler类型也是通过EventBus获取到的。(原来EventBus中的构造方法中的字段是在这里用到的!)

小结

EventBus中维护了三个缓存和四个映射:

  • 事件类型到观察者列表的映射(缓存)
  • 事件类型到监听者方法列表的映射(缓存)
  • 事件类型到事件类型及其所有父类的类型的列表的映射(缓存)
  • 观察者到监听者的映射,观察者到监听方法的映射;

观察者Subscriber内部封装了监听者和监听方法,可以直接反射触发。而如果是映射到监听者的话,还要判断监听者的方法的类型来进行触发

每次使用EventBus注册和取消注册监听者的时候,都会先从缓存中进行获取,不是每一次都会用到反射的,这可以提升获取的效率,也解答了我们一开始提出的效率的问题。当使用反射触发方法的调用貌似是不可避免的了。

ventBus中使用了非常多的数据结构,比如MultiMap、CopyOnWriteArraySet等,还有一些缓存和映射的工具库,这些大部分都来自于Guava。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352

推荐阅读更多精彩内容