1、什么是响应式编程 ?
响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。
1.1 时间表
- 90年代后期
受微软的一名计算机科学家Erik Meijer启发的思想,用来设计和开发微软的Rx库。
Rx
是微软.NET
的一个响应式扩展。Rx
借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。开发者可以使用Observables模拟异步数据流,使用LINQ
语法查询Observables,并且很容易管理调度器的并发。
Rx
让众所周知的概念变得易于实现和消费,例如push
方法。在响应式的世界里,我们不能假装作用户不关注或者是不抱怨它而一味的等待函数的返回结果,网络调用,或者数据库查询的返回结果。我们时刻都在等待某些东西,这就让我们失去了并行处理其他事情的机会,提供更好的用户体验,让我们的软件免受顺序链的影响,而阻塞编程。 - 2012年
Netflix在2012年开始意识到他们的架构要满足他们庞大的用户群体已经变得步履维艰。因此他们决定重新设计架构来减少REST
调用的次数。取代几十次的REST
调用,而是让客户端自己处理需要的数据,他们决定基于客户端需求创建一个专门优化过的REST
调用。 - 2013年
2013年2月份,Ben Christensen和Jafar Husain发在Netflix技术博客的一篇文章第一次向世界展示了RxJava
。 - 2014年
2014年9月份,发布RxJava 1.0.0
正式版。 - 2016年
2016年9月份,发布RxJava 2.0.0
正式版。 - 2020年
2020年2月份,发布RxJava 3.0.0
正式版。
1.2 定义
这里只贴出链接,不做介绍,感兴趣的自行查看。
1.3 更友好的介绍
参考自:《什么是响应式编程?》by 享学IT
介绍了响应式编程的三大特点:变化传递(propagation of change)、基于数据流(data stream)、声明式(declarative)。
具体形象的例子:“堪称“响应式典范”的强大的生产力工具——电子表格”
1.3.1 【满199减40活动】购物计划
下方【购物计划表】中,【单价】【数量】是原始输入,【商品金额】跟随【单价】和【数量】的变化而变化,【满199减40】跟随【商品金额】的变化而变化,以此类推,【订单总金额】、【邮费】、【最终应付款】也跟随相应的项的变化而变化。具体的公式以及变化的传递流向见【购物计划表 公式】。
- 变化传递(propagation of change)
- 基于数据流(data stream)
- 声明式(declarative)
2、扩展的观察者模式
2.1 观察者模式
- Observable
可观察对象,也有叫做Source
,内部维护一组观察者observers
,当event
有更新时,observable
将event
推(push)给observer
。 - Observer
观察者,也有叫做Consumer
、Subscriber
,观察observable
,接收observable
推(push)过来的event
,做出相应的反应(不同的observer
的反应可能不一样)。 - Event
observer
所关心的事件event
。 - subscrbe
将observer
和observable
连接起来的操作,叫做订阅(subscribe
)。
2.2 扩展的观察者
上述4个概念,也就是RxJava
中,最基本的几个概念。Observer
通过subscribe
方法订阅Observable
,从而,在Event
有变化时,Observable
可以分发给Observer
。
- Observable
- Observer
- Event
- subscribe
与传统的观察者模式不同的是,RxJava
不光会通过onNext
方法分发普通事件(相当于上节描述的Observer
中的accept
方法),另外还会通过onComplete
和onError
方法分发两个特殊事件。
- onComplete
事件流已完成。表明事件流已成功发出所有的事件,后续不会再有新的事件发出。(成功结束) - onError
事件流异常。表明由于发生异常,事件流将被打断,后续不会再有新的事件发出。(异常结束;特殊情况下,可能会人为在事件流过程中刻意发出error
事件)
在一个正确实现的事件流中,都应该有一个onComplete
或onError
作为事件流的最后一个事件,并且这两者也是互斥,发出了其中一个事件,另一个事件就不应该再被发出。
2.3 RxJava = Observer + 异步处理
本节参考自马士兵教育视频
3、相关概念
3.1 函数式编程
- 函数式编程是与面向对象编程有差异的一个编程范式,函数式编程是一个很大的领域,本文不打算对此做深入分析、介绍;
- 在函数式编程范式中,函数是头等公民,可以独立存在(不像面向对象,函数或称为方法,必须属于某个类);并且,函数可以作为方法的入参,也可以作为方法的返回值;
- Java是纯面向对象语言,本质上是不支持函数式编程的,但是,通过函数式接口(一个有且仅有一个抽象方法的接口),可以部分模拟函数式编程;
3.2 函数式接口
- 直接看例子
// 函数式接口实例,最常见的Runnable接口 Runnable runnable = new Runnable() { @Override public void run() { System.out.println("Runnable is a Functional Interface"); } }; // 函数式接口FunctionalInterface,只有一个accept方法,有入参和返回值 interface FunctionalInterface { String accept(int i1, int i2); } // FunctionalInterface实例 FunctionalInterface functionalInterface = new FunctionalInterface() { @Override public String accept(int i1, int i2) { return String.valueOf(i1 + i2); } };
3.3 lambda表达式
一开始不习惯的情况下,可以先像上一节那样,先按显性new实例的方式写出代码,然后光标移动到Android Studio标成灰色字的部分(new FunctionalInterface处),敲击alt + enter,即可通过IDE直接进行lambda改造。
- Lambda 表达式,也可称为闭包,它是推动 Java 8 发布的最重要新特性。
- Lambda 允许把函数作为一个方法的参数(函数作为参数传递进方法中)。
- 使用 Lambda 表达式可以使代码变的更加简洁紧凑。(下一章,看示例代码会深有感触)
- 我们把上节的例子做一下lambda改造
// Runnable实例,lambda形式,() -> { statement; }; Runnable runnable = () -> { System.out.println("Runnable is a Functional Interface"); }; // 当方法体只有一行时,可以进一步简写,() -> statement; Runnable runnable = () -> System.out.println("Runnable is a Functional Interface"); // FunctionalInterface实例,lambda形式,(param1, param2, ...) -> { return expression; }; FunctionalInterface functionalInterface = (i1, i2) -> { return String.valueOf(i1 + i2); }; // 当方法体只有一行时,可以进一步简写,(param1, param2, ...) -> expression; FunctionalInterface functionalInterface = (i1, i2) -> String.valueOf(i1 + i2);
4、RxJava2的使用
基于以下RxJava版本:
'io.reactivex.rxjava2:rxjava:2.2.11'
4.1 最简单的示例(create创建、subscribe订阅)
-
示例代码
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { System.out.println(Thread.currentThread().getName() + " start to emit"); emitter.onNext("Hello"); emitter.onNext("CodingDog1024"); } }); Consumer<String> consumer = new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(Thread.currentThread().getName() + " consumer accept: " + s); } }; observable.subscribe(consumer);
-
输出
main start to emit main consumer accept: Hello main consumer accept: CodingDog1024
-
lambda化 (对比前面的代码,可以看到代码明显简洁紧凑很多)
Observable<String> observable = Observable.create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); emitter.onNext("Hello"); emitter.onNext("CodingDog1024"); }); Consumer<String> consumer = s -> System.out.println(Thread.currentThread().getName() + " consumer accept: " + s); observable.subscribe(consumer);
-
链式调用 (可以看到,结合lambda和链式调用,代码更加的紧凑,少了很多干扰性的代码,让我们可以更加聚焦于业务逻辑)
Observable .<String>create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); emitter.onNext("Hello"); emitter.onNext("CodingDog1024"); }) .subscribe(s -> System.out.println(Thread.currentThread().getName() + " consumer accept: " + s));
后续的示例代码都会以lambda和链式调用结合的方式给出,除非特别需要说明具体类型的情况
4.2 map转换
现在我们来将上一节例子里的
String
转成大写字母;相信很多人的第一反应是在
subscribe
方法里,打印时调用s
的toUpperCase
方法;这个方式当然可以实现转换成大写的需求;但是,从业务逻辑解耦、代码复用的角度,我们希望不要改动到原有代码(只扩展新逻辑、不修改原逻辑,开闭原则)。
consumer
的逻辑保持最简单(拿到String
,显示就是,没有任何其他复杂逻辑)-
此时,我们可以使用
map
操作符Observable .<String>create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); emitter.onNext("Hello"); emitter.onNext("CodingDog1024"); }) .map(s -> { String upperCase = s.toUpperCase(); System.out.println(Thread.currentThread().getName() + " map " + s + " -> " + upperCase); return upperCase; }) .subscribe(s -> System.out.println(Thread.currentThread().getName() + " consumer accept: " + s));
-
输出
main start to emit main map Hello -> HELLO main consumer accept: HELLO main map CodingDog1024 -> CODINGDOG1024 main consumer accept: CODINGDOG1024
-
Function<T, R>
map
方法传入的是一个Function<T, R>
,泛型T
和R
都为String
,为了更清楚的看下map
方法,我们将map
方法里的lambda恢复成匿名类实例的样子(同时,再次感受lambda的简洁、紧凑)Observable .<String>create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); emitter.onNext("Hello"); emitter.onNext("CodingDog1024"); }) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { String upperCase = s.toUpperCase(); System.out.println(Thread.currentThread().getName() + " map " + s + " -> " + upperCase); return upperCase; } }) .subscribe(s -> System.out.println(Thread.currentThread().getName() + " consumer accept: " + s));
-
输出(与上面的输出是一样的)
main start to emit main map Hello -> HELLO main consumer accept: HELLO main map CodingDog1024 -> CODINGDOG1024 main consumer accept: CODINGDOG1024
4.3 异步处理
- 从上面的输出可以看到,目前的逻辑都是跑在主线程的。现在,我们假设emitter发出事件的操作是耗时操作,我们希望这个操作不要阻塞主线程
- 此时,我们可以使用调度器(
Scheduler
),新增一行代码,即可切换线程 -
Thread.sleep(500)
只是为了模拟耗时、CountDownLatch
只是为了测试代码的顺利执行,对这个流程没有任何影响,忽略即可CountDownLatch countDown = new CountDownLatch(2); Observable .<String>create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); Thread.sleep(500); emitter.onNext("Hello"); Thread.sleep(500); emitter.onNext("CodingDog1024"); }) .subscribeOn(Schedulers.newThread()) // 这一行是新增,其他的完全没变 .map(s -> { String upperCase = s.toUpperCase(); System.out.println(Thread.currentThread().getName() + " map " + s + " -> " + upperCase); return upperCase; }) .subscribe(s -> { System.out.println(Thread.currentThread().getName() + " consumer accept: " + s); countDown.countDown(); }); countDown.await();
- 输出(可以看到,已经切换到新线程执行)
RxNewThreadScheduler-1 start to emit RxNewThreadScheduler-1 map Hello -> HELLO RxNewThreadScheduler-1 consumer accept: HELLO RxNewThreadScheduler-1 map CodingDog1024 -> CODINGDOG1024 RxNewThreadScheduler-1 consumer accept: CODINGDOG1024
4.4 不同操作执行在不同线程
- 示例
CountDownLatch countDown = new CountDownLatch(2); Observable .<String>create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); Thread.sleep(500); emitter.onNext("Hello"); Thread.sleep(500); emitter.onNext("CodingDog1024"); }) .subscribeOn(Schedulers.newThread()) // emit操作执行在newThread .observeOn(Schedulers.computation()) // 接下去的操作(即map操作)执行在computation .map(s -> { String upperCase = s.toUpperCase(); System.out.println(Thread.currentThread().getName() + " map " + s + " -> " + upperCase); return upperCase; }) .observeOn(Schedulers.single()) // 接下去的操作(即consumer)执行在single .subscribe(s -> { System.out.println(Thread.currentThread().getName() + " consumer accept: " + s); countDown.countDown(); }); countDown.await();
- 输出
emit
操作执行在RxNewThreadScheduler-1
map
操作执行在RxComputationThreadPool-1
consumer
执行在RxSingleScheduler-1RxNewThreadScheduler-1 start to emit RxComputationThreadPool-1 map Hello -> HELLO RxSingleScheduler-1 consumer accept: HELLO RxComputationThreadPool-1 map CodingDog1024 -> CODINGDOG1024 RxSingleScheduler-1 consumer accept: CODINGDOG1024
4.5 subscribeOn 和 observeOn
- subscribeOn
-
subscribeOn
作用于发射事件处(如上一节中的create
方法),多次调用subscribeOn
方法,将只有离create
最近的一处生效。当你需要提供接口给外部调用,如果想要保证发射事件代码执行在指定调度器,则可以直接通过subscribeOn
方法设置调度器,接口调用方就算通过subscribeOn
方法设置其他的调度器,最终结果也是在你指定的调度器里执行。 - 示例
CountDownLatch countDown = new CountDownLatch(2); Observable .<String>create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); Thread.sleep(500); emitter.onNext("Hello"); Thread.sleep(500); emitter.onNext("CodingDog1024"); }) .subscribeOn(Schedulers.computation()) // 设置为computation调度器,最终结果为执行在computation调度器 .subscribeOn(Schedulers.io()) // 设置为io调度器 .map(s -> { String upperCase = s.toUpperCase(); System.out.println(Thread.currentThread().getName() + " map " + s + " -> " + upperCase); return upperCase; }) .subscribeOn(Schedulers.newThread()) // 设置newThread调度器 .subscribe(s -> { System.out.println(Thread.currentThread().getName() + " consumer accept: " + s); countDown.countDown(); }); countDown.await();
- 输出
RxComputationThreadPool-1 start to emit RxComputationThreadPool-1 map Hello -> HELLO RxComputationThreadPool-1 consumer accept: HELLO RxComputationThreadPool-1 map CodingDog1024 -> CODINGDOG1024 RxComputationThreadPool-1 consumer accept: CODINGDOG1024
-
- observeOn
-
observeOn
影响的是该操作符后续的事件所运行的线程,多次调用observeOn
方法,每次调用互不影响,可以实现多次切换不同线程。 - 上一节示例代码已经做了使用到该特性,两次调用
observeOn
,使map操作执行在computation
调度器、subscribe
执行在single
调度器。 - Android开发中最常用的场景为:
Observable
前面的操作都是在工作线程执行(io
、computation
等调度器),一切处理逻辑执行妥当后,调用observeOn
方法将最终的subscribe
切换到UI线程执行(AndroidSchedulers.mainThread()
调度器),从而可以在subscribe
方法里更新UI。
-
4.6 Scheduler类型
4.6.1 RxJava内置了5种调度器
-
Schedulers.io()
这个调度器时用于I/O操作。它基于根据需要,增长或缩减来自适应的线程池。我们将使用它来修复我们之前看到的StrictMode违规做法。由于它专用于I/O操作,所以并不是RxJava的默认方法;正确的使用它是由开发者决定的。
重点需要注意的是线程池是无限制的,大量的I/O调度操作将创建许多个线程并占用内存。一如既往的是,我们需要在性能和简捷两者之间找到一个有效的平衡点。 -
Schedulers.computation()
这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认调度器:buffer()
,debounce()
,delay()
,interval()
,sample()
,skip()
。 -
Schedulers.immediate()
这个调度器允许你立即在当前线程执行你指定的工作。它是timeout()
,timeInterval()
,以及timestamp()
方法默认的调度器。 -
Schedulers.newThread()
这个调度器正如它所看起来的那样:它为指定任务启动一个新的线程。 -
Schedulers.trampoline()
当我们想在当前线程执行一个任务时,并不是立即,我们可以用trampoline()
将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。它是repeat()
和retry()
方法默认的调度器。
4.6.2 RxAndroid额外提供了一个对应UI主线程的调度器
RxAndroid:
'io.reactivex.rxjava2:rxandroid:2.1.1'
-
AndroidSchedulers.mainThread()
在该调度器执行的操作,会被封装到Message
,send
到UI Handler
执行。
4.6.3 自定义调度器
-
Schedulers.from(Executor executor)
如果上述的5种内置调度器都不能满足需求,我们也可以传入自己定义的Executor
。
5、操作符(Operators)
完整介绍见:ReactiveX Operators
Rx
提供了非常非常丰富的操作符,为方便查看操作符含义,ReactiveX官网提供了一种操作符示意图,下面会举几个例子做一下介绍。
5.1 操作符示意图
-
create
看图的下半部分,横向箭头代表整个事件流、有颜色的图形代表事件、竖线代表完成。
以我们上面写过的示例为例,则是:
----------------Hello------------CodingDog1024---------------|----->
-
map
下图显示将原事件流里的每个数都乘以10的转换。
以我们上面写过的示例为例,则是:
----------------Hello------------CodingDog1024---------------|----->
map (s -> s.toUpperCase())
----------------HELLO -----------CODINGDOG1024---------------|----->
-
flatMap
与map
操作符返回的是事件(如我们例子中的s.toUpperCase()
)不同,flatMap
操作符返回的是Observable
。但是,consumer
收到的依旧是Observable
内包含的数据,因此,称为扁平化(flat
)。
- 可以将我们上面写过的例子做一下改造,使用
flatMap
操作符达到同样的功能:Observable .<String>create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); emitter.onNext("Hello"); emitter.onNext("CodingDog1024"); }) .flatMap(s -> getUpperCase(s)) .subscribe(s -> { System.out.println(Thread.currentThread().getName() + " consumer accept: " + s); }); private Observable<String> getUpperCase(String s) { return Observable.create(emitter -> { String upperCase = s.toUpperCase(); System.out.println(Thread.currentThread().getName() + " getUpperCase " + s + " -> " + upperCase); emitter.onNext(upperCase); emitter.onComplete(); }); }
- 有人可能会有疑问,
flatMap
比起map
明显难理解得多,为什么要用它。一个比较常见的场景是,想象一下如果上述的getUpperCase
方法是其他模块或sdk提供的接口,我们并不清楚其内部实现,但是需要依赖这个接口的功能,此时,就是flatMap
的一个派上用场的时候。
- 可以将我们上面写过的例子做一下改造,使用
-
filter
小于等于10的数据将被过滤掉。
- 示例
Observable .<String>create(emitter -> { System.out.println(Thread.currentThread().getName() + " start to emit"); emitter.onNext("Hello"); emitter.onNext("CodingDog1024"); }) .filter(s -> s.startsWith("C")) .subscribe(s -> { System.out.println(Thread.currentThread().getName() + " consumer accept: " + s); });
- 输出
main start to emit main consumer accept: CodingDog1024
- 示例
以我们上面写过的示例为例,则是:
----------------Hello------------CodingDog1024---------------|----->
filter(s -> s.startsWith("C"))
---------------------------------CodingDog1024---------------|----->
- merge
将两个以上Observable
合并一起,得到的事件将是所有事件流的一个合集。
- 示例
Observable hello = Observable.create(emitter -> { System.out.println(Thread.currentThread().getName() + " hello start to emit"); emitter.onNext("Hello"); }); Observable codingDog = Observable.create(emitter -> { System.out.println(Thread.currentThread().getName() + " codingDog start to emit"); emitter.onNext("CodingDog1024"); }); Observable .merge(hello, codingDog) .subscribe(s -> { System.out.println(Thread.currentThread().getName() + " consumer accept: " + s); });
- 输出
main hello start to emit main consumer accept: Hello main codingDog start to emit main consumer accept: CodingDog1024
- 示例
----------------Hello----------------------------------------|----->
---------------------------------CodingDog1024---------------|----->
merge
----------------Hello------------CodingDog1024---------------|----->
5.2 Creating Observables
Operators that originate new Observables.
-
Create
— create an Observable from scratch by calling observer methods programmatically -
Defer
— do not create the Observable until the observer subscribes, and create a fresh Observable for each observer -
Empty
/Never
/Throw
— create Observables that have very precise and limited behavior -
From
— convert some other object or data structure into an Observable -
Interval
— create an Observable that emits a sequence of integers spaced by a particular time interval -
Just
— convert an object or a set of objects into an Observable that emits that or those objects -
Range
— create an Observable that emits a range of sequential integers -
Repeat
— create an Observable that emits a particular item or sequence of items repeatedly -
Start
— create an Observable that emits the return value of a function -
Timer
— create an Observable that emits a single item after a given delay
5.3 Transforming Observables
Operators that transform items that are emitted by an Observable.
-
Buffer
— periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time -
FlatMap
— transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable -
GroupBy
— divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key -
Map
— transform the items emitted by an Observable by applying a function to each item -
Scan
— apply a function to each item emitted by an Observable, sequentially, and emit each successive value -
Window
— periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time
5.4 Filtering Observables
Operators that selectively emit items from a source Observable.
-
Debounce
— only emit an item from an Observable if a particular timespan has passed without it emitting another item -
Distinct
— suppress duplicate items emitted by an Observable -
ElementAt
— emit only item n emitted by an Observable -
Filter
— emit only those items from an Observable that pass a predicate test -
First
— emit only the first item, or the first item that meets a condition, from an Observable -
IgnoreElements
— do not emit any items from an Observable but mirror its termination notification -
Last
— emit only the last item emitted by an Observable -
Sample
— emit the most recent item emitted by an Observable within periodic time intervals -
Skip
— suppress the first n items emitted by an Observable -
SkipLast
— suppress the last n items emitted by an Observable -
Take
— emit only the first n items emitted by an Observable -
TakeLast
— emit only the last n items emitted by an Observable
5.5 Combining Observables
Operators that work with multiple source Observables to create a single Observable
-
And
/Then
/When
— combine sets of items emitted by two or more Observables by means ofPattern
andPlan
intermediaries -
CombineLatest
— when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function -
Join
— combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable -
Merge
— combine multiple Observables into one by merging their emissions -
StartWith
— emit a specified sequence of items before beginning to emit the items from the source Observable -
Switch
— convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables -
Zip
— combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function
6、Subject = Observable + Observer
subject
是一个神奇的对象,它可以是一个Observable
同时也可以是一个Observer
:它作为连接这两个世界的一座桥梁。一个Subject
可以订阅一个Observable
,就像一个Observer
,并且它可以发射新的数据,或者传递它接受到的数据,就像一个Observable
。很明显,作为一个Observable
,观察者们或者其它Subject
都可以订阅它。
RxJava
提供了4种不同的Subject
:
-
PublishSubject
PublishSubject
会向他的订阅者发送订阅后的数据流。 -
BehaviorSubject
BehaviorSubject
会首先向他的订阅者发送截至订阅前最新的一个数据对象(或初始值),然后正常发送订阅后的数据流。 -
ReplaySubject
ReplaySubject
会缓存它所订阅的所有数据,向任意一个订阅它的观察者重发。 -
AsyncSubject
当Observable
完成时AsyncSubject
只会发布最后一个数据给已经订阅的每一个观察者。
6.1 PublishSubject
- 示例
PublishSubject<String> publishSubject = PublishSubject.create(); Consumer<String> consumer0 = s -> System.out.println(Thread.currentThread().getName() + " consumer0 accept: " + s); publishSubject.subscribe(consumer0); publishSubject.onNext("Hello"); publishSubject.onNext("CodingDog1024"); Consumer<String> consumer1 = s -> System.out.println(Thread.currentThread().getName() + " consumer1 accept: " + s); publishSubject.subscribe(consumer1); publishSubject.onNext("I"); publishSubject.onNext("am"); publishSubject.onNext("RxJava");
- 输出
main consumer0 accept: Hello main consumer0 accept: CodingDog1024 main consumer0 accept: I main consumer1 accept: I main consumer0 accept: am main consumer1 accept: am main consumer0 accept: RxJava main consumer1 accept: RxJava
- 说明
consumer0
收到完整的"Hello"
、"CodingDog1024"
、"I"
、"am"
、"RxJava"
。
consumer1
只收到其订阅之后来到的"I"
、"am"
、"RxJava"
。
PublishSubject
的行为就类似我们常见的addXXXListener
注册监听,consumer
可以接收到其订阅之后的所有event
。
6.1 BehaviorSubject
- 示例
BehaviorSubject<String> behaviorSubject = BehaviorSubject.create(); Consumer<String> consumer0 = s -> System.out.println(Thread.currentThread().getName() + " consumer0 accept: " + s); behaviorSubject.subscribe(consumer0); behaviorSubject.onNext("Hello"); behaviorSubject.onNext("CodingDog1024"); Consumer<String> consumer1 = s -> System.out.println(Thread.currentThread().getName() + " consumer1 accept: " + s); behaviorSubject.subscribe(consumer1); behaviorSubject.onNext("I"); behaviorSubject.onNext("am"); behaviorSubject.onNext("RxJava");
- 输出
main consumer0 accept: Hello main consumer0 accept: CodingDog1024 main consumer1 accept: CodingDog1024 main consumer0 accept: I main consumer1 accept: I main consumer0 accept: am main consumer1 accept: am main consumer0 accept: RxJava main consumer1 accept: RxJava
- 说明
consumer0
收到完整的"Hello"
、"CodingDog1024"
、"I"
、"am"
、"RxJava"
。
consumer1
收到"CodingDog1024"
、"I"
、"am"
、"RxJava"
。(比上节PublishSubject
的例子,多了"CodingDog1024"
)
订阅BehaviorSubject
时,consumer
会先收到最新的一个event
,然后再接收到之后到来的所有event
。 - 使用场景
在很常见的【先获取一次值,执行逻辑,然后值变化时需要重新执行逻辑】的场景下,使用BehaviorSubject
可以很自然的实现。
7、How it works ?
基于以下RxJava版本:
'io.reactivex.rxjava2:rxjava:2.2.11'
7.1 Observable.create() & subscribe()
7.1.1 示例 & 实现代码走读
-
sample
// (1)实例化【observableOnSubscribe1】 ObservableOnSubscribe<String> observableOnSubscribe1 = new ObservableOnSubscribe<String>() { // (11)第(10)步中,调用了subscribe方法,入参为parent,parent为第(5)步observer的一个代理 @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { // (12)调用emitter onNext方法。通过parent,最终转调到第(5)步observer的onNext方法 emitter.onNext("CodingDog1024"); } }; // (4)返回值为一个ObservableCreate实例 Observable<String> observable1 = Observable.create(observableOnSubscribe1); // (5)实例化【observer1】 Observer observer1 = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } // (13) 收到事件/数据 @Override public void onNext(String s) { System.out.println("onNext " + s); } @Override public void onError(Throwable e) { System.out.println("Throwable " + e.getMessage()); } @Override public void onComplete() { System.out.println("onComplete"); } }; // (6)订阅 observable1.subscribe(observer1);
-
输出
onSubscribe onNext CodingDog1024
-
create
源码// 代码出处:Observable // (2)source即为【observableOnSubscribe1】 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } // 代码出处:RxJavaPlugins public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { // 默认onObservableAssembly为null,因此,该方法可以视为直接return source Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; } // 代码出处:ObservableCreate public final class ObservableCreate<T> extends Observable<T> { // source即为【observableOnSubscribe1】 final ObservableOnSubscribe<T> source; // (3)source即为【observableOnSubscribe1】 public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } // (9)observer即为【observer1】 @Override protected void subscribeActual(Observer<? super T> observer) { // CreateEmitter为observer的代理,增加一些异常处理,可以先直接理解为只是转调observer的方法 CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { // (10)该subscribe方法即为第(11)步的【subscribe】方法,这里就是create和subscribe的连接处!!! source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } ... ... }
-
subscribe
源码// 代码出处:Observable // (7)observer即为【observer1】 public final void subscribe(Observer<? super T> observer) { ... ... observer = RxJavaPlugins.onSubscribe(this, observer); subscribeActual(observer); ... ... } // 代码出处:Observable // (8)observer即为【observer1】 protected abstract void subscribeActual(Observer<? super T> observer);
7.1.2 简单概括一下
-
sample代码
// (1)实例化【observableOnSubscribe1】 ObservableOnSubscribe<String> observableOnSubscribe1 = new ObservableOnSubscribe<String>() { // (11)第(10)步中,调用了subscribe方法,入参为parent,parent为第(5)步observer的一个代理 @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { // (12)调用emitter onNext方法。通过parent,最终转调到第(5)步observer的onNext方法 emitter.onNext("CodingDog1024"); } }; // (4)返回值为一个ObservableCreate实例 Observable<String> observable1 = Observable.create(observableOnSubscribe1); // (5)实例化【observer1】 Observer observer1 = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } // (13) 收到事件/数据 @Override public void onNext(String s) { System.out.println("onNext " + s); } @Override public void onError(Throwable e) { System.out.println("Throwable " + e.getMessage()); } @Override public void onComplete() { System.out.println("onComplete"); } }; // (6)订阅 observable1.subscribe(observer1);
-
文字概括
-
Observable.create
方法入参为一个ObservableOnSubscribe
实例observableOnSubscribe1
,返回值为一个ObservableCreate
实例observable1
,observable1
持有observableOnSubscribe1
(source
) - 以入参
Observer
实例observer1
调用subscribe
方法时,实际调用的就是ObservableCreate
的subscribe
方法,接着调到ObservableCreate
的subscribeActual(Observer observer)
方法 -
subscribeActual
方法里new
了一个CreateEmitter
实例parent
,parent
持有上述observer1
-
subscribeActual
方法里执行source.subscribe(parent)
,这个source
即为第1步的入参source
,因此,此处就是执行了observableOnSubscribe1
的subscribe
方法 -
observableOnSubscribe1
的subscribe
方法执行了emitter.onNext("CodingDog1024");
- 也就是执行了第3步中
CreateEmitter
实例parent
的onNext
方法 -
CreateEmitter
的核心逻辑是调用持有的Observer
实例observer
的对应方法,即调用第2步入参Observer
实例observer1
的onNext
方法,即打印log的方法@Override public void onNext(String s) { System.out.println("onNext " + s); }
-
-
提取最核心逻辑,简化理解
前一段总共列了7步,出现了数量众多的类,理解起来稍微有点复杂。我们做一下简化,提取最核心逻辑,上述分析中,ObservableCreate
和Observable
描述的是同一个东西,我们统一视为Observable
;CreateEmitter
对象parent
其实就是Observer
对象observer
的一个代理,最终调用的是observer
的方法,我们暂时忽略代理类逻辑,直接将两者统一视为observer
;这么合并之后,前一段的分析就变成了:-
Observable.create
方法入参为observableOnSubscribe1
(或者称为source
),返回值为observable
,observable
持有source
-
observable.subscribe(observer)
,最终会调用到source
的subscribe
方法(入参为observer
) -
source
的subscribe
方法里,执行了observer
的onNext
方法
-
7.2 Observable.map()
7.2.1 示例 & 实现代码走读
-
sample
在上一节例子基础上,修改3处,见下方修改点1 2 3。// (1)实例化【observableOnSubscribe1】 ObservableOnSubscribe<String> observableOnSubscribe1 = new ObservableOnSubscribe<String>() { // (11)第(10)步中,调用了subscribe方法,入参为parent,parent为第(5)步observer的一个代理 @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { // (12)调用emitter onNext方法。通过parent,最终转调到第(5)步observer的onNext方法 emitter.onNext("CodingDog1024"); } }; // (4)返回值为一个ObservableCreate实例 Observable<String> observable1 = Observable.create(observableOnSubscribe1); // 修改点1:新增将String转成大写的mapper Function<String, String> mapper1 = new Function<String, String>() { @Override public String apply(String s) throws Exception { return s.toUpperCase(); } }; // 修改点2:observable1应用mapper得到新的Observable实例observable2,具体类型为ObservableMap Observable<String> observable2 = observable1.map(mapper1); // (5)实例化【observer1】 Observer observer1 = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } // (13) 收到事件/数据 @Override public void onNext(String s) { System.out.println("onNext " + s); } @Override public void onError(Throwable e) { System.out.println("Throwable " + e.getMessage()); } @Override public void onComplete() { System.out.println("onComplete"); } }; // 修改点3:订阅的是应用了mapper后的observable2 // (6)订阅 observable2.subscribe(observer1);
-
输出
onSubscribe onNext CODINGDOG1024
-
map
源码// 代码出处:Observable public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); // observable1上调用的map方法,因此,this就是observable1,mapper就是mapper1 return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); } // 代码出处:ObservableMap public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; // source就是observable1,function就是mapper1 public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { // source就是observable1,function就是mapper1,t就是observer1 source.subscribe(new MapObserver<T, U>(t, function)); } ... ... } // 代码出处:ObservableMap static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; // actual就是observer1,mapper就是mapper1 MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { // super方法,将actual赋给downstream,也就是说: // downstream就是observer1 super(actual); this.mapper = mapper; } @Override public void onNext(T t) { ... ... U v; ... ... // 调用mapper.apply,将上游的输入 t 转为 v v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); ... ... // 调用下游的onNext方法,入参为 v downstream.onNext(v); } ... ... }
-
分析
7.3 Observable.filter()
7.3.1 示例 & 实现代码走读
-
sample
ObservableOnSubscribe<String> observableOnSubscribe1 = new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("Hello"); emitter.onNext("CodingDog1024"); } }; // (4)返回值为一个ObservableCreate实例 Observable<String> observable1 = Observable.create(observableOnSubscribe1); Predicate<String> predicate1 = new Predicate<String>() { @Override public boolean test(String s) throws Exception { return s.startsWith("C"); } }; // observable2具体类型为ObservableFilter,内部只有predicate1 Observable<String> observable2 = observable1.filter(predicate1); Observer observer1 = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } @Override public void onNext(String s) { System.out.println("onNext " + s); } @Override public void onError(Throwable e) { System.out.println("Throwable " + e.getMessage()); } @Override public void onComplete() { System.out.println("onComplete"); } }; observable2.subscribe(observer1);
-
输出
onSubscribe onNext CodingDog1024
-
filter
源码// 代码出处:Observable public final Observable<T> filter(Predicate<? super T> predicate) { ObjectHelper.requireNonNull(predicate, "predicate is null"); // observable1上调用的filter方法,因此,this就是observable1,predicate就是predicate1 return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate)); } // 代码出处:ObservableFilter public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> { final Predicate<? super T> predicate; // source就是observable1,predicate就是predicate1 public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) { super(source); this.predicate = predicate; } @Override public void subscribeActual(Observer<? super T> observer) { // source就是observable1,predicate就是predicate1,observer就是observer1 source.subscribe(new FilterObserver<T>(observer, predicate)); } ... ... } // 代码出处:ObservableFilter static final class FilterObserver<T> extends BasicFuseableObserver<T, T> { final Predicate<? super T> filter; // actual就是observer1,filter就是predicate1 FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) { // super方法,将actual赋给downstream,也就是说: // downstream就是observer1 super(actual); this.filter = filter; } @Override public void onNext(T t) { ... ... boolean b; ... ... // 调用filter.test方法,入参为上游的输入 t b = filter.test(t); ... ... // filter.test结果为true,才调用下游的onNext方法,入参为 t if (b) { downstream.onNext(t); } ... ... } ... ... }
-
分析