RxJava(三)操作符的使用

一:创建操作符
1.create:建议你在传递给create方法的函数中检查观察者的isUnsubscribed状态,以便在没有观察者的时候,让你的Observable停止发射数据或者做昂贵的运算。
       Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                if (!e.isDisposed()){
                  e.onNext("1");
                  e.onNext("2");
                  e.onNext("3");
                }
                e.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.d(TAG,s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
2.Defer :操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个Observable,事实上每个订阅者获取的是它们自己的单独的数据序列。

在某些情况下,等待直到最后一分钟(就是知道订阅发生时)才生成Observable可以确保Observable包含最新的数据。


public class CourseThreeActivity extends AppCompatActivity {
    private final static  String TAG = "CourseThreeActivity";
    private int num = 10;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_course_three);
        Observable<Integer> observable = Observable.just(num);
        num =20;
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG,integer+"");
            }
        };
        observable.subscribe(subscriber);
    }
}

运行结果如下

20244-20244/com.pse.rxandroid E/CourseThreeActivity: 10

可见当我们调用Just 时该Observerable 的数据已经固定。即使在下面做出更改也不会造成影响。
那我们再来看看defer

public class CourseThreeActivity extends AppCompatActivity {
    private final static  String TAG = "CourseThreeActivity";
    private int num = 10;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_course_three);
        Observable<Integer> observable = Observable.defer(new Func0<Observable<Integer>>() {
            @Override
            public Observable<Integer> call() {
                return Observable.just(num);
            }
        });
        num =20;
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG,integer+"");
            }
        };
        observable.subscribe(subscriber);
    }
}

运行结果如下

21658-21658/com.pse.rxandroid E/CourseThreeActivity: 20

现在想必大家能够理解defer 的作用了吧(等待订阅,获取最新值)。

3.From :将其它种类的对象和数据类型转换为Observable

使用方法如下:

   Integer[] items = { 0, 1, 2, 3, 4, 5 };
        Observable myObservable = Observable.from(items);

        myObservable.subscribe(
                new Action1<Integer>() {
                    @Override
                    public void call(Integer item) {
                        System.out.println(item);
                    }
                },
                new Action1<Throwable>() {
                    @Override
                    public void call(Throwable error) {
                        System.out.println("Error encountered: " + error.getMessage());
                    }
                },
                new Action0() {
                    @Override
                    public void call() {
                        System.out.println("Sequence complete");
                    }
                }
        );

结果大家可想而知,不过在这里有一个新的操作符Action0,其实看过源码就会知道Action可以有多个参数,方便我们来重写,处理自己的需求,这里我们只是模拟Observer创建了 onNext(),onError(),onComplete().

4.Range:创建一个发射特定整数序列的Observable

示例代码如下

  Observable.range(5,10).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.e(TAG,integer+"");
            }
        });

这里我们只是为了测试,所以只处理onNext()事件。
运行结果如下

03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 5
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 6
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 7
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 8
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 9
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 10
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 11
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 12
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 13
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 14
5.Repeat :创建一个发射特定数据重复多次的Observable
  Observable<String> observable = Observable.just("hell Word");
        observable.repeat(4).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.e(TAG,s);
            }
        });

运行结果如下,会连续发射4四次Observerable(同一个)

03-07 14:13:11.723 17500-17500/com.pse.rxandroid E/CourseThreeActivity: hell Word
03-07 14:13:11.725 17500-17500/com.pse.rxandroid E/CourseThreeActivity: hell Word
03-07 14:13:11.725 17500-17500/com.pse.rxandroid E/CourseThreeActivity: hell Word
03-07 14:13:11.725 17500-17500/com.pse.rxandroid E/CourseThreeActivity: hell Word
6.Timer:创建一个Observerable,并在一定的延时后发射一个特定的值“0”

示例如下

        Observable<Long> observable = Observable.timer(2, TimeUnit.SECONDS);
        observable.subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                Log.e(TAG,aLong+"");
            }
        });

运行结果如下

26279-26302/com.pse.rxandroid E/CourseThreeActivity: 0

在日常中我们经常做倒计时操作,之前使用的是Handler 现在你可以尝试改用RX实现了。

7.MAP():最重要的变换操作符,对Observable发射的每一项数据应用一个函数,执行变换操作

大家可以试着理解下这个图:



上面这张图我可以举个例子,比如幼儿园举行活动,我们要给每个孩子穿上礼服,那么这个map()就是我们换衣服这个方法,所有的孩子经过map()后都会穿上华丽的礼服。这样的方式我们就会很快理解上图的意义。
示例如下

     Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                    Log.e(TAG,1+"");
                    subscriber.onNext(1);
                    subscriber.onCompleted();
            }
        }).map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return integer+"||";
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.e(TAG,s);
            }
        });
03-07 14:38:08.358 7900-7900/? E/CourseThreeActivity: 1
03-07 14:38:08.358 7900-7900/? E/CourseThreeActivity: 1||

从上面看出来,我们发射的数据是整数类型,但经过Map()后就变为String类型,这个操作符默认不在任何特定的调度器上执行。(有序)

8.FlatMap():将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable,听着有点绕口,那么我们看下示例

我们先在页面建立以下代码

  Observable.just(1, 2, 3, 4, 5, 6, 7)
                .flatMap(new Func1<Integer, Observable<String>>() {
                    @Override
                    public Observable<String> call(Integer integer) {
                        return getObserverable(integer);
                    }
                }).subscribe(
                new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e(TAG,s);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.e(TAG,throwable.getMessage());
                    }
                }
        );

    private Observable<String> getObserverable(final int id) {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("This is the" + id);
                subscriber.onCompleted();
            }
        });
    }

结果如下

om.pse.rxandroid E/CourseThreeActivity: This is the1 
om.pse.rxandroid E/CourseThreeActivity: This is the2 
om.pse.rxandroid E/CourseThreeActivity: This is the3 
om.pse.rxandroid E/CourseThreeActivity: This is the4 
om.pse.rxandroid E/CourseThreeActivity: This is the5 
om.pse.rxandroid E/CourseThreeActivity: This is the6 
om.pse.rxandroid E/CourseThreeActivity: This is the7 
9.Merge合并多个Observables的发射物

示例代码如下


Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
Observable<Integer> evens = Observable.just(2, 4, 6);

Observable.merge(odds, evens)
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

运行结果如下

Next: 1
Next: 3
Next: 5
Next: 2
Next: 4
Next: 6
Sequence complete.

以上是我们日常开发要用到的一些操作符,但是RxJava 可不止这些,如有需要可以去这里学习。
RxJava中文学习文档

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容

  • 本篇文章介主要绍RxJava中操作符是以函数作为基本单位,与响应式编程作为结合使用的,对什么是操作、操作符都有哪些...
    嘎啦果安卓兽阅读 2,853评论 0 10
  • 作者: maplejaw本篇只解析标准包中的操作符。对于扩展包,由于使用率较低,如有需求,请读者自行查阅文档。 创...
    maplejaw_阅读 45,642评论 8 93
  • 前言 按照官方的分类,操作符大致分为以下几种: Creating Observables(Observable的创...
    小玉1991阅读 1,043评论 0 1
  • 创建操作 用于创建Observable的操作符Create通过调用观察者的方法从头创建一个ObservableEm...
    rkua阅读 1,816评论 0 1
  • RxJava正在Android开发者中变的越来越流行。唯一的问题就是上手不容易,尤其是大部分人之前都是使用命令式编...
    刘启敏阅读 1,850评论 1 7