Observable概述(RxJava2)

上一篇文章中定义了Rx = Observable + Operator + Scheduler。Rx以经典观察者模式为骨架、并扩展之使得我们能够以类似使用Iterable的方式使用Observable。

Rx最为重要的两个要素是:数据流和异步(实际上Rx把数据流都视作异步的)。今天的主角便是数据流——Observable。根据上下文语义的需要,本系列文中可能另称之为数据序列事件流被观察者

观察者

在Rx的世界中,(几乎)每一个故事都从“观察者订阅了数据流”开始。观察者——Observer——好比哨兵,时刻监视着数据流的动静,一旦有数据发射或通知发送便立即响应。观察者实现了以下三个方法的子集:

  • onNext -- 当数据流发射流中任意一个数据时会调用观察者的onNext方法,并将发射的数据作为参数。
  • onError -- 当数据流产生数据失败或发生其他异常时会调用观察者的onError方法,并将失败原因(Throwable)作为参数。
  • onComplete -- 当数据流中的所有数据全部正常发射完会调用观察着的onComplete方法。

当数据流调用观察者的onError/onComplete时,我们称它发送了错误/完成通知观察者只能收到来自某个数据流的一个通知,也就是说如果收到了流的错误通知,就不可能再收到该流的完成通知,反之亦然。一旦观察者收到了通知,便不能接收任何由该流发射的数据

注:数据流可以发送多个通知,也可以在发送通知之后继续发送数据,只是观察者收到通知后就单方面把该流“拉黑”了而已。有时候为了实现一些特殊功能,我们不得不允许Observer不受限制地接收数据和通知(RxJava2的源码中也存在着这样的实现,比如:ObservableConcatMap.SourceObserver.InnerObserver就可以多次接收onComplete通知)。


Rx编程模型

我们先看一个常规的方法调用过程,程序会按照代码书写的顺序逐步地执行指令并返回结果,以同步的方式完成任务:

  1. 先调用某个方法。
  2. 把方法的返回值赋值给某个变量。
  3. 使用该变量执行后续指令以完成任务。

在Rx中,数据流用于定义产生、处理数据的机制,一旦有观察者订阅(subscribe)了该流,其预定义的机制立即生效,观察者等待数据发射或通知发送并响应:

  1. 定义一个数据流,该流定义了一个异步操作,可以产生一个或多个数据。
  2. 定义一个观察者,并为它定义一个方法(onNext),该方法用来消费第一步的异步操作发射的数据。
  3. 观察者订阅数据流(于是故事开始了),数据流的异步操作被触发,然后生产发射数据,或发送通知(以结束整个故事)。

如果程序需要完成多个不存在互相依赖的任务,由于Rx中指令可以异步并发地执行,我们可以同时启动多个任务,而不用依次地等待某个任务完成再启动下一个。


Observable操作符

掌握数据流和观察者之后,我们能比以前更好地处理数据序列(而不限于单个数据)。然而Rx真正的核武器是操作符Operator。我们先了解一下Rx有哪些操作符。

  • 创建型
    Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
  • 变换型
    Buffer, FlatMap, GroupBy, Map, Scan, Window
  • 过滤型
    Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
  • 组合型
    And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
  • 容错型
    Catch, Retry
  • 工具型
    Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
  • 条件型
    All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
  • 聚合型
    Average, Concat, Count, Max, Min, Reduce, Sum
  • 转换型
    To
  • 连接型
    Connect, Publish, RefCount, Replay
  • 背压型

多数操作符仍然返回一个数据流,这种方式允许我们在程序中链式地对数据流调用操作符——联想一下builder(构建者)模式的链式调用——与builder模式不同的是,Observable的操作符返回了一个新Observable,这个新Observable原Observable的代理。
应用了操作符后,单单用“数据流”已经无法准确描述Observable的含义。我增加“原始流”、“上游”和“下游”以及“支流(流中流)”来区分不同意义的Observable。“数据流”是Observable的泛称。

  • 原始流——全称为“原始数据流”,指代由创建型操作符返回的Observable
  • 上游和下游——二者必须成对地出现。对Observable调用非创建型操作符后,“上游”指代原Observable,“下游”指代返回的新Observable
  • 支流或流中流——仅仅在应用FlatMapConcatMap操作符的场景中使用这一称谓。“支流”指代这两个操作符的mapper返回的子Observable,“支流”亦称“流中流”。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)

flatMap的方法签名分析,它接受一个mapper参数,此mapper将上游中的数据变换成一个ObservableSource新Observable中的数据是ObservableSource类型——颇有子Observable的味道——这不就是“流中流”(“流中流”自带解释功能,理解之后还是叫“支流”比较自然)吗?
需要注意的是:最终Rx将整合子Observable(支流)中的所有数据而不是子Observable本身汇入下游。后面的文章会详细地对FlatMapConcatMap进行源码分析。


RxJava2#Observable类(源码基于v2.1.5)

Observable是一个抽象类,实现了ObservableSource(void subscribe())接口。该类有且仅有一个抽象方法subscribeActual,其他非privateprivate方法也就3个)方法要么是static的,要么是final的。这意味着定义自己的ObservableCustom是件非常简单的事情,Observable类已经完成了99.99%的工作,我们只需要override subscribeActual方法就够了。

Observable所有的创建型操作符都是静态的,比如Just:

public static <T> Observable<T> just(T item) {
    ObjectHelper.requireNonNull(item, "The item is null");
    // 通过RxJavaPlugins的setters可以在运行时改变默认的行为
    // 如果程序中没有调用RxJavaPlugins.setOnObservableAssembly(xxx),下面一行代码跟其后一行注释完全等效
    return RxJavaPlugins.onAssembly(new ObservableJust(item));
    // return new ObservableJust(item); 
}

我们可以看到Just操作符本质上构造了一个ObservableJust对象。RxJava2内置了大量的ObservableXXXXXX往往是操作符的名字比如Just)。

再来看一个非创建型的操作符Map:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap(this, mapper));
}

以及ObservableMap类的核心代码:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    public void subscribeActual(Observer<? super U> t) {
        this.source.subscribe(new ObservableMap.MapObserver(t, this.function));
    }
}

重点看ObservableMap构造方法,它接收ObservableSource类型的对象作为第一个参数——回忆一下代理模式——创建了原Observable的代理,也就是新ObservableMap实例。
RxJava2中大量运用了代理模式,细心的你或许已经发现:在subscribeActual方法中,还创建了一个原Observer的代理——MapObserver的实例。


本系列将持续更新

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,417评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,921评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,850评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,945评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,069评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,188评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,239评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,994评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,409评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,735评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,898评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,578评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,205评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,916评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,156评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,722评论 2 363
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,781评论 2 351

推荐阅读更多精彩内容

  • 本篇文章介主要绍RxJava中操作符是以函数作为基本单位,与响应式编程作为结合使用的,对什么是操作、操作符都有哪些...
    嘎啦果安卓兽阅读 2,853评论 0 10
  • 参考:给 Android 开发者的 RxJava 详解-扔物线深入浅出RxJava 基础 "a library f...
    Vincen1024阅读 542评论 0 1
  • RxJava技术分享 京金所—时光 2016.9.22 这里我拿出来给 Android 开发者的 RxJava 详...
    JC_Mobile阅读 5,566评论 3 55
  • 作者: maplejaw本篇只解析标准包中的操作符。对于扩展包,由于使用率较低,如有需求,请读者自行查阅文档。 创...
    maplejaw_阅读 45,643评论 8 93
  • 创建操作 用于创建Observable的操作符Create通过调用观察者的方法从头创建一个ObservableEm...
    rkua阅读 1,816评论 0 1