RxJava代码分析之订阅

当知道了RxJava是如何创建被观察者,接下来就是更重要的一步了,Rxjava是如何订阅的呢?
还是先看下使用过程:

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("我是RxJava");
                e.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Rxjava被订阅");
            }

            @Override
            public void onNext(String s) {
                System.out.println("Observer接收消息: " + s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("Observer出现错误: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("Rxjava 完成");
            }
        });

当我们点进去看subscribe方法的代码:

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {

        // 还是一样的判断是否为空
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //和创建时是一样的,为我们提供hook点
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            // 需要重点关注
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

当点进去查看时,发现这个方法是一个抽象类的抽象方法:

    protected abstract void subscribeActual(Observer<? super T> observer);

还记得我们创建时返回的是什么吗?
ObservableCreate,查看代码也知道ObservableCreate继承了Observable自然也实现了subscribeActual方法,这就知道了需要查看的代码:ObservableCreate#subscribeActual()

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

我们先看下observer.onSubscribe(parent):在这里就调用了我们new出来匿名内部类的onSubscribe()方法;
至于CreateEmitter,就是将我们的观察者又包装了一层
看下CreateEmitter的继承的类型

   static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable { 
           private static final long serialVersionUID = -3434801548987643227L;
    }

创建出来了发射器,这还没有完,继续回到subscribeActual():


Snipaste_2020-06-01_22-20-35.png

可以看出把创建出来的发射器交给了我们的被观察者
直到这我们才完成了订阅过程,看看订阅过程的时序图:


RxJava订阅.png

仅仅这样是不行的还有一个RxJava中最重要的东西,就是线程的切换

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。