Guava 中EventBus事件总线模式

一.事件总线模式的产生背景以及意义?

 在说事件模式(EventBus)之前,我们可以追朔到Bus 的起源,所谓Bus ,在计算机中就是存在主板上的总线,映射到生活中,就是公交车的含义;
 在计算机上,我们的输入设备,输出设备之间种类繁多,当我们从键盘输入一个字符串,cpu 处理完成之后回显给显示器;那这个过程需要怎么传输呢?最直接的方式就是为这些设备相互之间建立线路,这样可以解决问题,但是随着设备越来越多,这些设备建立的私有的线路越来越繁杂,很难以维护,于是我们暂时丢弃了这个方式,采用公共的总线方式,所有的设备将传输的数据,以及目的设备地址发送出去,总线只是为我们进行运输,这样也很方便维护,类比我们生活中,a用户要给b 用户买礼物,那么当a 自己买到了礼物后,然后坐上公交车去b 的地址,当公交车到达目的地之后,然后下车将礼物送到b 手中;
 
 所以,软件中也借鉴了硬件中总线的背景,产生了事件总线模式,在软件中,组件模块会有很多,相互通信的协议,方式多种多样,如果每个组件都要写一套,维护成本较高,那么我们就要用这个模式来解决这个问题.
 
 这个模式跟发布-订阅模式,观察者模式都很类似。但相比之下,EventBus解除了通知者与观察者之间的关系,是组件之间能够进一步的解耦;

二.EventBus 如何使用?

通过电商中一个案例,来说明一下使用的场景,假如我们买了一件商品,商品发货后,物流状态发生了变化,此时会给我们发通知短信,进行处理;

引入guava 依赖:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
</dependency>
物流事件定义:
 public class LogisticsEvent {
    private String orderStatus;


    public String getOrderStatus() {
        return orderStatus;
    }

    public void setOrderStatus(String orderStatus) {
        this.orderStatus = orderStatus;
    }
    public LogisticsEvent getData() {
        return this;
    }
    }    
用户监听器定义:
/**
 * 用户监听器
 */
public class UserListener {
    
    /**
        @Subscribe 标记了监听器需要处理的方法,可以存在多个方法来处理变更
        
    */
    @Subscribe 
    public  void orderUpdate(LogisticsEvent logisticsEvent){
        System.out.println("用户监听器收到该事件"+ " start process");
        System.out.println("processing");
        System.out.println("status"+ logisticsEvent.getOrderStatus() +"物流状态处理完成");
    }
}

客户端测试:
public class EventBusTest {
    public static void main(String[] args) {
        EventBus eventBus  = new EventBus();
        LogisticsEvent logisticsEvent = new LogisticsEvent();
        logisticsEvent.setOrderStatus("3");
        UserListener userListener = new UserListener();
        eventBus.register(userListener);
        eventBus.post(logisticsEvent);
    }
}
创建一个EventBus 实例,通过将用户监听器注册到eventBus 中,然后通过eventBus 发送事件,在监听器收到该类型事件时,进行处理;也可注册多个监听器,发送多个事件,只要明确不同监听器接收不同的事件类型,这样就能进行处理;

三.EventBus 如何工作?

所涉及的类在eventbus 包下,主要包括:
EventBus: 事件总线,以及消息的广播传输
Subscriber: 订阅者,观察者
Dispatcher: 事件分发,有3种实现,分别是ImmediateDispatcher(同步分发器),LegacyAsyncDispatcher(异步分发器),PerThreadQueuedDispatcher(单线程分发)
SubscriberRegistry:订阅者注册器,通过ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> 维护了事件消息类型和 观察者之间的关系;
SubscriberExceptionContext: 订阅者异常处理器上下文;
SubscriberExceptionHandler:异常处理器;
DeadEvent:死亡事件,事件没有相应的订阅者处理时,则标记为一个死亡事件;
AsyncEventBus:异步事件总线;
  /**
    创建一个事件总线,
    identifier 为事件的标识,
    executor 为 执行器
    dispatcher 分发器
    exceptionHandler:异常处理器
  */
  public EventBus(String identifier) {
    this(
        identifier,
        MoreExecutors.directExecutor(),
        Dispatcher.perThreadDispatchQueue(),
        LoggingHandler.INSTANCE);
  }
  /**
    EventBus 中 观察者注册
  */
  public void register(Object object) {
    subscribers.register(object);
  }
  
  /**
    为事件类型建立订阅者之间的处理关系,通过类的反射确定带有@Subscribe 的方法,为一个具体的观察者,方法上的参数为具体的事件类型,
  */
  void register(Object listener) {
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
    
    for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();
    
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
    
      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }
      eventSubscribers.addAll(eventMethodsInListener);
    }
    }

注册了订阅者之后,此时创建事件,并广播发送事件,调用EventBus 中post()方法
分发器处理,默认为PerThreadQueuedDispatcher 单线程分发器,分发的过程:

    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      checkNotNull(subscribers);
      Queue<Event> queueForThread = queue.get();
      queueForThread.offer(new Event(event, subscribers));

      if (!dispatching.get()) {
        dispatching.set(true);
        try {
          Event nextEvent;
          while ((nextEvent = queueForThread.poll()) != null) {
            while (nextEvent.subscribers.hasNext()) {
              nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
            }
          }
        } finally {
          dispatching.remove();
          queue.remove();
        }
      }
    }

通过使用ThreadLocal 的方式,创建Queue,存储事件队列,首先将事件和订阅者封装为一个事件,放入队列中,若当前事件状态未被处理,则标记当前事件为已处理,接着从队列中取出当前事件,若有存在的监听器,则分发事件给具体的监听器来处理,处理完成之后,队列清空,状态移除;下来继续接收下一个事件的处理,保证了事件分发处理的顺序性;

观察者对事件的处理:

  final void dispatchEvent(final Object event) {
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }
  @VisibleForTesting
  void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    try {
      method.invoke(target, checkNotNull(event));
    } catch (IllegalArgumentException e) {
      throw new Error("Method rejected target/argument: " + event, e);
    } catch (IllegalAccessException e) {
      throw new Error("Method became inaccessible: " + event, e);
    } catch (InvocationTargetException e) {
      if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
      }
      throw e;
    }
  }
  

通过反射调用观察者的方法进行事件的响应和处理;

在事件分发器中,剩余的两个LegacyAsyncDispatcher(AsyncEventBus 使用这个异步分发器),ImmediateDispatcher(同步分发器);
LegacyAsyncDispatcher:异步分发器分发过程:

    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      while (subscribers.hasNext()) {
        queue.add(new EventWithSubscriber(event, subscribers.next()));
      }

      EventWithSubscriber e;
      while ((e = queue.poll()) != null) {
        e.subscriber.dispatchEvent(e.event);
      }
    }

使用ConcurrentLinkedQueue 全局队列来存储事件和观察者处理器组成的对象,如果一个事件有多个观察者处理器,则会拆分成1*N 个对象放入队列中,然后进行分发

ImmediateDispatcher:同步分发器处理过程:

    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      while (subscribers.hasNext()) {
        subscribers.next().dispatchEvent(event);
      }
    }

循环遍历当前观察者处理器,如果有下一个处理器,则进行事件分发;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容