EventBus源码分析(二)

上一篇关于订阅和取消订阅的分析:https://www.jianshu.com/p/3f08a23c4544
上一篇对订阅和取消订阅进行了一个源码分析,简单来讲就是我们在类中调用@Subscribe所订阅事件的方法在订阅过程被封装成了subscriberMethod对象并被逐一添加到subscriptionsByEventType和typesBySubscriber这两个map中去,取消订阅则是分别从这两个map中移除相关的映射关系。
注册订阅事件后,接下来看一下是如何发送订阅事件的,发送订阅事件使用的是:

  EventBus.getDefault().post(TestEvent())

点击进去看一下post方法:

    private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };
    /** Posts the given event to the event bus. */
    public void post(Object event) {
        //分装成PostingThreadState 对象
        PostingThreadState postingState = currentPostingThreadState.get();
        //从postingState获取事件队列
        List<Object> eventQueue = postingState.eventQueue;
        // 将当前要发送的事件加入到队列中
        eventQueue.add(event);

        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                //while 不断轮询发送事件
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

    /** For ThreadLocal, much faster to set (and get multiple values). */
    final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<>();
        boolean isPosting;
        boolean isMainThread;
        Subscription subscription;
        Object event;
        boolean canceled;
    }

currentPostingThreadState是一个ThreadLocal类型的变量(ThreadLocal的作用:ThreadLocal是解决线程安全问题一个很好的思路,它通过为每个线程提供一个独立的变量副本解决了变量并发访问的冲突问题。在很多情况下,ThreadLocal比直接使用synchronized同步机制解决线程安全问题更简单,更方便,且结果程序拥有更高的并发性。)currentPostingThreadState中存储了当前线程对应的事件列表和线程的状态信息等,上述主要调用了轮询调用postSingleEvent方法,看一下postSingleEvent方法:

    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        //获取该事件对象类型
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) {
            //是否支持事件继承,默认为true,如果订阅了父类型,当发送子类型事件实也会调用其相关订阅方法
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
         // 找不到该事件的异常处理
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

上述eventInheritance默认为true,表示如果订阅了父类型,当发送子类型事件实也会调用其相关订阅方法,最终是调用postSingleEventForEventType进行分发,看一下postSingleEventForEventType:

    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        //CopyOnWriteArrayList是一个线程安全的list,写入时会复制一份数据出来,之后再赋值回去
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            //从subscriptionsByEventType map中获取该eventType下的所有订阅对象,subscriptionsByEventType会不会有点熟悉??
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
             // 遍历该eventType对应下的订阅对象,并调用postToSubscription执行分发操作
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

subscriptionsByEventType会不会觉得有点熟悉??这个便是我们上一篇订阅分析所提到的,这个map key为某一事件类型,value为该事件类型下的所有订阅,我们从subscriptionsByEventType中获取该eventType下的所有订阅后对其进行遍历,并逐一调用postToSubscription()把事件分发到每一个订阅对象中去,继续看postToSubscription,这里我们会根据订阅方法指定的threadMode信息来执行不同的发布策略:

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

threadMode总共有以下几种类型
POSTING:执行invokeSubscriber()方法,就是直接反射调用;
MAIN:首先去判断当前是否在UI线程,如果是的话则直接反射调用,否则调用mainThreadPoster#enqueue(),即把当前的方法加入到队列之中,然后通过handler去发送一个消息,在handler的handleMessage中去执行方法。具体逻辑在HandlerPoster.java中;
MAIN_ORDERED:与上面逻辑类似,顺序执行我们的方法;
BACKGROUND:判断当前是否在UI线程,如果不是的话直接反射调用,是的话通过backgroundPoster.enqueue()将方法加入到后台的一个队列,最后通过线程池去执行;
ASYNC:与BACKGROUND的逻辑类似,将任务加入到后台的一个队列,最终由Eventbus中的一个线程池去调用,这里的线程池与BACKGROUND逻辑中的线程池用的是同一个。

这里先取一个分支来看,假设我们指定事件监听最后是回到主线程,也即是平常常使用的 @Subscribe(threadMode = ThreadMode.MAIN),那么这里将会来到 case MAIN:分支,第一步先判断发送事件的时候(即调用event post)是不是在主线程,是的话直接执行invokeSubscriber()使用反射执行方法

    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            //使用反射执行
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

如果发送事件的时候(即调用event post)不是在主线程,则执行mainThreadPoster.enqueue(subscription, event)方法,那这个mainThreadPoster是什么呢?

//简化代码
private final Poster mainThreadPoster;
....
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
....
    MainThreadSupport getMainThreadSupport() {
        if (mainThreadSupport != null) {
            return mainThreadSupport;
        } else if (AndroidLogger.isAndroidLogAvailable()) {
            Object looperOrNull = getAndroidMainLooperOrNull();
            return looperOrNull == null ? null :
                    new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
        } else {
            return null;
        }
    }
....
public interface MainThreadSupport {

    boolean isMainThread();

    Poster createPoster(EventBus eventBus);

    class AndroidHandlerMainThreadSupport implements MainThreadSupport {

        private final Looper looper;

        public AndroidHandlerMainThreadSupport(Looper looper) {
            this.looper = looper;
        }

        @Override
        public boolean isMainThread() {
            return looper == Looper.myLooper();
        }

        @Override
        public Poster createPoster(EventBus eventBus) {
            return new HandlerPoster(eventBus, looper, 10);
        }
    }

}

可以看到,mainThreadPoster内部创建了一个主线程Looper,并最终new了一个HandlerPoster,HandlerPoster是mainThreadPoster的实现类,这里大概可以猜到mainThreadPoster其实就是主线程的handler,看一下HandlerPoster如何实现,先看一下enqueue方法:

public class HandlerPoster extends Handler implements Poster {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        //构建一个PendingPost对象
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //添加进PendingPost队列
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                //调用sendMessage发送消息,从而触发handleMessage
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }
  ......省略

PendingPostQueue是一个简单实现的链表,内部保存了两个PendingPost对象 ,一头一尾,尾部插入,头部移除,调用enqueue()从链表尾部插入,调用poll()从链表头部移除,每次插入后调用sendMessage从而回调到handleMessage,看一下handleMessage是如何从这个PendingPostQueue取出这个订阅事件的:

  ......省略
    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            //记录开始时间
            long started = SystemClock.uptimeMillis();
            while (true) {
                //while循环一直从PendingPostQueue取出PendingPost
                PendingPost pendingPost = queue.poll();
                //如果队列为空则取消
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                //这里同样使用反射调用了方法
                eventBus.invokeSubscriber(pendingPost);
                //判断执行是否超过了指定时间,是的话重新调用sendMessage方法
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }

可以看出,其实mainThreadPoster主要是帮我们回调到主线程,其内部本质上还是调用了反射去执行方法,关于handleMessage,第一步是用while循环不断轮询取出队列中的PendingPost,当队列为空则停止轮询,当执行处理方法的时长过长时则重新调用sendMessage,从而继续回调到handleMessage,这是为了防止执行时间过长,导致while循环阻塞主线程造成卡顿。
本篇博文到此结束,关于其他类型的threadMode执行操作,可自行在研究,差不多。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,869评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,716评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,223评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,047评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,089评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,839评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,516评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,410评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,920评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,052评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,179评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,868评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,522评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,070评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,186评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,487评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,162评论 2 356