RxJava 的使用与理解(一)

ReactiveX编程简称Rx编程,又叫响应式编程、响应式扩展,英文为Reactive Extensions。可以查看官方网站www.reactive.io,就像其网站说的"Expertise makes better software.",响应式编程的目标是提供一致的编程接口,
帮助开发者更方便的处理异步数据流,使软件开发更高效、更简洁。Rx是一个多语言的实现,已经支持多种语言包括Java、Swift、C++、.NET、JavaScript、Ruby、Groovy、Scala等等,支持的库包括:RxJavaRxSwift、Rx.NET、RxJS、RXRuby等等,真是屌炸天。在Android上我们添加 RxAndroid 库就可以,RxAndroid 是对 RxJava 一种更接地气的扩展。下面让我们通过 RxAndroid 去使用、理解 RxJava 吧。

Rx使用观察者模式

创建:Rx可以方便的创建事件流和数据流
组合:Rx使用查询式的操作符组合和变换数据流
监听:Rx可以订阅任何可观察的数据流并执行操作

Rx使代码简化

函数式风格:对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态
简化代码:Rx的操作符通通常可以将复杂的难题简化为很少的几行代码
异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题

先看简单的例子,通过RxJava将Integer类型转成String类型

private void funcDemo() {
    Observable.OnSubscribe<Integer> onSubscribe1 = new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(100);
        }
    };

    Func1<Integer, String> func1 = new Func1<Integer, String>() {
        @Override
        public String call(Integer integer) {
            return String.valueOf(integer);
        }
    };

    Subscriber<String> subscriber1 = new Subscriber<String>() {
        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onNext(String s) {
            Log.d("onNext(): ", s);
        }
    };

    Observable.create(onSubscribe1)
            .map(func1)
            .subscribe(subscriber1);

    // 将上面分解成三步执行
    // Observable<Integer> observable1 = Observable.create(onSubscribe1);
    // Observable<String> observable2 = observable1.map(func1);
    // observable2.subscribe(subscriber1);
}

主要关注这里面生成的四个对象:observable1、onSubscribe1、func1、subscriber1,如果对相应的单词不清晰,移步下面的备注。
这是典型的的被观察者与观察者的关系,或者叫被订阅者与订阅者的关系;下面理一下他们的角色:

observable1:被观察者,是subscriber1要订阅的对象。  
onSubscribe1:被观察者的行为,是subscriber1要订阅的行为,subscribe()时被执行。  
subscriber1:订阅者,是抽象类实现了Observer接口,可以叫观察者,其实就是观察者的角色。

// 分解成三步执行的代码
Observable<Integer> observable1 = Observable.create(onSubscribe1);
Observable<String> observable2 = observable1.map(func1);
observable2.subscribe(subscriber1);

这段代码的意思就是:创建一个被观察者observable1,给被观察者指定其所发布的行为(onSubscribe1来实现);
指定观察者时,只需指定相应的观察回调即可;在完成订阅的操作时,是先调用subscriber1的onStart方法,
之后通过订阅行为onSubscribe1来调用subscriber1完成相应的订阅操作;最后若出现异常则会回调subscriber1的onError方法。

换句话说就是:被观察者发布的行为是传递Integer类型的数值100,map()变换后,观察者收到了String类型的字符串"100"。

简单点说就是:观察者订阅了被观察者要发布的行为。

源码参考个人作品【图灵机器人】

现在结合源码,一步步看它到底是怎么执行的!

第一步 Observable.create(onSubscribe1)

public static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(hook.onCreate(f));
}

protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

这里面泛型比较多,先忽略<T>泛型符号;hook又是什么东东?翻译过来就是“钩子”,到底是什么钩子呢,查看源码后知道, hook 是一个 proxy 对象, 仅仅用作调试的时候可以插入一些测试代码的,那也先忽略。

所以Observable.create(onSubscribe1)干了两件事,第一 new 一个 observable1 对象,第二将 new 出的 onSubscribe1 对象通过 Observable 的构造函数赋值给它的成员变量 onSubscribe。

第二步 observable1.map(func1)

先看func1对象,RxJava有一系列的(Func+数字)的接口和一系列(Action+数字)接口,这些接口中都只有一个call方法,其中(Func+数字)接口的call方法都有返回值,而(Action+数字)接口的call方法都没有返回值,后面的那个数字表示call方法接受几个泛型类型的参数。看一下Func1和Action1的源码:

/**
 * Represents a function with one argument.
 */
public interface Func1<T, R> extends Function {
    R call(T t);
}

/**
 * A one-argument action.
 */
public interface Action1<T> extends Action {
    void call(T t);
}

Func1 和 Action(Action1继承Action) 都继承 Function 接口,Func1 接收一个 T 泛型类型的参数,call 回调后,返回一个 R 泛型类型的值,是一种“变换”函数接口,我们可以在 call 回调中处理这种“变换”的需求。
接下来看看 map(func1) 干了神马,上源码。。。

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> o) {
            // 核心代码
            Subscriber<? super T> subscriber2 = hook.onLift(operator).call(o);
            subscriber2.onStart();
            onSubscribe1.call(subscriber2);
        }
    });
}

map()函数里直接调用的lift()函数,先看看OperatorMap和lift()函数的参数Operator是什么玩意,再上源码。。。

/**
 * Operator function for lifting into an Observable.
 */
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
    // cover for generics insanity
}

public final class OperatorMap<T, R> implements Operator<R, T> {

    final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {
            @Override
            public void onCompleted() {
                o.onCompleted();
            }
            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }
            @Override
            public void onNext(T t) {
                o.onNext(transformer.call(t));
            }
        };
    }
}

Operator 继承 Func1,Operator 和 Func1 都是一种“变换”接口,比如输入Integer类型参数经过处理返回String类型值,
OperatorMap 继承 Operator,但它的参数又和Operator相反,难道经过OperatorMap又把String类型变换成Integer类型值啦,
其实 OperatorMap 类有个Func1属性transformer(transformer就是func1),执行o.onNext(transformer.call(t))就将func1变换的结果传递下去了。

lift()返回的是个新的被观察者对象observable2,同时创建一个新的OnSubscribe对象,暂时标记为onSubscribe2;
在onSubscribe2回调中调用hook.onLift(operator).call(o),变换后并生成新的观察者subscriber2对象,
新的观察者subscriber2对象被绑定到原来创建onSubscribe1对象上,可以理解为subscriber2已经订阅到经过变换后的
observable1要发布的行为,最后这种变化后的行为继续被发送到subscriber1观察者那里。

最后一步 subscribe(subscriber1)

subscriber1开始订阅被观察者的行为,也可以说被观察者的行为subscribe()时被执行;我们可以在观察者的回调中处理我们最终得到的结果;
在执行订阅后返回了Subscription对象,里面包含两个方法:

public interface Subscription {
    // 取消订阅
    void unsubscribe();
    // 订阅是否被取消
    boolean isUnsubscribed();
}

最后总结下吧,为了了解RxJava的运行机制;我使用一个简单的函数,实现的功能就是通过RxJava将Integer类型转成String类型并打印出来,并结合源码理解其执行过程;这里面泛型、接口的回调和各种角色的变换确实不好理解;接下来我会将RxJava和Retrofit2结合起来使用,
所以【图灵机器人】就这样产生啦。

参考文章:

彻底搞懂 RxJava — 基础篇
彻底搞懂 RxJava — 中级篇
彻底搞懂 RxJava — 高级篇

大头鬼Bruce
RxJava基本流程和lift源码分析

扔物线
给 Android 开发者的 RxJava 详解

备注:

observable:
adj. 显著的;觉察得到的;看得见的
n. [物] 可观察量;感觉到的事物

observer:
n. 观察者;[天] 观测者;遵守者

subscribe:
vi. 订阅;捐款;认购;赞成;签署
vt. 签署;赞成;捐助

subscriber:
n. 订户;签署者;捐献者

关于我

个人邮箱:sfsheng0322@126.com
GitHub主页
简书主页
个人博客
新浪微博

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容