RxJava2框架源码分析二(Create篇)

1.回顾

上篇已经介绍了RxJava的基本概念以及用法 RxJava2基本框架分析一(基础篇)

2.实例讲解

       // RxJava的链式操作
        // 1. 创建被观察者(Observable) & 定义需发送的事件
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        });
        // 2. 创建观察者(Observer) & 定义响应事件的行为
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("开始采用subscribe连接");
            }
            // 默认最先调用复写的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                System.out.println("对Next事件" + value + "作出响应");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                System.out.println("对Complete事件作出响应");
            }

        };
        // 3. 通过订阅(subscribe)连接观察者和被观察者
        observable.subscribe(observer);
  • 运行结果


    示意图

3. 源码分析

下面,我讲根据 使用步骤 进行RxJava2的源码进行分析
步骤1:创建被观察者(Observable)&定义需发送的事件
步骤2:创建观察者(Observer)&定义响应事件的行为
步骤3:通过订阅(subscribe)连接观察者和被观察者

步骤一:创建被观察者(Observable)

  • 源码分析如下
// 1. 创建被观察者(Observable) & 定义需发送的事件
 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        });
 /**
  * 源码分析 Observable.create(object : ObservableOnSubscribe<Int>{...])
  *  create 操作主要是创建了 ObservableCreate 对象并且返回出去
 */
    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        //判断source是否为空  
        ObjectHelper.requireNonNull(source, "source is null");
        //hook函数:判断是否需要再原对象加上一些代码操作(暂时可以当做返回对象本身)
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
  
  /**
   * 下面我们来看看 ObservableCreate 对象里面做了什么操作
   */
    public final class ObservableCreate<T> extends Observable<T> {
    // ObservableCreate 是Observable的子类
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        //构造函数
        //传入source对象,并且赋值全局 = 手动创建的ObservableOnSubscribe匿名内部类对象(Observable.create(new ObservableOnSubscribe<Integer>())
        this.source = source;
    }
  //这里需要留心关注subscribeActual方法后面会讲到

  • 步骤1总结:创建被观察者的操作已经完成了,调用 Observable.create()返回了一个ObservableCreate 对象。

步骤二创建观察者(Observer)

  • 源码分析
/** 
  * 使用步骤2:创建观察者 & 定义响应事件的行为(方法内的创建对象代码)
  **/
 // 2. 创建观察者(Observer) & 定义响应事件的行为
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("开始采用subscribe连接");
            }
            // 默认最先调用复写的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                System.out.println("对Next事件" + value + "作出响应");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                System.out.println("对Complete事件作出响应");
            }

        };
/** 
  * 源码分析Observer类
  **/
     public interface Observer<T> {
        // 注:Observer本质 = 1个接口
        // 接口内含4个方法,分别用于 响应 对应于被观察者发送的不同事件
        void onSubscribe(@NonNull Disposable d); // 内部参数:Disposable 对象,可结束事件
        void onNext(@NonNull T t);
        void onError(@NonNull Throwable e);
        void onComplete();
    }
  • 步骤2总结:创建观察者的操作已经完成了,通过new了一个Observer的匿名内部类

步骤三:通过订阅(subscribe)连接观察者和被观察者

  • 源码分析
 // 3. 通过订阅(subscribe)连接观察者和被观察者
        observable.subscribe(observer);

/** 
  * 源码分析:Observable.subscribe(observer)
  * 说明:该方法属于 Observable 类的方法(注:传入1个 Observer 对象)
  **/  
public abstract class Observable<T> implements ObservableSource<T> {
     ...
    // 仅贴出关键源码
  @Override
  public final void subscribe(Observer<? super T> observer) {
         ...
         // 仅贴出关键源码
        //可以看到调用的是本类的下面抽象方法
         subscribeActual(observer); 
   }
    //定义了一个抽象方法当调用subscribe时会跟这个调用Observable子类的实现方法(就是调用者)
   protected abstract void subscribeActual(Observer<? super T> observer);
}

/**
*  现在我们回到先前创建的被观察者中 ObservableCreate类 
**/
public final class ObservableCreate<T> extends Observable<T> {
 // ObservableCreate 是Observable的子类
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        //构造函数
        //传入source对象,并且赋值全局 = 手动创建的ObservableOnSubscribe匿名内部类对象(Observable.create(new ObservableOnSubscribe<Integer>())
        this.source = source;
    }

   /** 
      * 重点关注:复写了subscribeActual()
      * 作用:订阅时,通过接口回调 调用被观察者(Observerable) 与 观察者(Observer)的方法
      **/
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
      //1. 创建1个CreateEmitter对象(封装成一个Disposable对象)
      //作用:发射事件
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
      //2. 调用观察者(Observer)的onSubscribe()
     // onSubscribe()的实现 = 使用步骤2(创建观察者(Observer))时复写的onSubscribe()
       //将Disposable(CreateEmitter) 传到观察者onSubscribe(Disposable d) 参数中,使之可以解除订阅
        observer.onSubscribe(parent);

        try {
            //3.调用source对象的subscribe()方法
            // source对象 = 使用步骤1(创建被观察者(Observable))中创建的ObservableOnSubscribe对象
            //subscribe()的实现 = 使用步骤1(创建被观察者(Observable))中复写的subscribe()
            //将CreateEmitter对象传递给被观察者进行对象方法的调用(onNext(),onComplete()...)
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

  /** 
    * 分析2:emitter.onNext("1");
    * 此处仅讲解subscribe()实现中的onNext()
    * onError()、onComplete()类似,此处不作过多描述
    **/
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            //初始化讲观察者赋值到全局变量observer
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
          //当被观察者调用onNext()方法时,回调此方法(步骤一中创建Observable.create()匿名内部类中的onNext())
            //发送的事件不能为null
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
          //判断是否断开连接(调用Disposable.dispose())
          //没有断开的话,则调用观察者中的onNext()方法
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }
}

步骤3总结:当被观察者订阅观察者的时候,会调用被观察者ObservablesubscribeActual()抽象方法,回调其子类重新的subscribeActual()方法。这方法里面有三个步骤:

  • 创建1个CreateEmitter对象(封装成一个Disposable对象)
  • 调用观察者(Observer)的onSubscribe(CreateEmitter parent ) 使其可以取消订阅
  • 调用source对象的subscribe(CreateEmitter parent)方法,通过 parent发送事件回调

4. 源码总结

  • 在步骤1(创建被观察者(Observable))、步骤2(创建观察者(Observer))时,仅仅只是定义了发送的事件 & 响应事件的行为;
  • 只有在步骤3(订阅时),才开始发送事件 & 响应事件,真正连接了被观察者 & 观察者
  • 具体源码总结如下


    总结
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • RxJava——目前最热门的响应式函数编程框架。笔者也是初涉Rx,所以打算通过这篇文章来理解Rx的操作流程,加深自...
    Robin_Lrange阅读 11,137评论 10 44
  • 序言 RxJava是现在最流行的响应式函数编程框架,之前的项目中一直使用RxJava,结合Retrofit+OkH...
    左大人阅读 3,228评论 5 16
  • 前言:学习了这么多天的RxJava系列文章,虽然会用了,但是确不懂的具体是怎么回事,所以说会用的话还是不行,要去了...
    六_六阅读 269评论 0 0
  • 第四次拿起绿底白边的残次品瓷杯大口大口灌可乐时,像煎铁板鱿鱼一样滋滋作响的气泡又把我的眼泪呛出来。 是的没错,我不...
    南逢酒馆阅读 555评论 0 1
  • 今天没有加班,和老公相约在四惠地铁一起回家,在外面吃了饭,为了少长点肉,我俩又商量去超市溜溜食,一路上聊聊这聊...
    凤_b89a阅读 173评论 0 1