Android 开源框架 02 --- RxJava01

在Rxjava里面有几个角色我们需要弄明白:

  • Observable:
    俗称被订阅者,被订阅者是事件的来源,接收订阅者(Observer)的订阅,然后通过发射器(Emitter)发射数据给订阅者。

  • Observer:
    俗称订阅者,注册过程传给被订阅者,订阅者监听开始订阅,监听订阅过程中会把Disposable传给订阅者,然后在被订阅者中的发射器(Emitter)发射数据给订阅者(Observer)。

  • Emitter:
    俗称发射器,在发射器中会接收下游的订阅者(Observer),然后在发射器相应的方法把数据传给订阅者(Observer)。

  • Consumer:
    俗称消费器,消费器其实是Observer的一种变体,Observer的每一个方法都会对应一个Consumer,比如Observer的onNext、onError、onComplete、onSubscribe都会对应一个Consumer。

  • Disposable:
    是释放器,通常有两种方式会返回Disposable,一个是在Observer的onSubscribe方法回调回来,第二个是在subscribe订阅方法传consumer的时候会返回。

这里以一种最基本的订阅来介绍它们之间的关系:
image.png

RxJava怎么通过被订阅者传给订阅者的过程是什么样的?

通常我们写一个从订阅到发送数据的示例如下:

Observable.create(new ObservableOnSubscribe<Integer>() {

  @Override
  public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
      emitter.onNext(1);
      emitter.onNext(2);
      emitter.onNext(3);
      emitter.onComplete();
  }
}).subscribe(new Observer<Integer>() {
  @Override
  public void onSubscribe(@NonNull Disposable d) {
      Log.d(TAG, "onSubscribe:" + d.getClass().getName());
  }

  @Override
  public void onNext(@NonNull Integer integer) {
      Log.d(TAG, "onNext: " + integer);
  }

  @Override
  public void onError(@NonNull Throwable e) {
      Log.d(TAG, "onError: " + e.getMessage());
  }

  @Override
  public void onComplete() {
      Log.d(TAG, "onComplete");
  }
});

相信这是最简单的事件发送的示例,这没什么好说的,那它们是怎么发送数据,接收数据的呢,下面我会把代码拆分来看,因为现在是链式调用,我把代码拆分如下:

image.png

订阅过程
这里把创建observable和observer、以及发起订阅分别拆开来写,后面方便我们分析代码,首先是第一步发起订阅observable.subscribe(observer):

@Override
public final void subscribe(@NonNull Observer<? super T> observer) {
  try {
      subscribeActual(observer);
  } catch (NullPointerException e) { // NOPMD
      throw e;
  } catch (Throwable e) {
      NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
      throw npe;
  }
}

observable的订阅方法关键一句subscribeActual(observer),这里提一句,所有的被观察者的订阅入口都是subscribeActual方法,而subscribeActual在被观察者中是抽象方法,因此看对应的observable子类实现的逻辑,在上面通过Observable.create创建的被观察者是ObservableCreate, 它是Observable的子类,我么需要明确,RxJava中的操作符都会对应一个Observable的子类,比如just操作符对应的是ObservableJust的被观察者,好了,我们看ObservableCreate的subscribeActual实现:

@Override
protected void subscribeActual(Observer<? super T> observer) {
  //创建发射器,并且把下游的observer给发射器
  CreateEmitter<T> parent = new CreateEmitter<>(observer);
  //给下游的observer添加被订阅的监听
  observer.onSubscribe(parent);
  try {
      //给上游的ObservableOnSubscribe添加订阅,并且把下游的observer给上游的ObservableOnSubscribe
      source.subscribe(parent);
  } catch (Throwable ex) {
      Exceptions.throwIfFatal(ex);
      parent.onError(ex);
  }
}

先是创建CreateEmitter类型的发射器,把下游的observer传给发射器,注意此处的发射器是实现了Disposable接口,所以紧接着会把发射器通过下游的观察者的onSubscribe方法传给下游观察者,注意此处传的是Disposable对象。

接着会给上游的ObservableOnSubscribe添加订阅,并且把下游的observer给上游的ObservableOnSubscribe。 为了描述订阅的过程,我们画一张时序图:
image.png

小总结
订阅是从下游的Observer向上游的Observable发送订阅,然后在订阅的过程中,给下游的Observer发送订阅监听,并且给上游的被观察者添加订阅。

发送数据
上面我们知道在ObservableCreate的subscribeActual方法中给上游的ObservableOnSubscribe添加了onSubscribe订阅过程,并且把当前的发射器传给了ObservableOnSubscribe,而在我们上面的示例中定义的ObservableOnSubscribe内部类的subscribe方法通过传过来的发射器添加了如下代码:

emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();

所以到这里可以看到是通过发射器的onNext和onComplete发送数据,而emitter是上面订阅过程传过来的CreateEmitter,所以直接看它的onNext和onComplete:

@Override
public void onNext(T t) {
  if (t == null) {
      onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
      return;
  }
  //如果isDisposed为false,则可以继续发送数据
  if (!isDisposed()) {
      observer.onNext(t);
  }
}

很简单,给observer发送数据,而当前的observer是订阅过程中传进来的下游observer,所以大家明白了吧,最终是下游的observer接收到数据。

小总结
发送主要通过上游的被观察者通知发射器,然后发射器会发送给下游的observer。

Observer处理完onComplete后会还能onNext吗?

上面我们看到emitter.onNext三次完了后,会发送onComplete事件,那onComplete处理啥呢:

@Override
public void onComplete() {
  if (!isDisposed()) {
      try {
          observer.onComplete();
      } finally {
          dispose();
      }
  }
}

这是发射器中onComplete的定义,dispose方法是控制是否还能发送数据,其实这里的 CreateEmitter它是一个AtomicReference<Disposable>原子类包装Disposable的实现类,而我们dispose方法正是将该原子类添加了常量的DISPOSED,而在onNext方法中通过判断isDisposed是否为false才能继续发送数据。而isDisposed什么时候为false呢?当AtomicReference<Disposable>中的包装对象不是DISPOSED。所以我们的onComplete是用来控制不能发送数据的。
可以通过如下代码测试:

emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
emitter.onNext(3);

看看下游的observer是否还能收到3的数据。
小总结
onComplete是用来控制不能发送数据的,也就是不能onNext了,包括onError也是不能再发送onNext数据了,该方法中也是调用了dispose方法。

RxJava中map、flatMap的区别,你还用过其他哪些操作符?

map和flatMap是我们经常用的转换操作,我们先看看map如何使用:

Observable<Integer> createObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

      @Override
      public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
          emitter.onNext(1);
          emitter.onNext(2);
          emitter.onNext(3);
          emitter.onComplete();
      }
  });
  Observable<String> mapObservable = createObservable.map(new Function<Integer, String>() {

      @Override
      public String apply(Integer integer) throws Throwable {
          return String.valueOf(integer + 1);
      }
  });
  Observer<String> observer = new Observer<String>() {
      @Override
      public void onSubscribe(@NonNull Disposable d) {
          Log.d(TAG, "onSubscribe:" + d.getClass().getName());
      }

      @Override
      public void onNext(@NonNull String string) {
          Log.d(TAG, "onNext: " + string);
      }

      @Override
      public void onError(@NonNull Throwable e) {
          Log.d(TAG, "onError: " + e.getMessage());
      }

      @Override
      public void onComplete() {
          Log.d(TAG, "onComplete");
      }
  };
  mapObservable.subscribe(observer);
}

通过createObservable的map操作生成了一个mapObservable的被观察者,最终通过mapObservable与observer形成订阅关系,通过createObservable的map操作生成了一个mapObservable的被观察者,最终通过mapObservable与observer形成订阅关系,

Observable<Integer> flatMapObservable = mapObservable.flatMap(new Function<String, ObservableSource<Integer>>() {
  @Override
  public ObservableSource<Integer> apply(String s) throws Throwable {
      return Observable.create(new ObservableOnSubscribe<Integer>() {
          @Override
          public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
              emitter.onNext(Integer.valueOf(s)+1);
              emitter.onComplete();
          }
      });
  }
});
flatMapObservable.subscribe(observer);

在上面的mapObservable基础上通过flatMap返回flatMapObservable,最后通过flatMapObservable订阅observer。flatMap的Function第二个泛型是ObservableSource类型的,Observable的父类是ObservableSource类型,因此第二个参数返回Observable也可以。

从上面可以看出map是通过原始数据类型返回另外一种数据类型,而flatMap是通过原始数据类型返回另外一种被观察者。

关于面试也有问flatMap和concatMap的区别,下面我通过一个例子来演示他们的区别:

Observable<String> createObservable = Observable.just("1", "2", "3", "4", "5", "6", "7", "8", "9");
Observable<Integer> flatMapObservable = createObservable.flatMap(new Function<String, ObservableSource<Integer>>() {
  @Override
  public ObservableSource<Integer> apply(String s) throws Throwable {
      if (s.equals("2")) {
          return Observable.create(new ObservableOnSubscribe<Integer>() {
              @Override
              public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                  emitter.onNext(Integer.valueOf(s) + 1);
                  emitter.onComplete();
              }
          }).delay(500, TimeUnit.MILLISECONDS);
      } else {
          return Observable.create(new ObservableOnSubscribe<Integer>() {
              @Override
              public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                  emitter.onNext(Integer.valueOf(s) + 1);
                  emitter.onComplete();
              }
          });
      }

  }
});
Observable<Integer> observeOnObservable = flatMapObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
Observer<Integer> observer = new Observer<Integer>() {
  @Override
  public void onSubscribe(@NonNull Disposable d) {
      Log.d(TAG, "onSubscribe:" + d.getClass().getName());
  }

  @Override
  public void onNext(@NonNull Integer string) {
      Log.d(TAG, "onNext: " + string);
  }

  @Override
  public void onError(@NonNull Throwable e) {
      Log.d(TAG, "onError: " + e.getMessage());
  }

  @Override
  public void onComplete() {
      Log.d(TAG, "onComplete");
  }
};
observeOnObservable.subscribe(observer);

在上面flatMap操作过程中为了演示flatMap和concatMap的区别,在数据为2的时候让返回的observable延迟500毫秒,我们看到的结果如下:


image.png

上面例子中3是由2的发射数据发射过来的,而正好数据为2的时候让延迟了500毫秒,那如果换成concatMap结果是按照发射数据的顺序来返回的。

concatMap和flatMap的功能是一样的, 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据放进一个单独的Observable。只不过最后合并ObservablesflatMap采用的merge,而concatMap采用的是连接(concat)。

他们的区别在于:concatMap是有序的,flatMap是无序的,concatMap最终输出的顺序与原序列保持一致,而flatMap则不一定,有可能出现交错。

关于其他的操作符比如merge、concat、zip都是合并,interval是周期执行,timer是延迟发送数据。

Maybe、Observer、Single、Flowable、Completable几种观察者的区别,以及他们在什么场景用?

其实想知道它们的区别,我们直接看对应的Observer的方法有哪些:

  • Maybe
    Maybe从字面意思是可能的意思,看下MaybeObserver接口:
public interface MaybeObserver<@NonNull T> {  
    void onSubscribe(@NonNull Disposable d);
    void onSuccess(@NonNull T t);  
    void onError(@NonNull Throwable e);
    void onComplete();
}

它没有onNext方法,也就是说不能发多条数据,如果回调到onSuccess再不能发消息了,如果直接回调onComplete相当于没发数据,也就是说Maybe可能不发送数据,如果发送数据只会发送单条数据

  • Observer
    这个不用多说了,它是能发送多条数据的,直到发送onError或onComplete才不会再发送数据了,当然它也是可以不发送数据的,直接发送onError或onComplete。

  • Single

public interface SingleObserver<@NonNull T> {
   void onSubscribe(@NonNull Disposable d);
   void onSuccess(@NonNull T t);
   void onError(@NonNull Throwable e);
}

single也是发送单条数据,单是它要么成功要么失败。

  • Flowable
    Flowable没有FlowableObserver接口,它是由FlowableSubscriber代表观察者,Flowable在后面被压的时候讲,我们只要知道它是被压策略的一个被观察者。

  • Completable

public interface CompletableObserver {
   void onSubscribe(@NonNull Disposable d);
   void onComplete();
   void onError(@NonNull Throwable e);
}

Completable不发送数据,只会发送成功或失败的事件,当然这个用得很少。

小总结
从上面各个对应的observer接口来看,如果只想发一条数据,或者不发数据就用Maybe,如果想法多条数据或者不发数据就用Observable,如果只发一条数据或者失败就用Single,如果想用背压策略使用Flowable,如果不发数据就用Completable。

RxJava切换线程是怎么回事?

大家都知道RxJava切换线程使用subscribeOn指定被观察者的在哪个线程执行,使用observeOn指定观察者在哪个线程执行,通常我们写法如下:

subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

subscribeOn(Schedulers.io())
image.png

subscribeOn会返回一个ObservableSubscribeOn,它是一个Observable,根据前面介绍的订阅流程,我们直接看ObservableSubscribeOn的subscribeActual操作:

@Override
public void subscribeActual(final Observer<? super T> observer) {
  //创建了内部的Observer,其实这里类似上面介绍的Observable.create创建的发射器,只不过发射器是Emitter
  final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
  //给下游的observer添加订阅的监听
  observer.onSubscribe(parent);
  //给SubscribeOnObserver设置disposable对象
  parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

创建了SubscribeOnObserver对象,它是Observer类型的,其实类似上面介绍的Observable.create创建的发射器,只不过发射器是Emitter类型。接着给下游的observer添加订阅的监听,最后是给SubscribeOnObserver设置disposable对象,还记得在observable.create最后一步是给上游的ObservableOnSubscribe添加订阅吗,那我们看看此处是如果给上游的observable添加订阅的,首先scheduler是Schedulers.io(),最终它是一个IoScheduler对象,里面是通过CachedWorkerPool内部类创建了线程池,创建线程池如下:

evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);

而scheduler.scheduleDirect(new SubscribeTask(parent))中的SubscribeTask是一个Runnable,所以最终通过线程池执行SubscribeTask的run方法:
image.png

到了最后还是通过线程池执行Runnable来添加上游Observable的订阅,并且把当前创建的SubscribeOnObserver传给了上游的observable,这个跟我们上面介绍Observable.create中给上游的ObservableOnSubscribe添加订阅是一样的。

小总结
subscribeOn实际是创建了ObservableSubscribeOn的Observable,它的订阅方法里面创建了SubscribeOnObserver,通过线程池执行Runnable来达到上游Observable的订阅在子线程中执行,这就是为什么subscribeOn能控制observable在哪个线程中执行的原因。

observeOn(AndroidSchedulers.mainThread())
同样如此observeOn也会有对应的observable,它是ObservableObserveOn,我们直接看它订阅的方法:


image.png

同样如此,可以看到先是拿到AndroidSchedulers中的worker,它是HandlerWorker类型,按道理说应该给下游的observer添加订阅监听啊,怎么没有呢,看官别急,我们继续看ObserveOnObserver的订阅方法:
image.png

我们的重点不在下游的observer订阅监听这,在ObserveOnObserver的onNext方法中,会调用schedule方法,最终是通过HandlerWorker的schedule执行ObserveOnObserver,因为ObserveOnObserver也是一个runnable实现类,HandlerWorker中的schedule方法是通过主线程的Handler给主线程发送了一个Message,所以我们回到ObserveOnObserver的run方法,在run方法中会执行下游的onNext、onError等方法,所以这就是为什么observeOn能让observer能在主线程中执行。

小总结
observeOn实际是创建了ObservableObserveOn的Observable,它的订阅方法里面创建了ObserveOnObserver,而ObserveOnObserver是实现了Runnable接口,把它包装成message给主线程的Handler发送一条消息,而ObserveOnObserver的run方法中会给下游的Observer发送数据。所以这就是observeOn能让observer在哪个线程中执行。

RxJava的subscribeOn只有第一次生效?

如果你理解了订阅的过程,其实该问题很好理解,subscribeOn是规定上游的observable在哪个线程中执行,如果我们执行多次的subscribeOn的话,从下游的observer到上游的observable的订阅过程,最开始调用的subscribeOn返回的observable会把后面执行的subscribeOn返回的observable给覆盖了,因此我们感官的是只有第一次的subscribeOn能生效。

那如何才能知道它实际在里面生效了呢,我们可以通过doOnSubscribe来监听切实发生线程切换了

RxJava的observeOn多次调用哪个有效?

上面分析了observeOn是指定下游的observer在哪个线程中执行,所以这个更好理解,看observeOn下一个observer是哪一个,所以多次调用observeOn肯定是最后一个observeOn控制有效。

RxJava1.0、RxJava2.0、RxJava3.0有什么区别?

RxJava2.0相比于RxJava1.0

  • 添加背压的策略Flowable
  • 添加Observer的变体consumer
  • ActionN 和 FuncN 改名(Action0 改名成Action,Action1改名成Consumer,而Action2改名成了BiConsumer,而Action3 - Action9都不再使用了,ActionN变成了Consumer<Object[]> 。Func改名成Function,Func2改名成BiFunction,Func3 - Func9 改名成 Function3 - Function9,FuncN 由 Function<Object[], R> 取代。)
  • Observable.OnSubscribe 变成 ObservableOnSubscribe
  • ObservableOnSubscribe 中使用 ObservableEmitter 发射数据给 Observer,在RxJava中使用Subscriber发射数据。
  • Subscription 改名为 Disposable

RxJava3.0相比与RxJava2.0

  • 提供Java 8 lambda友好的API
  • 删除Maybe.toSingle(T)
  • 删除Flowable.subscribe(4 args)
  • 删除Observable.subscribe(4 args)
  • 删除Single.toCompletable()

RxJava内存泄漏问题?

  • 在页面销毁后,Observable仍然还有事件等待发送和处理,这个时候会导致Activity回收失败,从而内存泄漏。

  • 解决rxjava内存泄漏的主要方式是在页面关闭之前取消所有的订阅。
    a.使用Disposable,关闭页面时调用dispose()取消订阅
    b.使用CompositeDisposable,添加一组Disposable,在关闭页面时同时取消订阅。

  • 使用框架自定义取消订阅,其原理是跟Activity生命周期进行绑定,在摧毁是取消订阅。(RxLifeCycle)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351

推荐阅读更多精彩内容