Android RxJava之创建操作符(二)

作用

  • 用于创建被观察者(Observable)对象和发送事件。

类型

create()

        
        //1. 通过create()创建被观察者对象
        Observable.create(new ObservableOnSubscribe<Integer>() {
            //2. 在复写的subscribe()里定义需要发送的事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                
            } //3. 至此一个观察者对象(Observable)就创建完毕
        }).subscribe(new Observer<Integer>() {
            //4. 以下步骤仅为展示一个完整的demo,可以忽略。
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

just()

        // 1. 创建时传入整型1、2、3、4
        //在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
        Observable.just(1, 2, 3, 4)
                // 2. 至此,一个Observable对象创建完毕,以下步骤仅为展示一个完整demo,可以忽略。
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

fromArray()

        //方式一:发送事件
        
        //1. 设置需要传入的数组
        Integer[] items1 = {1, 2, 3, 4};
        //2. 创建被观察者对象(Observable)时传入数组
        //在创建后就会将该数组转换成Observable和发送该对象中的所有数据
        //3. 可发送10个以上参数
        //若直接传递一个List集合进去,会直接把List当做一个数据元素发送。
        Observable.fromArray(items1)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });
        //方式二:数组遍历
        
        //1. 设置需要传入的数组
        Integer[] items2 = {1, 2, 3, 4};
        //2. 创建被观察者对象(Observable)时传入数组
        //在创建后就会将该数组转换成Observable和发送该对象中的所有数据
        //3. 可发送10个以上参数
        //若直接传递一个List集合进去,会直接把List当做一个数据元素发送。
        Observable.fromArray(items2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "数组遍历");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "数组中的元素" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "遍历结束");
                    }
                });
image

fromIterable()

        
        //方式一:快速发送集合
        
        //1. 设置一个集合
        List<Integer> list1 = new ArrayList<>();
        list1.add(1);
        list1.add(2);
        list1.add(3);
        //2. 通过fromIterable()将集合中的对象/数据发送出去。
        Observable.fromIterable(list1)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

        //方式二:集合遍历

        //1. 设置一个集合
        List<Integer> list2 = new ArrayList<>();
        list2.add(1);
        list2.add(2);
        list2.add(3);
        //2. 通过fromIterable()将集合中的对象/数据发送出去。
        Observable.fromIterable(list2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "集合遍历");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "集合中的数据元素" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "遍历结束");
                    }
                });
image

empty()

        //1. 一般用于测试使用
        //2. 该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成
        //3. 即观察者接收后会直接调用onCompleted()方法
        Observable observable = Observable.empty();

error()

        //1. 一般用于测试使用
        //2. 该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
        //3. 即观察者接收后会直接调用onError()方法
        Observable observable = Observable.error(new RuntimeException());

never()

        //1. 一般用于测试使用
        //2. 该方法创建的被观察者对象发送事件的特点:不发送任何事件
        //3. 即观察者接收后什么都不调用
        Observable observable = Observable.never();

defer()

        // 1. 第1次对i赋值 
        Integer i = 10;
        // 通过defer 定义被观察者对象
        //注:此时的被观察者对象还没创建
         Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
             @Override
             public ObservableSource<? extends Integer> call() throws Exception {
                 return Observable.just(i);
             }
         });
        // 2. 第2次对i赋值
        i = 15;
        
        // 3. 观察者开始订阅
        // 注,此时才会调用defer()创建被观察者对象(Observable)
        // 当观察者调订阅时,才创建Observable,并且针对每个观察者创建都是一个新的
        // Observable。以何种方式创建这个Observable对象,当满足回调条件后,就会进行
        // 相应的回调
        observable.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到的整数是" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });

timer()

        // 该例子 = 延迟2s后,发送一个long类型数值
        // timer操作符默认运行在一个新线程上
        // 也可以自定义线程调度器(第3个参数):timer(long,TimeUnit,Scheduler)
        Observable.timer(2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

interval()

        // 参数说明
        // 参数1 = 第一次延迟时间
        // 参数2 = 间隔时间数字
        // 参数3 = 时间单位
        // 注:intervalm默认在computation调度器上执行
        //也可以自定义执行线程调度器(第3个参数):interval(long,TimeUnit,Scheduler)
        
        Observable.interval(3, 1, TimeUnit.SECONDS)
                // 该例子发送到事件序列特点:延迟3后发送,每隔1产生1个数字(从0开始递增,无限个)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe连接");
                    }
                    // 默认最先调用复写的 onSubscribe()

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });
![](https://upload-images.jianshu.io/upload_images/944365-3db189b868dc2463.gif)

intervalRange()

        /*
        * 参数1 = 事件序列起始点
        * 参数2 = 事件数量
        * 参数3 = 第一次事件延迟发送时间
        * 参数4 = 间隔时间数字
        * 参数5 = 时间单位
        * */
        Observable
                .intervalRange(3, 10, 2, 1, TimeUnit.SECONDS)
                /*
                * 该例子发送到事件序列特点:
                * 1. 从3开始,一共发送10个事件
                * 2. 第一次延迟2s发送,之后每隔2s产生1个数字(从0开始递增1,无限个)
                * */
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe连接");
                    }
                    // 默认最先调用复写的 onSubscribe()

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

range()

        /*
        * 参数1 = 事件序列起始点
        * 参数2 = 事件数量
        * 注:若设置为负数,则会怕抛出异常
        * */
        Observable
                .range(3,10)
                // 该例子发送的事件序列特点:从3开始发送,每次发送事件递增,一共发送10个事件
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        
                    }

                    @Override
                    public void onNext(Integer integer) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

rangeLong()

  • 类似range(),区别在于该方法支持数据类型Long
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352