尝试翻译RxJava Github首页README文档

原文地址:README.md

前言

  • 关于RxJava:详情请查看另一篇文章:Android RxJava 学习笔记
  • 写这篇文章的目的:通过阅读RxJava官方文档,对RxJava有更全面的认识,在翻译的过程中,提高自己的英语水平,并为他人带来便利。

以下内容为译文内容。

RxJava:为JVM而设计的Reactive Extensions


Reactive Extensions(Rx):一个使用可观察序列组成异步的、基于事件的程序的库。RxJava就是Reactive Extensions在Java虚拟机上的实现。

RxJava继承了观察者模式以支持数据、事件序列,以及添加允许你以声明的方式组合序列的运算符,同时抽出对类似于低级线程、同步、线程安全和并发数据结构等问题的关注。

2.x版本(API文档
  • 单独依赖:Reactive-Streams
  • 持续支持Java 6+ 以及 Android 2.3+
  • 通过设计变更以及Reactive-Streams-Commons研究项目提升了性能
  • Java 8 lambda友好API
  • 兼容并发源(线程、池、事件循环、Actors、参与者等)
  • 异步或同步执行
  • 用于参数化并发的虚拟时间和调度器

版本2.x和版本1.x将会在几年内并存。他们拥有不同的Group id(io.reactivex.rxjava2 对比 io.reactivex)和命名空间(io.reactivex 对比 rx)。
关于版本1.x和2.x的不同可以查看维基文章《2.0有何不同》。可以在维基主页从整体上了解更多RxJava的内容。

1.x版本

截至2018年3月31日,1.x版本已过期。不再对其进行开发、支持、维护、PRs和更新。最后一个版本1.3.8的API文档,将可持续访问。

入门


设置依赖

首先将RxJava2添加到你的项目中,例如,通过Gradle方式添加compile依赖:

implementation "io.reactivex.rxjava2:rxjava:2.x.y"

(请将x和y换成最新的版本号

Hello World

接下来我们写一段Hello World程序:

package rxjava.examples;

import io.reactivex.*;
public class HelloWorld {
    public static void main(String[] args) {
        Flowable.just("Hello world").subscribe(System.out::println);
    }
}

如果你的平台不支持Java 8 lambdas(目前为止),你需要手动新建一个Consumer内部类:

import io.reactivex.functions.Consumer;
Flowable.just("Hello world")
  .subscribe(new Consumer<String>() {
      @Override public void accept(String s) {
          System.out.println(s);
      }
  });

基本类

RxJava 2 具有几个基本类,你可以在其中发现运算符:

一些术语

上游,下游

RxJava中的数据流包含一个数据源,至少0个中间步骤,随后是一个数据消费者或者组合器步骤(该步骤负责以某种方式消费数据流):

source.operator1().operator2().operator3().subscribe(consumer);

source.flatMap(value -> source.operator1().operator2().operator3());

在这里,如果我们想想自己处于operator2的位置,向左看,一直到source,被称为上游。向右看,直到consumer,被称为下游。这当我们把每个元素分开写时通常更容易理解:

source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer)
运动中的对象

在RxJava的文档中,排放物(emission)发射物(emits)条目(item)信号(signal)数据(data)以及消息(message)被认为是同意词并且代表在整个数据流中运动的对象。

背压

当数据流运行到异步步骤时,每一步可能都以不同的速度执行不同的操作。为了避免此类步骤(通常表现为由于临时缓冲或需要跳过/删除数据而导致内存使用量增加)被吞没,应用了所谓的凡压力,这是一种流控制形式,使得此类步骤能明确他们准备处理的条目数。其允许当数据流中的某个步骤无法直到上游有多少个条目会传递过来时,限制内存的使用。
在RxJava中,Flowable类专用于支持背压,而Observable类专用于非背压操作(短序列、GUI交互等)。其他类型,SingleMaybe以及Completable都不支持背压,并且也不应该支持背压;暂时存储一个条目的空间总会有的。

装配时间

数据流通过应用各种中间操作的发生而做的准备被称为装配时间

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;

在此时,数据还未流动并且没有副作用发生。

订阅时间

这是在流中的subscribe()方法被调用使得队列内部建立起链条关系时的一个短暂的状态:

flow.subscribe(System.out::println)

这是触发订阅副作用的时候(参见doonsubscribe)。在这种状态下,某些源会立即阻止或开始发出条目。

运行时间

这是当流主动地发出条目、错误或完成信号时的状态:

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

实际上,是上面给出的示例的主体执行的时候。

简单的后台计算

在RxJava中有一个很普遍的使用场景就是在后台线程运行一些计算,网络请求并在UI线程显示结果(或错误):

import io.reactivex.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  模仿复杂的计算
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- 等待流结束

这种类似于建造者模式的链式方法风格被称为流式API。然而,RxJava的响应式类型是不可改变的;每个方法调用后返回一个携带了被添加的行为的新Flowable对象。为了更容易理解,上方的示例代码可以被写为如下形式:

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  模仿复杂的计算
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

通常,你可以通过subscribeOn将计算或者阻塞IO转移到其他线程。一旦数据就绪,你可以通过observeOn确保他们在前台或者GUI线程上进行处理。

调度器

RxJava运算符不直接与ThreadExecutorService直接工作,而是与所谓的Scheduler一起工作,后者在统一的API背后抽象出并发源。RxJava 2 具有多个标准调度程序,可通过Schedulers工具类访问它们。

  • Schedulers.computation():在后台中的固定数量的专用线程中运行密集的计算工作。大部分异步操作将此作为默认的Scheduler
  • Schedulers.io():在一组动态变化的线程池中运行I/O类或阻塞操作。
  • Schedulers.single():以顺序和先进先出的方式在单个线程上运行工作。
  • Schedulers.trampoline():在一个线程中以顺序和先进先出的方式运行工作,通常用作测试目的。

这些可用于所有JVM平台,但在某些特定的平台,比如Android,定义了典型的SchedulerAndroidSchedulers.mainThread()SwingScheduler.instance()JavaFXSchedulers.gui()
此外,可以通过Schedulers.from(Executor)方法选择一个已有的Executor(及其子类比如ExecutorService)。举个例子,持有一个更大但数量固定的线程池(与computation()io()不同)时可以使用这个方法。
末尾的Thread.sleep(2000)不会引发异常。RxJava中的默认Scheduler运行在守护进程中,这意味着一旦Java主线程存在,它们都会停止,后台的计算也不会发生。在这个例子中睡眠一段时间,可以让你有足够时间在控制台中看到流的输出。

流中的并发情况

RxJava中的流本质上是连续的,分为几个处理阶段,这些处理阶段可能同时运行:

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

这个例子中,流在计算Scheduler中将1到10的数字作开方,并在“主”线程中消耗计算结果(更精确地说,blockingSubscribe的调用线程)。然而,lambda表达式v -> v * v并未与这个流并行运行;它在同一个计算线程上一个接一个地接收1到10的值。

并行处理

并行处理数字1到10会更复杂一些:

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

实际上,并行在RxJava中其实是在不同的流中运行最后将他们的结果合并到一个单独的流中。运算符flatMap通过首先将1到10之间的每个数字银蛇到它自己单独的Flowable中运行,然后将计算好的平方数合并起来以实现这种平行。
但是,请注意,flatMap不保证任何顺序,内部流的最终结果可能会交替出现。有一些其他运算符可以代替:

  • concatMap同时在一个内部流中进行映射以及运行
  • concatMapEager“同时”运行所有的内部流,但是输出流会按照这些内部流创建时的顺序进行排序

或者,操作符Flowable.parallel()以及ParallelFlowable类型可以实现相同的并行处理模式:

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

子流依赖

flatMap是一个能在很多情况下使用的强大的操作符。比如,给定一个返回Flowable的服务,我们项用第一个服务发出的值调用另一个服务:

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource.flatMap(inventoryItem ->
    erp.getDemandAsync(inventoryItem.getId())
    .map(demand 
        -> System.out.println("Item " + inventoryItem.getName() + " has demand " + demand));
  )
  .subscribe();

持续

有时候,当一个条目变得可用时,有人希望对它执行一些依赖性的计算。这有时被称为持续,根据应该发生的内容的不同以及设计的类型的不同,可能需要不同的操作符实现。

依赖

最典型的场景是给定一个值,调用另一个服务,等待并继续其结果:

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

还有一个场景是后面的序列需要用到之前的映射中的值。这可以通过将外部的flatMap移动到之前的flatMap中而解决,如下:

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)

这样,最初的value就可以被内部的flatMap使用了,由lambda变量捕获提供。

非依赖

在其他场景中,第一个源/数据流的结果是无用的,有人想要以另一个差不多的独立的源继续,这种情况,flatMap同样可用:

Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
  .subscribe(System.out::println, Throwable::printStackTrace);

但是,这个例子中的连续保持着Observable而不是看起来更合适的Single。(这是可以理解的,因为从flatMapSingle的调用者sourceObservable是一个多值的源,所以映射的可能的结果同样也是多值的)。
通常,即使有一种有点儿更具表现力(同时也降低了开销)方式,通过使用Completable作为中介及其操作符andThen以用其他东西继续:

sourceObservable
  .ignoreElements()           // returns Completable
  .andThen(someSingleSource)
  .map(v -> v.toString())

sourceObservablesomeSingleSource之间的唯一的依赖是前者应当正常完成以便后者被消费。

延迟依赖

有时,在之前的序列和新序列中有一个隐藏的依赖,由于某种原因,并未通过“常规通道”而流动。有人会想要将持续写成如下文这样:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.just(count.get()))
  .subscribe(System.out::println);

不幸的是,因为Single.just(count.get())在数据流还没有运行的时候,作为参数组装时已经运行完成了,所以最终打印了0。我们需要某种方式以延迟这个Single源的运算,直到主源完成运行时为止:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.defer(() -> Single.just(count.get())))
  .subscribe(System.out::println);

或者

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.fromCallable(() -> count.get()))
  .subscribe(System.out::println);

类型变换

有时,一个源或服务返回了一个与应该使用它的流不同的类型。比如,在上方的inventory例子中,getDemandAsync应当返回一个Single<DemandRecord>。如果示例代码保持不变,最终将引起一个便利错误(但是,经常会抛出一个有关缺乏过载的误导性错误消息)。

待更新……

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容