RxJava2 线程切换与任务取消的原理

1、线程切换

涉及线程切换操作的操作符主要就俩,subscribeOn和observedOn, 我们还是像RxJava2 的原理浅析 一样,考究源码实现。我们先把主要注意力放在 subsribeActual方法里。

  • FlowableSubscribeOn


    Screen Shot 2018-02-03 at 3.22.40 PM.png

我们可以看到与普通的Flowable子类的subscribeActual实现是不太一样的, 通常我们的Flowable实现是这样的,

Screen Shot 2018-02-03 at 3.26.11 PM.png

调用上游的source,执行subscribe就好了,但是在FlowableSubscribeOn这个类里面,我们发现是先执行onSubscribe,也就是说先回溯,然后下游会再递归回来执行对应subscription的request(long )方法

Screen Shot 2018-02-03 at 4.14.35 PM.png

到达这里 request(long n) 有可能执行requestUpstream,向上游递归执行request。w.schedule(sos) 这里,就是执行SubscribeOnSubscriber的run方法,w就是一个Scheduler.Worker,内部包装了一个Java自带的ExectutorService。我们发现,执行subsribe向上游递归时,线程发生切换,后续的代码都执行在w指定的线程池上。

Screen Shot 2018-02-03 at 4.15.22 PM.png
  • FlowableObservedOn

FlowableObservedOn它的subscribeActual方法与其他类是一样的,我们把主要注意力放到它的BaseObserveOnSubscriber这个内部类里面,显然,这个类实现了runnable接口的run方法,这会用于线程切换。我们主要看onNext以及接下来要执行的trySchedule(看名字就知道要切换线程了),以及run方法。

Screen Shot 2018-02-03 at 4.32.55 PM.png
Screen Shot 2018-02-03 at 4.34.37 PM.png
Screen Shot 2018-02-03 at 4.33.12 PM.png

在onNext(T )方法里面我们会将数据 t 加入队列queue,稍后会取出数据,调用actual.onNext(),数据将会传递到下游。

Screen Shot 2018-02-03 at 6.15.17 PM.png

在ObserveOnSubscriber方法里,真正执行的三个方法其实是

Screen Shot 2018-02-03 at 6.18.47 PM.png

以runSync为例子,我们可以看到取出队头数据我们会看到一些异常处理,顺便说一下,e 可以看成已经发送的数据个数,r就是请求的数据个数,具体细节就是在处理队列里面残留的数据啦,不过在这里我们暂时先不关心这些。runSync最后就会跑到 a.onNext(v),这里的a就是下游的actual,一个subsriber。整个方法体被封装在run方法里调用,也就是trySchedule方法里面的那个work.scheduler(this),在这个地方发生了线程切换。

Screen Shot 2018-02-03 at 4.34.37 PM.png

2、任务取消的原理

取消任务的做法其实很简单,flowable执行subscribe(Subscriber s) 以后,就会返回一个LambdaSubscriber,它自己实现了disposable接口的dipose方法,所以做法就如下就好。

Screen Shot 2018-02-03 at 6.34.15 PM.png

然后我们来看下它的原理,一共分两种情况讨论,同步任务和异步任务

  • 同步任务

还是回到Flowable.subscribeActual方法里

    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.SPECIAL)
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Subscription> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }

关键点在先执行subscribe(ls),然后才返回我们的disposable,先执行subscribe,整条执行链路就都跑起来了,然后又是同步,等到返回的时候必然已经执行完毕,所以同步的情况不需要取消任务。

  • 异步任务

异步任务麻烦一点,我们需要看下代码执行调用的一些顺序,我们以这段代码为例子

 public void run() {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        Runnable unlockAction = () -> {
            lock.lock();
            try {
                condition.signal();
            } finally {
                lock.unlock();
            }
        };

        Runnable lockAction = () -> {
            lock.lock();
            try {
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        };

        Disposable disposable = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(23);
                System.out.println(23);
            }
        }, BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.io())
                .delay(3, TimeUnit.MINUTES)
                .subscribe(integer -> {
                    System.out.println(integer);


                });
        Scheduler.Worker worker = Schedulers.newThread().createWorker();

        worker.schedule(() -> {
            System.out.println("dispose start");
            disposable.dispose();
            unlockAction.run();
        }, 3, TimeUnit.SECONDS);
        lockAction.run();
    }

执行disposable以后通过打断点观看栈帧,dispose方法执行以后会递归执行subscription 的 cancel 方法。

Screen Shot 2018-02-03 at 7.07.48 PM.png

这些个Flowable的子类大多都有个内部类,同时实现subscriber和subscription方法,同时也是 atomic*的子类,有些是AtomicLong的子类,有些是AtomicReference<?>的子类,每个subscription都会在onSubscribe方法里获取到上游的subscription,这样就能够递归执行了。最终到源头的FlowableCreate里面执行cancel方法。

接下来是异步取消任务的核心,代码展示说不太清,需要一点点思考。

当递归执行subscription cancel的时候,总会跑到某个subscriber里面,我们称呼它为A,它的worker正在执行,虽然执行代码的片段不一定是A的run或者是onNext(有可能是它下游的某个subscriber) 但是一定会运行在A的worker里面,所以执行worker.dipose停掉这个worker就可以取消异步任务(所有的runnable都会被封装成为FutureTask 从而可以被取消)
其实原理就辣么简单。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容