响应式编程(reactive programming)简介 -- 基于RxJava2

1、什么是响应式编程 ?

部分内容参考自:《RxJava Essentials 中文版》by yuxingxin

响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。

1.1 时间表

  • 90年代后期
    受微软的一名计算机科学家Erik Meijer启发的思想,用来设计和开发微软的Rx库。
    Rx是微软.NET的一个响应式扩展。Rx借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。开发者可以使用Observables模拟异步数据流,使用LINQ语法查询Observables,并且很容易管理调度器的并发。
    Rx让众所周知的概念变得易于实现和消费,例如push方法。在响应式的世界里,我们不能假装作用户不关注或者是不抱怨它而一味的等待函数的返回结果,网络调用,或者数据库查询的返回结果。我们时刻都在等待某些东西,这就让我们失去了并行处理其他事情的机会,提供更好的用户体验,让我们的软件免受顺序链的影响,而阻塞编程。
  • 2012年
    Netflix在2012年开始意识到他们的架构要满足他们庞大的用户群体已经变得步履维艰。因此他们决定重新设计架构来减少REST调用的次数。取代几十次的REST调用,而是让客户端自己处理需要的数据,他们决定基于客户端需求创建一个专门优化过的REST调用。
  • 2013年
    2013年2月份,Ben ChristensenJafar Husain发在Netflix技术博客的一篇文章第一次向世界展示了RxJava
  • 2014年
    2014年9月份,发布RxJava 1.0.0正式版。
  • 2016年
    2016年9月份,发布RxJava 2.0.0正式版。
  • 2020年
    2020年2月份,发布RxJava 3.0.0正式版。

1.2 定义

这里只贴出链接,不做介绍,感兴趣的自行查看。

1.3 更友好的介绍

参考自:《什么是响应式编程?》by 享学IT
介绍了响应式编程的三大特点:变化传递(propagation of change)基于数据流(data stream)声明式(declarative)
具体形象的例子:“堪称“响应式典范”的强大的生产力工具——电子表格”

1.3.1 【满199减40活动】购物计划

下方【购物计划表】中,【单价】【数量】是原始输入,【商品金额】跟随【单价】和【数量】的变化而变化,【满199减40】跟随【商品金额】的变化而变化,以此类推,【订单总金额】、【邮费】、【最终应付款】也跟随相应的项的变化而变化。具体的公式以及变化的传递流向见【购物计划表 公式】

  • 变化传递(propagation of change)
  • 基于数据流(data stream)
  • 声明式(declarative)
购物计划表

购物计划表 公式

2、扩展的观察者模式

2.1 观察者模式

观察者模式
  • Observable
    可观察对象,也有叫做Source,内部维护一组观察者observers,当event有更新时,observableevent推(push)给observer
  • Observer
    观察者,也有叫做ConsumerSubscriber,观察observable,接收observable推(push)过来的event,做出相应的反应(不同的observer的反应可能不一样)。
  • Event
    observer所关心的事件event
  • subscrbe
    observerobservable连接起来的操作,叫做订阅(subscribe)。

2.2 扩展的观察者

上述4个概念,也就是RxJava中,最基本的几个概念。Observer通过subscribe方法订阅Observable,从而,在Event有变化时,Observable可以分发给Observer

  • Observable
  • Observer
  • Event
  • subscribe

与传统的观察者模式不同的是,RxJava不光会通过onNext方法分发普通事件(相当于上节描述的Observer中的accept方法),另外还会通过onCompleteonError方法分发两个特殊事件。

  • onComplete
    事件流已完成。表明事件流已成功发出所有的事件,后续不会再有新的事件发出。(成功结束)
  • onError
    事件流异常。表明由于发生异常,事件流将被打断,后续不会再有新的事件发出。(异常结束;特殊情况下,可能会人为在事件流过程中刻意发出error事件)

在一个正确实现的事件流中,都应该有一个onCompleteonError作为事件流的最后一个事件,并且这两者也是互斥,发出了其中一个事件,另一个事件就不应该再被发出。

2.3 RxJava = Observer + 异步处理

本节参考自马士兵教育视频

RxJava = Observer + 异步处理

3、相关概念

3.1 函数式编程

  • 函数式编程是与面向对象编程有差异的一个编程范式,函数式编程是一个很大的领域,本文不打算对此做深入分析、介绍;
  • 在函数式编程范式中,函数是头等公民,可以独立存在(不像面向对象,函数或称为方法,必须属于某个类);并且,函数可以作为方法的入参,也可以作为方法的返回值;
  • Java是纯面向对象语言,本质上是不支持函数式编程的,但是,通过函数式接口(一个有且仅有一个抽象方法的接口),可以部分模拟函数式编程;

3.2 函数式接口

  • 直接看例子
        // 函数式接口实例,最常见的Runnable接口
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("Runnable is a Functional Interface");
            }
        };
    
        // 函数式接口FunctionalInterface,只有一个accept方法,有入参和返回值
        interface FunctionalInterface {
            String accept(int i1, int i2);
        }
    
        // FunctionalInterface实例
        FunctionalInterface functionalInterface = new FunctionalInterface() {
            @Override
            public String accept(int i1, int i2) {
                return String.valueOf(i1 + i2);
            }
        };
    

3.3 lambda表达式

一开始不习惯的情况下,可以先像上一节那样,先按显性new实例的方式写出代码,然后光标移动到Android Studio标成灰色字的部分(new FunctionalInterface处),敲击alt + enter,即可通过IDE直接进行lambda改造。

  • Lambda 表达式,也可称为闭包,它是推动 Java 8 发布的最重要新特性。
  • Lambda 允许把函数作为一个方法的参数(函数作为参数传递进方法中)。
  • 使用 Lambda 表达式可以使代码变的更加简洁紧凑。(下一章,看示例代码会深有感触
  • 我们把上节的例子做一下lambda改造
        // Runnable实例,lambda形式,() -> { statement; };
        Runnable runnable = () -> {
            System.out.println("Runnable is a Functional Interface");
        };
    
        // 当方法体只有一行时,可以进一步简写,() -> statement;
        Runnable runnable = () -> System.out.println("Runnable is a Functional Interface");
        
        // FunctionalInterface实例,lambda形式,(param1, param2, ...) -> { return expression; };
        FunctionalInterface functionalInterface = (i1, i2) -> {
            return String.valueOf(i1 + i2);
        };
    
        // 当方法体只有一行时,可以进一步简写,(param1, param2, ...) -> expression;
        FunctionalInterface functionalInterface = (i1, i2) -> String.valueOf(i1 + i2);
    

4、RxJava2的使用

基于以下RxJava版本:'io.reactivex.rxjava2:rxjava:2.2.11'

4.1 最简单的示例(create创建、subscribe订阅)

  • 示例代码

        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                System.out.println(Thread.currentThread().getName() + " start to emit");
                emitter.onNext("Hello");
                emitter.onNext("CodingDog1024");
            }
        });
    
        Consumer<String> consumer = new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(Thread.currentThread().getName() + " consumer accept: " + s);
            }
        };
    
        observable.subscribe(consumer);
    
  • 输出

        main start to emit
        main consumer accept: Hello
        main consumer accept: CodingDog1024
    
  • lambda化 (对比前面的代码,可以看到代码明显简洁紧凑很多)

        Observable<String> observable = Observable.create(emitter -> {
            System.out.println(Thread.currentThread().getName() + " start to emit");
            emitter.onNext("Hello");
            emitter.onNext("CodingDog1024");
        });
    
        Consumer<String> consumer = s -> System.out.println(Thread.currentThread().getName() + " consumer accept: " + s);
    
        observable.subscribe(consumer);
    
  • 链式调用 (可以看到,结合lambda和链式调用,代码更加的紧凑,少了很多干扰性的代码,让我们可以更加聚焦于业务逻辑)

        Observable
                .<String>create(emitter -> {
                    System.out.println(Thread.currentThread().getName() + " start to emit");
                    emitter.onNext("Hello");
                    emitter.onNext("CodingDog1024");
                })
                .subscribe(s -> System.out.println(Thread.currentThread().getName() + " consumer accept: " + s));
    
  • 后续的示例代码都会以lambda和链式调用结合的方式给出,除非特别需要说明具体类型的情况

4.2 map转换

  • 现在我们来将上一节例子里的String转成大写字母;

  • 相信很多人的第一反应是在subscribe方法里,打印时调用stoUpperCase方法;这个方式当然可以实现转换成大写的需求;

  • 但是,从业务逻辑解耦、代码复用的角度,我们希望不要改动到原有代码(只扩展新逻辑、不修改原逻辑,开闭原则)。consumer的逻辑保持最简单(拿到String,显示就是,没有任何其他复杂逻辑)

  • 此时,我们可以使用map操作符

        Observable
                .<String>create(emitter -> {
                    System.out.println(Thread.currentThread().getName() + " start to emit");
                    emitter.onNext("Hello");
                    emitter.onNext("CodingDog1024");
                })
                .map(s -> {
                    String upperCase = s.toUpperCase();
                    System.out.println(Thread.currentThread().getName() + " map "  + s + " -> " + upperCase);
                    return upperCase;
                })
                .subscribe(s -> System.out.println(Thread.currentThread().getName() + " consumer accept: " + s));
    
  • 输出

        main start to emit
        main map Hello -> HELLO
        main consumer accept: HELLO
        main map CodingDog1024 -> CODINGDOG1024
        main consumer accept: CODINGDOG1024
    
  • Function<T, R>
    map方法传入的是一个Function<T, R>,泛型TR都为String,为了更清楚的看下map方法,我们将map方法里的lambda恢复成匿名类实例的样子(同时,再次感受lambda的简洁、紧凑)

        Observable
                .<String>create(emitter -> {
                    System.out.println(Thread.currentThread().getName() + " start to emit");
                    emitter.onNext("Hello");
                    emitter.onNext("CodingDog1024");
                })
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        String upperCase = s.toUpperCase();
                        System.out.println(Thread.currentThread().getName() + " map "  + s + " -> " + upperCase);
                        return upperCase;
                    }
                })
                .subscribe(s -> System.out.println(Thread.currentThread().getName() + " consumer accept: " + s));
    
  • 输出(与上面的输出是一样的)

        main start to emit
        main map Hello -> HELLO
        main consumer accept: HELLO
        main map CodingDog1024 -> CODINGDOG1024
        main consumer accept: CODINGDOG1024
    

4.3 异步处理

  • 从上面的输出可以看到,目前的逻辑都是跑在主线程的。现在,我们假设emitter发出事件的操作是耗时操作,我们希望这个操作不要阻塞主线程
  • 此时,我们可以使用调度器(Scheduler),新增一行代码,即可切换线程
  • Thread.sleep(500)只是为了模拟耗时、CountDownLatch只是为了测试代码的顺利执行,对这个流程没有任何影响,忽略即可
        CountDownLatch countDown = new CountDownLatch(2);
    
        Observable
                .<String>create(emitter -> {
                    System.out.println(Thread.currentThread().getName() + " start to emit");
                    Thread.sleep(500);
                    emitter.onNext("Hello");
                    Thread.sleep(500);
                    emitter.onNext("CodingDog1024");
                })
                .subscribeOn(Schedulers.newThread()) // 这一行是新增,其他的完全没变
                .map(s -> {
                    String upperCase = s.toUpperCase();
                    System.out.println(Thread.currentThread().getName() + " map " + s + " -> " + upperCase);
                    return upperCase;
                })
                .subscribe(s -> {
                    System.out.println(Thread.currentThread().getName() + " consumer accept: " + s);
                    countDown.countDown();
                });
    
        countDown.await();
    
  • 输出(可以看到,已经切换到新线程执行)
        RxNewThreadScheduler-1 start to emit
        RxNewThreadScheduler-1 map Hello -> HELLO
        RxNewThreadScheduler-1 consumer accept: HELLO
        RxNewThreadScheduler-1 map CodingDog1024 -> CODINGDOG1024
        RxNewThreadScheduler-1 consumer accept: CODINGDOG1024
    

4.4 不同操作执行在不同线程

  • 示例
        CountDownLatch countDown = new CountDownLatch(2);
    
        Observable
                .<String>create(emitter -> {
                    System.out.println(Thread.currentThread().getName() + " start to emit");
                    Thread.sleep(500);
                    emitter.onNext("Hello");
                    Thread.sleep(500);
                    emitter.onNext("CodingDog1024");
                })
                .subscribeOn(Schedulers.newThread()) // emit操作执行在newThread
                .observeOn(Schedulers.computation()) // 接下去的操作(即map操作)执行在computation
                .map(s -> {
                    String upperCase = s.toUpperCase();
                    System.out.println(Thread.currentThread().getName() + " map " + s + " -> " + upperCase);
                    return upperCase;
                })
                .observeOn(Schedulers.single()) // 接下去的操作(即consumer)执行在single
                .subscribe(s -> {
                    System.out.println(Thread.currentThread().getName() + " consumer accept: " + s);
                    countDown.countDown();
                });
    
        countDown.await();
    
  • 输出
    emit操作执行在RxNewThreadScheduler-1
    map操作执行在RxComputationThreadPool-1
    consumer执行在RxSingleScheduler-1
        RxNewThreadScheduler-1 start to emit
        RxComputationThreadPool-1 map Hello -> HELLO
        RxSingleScheduler-1 consumer accept: HELLO
        RxComputationThreadPool-1 map CodingDog1024 -> CODINGDOG1024
        RxSingleScheduler-1 consumer accept: CODINGDOG1024
    

4.5 subscribeOn 和 observeOn

  • subscribeOn
    • subscribeOn作用于发射事件处(如上一节中的create方法),多次调用subscribeOn方法,将只有离create最近的一处生效。当你需要提供接口给外部调用,如果想要保证发射事件代码执行在指定调度器,则可以直接通过subscribeOn方法设置调度器,接口调用方就算通过subscribeOn方法设置其他的调度器,最终结果也是在你指定的调度器里执行。
    • 示例
          CountDownLatch countDown = new CountDownLatch(2);
      
          Observable
                  .<String>create(emitter -> {
                      System.out.println(Thread.currentThread().getName() + " start to emit");
                      Thread.sleep(500);
                      emitter.onNext("Hello");
                      Thread.sleep(500);
                      emitter.onNext("CodingDog1024");
                  })
                  .subscribeOn(Schedulers.computation()) // 设置为computation调度器,最终结果为执行在computation调度器
                  .subscribeOn(Schedulers.io())  // 设置为io调度器
                  .map(s -> {
                      String upperCase = s.toUpperCase();
                      System.out.println(Thread.currentThread().getName() + " map " + s + " -> " + upperCase);
                      return upperCase;
                  })
                  .subscribeOn(Schedulers.newThread())  // 设置newThread调度器
                  .subscribe(s -> {
                      System.out.println(Thread.currentThread().getName() + " consumer accept: " + s);
                      countDown.countDown();
                  });
      
          countDown.await();
      
    • 输出
          RxComputationThreadPool-1 start to emit
          RxComputationThreadPool-1 map Hello -> HELLO
          RxComputationThreadPool-1 consumer accept: HELLO
          RxComputationThreadPool-1 map CodingDog1024 -> CODINGDOG1024
          RxComputationThreadPool-1 consumer accept: CODINGDOG1024
      
  • observeOn
    • observeOn影响的是该操作符后续的事件所运行的线程,多次调用observeOn方法,每次调用互不影响,可以实现多次切换不同线程。
    • 上一节示例代码已经做了使用到该特性,两次调用observeOn,使map操作执行在computation调度器、subscribe执行在single调度器。
    • Android开发中最常用的场景为:Observable前面的操作都是在工作线程执行(iocomputation等调度器),一切处理逻辑执行妥当后,调用observeOn方法将最终的subscribe切换到UI线程执行(AndroidSchedulers.mainThread()调度器),从而可以在subscribe方法里更新UI。

4.6 Scheduler类型

4.6.1 RxJava内置了5种调度器

本节内容参考自 《RxJava Essentials 中文版》by yuxingxin

  • Schedulers.io()
    这个调度器时用于I/O操作。它基于根据需要,增长或缩减来自适应的线程池。我们将使用它来修复我们之前看到的StrictMode违规做法。由于它专用于I/O操作,所以并不是RxJava的默认方法;正确的使用它是由开发者决定的。
    重点需要注意的是线程池是无限制的,大量的I/O调度操作将创建许多个线程并占用内存。一如既往的是,我们需要在性能和简捷两者之间找到一个有效的平衡点。
  • Schedulers.computation()
    这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认调度器:buffer(),debounce(),delay(),interval(),sample(),skip()
  • Schedulers.immediate()
    这个调度器允许你立即在当前线程执行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默认的调度器。
  • Schedulers.newThread()
    这个调度器正如它所看起来的那样:它为指定任务启动一个新的线程。
  • Schedulers.trampoline()
    当我们想在当前线程执行一个任务时,并不是立即,我们可以用trampoline()将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。它是repeat()retry()方法默认的调度器。

4.6.2 RxAndroid额外提供了一个对应UI主线程的调度器

RxAndroid: 'io.reactivex.rxjava2:rxandroid:2.1.1'

  • AndroidSchedulers.mainThread()
    在该调度器执行的操作,会被封装到MessagesendUI Handler执行。

4.6.3 自定义调度器

  • Schedulers.from(Executor executor)
    如果上述的5种内置调度器都不能满足需求,我们也可以传入自己定义的Executor

5、操作符(Operators)

完整介绍见:ReactiveX Operators

Rx提供了非常非常丰富的操作符,为方便查看操作符含义,ReactiveX官网提供了一种操作符示意图,下面会举几个例子做一下介绍。

5.1 操作符示意图

  • create
    看图的下半部分,横向箭头代表整个事件流、有颜色的图形代表事件、竖线代表完成。


    create

以我们上面写过的示例为例,则是:

----------------Hello------------CodingDog1024---------------|----->
  • map
    下图显示将原事件流里的每个数都乘以10的转换。


    map

    以我们上面写过的示例为例,则是:

----------------Hello------------CodingDog1024---------------|----->
                   map (s -> s.toUpperCase())
----------------HELLO -----------CODINGDOG1024---------------|----->
  • flatMap
    map操作符返回的是事件(如我们例子中的s.toUpperCase())不同,flatMap操作符返回的是Observable。但是,consumer收到的依旧是Observable内包含的数据,因此,称为扁平化(flat)。

    flatMap

    • 可以将我们上面写过的例子做一下改造,使用flatMap操作符达到同样的功能:
          Observable
                  .<String>create(emitter -> {
                      System.out.println(Thread.currentThread().getName() + " start to emit");
                      emitter.onNext("Hello");
                      emitter.onNext("CodingDog1024");
                  })
                  .flatMap(s -> getUpperCase(s))
                  .subscribe(s -> {
                      System.out.println(Thread.currentThread().getName() + " consumer accept: " + s);
                  });
      
      private Observable<String> getUpperCase(String s) {
          return Observable.create(emitter -> {
              String upperCase = s.toUpperCase();
              System.out.println(Thread.currentThread().getName() + " getUpperCase " + s + " -> " + upperCase);
              emitter.onNext(upperCase);
              emitter.onComplete();
          });
      }
      
    • 有人可能会有疑问,flatMap比起map明显难理解得多,为什么要用它。一个比较常见的场景是,想象一下如果上述的getUpperCase方法是其他模块或sdk提供的接口,我们并不清楚其内部实现,但是需要依赖这个接口的功能,此时,就是flatMap的一个派上用场的时候。
  • filter
    小于等于10的数据将被过滤掉。


    filter
    • 示例
          Observable
                  .<String>create(emitter -> {
                      System.out.println(Thread.currentThread().getName() + " start to emit");
                      emitter.onNext("Hello");
                      emitter.onNext("CodingDog1024");
                  })
                  .filter(s -> s.startsWith("C"))
                  .subscribe(s -> {
                      System.out.println(Thread.currentThread().getName() + " consumer accept: " + s);
                  });
      
    • 输出
          main start to emit
          main consumer accept: CodingDog1024
      

以我们上面写过的示例为例,则是:

----------------Hello------------CodingDog1024---------------|----->
                   filter(s -> s.startsWith("C"))
---------------------------------CodingDog1024---------------|----->
  • merge
    将两个以上Observable合并一起,得到的事件将是所有事件流的一个合集。
    merge
    • 示例
          Observable hello = Observable.create(emitter -> {
              System.out.println(Thread.currentThread().getName() + " hello start to emit");
              emitter.onNext("Hello");
          });
      
          Observable codingDog = Observable.create(emitter -> {
              System.out.println(Thread.currentThread().getName() + " codingDog start to emit");
              emitter.onNext("CodingDog1024");
          });
      
          Observable
                  .merge(hello, codingDog)
                  .subscribe(s -> {
                      System.out.println(Thread.currentThread().getName() + " consumer accept: " + s);
                  });
      
    • 输出
          main hello start to emit
          main consumer accept: Hello
          main codingDog start to emit
          main consumer accept: CodingDog1024
      
----------------Hello----------------------------------------|----->
---------------------------------CodingDog1024---------------|----->
                           merge
----------------Hello------------CodingDog1024---------------|----->

5.2 Creating Observables

Operators that originate new Observables.

  • Create — create an Observable from scratch by calling observer methods programmatically
  • Defer — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
  • Empty/Never/Throw — create Observables that have very precise and limited behavior
  • From — convert some other object or data structure into an Observable
  • Interval — create an Observable that emits a sequence of integers spaced by a particular time interval
  • Just — convert an object or a set of objects into an Observable that emits that or those objects
  • Range — create an Observable that emits a range of sequential integers
  • Repeat — create an Observable that emits a particular item or sequence of items repeatedly
  • Start — create an Observable that emits the return value of a function
  • Timer — create an Observable that emits a single item after a given delay

5.3 Transforming Observables

Operators that transform items that are emitted by an Observable.

  • Buffer — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time
  • FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
  • GroupBy — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key
  • Map — transform the items emitted by an Observable by applying a function to each item
  • Scan — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
  • Window — periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time

5.4 Filtering Observables

Operators that selectively emit items from a source Observable.

  • Debounce — only emit an item from an Observable if a particular timespan has passed without it emitting another item
  • Distinct — suppress duplicate items emitted by an Observable
  • ElementAt — emit only item n emitted by an Observable
  • Filter — emit only those items from an Observable that pass a predicate test
  • First — emit only the first item, or the first item that meets a condition, from an Observable
  • IgnoreElements — do not emit any items from an Observable but mirror its termination notification
  • Last — emit only the last item emitted by an Observable
  • Sample — emit the most recent item emitted by an Observable within periodic time intervals
  • Skip — suppress the first n items emitted by an Observable
  • SkipLast — suppress the last n items emitted by an Observable
  • Take — emit only the first n items emitted by an Observable
  • TakeLast — emit only the last n items emitted by an Observable

5.5 Combining Observables

Operators that work with multiple source Observables to create a single Observable

  • And/Then/When — combine sets of items emitted by two or more Observables by means of Pattern and Plan intermediaries
  • CombineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
  • Join — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable
  • Merge — combine multiple Observables into one by merging their emissions
  • StartWith — emit a specified sequence of items before beginning to emit the items from the source Observable
  • Switch — convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables
  • Zip — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

6、Subject = Observable + Observer

subject是一个神奇的对象,它可以是一个Observable同时也可以是一个Observer:它作为连接这两个世界的一座桥梁。一个Subject可以订阅一个Observable,就像一个Observer,并且它可以发射新的数据,或者传递它接受到的数据,就像一个Observable。很明显,作为一个Observable,观察者们或者其它Subject都可以订阅它。
RxJava提供了4种不同的Subject

  • PublishSubject
    PublishSubject会向他的订阅者发送订阅后的数据流。
  • BehaviorSubject
    BehaviorSubject会首先向他的订阅者发送截至订阅前最新的一个数据对象(或初始值),然后正常发送订阅后的数据流。
  • ReplaySubject
    ReplaySubject会缓存它所订阅的所有数据,向任意一个订阅它的观察者重发。
  • AsyncSubject
    Observable完成时AsyncSubject只会发布最后一个数据给已经订阅的每一个观察者。

6.1 PublishSubject

  • 示例
        PublishSubject<String> publishSubject = PublishSubject.create();
    
        Consumer<String> consumer0 = s -> System.out.println(Thread.currentThread().getName() + " consumer0 accept: " + s);
        publishSubject.subscribe(consumer0);
    
        publishSubject.onNext("Hello");
        publishSubject.onNext("CodingDog1024");
    
        Consumer<String> consumer1 = s -> System.out.println(Thread.currentThread().getName() + " consumer1 accept: " + s);
        publishSubject.subscribe(consumer1);
    
        publishSubject.onNext("I");
        publishSubject.onNext("am");
        publishSubject.onNext("RxJava");
    
  • 输出
        main consumer0 accept: Hello
        main consumer0 accept: CodingDog1024
        main consumer0 accept: I
        main consumer1 accept: I
        main consumer0 accept: am
        main consumer1 accept: am
        main consumer0 accept: RxJava
        main consumer1 accept: RxJava
    
  • 说明
    consumer0收到完整的"Hello""CodingDog1024""I""am""RxJava"
    consumer1只收到其订阅之后来到的"I""am""RxJava"
    PublishSubject的行为就类似我们常见的addXXXListener注册监听,consumer可以接收到其订阅之后的所有event

6.1 BehaviorSubject

  • 示例
        BehaviorSubject<String> behaviorSubject = BehaviorSubject.create();
    
        Consumer<String> consumer0 = s -> System.out.println(Thread.currentThread().getName() + " consumer0 accept: " + s);
        behaviorSubject.subscribe(consumer0);
    
        behaviorSubject.onNext("Hello");
        behaviorSubject.onNext("CodingDog1024");
    
        Consumer<String> consumer1 = s -> System.out.println(Thread.currentThread().getName() + " consumer1 accept: " + s);
        behaviorSubject.subscribe(consumer1);
    
        behaviorSubject.onNext("I");
        behaviorSubject.onNext("am");
        behaviorSubject.onNext("RxJava");
    
  • 输出
        main consumer0 accept: Hello
        main consumer0 accept: CodingDog1024
        main consumer1 accept: CodingDog1024
        main consumer0 accept: I
        main consumer1 accept: I
        main consumer0 accept: am
        main consumer1 accept: am
        main consumer0 accept: RxJava
        main consumer1 accept: RxJava
    
  • 说明
    consumer0收到完整的"Hello""CodingDog1024""I""am""RxJava"
    consumer1收到"CodingDog1024""I""am""RxJava"。(比上节PublishSubject的例子,多了"CodingDog1024")
    订阅BehaviorSubject时,consumer会先收到最新的一个event,然后再接收到之后到来的所有event
  • 使用场景
    在很常见的【先获取一次值,执行逻辑,然后值变化时需要重新执行逻辑】的场景下,使用BehaviorSubject可以很自然的实现。

7、How it works ?

基于以下RxJava版本:'io.reactivex.rxjava2:rxjava:2.2.11'

7.1 Observable.create() & subscribe()

7.1.1 示例 & 实现代码走读

  • sample

        // (1)实例化【observableOnSubscribe1】
        ObservableOnSubscribe<String> observableOnSubscribe1 = new ObservableOnSubscribe<String>() {
            // (11)第(10)步中,调用了subscribe方法,入参为parent,parent为第(5)步observer的一个代理
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                // (12)调用emitter onNext方法。通过parent,最终转调到第(5)步observer的onNext方法
                emitter.onNext("CodingDog1024");
            }
        };
    
        // (4)返回值为一个ObservableCreate实例
        Observable<String> observable1 = Observable.create(observableOnSubscribe1);
    
        // (5)实例化【observer1】
        Observer observer1 = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }
    
            // (13) 收到事件/数据
            @Override
            public void onNext(String s) {
                System.out.println("onNext " + s);
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("Throwable " + e.getMessage());
            }
    
            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
    
        // (6)订阅
        observable1.subscribe(observer1);
    
  • 输出

        onSubscribe
        onNext CodingDog1024
    
  • create源码

        // 代码出处:Observable
        // (2)source即为【observableOnSubscribe1】
        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { 
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    
        // 代码出处:RxJavaPlugins
        public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
            // 默认onObservableAssembly为null,因此,该方法可以视为直接return source
            Function<? super Observable, ? extends Observable> f = onObservableAssembly;
            if (f != null) {
                return apply(f, source);
            }
            return source;
        }
        
        // 代码出处:ObservableCreate
        public final class ObservableCreate<T> extends Observable<T> {
            // source即为【observableOnSubscribe1】
            final ObservableOnSubscribe<T> source; 
        
            // (3)source即为【observableOnSubscribe1】
            public ObservableCreate(ObservableOnSubscribe<T> source) { 
                this.source = source;
            }
        
             // (9)observer即为【observer1】
            @Override
            protected void subscribeActual(Observer<? super T> observer) { 
                // CreateEmitter为observer的代理,增加一些异常处理,可以先直接理解为只是转调observer的方法
                CreateEmitter<T> parent = new CreateEmitter<T>(observer); 
                observer.onSubscribe(parent);
        
                try {
                     // (10)该subscribe方法即为第(11)步的【subscribe】方法,这里就是create和subscribe的连接处!!!
                    source.subscribe(parent);
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    parent.onError(ex);
                }
            }
            ... ...
        }
    
  • subscribe源码

         // 代码出处:Observable
         // (7)observer即为【observer1】
        public final void subscribe(Observer<? super T> observer) {
                ... ...
                observer = RxJavaPlugins.onSubscribe(this, observer);
                subscribeActual(observer);
                ... ...
        }
        
        // 代码出处:Observable
        // (8)observer即为【observer1】
        protected abstract void subscribeActual(Observer<? super T> observer); 
    

7.1.2 简单概括一下

  • sample代码

        // (1)实例化【observableOnSubscribe1】
        ObservableOnSubscribe<String> observableOnSubscribe1 = new ObservableOnSubscribe<String>() {
            // (11)第(10)步中,调用了subscribe方法,入参为parent,parent为第(5)步observer的一个代理
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                // (12)调用emitter onNext方法。通过parent,最终转调到第(5)步observer的onNext方法
                emitter.onNext("CodingDog1024");
            }
        };
    
        // (4)返回值为一个ObservableCreate实例
        Observable<String> observable1 = Observable.create(observableOnSubscribe1);
    
        // (5)实例化【observer1】
        Observer observer1 = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }
    
            // (13) 收到事件/数据
            @Override
            public void onNext(String s) {
                System.out.println("onNext " + s);
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("Throwable " + e.getMessage());
            }
    
            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
    
        // (6)订阅
        observable1.subscribe(observer1);
    
  • 文字概括

    1. Observable.create方法入参为一个ObservableOnSubscribe实例observableOnSubscribe1,返回值为一个ObservableCreate实例observable1observable1持有observableOnSubscribe1source
    2. 以入参Observer实例observer1调用subscribe方法时,实际调用的就是ObservableCreatesubscribe方法,接着调到ObservableCreatesubscribeActual(Observer observer)方法
    3. subscribeActual方法里new了一个CreateEmitter实例parentparent持有上述observer1
    4. subscribeActual方法里执行source.subscribe(parent),这个source即为第1步的入参source,因此,此处就是执行了observableOnSubscribe1subscribe方法
    5. observableOnSubscribe1subscribe方法执行了
              emitter.onNext("CodingDog1024");
      
    6. 也就是执行了第3步中CreateEmitter实例parentonNext方法
    7. CreateEmitter的核心逻辑是调用持有的Observer实例observer的对应方法,即调用第2步入参Observer实例observer1onNext方法,即打印log的方法
          @Override
          public void onNext(String s) {
              System.out.println("onNext " + s);
          }
      
  • 提取最核心逻辑,简化理解
    前一段总共列了7步,出现了数量众多的类,理解起来稍微有点复杂。我们做一下简化,提取最核心逻辑,上述分析中,ObservableCreateObservable描述的是同一个东西,我们统一视为ObservableCreateEmitter对象parent其实就是Observer对象observer的一个代理,最终调用的是observer的方法,我们暂时忽略代理类逻辑,直接将两者统一视为observer;这么合并之后,前一段的分析就变成了:

    1. Observable.create方法入参为observableOnSubscribe1(或者称为source),返回值为observableobservable持有source
    2. observable.subscribe(observer),最终会调用到sourcesubscribe方法(入参为observer
    3. sourcesubscribe方法里,执行了observeronNext方法
Observable create subscribe

7.2 Observable.map()

7.2.1 示例 & 实现代码走读

  • sample
    在上一节例子基础上,修改3处,见下方修改点1 2 3。

        // (1)实例化【observableOnSubscribe1】
        ObservableOnSubscribe<String> observableOnSubscribe1 = new ObservableOnSubscribe<String>() {
            // (11)第(10)步中,调用了subscribe方法,入参为parent,parent为第(5)步observer的一个代理
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                // (12)调用emitter onNext方法。通过parent,最终转调到第(5)步observer的onNext方法
                emitter.onNext("CodingDog1024");
            }
        };
    
        // (4)返回值为一个ObservableCreate实例
        Observable<String> observable1 = Observable.create(observableOnSubscribe1);
    
        // 修改点1:新增将String转成大写的mapper
        Function<String, String> mapper1 = new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return s.toUpperCase();
            }
        };
    
        // 修改点2:observable1应用mapper得到新的Observable实例observable2,具体类型为ObservableMap
        Observable<String> observable2 = observable1.map(mapper1);
    
        // (5)实例化【observer1】
        Observer observer1 = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }
    
            // (13) 收到事件/数据
            @Override
            public void onNext(String s) {
                System.out.println("onNext " + s);
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("Throwable " + e.getMessage());
            }
    
            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
    
        // 修改点3:订阅的是应用了mapper后的observable2
        // (6)订阅
        observable2.subscribe(observer1);
    
  • 输出

        onSubscribe
        onNext CODINGDOG1024
    
  • map源码

       // 代码出处:Observable
        public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
            ObjectHelper.requireNonNull(mapper, "mapper is null");
            // observable1上调用的map方法,因此,this就是observable1,mapper就是mapper1
            return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
        }
    
        // 代码出处:ObservableMap
        public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
            final Function<? super T, ? extends U> function;
        
             // source就是observable1,function就是mapper1
            public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
                super(source);
                this.function = function;
            }
        
            @Override
            public void subscribeActual(Observer<? super U> t) {
                // source就是observable1,function就是mapper1,t就是observer1
                source.subscribe(new MapObserver<T, U>(t, function));
            }
            ... ...
        }
    
        // 代码出处:ObservableMap
        static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
            final Function<? super T, ? extends U> mapper;
    
            // actual就是observer1,mapper就是mapper1
            MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
                // super方法,将actual赋给downstream,也就是说:
                // downstream就是observer1
                super(actual);
                this.mapper = mapper;
            }
    
            @Override
            public void onNext(T t) {
                ... ...
                U v;
                ... ...
                    // 调用mapper.apply,将上游的输入 t 转为 v 
                    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
                ... ...
                // 调用下游的onNext方法,入参为 v 
                downstream.onNext(v);
            }
            ... ...
        }
    
  • 分析


    Observable create map subscribe

7.3 Observable.filter()

7.3.1 示例 & 实现代码走读

  • sample

            ObservableOnSubscribe<String> observableOnSubscribe1 = new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("Hello");
                    emitter.onNext("CodingDog1024");
                }
            };
    
            // (4)返回值为一个ObservableCreate实例
            Observable<String> observable1 = Observable.create(observableOnSubscribe1);
    
            Predicate<String> predicate1 = new Predicate<String>() {
                @Override
                public boolean test(String s) throws Exception {
                    return s.startsWith("C");
                }
            };
    
            // observable2具体类型为ObservableFilter,内部只有predicate1
            Observable<String> observable2 = observable1.filter(predicate1);
    
            Observer observer1 = new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("onSubscribe");
                }
    
                @Override
                public void onNext(String s) {
                    System.out.println("onNext " + s);
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("Throwable " + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            };
    
            observable2.subscribe(observer1);
    
  • 输出

        onSubscribe
        onNext CodingDog1024
    
  • filter源码

        // 代码出处:Observable
        public final Observable<T> filter(Predicate<? super T> predicate) {
            ObjectHelper.requireNonNull(predicate, "predicate is null");
            // observable1上调用的filter方法,因此,this就是observable1,predicate就是predicate1
            return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
        }
    
        // 代码出处:ObservableFilter
        public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> {
            final Predicate<? super T> predicate;
    
            // source就是observable1,predicate就是predicate1
            public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {
                super(source);
                this.predicate = predicate;
            }
        
            @Override
            public void subscribeActual(Observer<? super T> observer) {
                // source就是observable1,predicate就是predicate1,observer就是observer1
                source.subscribe(new FilterObserver<T>(observer, predicate));
            }
            ... ...
        }
    
        // 代码出处:ObservableFilter
        static final class FilterObserver<T> extends BasicFuseableObserver<T, T> {
            final Predicate<? super T> filter;
    
            // actual就是observer1,filter就是predicate1
            FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) {
                // super方法,将actual赋给downstream,也就是说:
                // downstream就是observer1
                super(actual);
                this.filter = filter;
            }
    
            @Override
            public void onNext(T t) {
                    ... ...
                    boolean b;
                    ... ...
                        // 调用filter.test方法,入参为上游的输入 t 
                        b = filter.test(t);
                    ... ...
                    // filter.test结果为true,才调用下游的onNext方法,入参为 t
                    if (b) {
                        downstream.onNext(t);
                    }
                    ... ...
            }
            ... ...
        }
    
  • 分析


    Observable create filter subscribe
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342