Rxjava1.x 回顾入门(这一篇就够了)

Java 是响应式编程(Reactive Programming)在 Java 语言上的实现

主要内容:

RxJava 简单介绍

观察者模式实现

Subscriber 使用

Action 使用

Scheduler 使用

Function 使用

map、flatMap、concatMap 使用

filter、toList使用

关于 RxJava 2.x

RxJava 简单介绍

参考

github 地址 - ReactiveX/RxJava:https://github.com/ReactiveX/RxJava

RxJava 1.x javadoc:http://reactivex.io/RxJava/1.x/javadoc/

观察者模式 Java:http://blog.csdn.net/u012005313/article/details/72236997

官方介绍:

  RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.


  RxJava 是一个在Java虚拟机上实现的响应式扩展:一个使用可观察序列构成异步和基于事件编程的库

RxJava 扩展了观察者模式(Observer Pattern),是一个全新的编程思想(响应式编程)的体现

在学习过程中,比较难以入门。官方文档比较难以理解,网上的博客也并没有很完整的讲解,尤其现在发布了两个版本 - RxJava 1.x / RxJava 2.x, 所以理解其中编程思维还是很困难的。没办法,只能多看,多练了。

Note:目前 RxJava 有两个版本 - RxJava 1.x 和 RxJava 2.x,这两个版本并不兼容,RxJava 2.x 在 RxJava 1.x 的基础上有所发展,同时舍弃了部分内容。由于 RxJava 2.x 发行的时间不长,目前网上的资料也不太多,更多的讲解和资料都是关于 RxJava 1.x 的,所以下面先学习 RxJava 1.x 的内容,再补充 RxJava 2.x 的内容

gradle 依赖

在工程中加入 RxJava 1.x 和 RxAndroid 1.x 依赖:

compile 'io.reactivex:rxjava:1.3.0'

compile 'io.reactivex:rxandroid:1.2.1

12

观察者模式实现

介绍

Observable:被观察者类,这个类就是观察者模式中的被观察对象。当触发某些事件后,就可以向观察者推送通知

Observer:观察者接口,当订阅了 Observable,Observable 就可以调用 Observer 接口函数对其进行通知

现在用 RxJava 实现一个简单的观察者模式

首先创建一个被观察者对象:

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {

    @Override

    public void call(Subscriber<? super String> subscriber) {

        subscriber.onNext("Hello World");

        subscriber.onNext("Hi zj");

        subscriber.onCompleted();

    }

});

123456789

然后创建一个观察者对象:

Observer<String> observer = new Observer<String>() {

    @Override

    public void onCompleted() {

        Log.e(TAG, "onCompleted: ");

    }

    @Override

    public void onError(Throwable e) {

        Log.e(TAG, "onError: " + e.getMessage());

    }

    @Override

    public void onNext(String s) {

        Log.e(TAG, "onNext: s = " + s);

    }

};

12345678910111213141516

观察者对象订阅被观察者对象:

observable.subscribe(observer);

1

欧了,就是这么的简单!!!

实现功能:被观察对象调用观察者接口方法 onNext 传输字符串,然后调用 onCompleted 结束,观察者对象收到通知后,打印出得到的字符串

实现结果:

Observer 接口详解

Observer 接口有三个函数:onNext、onCompleted、onError

onNext - void onNext(T t):该方法用于被观察者向观察者的信息传送。当被观察者调用 onCompleted 或者 onError 后,将不再传输 onNext 方法到观察者

onCompleted - void onCompleted():该方法用于通知观察者,被观察者结束发送信息了。当被观察者已经调用 onError 方法后,其后的 onNext 和 onCompleted 方法都将无效

onError - void onError(java.lang.Throwable e):该方法用于通知观察者,自己遭遇了一个错误,结束发送信息了。当被观察者已经调用 onError 方法后,其后的 onNext 和 onCompleted 方法都将无效

Observable 类介绍

Observable 类内容比较复杂,还没有完全理清,下面就记录关于如何创建 Observable,以及如何进行订阅

上面用到了 Observable.create 方法:

@Deprecated

public static  Observable create(Observable.OnSubscribe f)

指定传送信息的类型(比如 String)

也可以直接输入一组数据的方法 Observable.from:

public static  Observable from(T[] array)

实现代码:

String[] arr = {"Hello World", "Hi zj"};

Observable<String> observable = Observable.from(arr);

12

或者使用方法 Observable.just:

public static  Observable just(T value)

实现代码:

Observable<String> observable = Observable.just("Hello World", "Hi zj");

1

链式编程

RxJava 可以实现链式编程,修改上面的程序如下:

Observable.just("Hello World", "Hi zj")

        .subscribe(new Observer<String>() {

            @Override

            public void onCompleted() {

                Log.e(TAG, "onCompleted: ");

            }

            @Override

            public void onError(Throwable e) {

                Log.e(TAG, "onError: " + e.getMessage());

            }

            @Override

            public void onNext(String s) {

                Log.e(TAG, "onNext: s = " + s);

            }

        });

1234567891011121314151617

Note:Observable 的信息发送和 Observer 的信息处理均发生在 subscribe(订阅)之后

Subscriber 使用

Subscriber 是一个抽象类,继承了 Observer 接口:

public abstract class Subscriber<T> extends java.lang.Object

        implements Observer<T>, Subscription

12

Subscriber 同时扩展了一些新的函数,可以使用 Subscriber 类替代 Observer 接口。

比如Subscriber 提供了手动取消订阅的方法 unsubscribe:

public final void unsubscribe()

当在函数 onCompleted 之前调用 unsubscribe,将会不再接收接下来的信息

调用 unsubscribe 还可以避免内存泄漏的风险,所以尽量在不需要使用后调用此函数

参考:RxJava的使用

也可以查询是否订阅 - isUnsubscribed:

public final boolean isUnsubscribed()

返回为 true 表明已取消订阅

Subscriber 类还新增了方法 onStart:

public void onStart()

该方法在被观察者已经和观察者连接,但还没有发出通知之前被调用,比如弹出进度条,增加有用的初始化信息

使用 Subscriber 类实现观察者模式如下:

Observable.just("Hello", " World", "Hi", " zj")

        .subscribe(new Subscriber<String>() {

            @Override

            public void onStart() {

                super.onStart();

                Log.e(TAG, "onStart: " + this.isUnsubscribed());

            }

            @Override

            public void onCompleted() {

                Log.e(TAG, "onCompleted: " + this.isUnsubscribed());

            }

            @Override

            public void onError(Throwable e) {

                Log.e(TAG, "onError: " + e.getMessage());

                Log.e(TAG, "onError: " + this.isUnsubscribed());

            }

            @Override

            public void onNext(String s) {

                Log.e(TAG, "onNext: " + s);

                if (s.equals("Hi")) {

                    this.unsubscribe();

                }

                Log.e(TAG, "onNext: " + this.isUnsubscribed());

            }

        });

123456789101112131415161718192021222324252627282930

实现结果:

Note:接下来将使用 Subscriber 替代 Observer

Action 使用

Action 是一个接口,没有任何函数或参数,但是其他 ActionX(X 表示数字)接口都继承自它,ActionX(X 表示数字)接口仅有一个函数 call,其参数随数字 X 的变化而增大

当没有参数时,可以使用 Action0 接口:

public interface Action0

        extends Action

它有单个函数 call:

void call()

1

当有单个参数时,可以使用 Action1 接口:

public interface Action1

        extends Action

它的函数 call 有一个参数:

void call(T t)

1

以此类推,直到 Action9 接口,如果需要大于 9 个参数的接口,可以使用 ActionN:

public interface ActionN

        extends Action

12

其函数 call 的参数为数组形式:

void call(java.lang.Object... args)

1

使用 Action1 替代 Observer 或者 Subscriber

从上面描述可知,可以显式创建 Observer 接口或者 Subscriber 类作为观察者对象,RxJava 同样也提供了 Action 接口来隐式处理被观察者发送的通知

public final Subscription subscribe(Action1<? super T> onNext)

public final Subscription subscribe(Action1<? super T> onNext,

                Action1<java.lang.Throwable> onError)

public final Subscription subscribe(Action1<? super T> onNext,

                Action1<java.lang.Throwable> onError,

                Action0 onCompleted)

123456

RxJava 提供了三种方式,可以仅处理观察者发出的 onNext 方法,也可以处理 onNext 和 onError,或者实现 onNext,onError,onCompleted,示例代码如下:

Action1<String> onNext = new Action1<String>() {

    @Override

    public void call(String s) {

        Log.e(TAG, "call: " + s);

    }

};

Action1<Throwable> onError = new Action1<Throwable>() {

    @Override

    public void call(Throwable throwable) {

        Log.e(TAG, "call: " + throwable.getMessage());

    }

};

Action0 onCompleted = new Action0() {

    @Override

    public void call() {

        Log.e(TAG, "call: ");

    }

};

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {

    @Override

    public void call(Subscriber<? super String> subscriber) {

        subscriber.onNext("Hello");

        subscriber.onNext("World");

        subscriber.onNext("Hi");

        subscriber.onNext("zj");

        Throwable e = new Throwable("eeeeeeee");

        subscriber.onError(e);

    }

});

Observable.just("Hello World", "Hi zj")

        .subscribe(onNext);

observable.subscribe(onNext, onError);

observable.subscribe(onNext, onError, onCompleted);

12345678910111213141516171819202122232425262728293031323334353637383940

结果:

Note:如果被观察者发送了 onError 后,必须在订阅时也实现该方法

Scheduler 使用

参考

RXjava observeOn subscribeOn 解析:http://blog.csdn.net/lazyer_dog/article/details/52586069

RxJava observeOn()与subscribeOn()的关系:http://blog.csdn.net/jdsjlzx/article/details/51685769

介绍

上面描述的程序运行均在同一线程内,使用 Scheduler 类可以实现异步处理

public abstract class Scheduler

        extends java.lang.Object

12

Scheduler 可以调度不同任务在不同的线程内。RxJava 已经封装了一些常用的 Scheduler 对象,可以在 Schedulers 类中找到:

public final class Schedulers

        extends java.lang.Object

12

当进行 io 操作时,可以使用 Schedulers.io:

public static Scheduler io()

当进行密集计算工作时,可以使用 Schedulers.computation:

public static Scheduler computation()

Note:不要混淆上面两个 Scheduler 对象,当进行计算操作时,就调用 computation;当进行io 操作时,就调用 io

* 如果仅需要新建一个线程,可以使用 Schedulers.newThread:

public static Scheduler newThread()

1

对于 Android 开发,还可以调用 AndroidThread.mainThread 来指定主线程:

public static Scheduler mainThread()

observeOn、subscribeOn 和 doOnSubscribe

subscribeOn:

public final Observable<T> subscribeOn(Scheduler scheduler)

1

官方介绍:

Asynchronously subscribes Observers to this Observable on the specified Scheduler.

If there is a create(Action1, rx.Emitter.BackpressureMode) type source up in the chain, it is recommended to use subscribeOn(scheduler, false) instead to avoid same-pool deadlock because requests pile up behind a eager/blocking emitter.

12

observeOn:

public final Observable<T> observeOn(Scheduler scheduler)

1

官方介绍:

Modifies an Observable to perform its emissions and notifications on a specified Scheduler, asynchronously with a bounded buffer of RxRingBuffer.SIZE slots.

Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly asynchronous. If strict event ordering is required, consider using the observeOn(Scheduler, boolean) overload.

12

doOnSubscribe:

public final Observable<T> doOnSubscribe(Action0 subscribe)

1

官方介绍:

Modifies the source Observable so that it invokes the given action when it is subscribed from its subscribers. Each subscription will result in an invocation of the given action except when the source Observable is reference counted, in which case the source Observable will invoke the given action for the first subscription.

1

通过网上博文学习到:

observeOn 影响的是跟在后面的操作(指定观察者运行的线程)。所以如果想要多次改变线程,可以多次使用 observeOn;

Note:调用 observeOn 后箭头颜色变化了,说明 observeOn 仅改变之后操作所在线程

subscribeOn 影响的是最开始的被观察者所在的线程。当使用多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用;

Note:subscribeOn 的使用改变了前后的箭头颜色,说明 subscribeOn 改变的是被观察者所在的线程

当被观察者需要在订阅后,运行前设置初始信息,比如进度条,可以调用 doOnSubscribe。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程

Note:调用 doOnSubscribe 后,将会运行在订阅后最开始的位置

示例:

Observable.create(new Observable.OnSubscribe<Integer>() {

    @Override

    public void call(Subscriber<? super Integer> subscriber) {

        Log.e(TAG, "create call: " + Process.myTid());

        subscriber.onNext(1);

        subscriber.onNext(2);

        subscriber.onNext(3);

        subscriber.onCompleted();

    }

})

        .subscribeOn(Schedulers.newThread())

        .observeOn(Schedulers.computation())

        .map(new Func1<Integer, Integer>() {

            @Override

            public Integer call(Integer integer) {

                Log.e(TAG, "map call: " + Process.myTid());

                return integer + 10;

            }

        })

        .doOnSubscribe(new Action0() {

            @Override

            public void call() {

                Log.e(TAG, "doOnSubscribe call: " + Process.myTid());

                Log.e(TAG, "call: Hello");

            }

        })

        .subscribeOn(AndroidSchedulers.mainThread())

        .observeOn(AndroidSchedulers.mainThread())

        .subscribe(new Action1<Integer>() {

            @Override

            public void call(Integer integer) {

                Log.e(TAG, "onNext call: " + Process.myTid());

                Log.e(TAG, "call: " + integer);

            }

        }, new Action1<Throwable>() {

            @Override

            public void call(Throwable throwable) {

                Log.e(TAG, "onError call: " + Process.myTid());

                Log.e(TAG, "call: " + throwable.getMessage());

            }

        });

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647

结果:

Function 使用

Function 接口使用和 Action 接口类似,区别只是 Function 接口中函数 call 有返回值,比如 Func1 接口:

public interface Func1<T,R>

        extends Function

12

泛型 T 表示参数类型,泛型 R 表示返回值类型。call 函数格式如下:

R call(T t)

1

map、flatMap、concatMap 使用

参考

RxJava(三) flatMap操作符用法详解:http://blog.csdn.net/johnny901114/article/details/51532776

map

public final <R> Observable<R> map(Func1<? super T,? extends R> func)

1

官方介绍:

Returns an Observable that applies a specified function to each item emitted by the source Observable and emits the results of these function applications.

1

流程图:

调用函数 map 后,可以转换传输信息的类型,比如从一个类中取出某个字段,将图像 id 值转换为 Bitmap:

Observable.just(android.R.drawable.alert_dark_frame)

        .subscribeOn(Schedulers.newThread())

        .observeOn(Schedulers.io())

        .map(new Func1<Integer, Bitmap>() {

            @Override

            public Bitmap call(Integer integer) {

                return BitmapFactory.decodeResource(getResources(), integer);

            }

        })

        .observeOn(AndroidSchedulers.mainThread())

        .subscribe(new Action1<Bitmap>() {

            @Override

            public void call(Bitmap bitmap) {

                image.setImageBitmap(bitmap);

            }

        });

12345678910111213141516

先开启新线程,传送 Integer 类型信息;

然后在 io 线程中加载图像,从而传输信息类型转为 Bitmap;

最后转回主线程,显示图像

flatmap

public final <R> Observable<R> flatMap(Func1<? super T,? extends Observable<? extends R>> func)

1

官方介绍:

Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.

1

流程图:

flatMap 方法获取了原来的被观察者发送的 item,然后转换为一个 Observable 对象,将所有转换后得到的被观察者对象收集起来后,再向下传送(具体使用场景还没有想出来,可能在实际工程中使用后会更加理解)

示例代码如下:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)

        .subscribeOn(Schedulers.newThread())

        .observeOn(Schedulers.computation())

        .flatMap(new Func1<Integer, Observable<Integer>>() {

            @Override

            public Observable<Integer> call(Integer integer) {

                return Observable.just(integer + 10)

                        .subscribeOn(Schedulers.newThread());

            }

        })

        .observeOn(AndroidSchedulers.mainThread())

        .subscribe(new Action1<Integer>() {

            @Override

            public void call(Integer integer) {

                Log.e(TAG, "call: " + integer);

            }

        });

1234567891011121314151617

Note:可以对 flatMap 方法生成的 Observable 对象调用 subscribeOn 和 observeOn,这样,每个生成的 Observable 运行在不同的线程中

concatMap

public final <R> Observable<R> concatMap(Func1<? super T,? extends Observable<? extends R>> func)

1

官方介绍:

Returns a new Observable that emits items resulting from applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then emitting the items that result from concatenating those resulting Observables.

1

流程图:

作用和 flatMap 类似,唯一区别就是返回结果是按输入顺序返回的,而 flatMap 的顺序不定。

示例代码:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)

        .subscribeOn(Schedulers.newThread())

        .observeOn(Schedulers.computation())

        .concatMap(new Func1<Integer, Observable<Integer>>() {

            @Override

            public Observable<Integer> call(Integer integer) {

                return Observable.just(integer + 10)

                        .subscribeOn(Schedulers.newThread());

            }

        })

        .observeOn(AndroidSchedulers.mainThread())

        .subscribe(new Action1<Integer>() {

            @Override

            public void call(Integer integer) {

                Log.e(TAG, "call: " + integer);

            }

        });

1234567891011121314151617

filter、toList 使用

filter

public final Observable<T> filter(Func1<? super T,java.lang.Boolean> predicate)

1

官方介绍:

Filters items emitted by an Observable by only emitting those that satisfy a specified predicate.

1

顾名思义,就是筛选被观察者发送的通知。流程图如下:

示例 - 过滤掉偶数:

Observable.just(1, 2, 3, 4, 5, 6)

        .filter(new Func1<Integer, Boolean>() {

            @Override

            public Boolean call(Integer integer) {

                return integer % 2 == 0;

            }

        })

        .subscribe(new Action1<Integer>() {

            @Override

            public void call(Integer integer) {

                Log.e(TAG, "call: " + integer);

            }

        });

12345678910111213

toList

public final Observable<java.util.List<T>> toList()

1

将被观察者发送的所有信息结合为单个列表,然后发送给观察者

流程图如下:

示例:

Observable.just(1, 2, 3, 4, 5, 6)

        .toList()

        .subscribe(new Action1<List<Integer>>() {

            @Override

            public void call(List<Integer> integers) {

                for (Integer i : integers) {

                    Log.e(TAG, "call: " + i);

                }

            }

        });

---------------------

作者:编号1993

来源:CSDN

原文:https://blog.csdn.net/u012005313/article/details/72818125

版权声明:本文为博主原创文章,转载请附上博文链接!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,776评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,527评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,361评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,430评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,511评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,544评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,561评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,315评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,763评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,070评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,235评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,911评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,554评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,173评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,424评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,106评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,103评论 2 352

推荐阅读更多精彩内容

  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 2,190评论 2 8
  • 来自于:CSDNblog.csdn.net/caihongdao123/article/details/51897...
    于加泽阅读 1,275评论 0 5
  • 转一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong阅读 911评论 0 2
  • 在正文开始之前的最后,放上GitHub链接和引入依赖的gradle代码: Github: https://gith...
    苏苏说zz阅读 677评论 0 2
  • 窗簾隔絕太久,拉開 陽光恩澤每一片樹葉,野草昂首 攀藤而上的野草早已突破陰影 在枝枒以上,沐浴生命之光 蝴蝶翩翩起...
    康飄釀阅读 301评论 0 1