原文地址:README.md
前言
- 关于RxJava:详情请查看另一篇文章:Android RxJava 学习笔记。
- 写这篇文章的目的:通过阅读RxJava官方文档,对RxJava有更全面的认识,在翻译的过程中,提高自己的英语水平,并为他人带来便利。
以下内容为译文内容。
RxJava:为JVM而设计的Reactive Extensions
Reactive Extensions(Rx):一个使用可观察序列组成异步的、基于事件的程序的库。RxJava就是Reactive Extensions在Java虚拟机上的实现。
RxJava继承了观察者模式以支持数据、事件序列,以及添加允许你以声明的方式组合序列的运算符,同时抽出对类似于低级线程、同步、线程安全和并发数据结构等问题的关注。
2.x版本(API文档)
- 单独依赖:Reactive-Streams
- 持续支持Java 6+ 以及 Android 2.3+
- 通过设计变更以及Reactive-Streams-Commons研究项目提升了性能
- Java 8 lambda友好API
- 兼容并发源(线程、池、事件循环、Actors、参与者等)
- 异步或同步执行
- 用于参数化并发的虚拟时间和调度器
版本2.x和版本1.x将会在几年内并存。他们拥有不同的Group id(io.reactivex.rxjava2
对比 io.reactivex
)和命名空间(io.reactivex
对比 rx
)。
关于版本1.x和2.x的不同可以查看维基文章《2.0有何不同》。可以在维基主页从整体上了解更多RxJava的内容。
1.x版本
截至2018年3月31日,1.x版本已过期。不再对其进行开发、支持、维护、PRs和更新。最后一个版本1.3.8的API文档,将可持续访问。
入门
设置依赖
首先将RxJava2添加到你的项目中,例如,通过Gradle方式添加compile依赖:
implementation "io.reactivex.rxjava2:rxjava:2.x.y"
(请将x和y换成最新的版本号)
Hello World
接下来我们写一段Hello World程序:
package rxjava.examples;
import io.reactivex.*;
public class HelloWorld {
public static void main(String[] args) {
Flowable.just("Hello world").subscribe(System.out::println);
}
}
如果你的平台不支持Java 8 lambdas(目前为止),你需要手动新建一个Consumer
内部类:
import io.reactivex.functions.Consumer;
Flowable.just("Hello world")
.subscribe(new Consumer<String>() {
@Override public void accept(String s) {
System.out.println(s);
}
});
基本类
RxJava 2 具有几个基本类,你可以在其中发现运算符:
-
io.reactivex.Flowable
:0..N个流,支持响应流和背压 -
io.reactivex.Observable
:0..N个流,无背压 -
io.reactivex.Single
:一条只有一个条目或者错误的流 -
io.reactivex.Completable
:一条没有条目但是只有一个完成或错误标志的流 -
io.reactivex.Maybe
:一条无条目或者只有一个条目或错误的流
一些术语
上游,下游
RxJava中的数据流包含一个数据源,至少0个中间步骤,随后是一个数据消费者或者组合器步骤(该步骤负责以某种方式消费数据流):
source.operator1().operator2().operator3().subscribe(consumer);
source.flatMap(value -> source.operator1().operator2().operator3());
在这里,如果我们想想自己处于operator2
的位置,向左看,一直到source
,被称为上游。向右看,直到consumer
,被称为下游。这当我们把每个元素分开写时通常更容易理解:
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
运动中的对象
在RxJava的文档中,排放物(emission),发射物(emits),条目(item),信号(signal),数据(data)以及消息(message)被认为是同意词并且代表在整个数据流中运动的对象。
背压
当数据流运行到异步步骤时,每一步可能都以不同的速度执行不同的操作。为了避免此类步骤(通常表现为由于临时缓冲或需要跳过/删除数据而导致内存使用量增加)被吞没,应用了所谓的凡压力,这是一种流控制形式,使得此类步骤能明确他们准备处理的条目数。其允许当数据流中的某个步骤无法直到上游有多少个条目会传递过来时,限制内存的使用。
在RxJava中,Flowable
类专用于支持背压,而Observable
类专用于非背压操作(短序列、GUI交互等)。其他类型,Single
,Maybe
以及Completable
都不支持背压,并且也不应该支持背压;暂时存储一个条目的空间总会有的。
装配时间
数据流通过应用各种中间操作的发生而做的准备被称为装配时间:
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;
在此时,数据还未流动并且没有副作用发生。
订阅时间
这是在流中的subscribe()
方法被调用使得队列内部建立起链条关系时的一个短暂的状态:
flow.subscribe(System.out::println)
这是触发订阅副作用的时候(参见doonsubscribe)。在这种状态下,某些源会立即阻止或开始发出条目。
运行时间
这是当流主动地发出条目、错误或完成信号时的状态:
Observable.create(emitter -> {
while (!emitter.isDisposed()) {
long time = System.currentTimeMillis();
emitter.onNext(time);
if (time % 2 != 0) {
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace);
实际上,是上面给出的示例的主体执行的时候。
简单的后台计算
在RxJava中有一个很普遍的使用场景就是在后台线程运行一些计算,网络请求并在UI线程显示结果(或错误):
import io.reactivex.schedulers.Schedulers;
Flowable.fromCallable(() -> {
Thread.sleep(1000); // 模仿复杂的计算
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- 等待流结束
这种类似于建造者模式的链式方法风格被称为流式API。然而,RxJava的响应式类型是不可改变的;每个方法调用后返回一个携带了被添加的行为的新Flowable
对象。为了更容易理解,上方的示例代码可以被写为如下形式:
Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000); // 模仿复杂的计算
return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
通常,你可以通过subscribeOn
将计算或者阻塞IO转移到其他线程。一旦数据就绪,你可以通过observeOn
确保他们在前台或者GUI线程上进行处理。
调度器
RxJava运算符不直接与Thread
或ExecutorService
直接工作,而是与所谓的Scheduler
一起工作,后者在统一的API背后抽象出并发源。RxJava 2 具有多个标准调度程序,可通过Schedulers
工具类访问它们。
-
Schedulers.computation()
:在后台中的固定数量的专用线程中运行密集的计算工作。大部分异步操作将此作为默认的Scheduler
。 -
Schedulers.io()
:在一组动态变化的线程池中运行I/O类或阻塞操作。 -
Schedulers.single()
:以顺序和先进先出的方式在单个线程上运行工作。 -
Schedulers.trampoline()
:在一个线程中以顺序和先进先出的方式运行工作,通常用作测试目的。
这些可用于所有JVM平台,但在某些特定的平台,比如Android,定义了典型的Scheduler
:AndroidSchedulers.mainThread()
,SwingScheduler.instance()
或JavaFXSchedulers.gui()
。
此外,可以通过Schedulers.from(Executor)
方法选择一个已有的Executor
(及其子类比如ExecutorService
)。举个例子,持有一个更大但数量固定的线程池(与computation()
和io()
不同)时可以使用这个方法。
末尾的Thread.sleep(2000)
不会引发异常。RxJava中的默认Scheduler
运行在守护进程中,这意味着一旦Java主线程存在,它们都会停止,后台的计算也不会发生。在这个例子中睡眠一段时间,可以让你有足够时间在控制台中看到流的输出。
流中的并发情况
RxJava中的流本质上是连续的,分为几个处理阶段,这些处理阶段可能同时运行:
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);
这个例子中,流在计算Scheduler
中将1到10的数字作开方,并在“主”线程中消耗计算结果(更精确地说,blockingSubscribe
的调用线程)。然而,lambda表达式v -> v * v
并未与这个流并行运行;它在同一个计算线程上一个接一个地接收1到10的值。
并行处理
并行处理数字1到10会更复杂一些:
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);
实际上,并行在RxJava中其实是在不同的流中运行最后将他们的结果合并到一个单独的流中。运算符flatMap
通过首先将1到10之间的每个数字银蛇到它自己单独的Flowable
中运行,然后将计算好的平方数合并起来以实现这种平行。
但是,请注意,flatMap
不保证任何顺序,内部流的最终结果可能会交替出现。有一些其他运算符可以代替:
-
concatMap
同时在一个内部流中进行映射以及运行 -
concatMapEager
“同时”运行所有的内部流,但是输出流会按照这些内部流创建时的顺序进行排序
或者,操作符Flowable.parallel()
以及ParallelFlowable
类型可以实现相同的并行处理模式:
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
子流依赖
flatMap
是一个能在很多情况下使用的强大的操作符。比如,给定一个返回Flowable
的服务,我们项用第一个服务发出的值调用另一个服务:
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource.flatMap(inventoryItem ->
erp.getDemandAsync(inventoryItem.getId())
.map(demand
-> System.out.println("Item " + inventoryItem.getName() + " has demand " + demand));
)
.subscribe();
持续
有时候,当一个条目变得可用时,有人希望对它执行一些依赖性的计算。这有时被称为持续,根据应该发生的内容的不同以及设计的类型的不同,可能需要不同的操作符实现。
依赖
最典型的场景是给定一个值,调用另一个服务,等待并继续其结果:
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))
还有一个场景是后面的序列需要用到之前的映射中的值。这可以通过将外部的flatMap
移动到之前的flatMap
中而解决,如下:
service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
)
这样,最初的value
就可以被内部的flatMap
使用了,由lambda变量捕获提供。
非依赖
在其他场景中,第一个源/数据流的结果是无用的,有人想要以另一个差不多的独立的源继续,这种情况,flatMap
同样可用:
Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
.subscribe(System.out::println, Throwable::printStackTrace);
但是,这个例子中的连续保持着Observable
而不是看起来更合适的Single
。(这是可以理解的,因为从flatMapSingle
的调用者sourceObservable
是一个多值的源,所以映射的可能的结果同样也是多值的)。
通常,即使有一种有点儿更具表现力(同时也降低了开销)方式,通过使用Completable
作为中介及其操作符andThen
以用其他东西继续:
sourceObservable
.ignoreElements() // returns Completable
.andThen(someSingleSource)
.map(v -> v.toString())
sourceObservable
和someSingleSource
之间的唯一的依赖是前者应当正常完成以便后者被消费。
延迟依赖
有时,在之前的序列和新序列中有一个隐藏的依赖,由于某种原因,并未通过“常规通道”而流动。有人会想要将持续写成如下文这样:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.just(count.get()))
.subscribe(System.out::println);
不幸的是,因为Single.just(count.get())
在数据流还没有运行的时候,作为参数组装时已经运行完成了,所以最终打印了0
。我们需要某种方式以延迟这个Single
源的运算,直到主源完成运行时为止:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.defer(() -> Single.just(count.get())))
.subscribe(System.out::println);
或者
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.fromCallable(() -> count.get()))
.subscribe(System.out::println);
类型变换
有时,一个源或服务返回了一个与应该使用它的流不同的类型。比如,在上方的inventory例子中,getDemandAsync
应当返回一个Single<DemandRecord>
。如果示例代码保持不变,最终将引起一个便利错误(但是,经常会抛出一个有关缺乏过载的误导性错误消息)。
待更新……