RxJava 系列 (一)RxJava 1.0 简介

RxJava 在业内越来越受欢迎,对于老手来说,RxJava 太好用了,RxJava 简直无所不能;然而对于新手来说,RxJava 更像一座陡峭的大山,似乎不能识得 RxJava 的真面目,无法逾越。纠其原因,新手更像是只知其然不知其所以然,只停留在会用的角度是无法了解 RxJava 精髓的。

我们学习一个新概念,应该多去思考,如 RxJava 出现的背景,它解决了什么问题,它是如何解决的,它有什么优点,它的核心思想是什么,它能应用在哪些场景等。如果对这几个问题都有答案的话,对 RxJava 的基本认知就算完成了。对于 RxJava 小白来说,非常不建议一来就源码分析,否则很容易陷入代码黑洞无法自拔,源码分析可以在 RxJava 进阶阶段去做。本文将会围绕上面提出的几个问题做一些简要回答。

RxJava 背景

我们都知道 Java 是一门编程语言,那么 Rx 是什么呢?
Github 对 Rx 的介绍如下:

Reactive Extensions for Async Programming

Rx (Reactive Extension)是异步编程的响应式扩展,再具体一步说, Rx 是微软.NET 的一个响应式扩展。Rx 借助可观测的序列提供一种简单的方式来创建异步的基于事件驱动的程序。开发者可以使用Observables模拟异步数据流,使用LINQ语法查询Observables,并且很容易管理调度器的并发。

Netflix 在2012年开始意识到他们的架构要满足他们庞大的用户群体已经变得步履维艰。因此他们决定重新设计架构来减少 REST 调用的次数。取代几十次的 REST 调用,而是让客户端自己处理需要的数据,他们决定基于客户端需求创建一个专门优化过的 REST 调用。

为了实现这一目标,他们决定尝试响应式,开始将.NET Rx 迁移到 JVM 上面。他们不想只基于 Java 语言;而是整个 JVM,从而有可能为市场上的每一种基于 JVM 的语言:如 Java、Clojure、Groovy、Scala 等等提供一种新的工具。

2013年二月份,Ben Christensen 和 Jafar Husain 发在 Netflix 技术博客的一篇文章第一次向世界展示了 RxJava。主要特点有:

  • 易于并发从而更好的利用服务器的能力。
  • 易于有条件的异步执行。
  • 一种更好的方式来避免回调地狱。
  • 一种响应式方法。

Github 对 RxJava 的介绍如下:

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

RxJava 是一个在 JVM 上使用可观测的序列来组成异步的、基于事件的程序的库。所以要学习 RxJava 理念和精髓,重点应该是放在 Rx 上,而不是放在 Java 这门编程语言上。并且 Jake Wharton 大神在 Github 上贡献了 RxAndroid 扩展库。RxAndroid 是在 RxJava 的基础上结合 Android 系统特性而开发的一个库,如主线程、UI 事件等。

核心思想

RxJava 的核心思想:异步响应式

在响应式的世界里,为了给提供更好的用户体验,一些函数的计算、网络请求和数据库查询等操作应该异步执行,软件的使用者不应该等待这些结果。同理对于开发者而言,也不需要等待结果,而是在结果返回时通知他,在这期间开发者可以做想做的任何事情。

响应式编程不同于命令式或面向对象编程,参考以下代码:

int a = 1;
int b = 2;
int c = a + b; // c = 3
a = 4;  // c 仍然是 3

对于命令式编程来说, c = a+b 意思是将表达式的结果赋值给 c ,而之后 a 和 b 的值改变均不会影响到 c 。所以在给 a 赋值为4 时,c 的值仍然是 3;但是在响应式编程中,c 的值会随着 a 或 b 的值更新而更新,当给 a 赋值为4 时,c 的值会自动更新为 6,响应式就是要关注值的变化。


举例: 一个最直观的例子就是 Excel 中,若规定 C1 = SUM(A1,B1),C1 的值一直会随着 A1 或 B1 的值变化。

响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。

响应式编程的一个关键概念是事件。事件可以被等待,可以触发过程,也可以触发其它事件。事件是唯一的以合适的方式将我们的现实世界映射到我们的软件中:如果屋里太热了我们就打开一扇窗户。同样的,当我们更改电子表(变化的传播)中的一些数值时,我们需要更新整个表格或者我们的机器人碰到墙时会转弯(响应事件)。

今天,响应式编程最通用的一个场景是 UI:我们的移动 App 必须做出对网络调用、用户触摸输入和系统弹框的响应。在这个世界上,软件之所以是事件驱动并响应的是因为现实生活也是如此。
响应式编程 文章链接:Reactive Programming

扩展的观察者模式

观察者模式

观察者模式是最常见的设计模式之一。它主要基于 Subject 这个概念,Subject 是一种特殊的对象,当他改变时,由它保存的一系列对象将会得到通知。而这一系列对象被称作 Observers 它们会对外暴露一个通知方法,当 Subject 状态变化时会调用的这个方法。


观察者模式.jpg

上图中,展示了 Subject/Observer 是怎样的一个 一对多的关系,如果有需要,一个 Subject 可以有无限多个 Observers,当 subject 状态发生变化时,这些 Observers 中的每一个都会收到通知。

观察者模式很适合下面这些场景中的任何一个:

  • 当你的架构有两个实体类,一个依赖另一个,你想让它们互不影响或者是独立复用它们时。
  • 当一个变化的对象通知那些与它自身变化相关联的未知数量的对象时。
  • 当一个变化的对象通知那些无需推断具体是谁的对象时。
扩展

在 RxJava 中共有4个角色:

  • Observable
  • Observer
  • Subscriber
  • Subjects

Observables 和 Subjects 是两个“生产”实体,Observers 和 Subscribers 是两个“消费”实体。同时相比于观察者模式,RxJava 添加了三个观察者缺少的功能:

  • 生产者在没有更多数据可用时能够发出信号通知:onCompleted() 事件。
  • 生产者在发生错误时能够发出信号通知:onError() 事件。
  • RxJava Observables 能够组合而不是嵌套,从而避免开发者陷入回调地狱。

核心概念

Observables

当我们异步执行一些复杂的事情,Java提供了传统的类,例如 Thread、Future、FutureTask、CompletableFuture 来处理这些问题。当复杂度提升,这些方案就会变得麻烦和难以维护。最糟糕的是,它们都不支持链式调用。

RxJava Observables 被设计用来解决这些问题。它们灵活,且易于使用,也可以链式调用,并且可以作用于单个结果程序上,更有甚者,也可以作用于序列上。无论何时你想发射单个标量值,或者一连串值,甚至是无穷个数值流,你都可以使用 Observable。

Observable的生命周期包含了三种可能的易于与Iterable生命周期事件相比较的事件,


使用Iterable时,消费者从生产者那里以同步的方式得到值,在这些值得到之前线程处于阻塞状态。相反,使用Observable时,生产者以异步的方式把值推给观察者,无论何时,这些值都是可用的。这种方法之所以更灵活是因为即便值是同步或异步方式到达,消费者在这两种场景都可以根据自己的需要来处理。

为了更好地复用Iterable接口,RxJava Observable 类扩展了GOF观察者模式的语义。引入了两个新的接口:

  • onCompleted() 即通知观察者Observable没有更多的数据。
  • onError() 即观察者有错误出现了。

从发射物的角度来看,有两种不同的 Observables :热的和冷的。一个"热"的 Observable 典型的只要一创建完就开始发射数据,因此所有后续订阅它的观察者能从序列中间的某个位置开始接受数据(有一些数据错过了)。一个"冷"的 Observable 会一直等待,直到有观察者订阅它才开始发射数据,因此这个观察者可以确保会收到整个数据序列。

Subject

Subject 是一个神奇的对象,它可以是一个Observable同时也可以是一个 Observer:它作为连接这两个世界的一座桥梁。一个 Subject 可以订阅一个 Observable,就像一个观察者,并且它可以发射新的数据,或者传递它接受到的数据,就像一个 Observable。很明显,作为一个Observable,观察者们或者其它 Subject 都可以订阅它。

一旦 Subject 订阅了 Observable,它将会触发 Observable开始发射。如果原始的 Observable 是“冷”的,这将会对订阅一个“热”的 Observable 变量产生影响。RxJava 提供四种不同的 Subject:

  • PublishSubject,普通的 Subject 对象
  • BehaviorSubject,BehaviorSubject会首先向他的订阅者发送截至订阅前最新的一个数据对象(或初始值),然后正常发送订阅后的数据流。
  • ReplaySubject,会缓存它所订阅的所有数据,向任意一个订阅它的观察者重发:
  • AsyncSubjec,当Observable 完成时 AsyncSubject只会发布最后一个数据给已经订阅的每一个观察者。

优势

RxJava 的优点一搜一箩筐,如提升开发效率,降低维护成本,简化逻辑代码,提升可读性等。其逆天之处在于当程序逻辑变得复杂的情况下,它已然能够保持简洁。
看一个 RxJava 保持简洁的范例(此例来源于 扔物线大神 的文章):界面上有一个自定义的视图 imageCollectorView ,它的作用是显示多张图片,并能使用 addImage(Bitmap) 方法来任意增加显示的图片。现在需要程序将一个给出的目录数组 File[] folders 中每个目录下的 png 图片都加载出来并显示在 imageCollectorView 中。需要注意的是,由于读取图片的这一过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行。贴出一种实现如下:

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();

而用 RxJava 实现如下:

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

如果 IDE 是 Android Studio ,每次打开某个 Java 文件的时候,你会看到被自动 Lambda 化的预览,这将让你更加清晰地看到程序逻辑:

Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

流式的 API 调用风格看着很爽有木有!!! 良好的编码规范逻辑清楚,能方便我们更好的定位问题、解决问题,进而能提升我们的效率。

这里引用《代码大全》第 31 章中“把布局当做一种信仰”的一段话:

良好布局的目标:
1、准确表现代码的逻辑结构
2、始终如一的表现代码的逻辑结构
3、改善可读性
4、经得起修改

RxJava 的代码布局既准确的表示了代码的逻辑结构,又增强了可读性和可维护性,对于有代码洁癖的小伙伴来说是大大的福利啊。

使用范例

1、在项目的 build.gradle 文件中添加对 RxJava 的支持

对于 RxJava 1.x 版本,

compile 'io.reactivex:rxjava:x.y.z‘

对于 RxJava 2.x 版本

compile "io.reactivex.rxjava2:rxjava:2.x.y"

需要注意的是,在一个项目中, RxJava 1.x 和 2.x 是可以同时使用的 ,因为不同版本的依赖路径是不一样的,即对于同一个类 1.x 版本和 2.x 版本的全路径名不一样,我们在使用时只需要注意引入的包名是否正确就可以了。

2、使用范例
  • 创建被观察者
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello,RxJava");
        subscriber.onCompleted();
    }
});
  • 创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};
  • 订阅
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

RxJava 的使用还是比较简单的, 如果有更复杂的需求,可以熟悉相关操作符。 RxJava 的操作符基本能够满足所有的使用需求。

应用场景

  • RxJava + Retrofit ,最热门的框架组合,实现网络请求及结果的异步处理
  • RxBinding,Jake Wharton 的一个开源库,本质上为 View 设置一些 Listenter,只不过是用 Observable 实现,如界面按钮防止连续点击、Android 一些 CheckBox 点击更新响应视图等
  • 线程切换,在后台线程取数据,主线程展示”的模式中看见
  • 异步操作, 耗时操作,如复杂计算、网络请求、I/O操作等
  • RxBus,RxBus并不是一个库,而是一种模式。其思想是用 RxJava 实现解耦,同 GreenRobot 的 EventBus 和 Otto 类似。

对于 RxJava 的基础部分就介绍就到这里了,后续文章将会介绍 RxJava 中的常用操作符及实现原理。

参考文献

Git官网
给 Android 开发者的 RxJava 详解
RxJava Essentials

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352