RxJava2.0之旅(二)---创建操作符

基于RxJava2

1 常用的创建操作符

常用的创建操作符

2 补充(文字)

2.1 基本创建与快速创建

  • 快速创建1个被观察者对象
  • 直接发送 传入的事件
  • 数组集合遍历
类型 方法 作用 备注
基本创建 create() 完整创建一个被观察者对象(Observable) RxJava中创建被观察者对象最基本的操作符
快速创建 just() 快速创建后,直接发送传入的事件 最多只能发送10个参数
fromArray() 快速创建后,直接发送 传入的数组数据 可发送10个以上事件 (数组形式)
fromIterable() 快速创建后 直接发送 传入的集合List数据 可发送10个以上事件 (集合形式)
empty() 快速创建后,Comolete事件,直接通知完成
error() 快速创建后,仅发送Error事件,直接通知异常
never() 快速创建后不发送任何事件

示例

//集合遍历
private void initDara() {
    dataList = new ArrayList<>();
    dataList.add("123");
    dataList.add("456");
    dataList.add("789");
    dataList.add("101112");
}
// 2. 通过fromIterable()将集合中的对象 / 数据发送出去
private void traverse() {
    Observable.fromIterable(dataList).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "集合遍历");
        }

        @Override
        public void onNext(String value) {
            Log.d(TAG, "集合中的数据元素 = " + value);
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "对Error事件作出响应");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "遍历结束");
        }
    });
}
// 下列方法一般用于测试使用

<-- empty()  -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成
Observable observable1=Observable.empty(); 
// 即观察者接收后会直接调用onCompleted()

<-- error()  -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
// 可自定义异常
Observable observable2=Observable.error(new RuntimeException())
// 即观察者接收后会直接调用onError()

<-- never()  -->
// 该方法创建的被观察者对象发送事件的特点:不发送任何事件
Observable observable3=Observable.never();
// 即观察者接收后什么都不调用

2.2 延迟创建

类型 方法 作用 备注
延迟创建 defer() 直到有观察者(Observer)订阅时,才动态创建被观察者对象(Observable)&发送事件 * 通过Observable工厂方法创建被观察者(Observable) * 每次订阅后,都会得到一个新创建的最新的Observable对象,这可以确保Observable对象里的数据都是最新的
timer() 快速创建后,延迟指定时间后,发送1个数值0(Long类型) * 本质=延迟指定时间后,调用一次onNext(0) * 注:timer操作符默认运行在一个新线程上
interval() 快速创建后,每隔指定的时间就发送事件 发送的事件序列=从0开始、无限递增1的整数序列
intervalRange() 快速创建后,每隔指定时间就发送事件,可指定发送的数据的数据量 发送的事件序列=从start开始递增1的整数序列(可指定发送的数据的数据量count)
range() 快速创建后,连续发送1个事件序列,可指定范围 * 无延迟发送事件 * 从start开始递增1的整数序列(可指定发送的数据的数据量count
rangeLong() 类似于range(),区别在于该方法支持数据类型long

2.2.1 defer

private void defer() {
    //1. 第1次对i赋值
    i = 10;

    // 2. 通过defer 定义被观察者对象
    // 注:此时被观察者对象还没创建
    Observable<Integer> defer = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {

        @Override
        public ObservableSource<? extends Integer> call() throws Exception {
            return Observable.just(i);
        }
    });
    //2. 第2次对i赋值
    i = 15;

    //3. 观察者开始订阅
    // 注:此时,才会调用defer()创建被观察者对象(Observable)
    defer.subscribe(new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "开始采用subscribe连接");
        }

        @Override
        public void onNext(Integer value) {
            Log.d(TAG, "接收到的整数是" + value);
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "对Error事件作出响应");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "对Complete事件作出响应");
        }
    });

}
TIM截图20180531185545.png

2.2.2 timer

// 注:timer操作符默认运行在一个新线程上
// 也可自定义线程调度器(第3个参数):timer(long,TimeUnit,Scheduler) 

// 该例子 = 延迟2s后,发送一个long类型数值
private void timer() {
    Observable.timer(2, TimeUnit.SECONDS)
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "开始采用subscribe连接");
                }

                @Override
                public void onNext(Long value) {
                    Log.d(TAG, "接收到了事件" + value);
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "对Error事件作出响应");
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }

            });

}
TIM截图20180531185441.png

2.2.3 interval

发送的事件序列 = 从0开始、无限递增1的的整数序列

//每隔指定时间 就发送 事件
private void interval() {
    // 参数说明:
    // 参数1 = 第1次延迟时间;
    // 参数2 = 间隔时间数字;
    // 参数3 = 时间单位;
    Observable.interval(3, 2, TimeUnit.SECONDS)
            // 该例子发送的事件序列特点:延迟3s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个)
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "开始采用subscribe连接");
                }
                // 默认最先调用复写的 onSubscribe()

                @Override
                public void onNext(Long value) {
                    Log.d(TAG, "接收到了事件" + value);
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "对Error事件作出响应");
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }
            });
}
TIM截图20180601105651.png

2.2.4 intervalRange

可指定发送的数据的数量

private void intervalRange() {
    // 参数说明:
    // 参数1 = 事件序列起始点;
    // 参数2 = 事件数量;
    // 参数3 = 第1次事件延迟发送时间;
    // 参数4 = 间隔时间数字;
    // 参数5 = 时间单位
    Observable.intervalRange(3, 10, 2, 1, TimeUnit.SECONDS)
            // 该例子发送的事件序列特点:
            // 1. 从3开始,一共发送10个事件;
            // 2. 第1次延迟2s发送,之后每隔2秒产生1个数字(从0开始递增1,无限个)
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "开始采用subscribe连接");
                }
                // 默认最先调用复写的 onSubscribe()

                @Override
                public void onNext(Long value) {
                    Log.d(TAG, "接收到了事件" + value);
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "对Error事件作出响应");
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }

            });
}
TIM截图20180601115349.png

2.2.5 range()

无延迟发送事件

private void range() {
    // 参数说明:
    // 参数1 = 事件序列起始点;
    // 参数2 = 事件数量;
    // 注:若设置为负数,则会抛出异常
    Observable.range(3, 5)
            // 该例子发送的事件序列特点:从3开始发送,每次发送事件递增1,一共发送5个事件
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "开始采用subscribe连接");
                }
                // 默认最先调用复写的 onSubscribe()

                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "接收到了事件" + value);
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "对Error事件作出响应");
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }

            });
}
TIM截图20180601135651.png

参考

https://www.jianshu.com/p/a406b94f3188

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Rxjava2.0概述 通过前面的文章介绍,读者对RxJava2.0应该有了初步的认识。RxJava2.0相对1....
    嘎啦果安卓兽阅读 4,085评论 0 2
  • 本文首发于“随手记技术团队”公众号 大概从2015年开始,RxJava1.0开始快速流行起来,短短两年时间,RxJ...
    HolenZhou阅读 2,214评论 0 19
  • 疏疏黄叶寻碧壤,飞廉为君送银装。 碗底罡风溶翠色,窗外残枝引茶香。 翰墨飞白今古事,展氏《游春》见垂杨。 敢问年间...
    木九洲阅读 792评论 0 1
  • 又是这个梦,李元很无奈,这个梦已经做了几个月了。同样的环境,同样的人,李元甚至知道自己下一秒钟会碰到什么样的事,说...
    木头windy阅读 676评论 4 12
  • 2016年8月 我只身独行,却仿佛带着一万雄兵 2016年7月 推荐阅读: 我的小学我的乡村教师 - 简书 好的老...
    学校教育篇阅读 283评论 0 0

友情链接更多精彩内容