2-RxJava源码分析之 --- 订阅过程和线程切换

RxJava订阅过程和线程切换原理

1 - Observable.just("hello world").subscribe(observer)

这是RxJava中的生产/消费模式中最简单的一种,就是生产发送“hello world"在用observer去监听消费数据,那么具体内部RxJava是如何实现的呢?

public abstract class Observable<T> implements ObservableSource<T> {
    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        // 1.创建实际的Observable类ObservableJust,并把数据传进去
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }
    
    // subscribe操作
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            // 2.Observable的subscribeActual是abstract,实现在ObservableJust中
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }   
}

// Observable.just("Hello world"),创建的Observable实现类
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    // subscribe时具体实现逻辑
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        // 3.先调用observer.onSubscribe()方法
        observer.onSubscribe(sd);
        // 4.执行后续的调用
        sd.run();
    }

    @Override
    public T call() {
        return value;
    }
}

public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable {
    @Override
    public void run() {
        // 5.如果时START,设置为ON_NEXT
        if (get() == START && compareAndSet(START, ON_NEXT)) {
            // 6.调用observer.onNext()方法
            observer.onNext(value);
            if (get() == ON_NEXT) {
                // 7.设置为ON_COMPLETE
                lazySet(ON_COMPLETE);
                // 8.调用observer.onComplete()方法
                observer.onComplete();
            }
        }
    }
}

上面代码的注释可以清晰的了解到整个Observable.just("Hello world").subscribe(observer)的调用过程,首先是在Observable.just("Hello world")中创建实际的Observable对象ObservableJust实例,然后在subscribe(observer)时,调用ObservableJust的subscribeActual方法,在subscribeActual中先调用obsever.onSubscribe(),再调用ScalarDisposable的run()方法,run()方法中处理了onNext()/onComplete()逻辑。

2 - Observable.just("hello world").subscribeOn(Schedulers.IO).subscribe(observer),subscribeOn(scheduler)的线程切换分析,数据生产方的线程切换

public abstract class Observable<T> implements ObservableSource<T> {
 public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        // 1.创建ObservableSubscribeOn,即把原始Observable转为ObservableSubscribeOn对象
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
}

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        // 2.把原始Observable保存为source
        super(source);
        this.scheduler = scheduler;
    }
    
     @Override
    public void subscribeActual(final Observer<? super T> s) {
        // 3.把原始observer转成另一个代理类,代理类中对onSubscribe和dispose做了处理
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        // 4.调用Observer.onSubscribe()方法
        s.onSubscribe(parent);
        // 5.此处是关键,创建SubscribeTask,它是个Runnable,给scheduler去调用,即进行线程切换,
        // 在SubscribeTask中包含了orignalObservable.subscribe(orignalObserver)逻辑,
        // 这样就使订阅逻辑执行在scheduler线程中。此处的scheduler.scheduleDirect()逻辑后面在分析,
        // 只要理解为把一个Runnable放在执行线程执行即可。
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
    final class SubscribeTask implements Runnable {
        @Override
        public void run() {
            // 5.source即为原始Observable,parent为原始Observer的代理类(SubscribeOnObserver),
            //即这里实际就是执行原始未切换线程逻辑下的Observable.subscribe(observer),
            // 注意因为上面已经调用了originObserver.onSubscribe(parent);
            // 所以在代理类onSubscribe方法中没有调用originObserver.onSubscribe()
            source.subscribe(parent);
        }
    }
}

从上面代码注释可以了解到,RxJava subscribe(发送端) 切换线程subscribe(Schedulers.IO)逻辑会把原始Observable转为另一个订阅时线程切换的Observable(ObservableSubscribeOn),在ObservableSubscribeOn.subscribeActual()中把原始observer转为一个代理对象parent,调用originObserver.onSubscribe()方法,并把未切换线程的subscribe逻辑包装为Runnable,再把Runnable给Scheduler去调用执行,从而达到切换线程执行subscribe逻辑。

3 - Observable.just("hello world").observeOn(Schedulers.IO).subscribe(observer), observeOn(scheduler)的线程切换分析,数据消费方线程切换

public abstract class Observable<T> implements ObservableSource<T> {
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        //  1.创建ObservableObserveOn,即把原始Observable转为ObservableObserveOn对象
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
}


public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            // 2.创建ObserveOnObserver,即把原始observer转为ObserveOnObserver对象,
            // ObserveOnObserver中会把observer相关的回调通过worker切换到指定线程去调用
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            // 3.切换线程调用
            schedule();
        }
        
         void schedule() {
            if (getAndIncrement() == 0) {
                // 4.切换到worker线程调用,里面具体调用哪个方法执行逻辑较多,总体来说到这里就达到了observer回调切换线程的目的
                worker.schedule(this);
            }
        }
    }
}

从上面代码注释可以了解到,RxJava observer(接收端) 切换线程observerOn(Schedulers.IO)逻辑会把原始Observable转为另一个订阅时线程切换的Observable(ObservableObserveOn),在ObservableObserveOn.subscribeActual()中把原始observer转为一个代理对象ObserveOnObserver,在ObserveOnObserver中对observer相关的回调做线程切换处理,从而达到observer回调切换线程的目的。

4 - 生产端和消费端都切换线程的分析, Observable.just("hello world").subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(observer)

public abstract class Observable<T> implements ObservableSource<T> {
    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        // 1.创建实际的Observable类ObservableJust,并把数据传进去
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }
    
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        // 2.创建ObservableSubscribeOn,即把ObservableJust转为ObservableSubscribeOn对象
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    
     public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        // 3.创建ObservableObserveOn,即把ObservableSubscribeOn转为ObservableObserveOn对象
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
}

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    @Override
    protected void subscribeActual(Observer<? super T> s) {
        // 4.这是ObservableJust执行subscribe的逻辑。记:TAG-4
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
    }
}

public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable {
    public void run() {
        // 5.ObservableJust发送端逻辑,这里会调线程切换转化后的Observer,记后面的TAG-10位置
        if (get() == START && compareAndSet(START, ON_NEXT)) {
            observer.onNext(value);
            if (get() == ON_NEXT) {
                lazySet(ON_COMPLETE);
                observer.onComplete();
            }
        }
    }
}

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        // 6.把原始Observable保存为source
        super(source);
        this.scheduler = scheduler;
    }
    
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        // 注意此处的s不是原始的observer,是在ObservableObserveOn中转化过的observer,可以见下面分析
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        //7.把observable.subscribe(observer)包装进Runnable中,并给scheduler去执行,留意此处在哪调用的,后面会分析。记:TAG-7,这里会调用到第六步TAG-8
        //注意注意注意!!!:这里达到发送端切换线程目的
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
    final class SubscribeTask implements Runnable {
        @Override
        public void run() {
            // 8.在io线程中执行observable.subscribe(observer),注意此处的source是ObservableJust,它会调用TAG-4。记:TAG-8
            source.subscribe(parent);
        }
    }
}


public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            // 9.创建ObserveOnObserver,即把原始observer转为ObserveOnObserver对象,并调用到上面第7步的流程,即TAG-7的地方
            // 注意注意注意!!!:ObserveOnObserver会把原始observer的回调放在指定线程去回调,达到接收端切换线程目的
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            // 10.observer切换线程的地方。记TAG-10
            schedule();
        }
    }
}

从上面代码注释可以了解到,其实RxJava对于发送端和接收端切换线程的逻辑就是把对应的Observable转为另一个Observable,且对于observeOn(scheduler)会把observer转为切换线程调用的oberver,当subscribe时,就会从转化后的Observable一级一级调用到原始的Observable方法,当然中间做了subscribe的切换线程操作,在原始Observable上再调用转化后的可线程切换的Observer的回调,在切换线程Observer中对原始Observer回调进行线程切换后调用。

RxJava源码分析系列文章主题目录:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352

推荐阅读更多精彩内容