基于RxJava2
1 常用的创建操作符
常用的创建操作符
2 补充(文字)
2.1 基本创建与快速创建
- 快速创建1个被观察者对象
- 直接发送 传入的事件
- 数组集合遍历
类型 | 方法 | 作用 | 备注 |
---|---|---|---|
基本创建 | create() | 完整创建一个被观察者对象(Observable) | RxJava中创建被观察者对象最基本的操作符 |
快速创建 | just() | 快速创建后,直接发送传入的事件 | 最多只能发送10个参数 |
fromArray() | 快速创建后,直接发送 传入的数组数据 | 可发送10个以上事件 (数组形式) | |
fromIterable() | 快速创建后 直接发送 传入的集合List数据 | 可发送10个以上事件 (集合形式) | |
empty() | 快速创建后,Comolete事件,直接通知完成 | ||
error() | 快速创建后,仅发送Error事件,直接通知异常 | ||
never() | 快速创建后不发送任何事件 |
示例
//集合遍历
private void initDara() {
dataList = new ArrayList<>();
dataList.add("123");
dataList.add("456");
dataList.add("789");
dataList.add("101112");
}
// 2. 通过fromIterable()将集合中的对象 / 数据发送出去
private void traverse() {
Observable.fromIterable(dataList).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "集合遍历");
}
@Override
public void onNext(String value) {
Log.d(TAG, "集合中的数据元素 = " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "遍历结束");
}
});
}
// 下列方法一般用于测试使用
<-- empty() -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成
Observable observable1=Observable.empty();
// 即观察者接收后会直接调用onCompleted()
<-- error() -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
// 可自定义异常
Observable observable2=Observable.error(new RuntimeException())
// 即观察者接收后会直接调用onError()
<-- never() -->
// 该方法创建的被观察者对象发送事件的特点:不发送任何事件
Observable observable3=Observable.never();
// 即观察者接收后什么都不调用
2.2 延迟创建
类型 | 方法 | 作用 | 备注 |
---|---|---|---|
延迟创建 | defer() | 直到有观察者(Observer)订阅时,才动态创建被观察者对象(Observable)&发送事件 | * 通过Observable工厂方法创建被观察者(Observable) * 每次订阅后,都会得到一个新创建的最新的Observable对象,这可以确保Observable对象里的数据都是最新的 |
timer() | 快速创建后,延迟指定时间后,发送1个数值0(Long类型) | * 本质=延迟指定时间后,调用一次onNext(0) * 注:timer操作符默认运行在一个新线程上 | |
interval() | 快速创建后,每隔指定的时间就发送事件 | 发送的事件序列=从0开始、无限递增1的整数序列 | |
intervalRange() | 快速创建后,每隔指定时间就发送事件,可指定发送的数据的数据量 | 发送的事件序列=从start开始递增1的整数序列(可指定发送的数据的数据量count) | |
range() | 快速创建后,连续发送1个事件序列,可指定范围 | * 无延迟发送事件 * 从start开始递增1的整数序列(可指定发送的数据的数据量count | |
rangeLong() | 类似于range(),区别在于该方法支持数据类型long |
2.2.1 defer
private void defer() {
//1. 第1次对i赋值
i = 10;
// 2. 通过defer 定义被观察者对象
// 注:此时被观察者对象还没创建
Observable<Integer> defer = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(i);
}
});
//2. 第2次对i赋值
i = 15;
//3. 观察者开始订阅
// 注:此时,才会调用defer()创建被观察者对象(Observable)
defer.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到的整数是" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
}
TIM截图20180531185545.png
2.2.2 timer
// 注:timer操作符默认运行在一个新线程上
// 也可自定义线程调度器(第3个参数):timer(long,TimeUnit,Scheduler)
// 该例子 = 延迟2s后,发送一个long类型数值
private void timer() {
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
}
TIM截图20180531185441.png
2.2.3 interval
发送的事件序列 = 从0开始、无限递增1的的整数序列
//每隔指定时间 就发送 事件
private void interval() {
// 参数说明:
// 参数1 = 第1次延迟时间;
// 参数2 = 间隔时间数字;
// 参数3 = 时间单位;
Observable.interval(3, 2, TimeUnit.SECONDS)
// 该例子发送的事件序列特点:延迟3s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
}
TIM截图20180601105651.png
2.2.4 intervalRange
可指定发送的数据的数量
private void intervalRange() {
// 参数说明:
// 参数1 = 事件序列起始点;
// 参数2 = 事件数量;
// 参数3 = 第1次事件延迟发送时间;
// 参数4 = 间隔时间数字;
// 参数5 = 时间单位
Observable.intervalRange(3, 10, 2, 1, TimeUnit.SECONDS)
// 该例子发送的事件序列特点:
// 1. 从3开始,一共发送10个事件;
// 2. 第1次延迟2s发送,之后每隔2秒产生1个数字(从0开始递增1,无限个)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
}
TIM截图20180601115349.png
2.2.5 range()
无延迟发送事件
private void range() {
// 参数说明:
// 参数1 = 事件序列起始点;
// 参数2 = 事件数量;
// 注:若设置为负数,则会抛出异常
Observable.range(3, 5)
// 该例子发送的事件序列特点:从3开始发送,每次发送事件递增1,一共发送5个事件
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
}
TIM截图20180601135651.png