1. Create (可以用于获取一个被观察者的对象)
Observable.create((ObservableOnSubscribe<String>) emitter -> {
/**
* 开始发送事件
*/
boolean disposed = emitter.isDisposed();
emitter.onNext(" 发射数据一"+ "\n" );
mRxOperatorsText.append("发射数据一" + "\n");
emitter.onNext(" 发射数据二" );
mRxOperatorsText.append("发射数据二" + "\n");
emitter.onNext(" 发射数据三" );
mRxOperatorsText.append("发射数据三" + "\n");
emitter.onComplete(); // 事件完成
emitter.onNext(" 发射数据四" );
mRxOperatorsText.append("发射数据四" + "\n");
emitter.onNext(" 发射数据五" );
mRxOperatorsText.append("发射数据五" + "\n");
emitter.onNext(String.valueOf(disposed)+ "\n");
mRxOperatorsText.append(String.valueOf(disposed) + "\n");
// emitter.onError(new RuntimeException());
}).subscribe(new Observer<String>() {
/**
* 开始发送事件的订阅
* @param d
*/
@Override
public void onSubscribe(Disposable d) {
boolean disposed = d.isDisposed();
mRxOperatorsText.append(String.valueOf(disposed) + "\n");
mRxOperatorsText.append("开始发送事件的订阅" + "\n");
}
/**
* 事件发送中
* @param s
*/
@Override
public void onNext(String s) {
mRxOperatorsText.append("接受事件"+s+ "\n");
}
/**
* 事件发送失败
* @param e
*/
@Override
public void onError(Throwable e) {
mRxOperatorsText.append("事件发送失败"+ "\n");
}
/**
* 事件发送完成
*/
@Override
public void onComplete() {
mRxOperatorsText.append("事件发送完成"+ "\n");
}
});
运行结果
09-19 14:36:39.707 3120-3120/com.nanchen.rxjava2examples E/RxCreateActivity:
false
开始发送事件的订阅
发射数据一
接受事件 发射数据一
发射数据二
接受事件 发射数据二
发射数据三
接受事件 发射数据三
09-19 14:36:39.717 3120-3120/com.nanchen.rxjava2examples E/RxCreateActivity: 事件发送完成
发射数据四
事件完成
发射数据五
false
注意事件 :
- 1.在开始发送事件的时候不能同时调用
emitter.onError(); //事件异常
跟emitter.onComplete(); // 事件完成
不然程序会出现运行异常。 - 2.当发送事件中 发射数据 三后 当调用了
emitter.onComplete(); // 事件完成
这个后,发现 事件还在发送但是接受事件中并没有接受事。 - 3.Disposable 这事件的作用是可以直接调用切断,可以看到,当它的 isDisposed() 返回为 false 的时候,接收器能正常接收事件,但当其为 true 的时候,接收器停止了接收。所以可以通过此参数动态控制接收事件了。 这是Rxjava 2.0 里面新添加的。
2. map (作用是对上游发送的每一个事件应用一个函数,使得每一个事件都按照指定的函数去变化)
这是用Lambda表达式来写
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
Log.e(TAG, "int型的数据 1" + "\n");
emitter.onNext(1);
Log.e(TAG, "int型的数据 2" + "\n");
emitter.onNext(2);
Log.e(TAG, "int型的数据 3" + "\n");
emitter.onNext(3);
Log.e(TAG, "int型的数据 4" + "\n");
emitter.onNext(4);
Log.e(TAG, "int型的数据 5" + "\n");
emitter.onNext(5);
}).map(integer -> "转成字符型" + integer)
.subscribe(s -> Log.e(TAG, s + "\n"));
// 正常的写法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("accept : " + s + "\n");
Log.e(TAG, "accept : " + s + "\n");
}
});
运行结果
09-19 14:58:24.497 3345-3345/com.nanchen.rxjava2examples E/RxMapActivity:
int型的数据 1
转成字符型1
int型的数据 2
转成字符型2
int型的数据 3
转成字符型3
int型的数据 4
转成字符型4
int型的数据 5
转成字符型5
注意事件 :
map 基本作用就是将一个 Observable 通过某种函数关系,转换为另一种 Observable,上面例子中就是把我们的 Integer 数据变成了 String 类型。从Log日志显而易见。 里面的map 可以多次调用操作是同一Observable。
3. FlatMap(将一个发送事件的上游Observable变换成多个发送事件的Observables, 然后将它们发射的时间合并后放进一个单独的Observable里)
-
这里FlatMap 做一个解释说明,其实我在刚开看着个操作符,根本不知道说的是什么意思。
1.用图来表达一下。
先看看上游, 上游发送了三个事件, 分别是1,2,3, 注意它们的颜色.
中间flatMap的作用是将圆形的事件转换为一个发送矩形事件和三角形事件的新的上游Observable.
还是不能理解? 别急, 再来看看分解动作:
上游每发送一个事件, flatMap都将创建一个新的水管, 然后发送转换之后的新的事件, 下游接收到的就是这些新的水管发送的数据. 这里需要注意的是, flatMap并不保证事件的顺序, 也就是图中所看到的, 并不是事件1就在事件2的前面. 如果需要保证顺序则需要使用。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list)
//延时10毫秒,第一个参数是数值,第二个参数是事件单位
.delay(10,TimeUnit.MILLISECONDS);
}
}) .subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, s);
}
});
如代码所示, 我们在flatMap中将上游发来的每个事件转换为一个新的发送三个String事件的水管, 为了看到flatMap结果是无序的,所以加了10毫秒的延时, 来看看运行结果吧:
运行结果
09-19 16:21:21.837 3819-3849/com.nanchen.rxjava2examples E/RxFlatMapActivity:
I am value 2
I am value 2
I am value 2
I am value 4
I am value 4
09-19 16:21:21.837 3819-3848/com.nanchen.rxjava2examples E/RxFlatMapActivity:
I am value 1
I am value 4
I am value 1
I am value 1
I am value 3
I am value 3
I am value 3
注意事件 :
- 1.flatMap并不保证事件的顺序
4.concatMap (作用和flatMap几乎一模一样,唯一的区别是它能保证事件的顺序)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
}
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return
// 此方法接收一个继承自Iterable接口的参数,简单的说就是java中的集合类。因此你可以传入一个list集合等等
Observable.fromIterable(list)
//延时两秒,第一个参数是数值,第二个参数是事件单位
.delay(10,TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, s);
}
});
运行结果
09-19 16:35:33.937 3965-3991/com.nanchen.rxjava2examples E/RxConcatMapActivity:
I am value 1
I am value 1
I am value 1
09-19 16:35:33.947 3965-3992/com.nanchen.rxjava2examples E/RxConcatMapActivity:
I am value 2
I am value 2
I am value 2
09-19 16:35:33.957 3965-3991/com.nanchen.rxjava2examples E/RxConcatMapActivity:
I am value 3
I am value 3
I am value 3
09-19 16:35:33.967 3965-3992/com.nanchen.rxjava2examples E/RxConcatMapActivity:
I am value 4
I am value 4
I am value 4
5. Zip(专用于合并事件,该合并不是连接,而是两两配对,也就意味着,最终配对出的 Observable 发射事件数目只和少的那个相同)
-
zip操作符就是合并多个被观察者的数据流, 然后发送(Emit)最终合并的数据。借用网上的一张图,分析的比较透彻
从上游中可以看出,上游有两根水管,其中一根水管负责发送圆形事件 , 另外一根水管负责发送三角形事件 , 通过Zip操作符, 使得圆形事件 和三角形事件 合并为了一个矩形事件 . 拆分过程如下:
组合的过程是分别从 两根水管里各取出一个事件 来进行组合, 并且一个事件只能被使用一次, 组合的顺序是严格按照事件发送的顺利 来进行的, 也就是说不会出现圆形1 事件和三角形B 事件进行合并, 也不可能出现圆形2 和三角形A 进行合并的情况.
最终下游收到的事件数量 是和上游中发送事件最少的那一根水管的事件数量 相同. 这个也很好理解, 因为是从每一根水管 里取一个事件来进行合并, 最少的 那个肯定就最先取完 , 这个时候其他的水管尽管还有事件 , 但是已经没有足够的事件来组合了, 因此下游就不会收到剩余的事件了.
/**
事件一
*/
private Observable<String> getStringObservable() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
if (!e.isDisposed()) {
e.onNext("A");
Log.e(TAG, "String emit : A \n");
e.onNext("B");
Log.e(TAG, "String emit : B \n");
e.onNext("C");
Log.e(TAG, "String emit : C \n");
}
}
});
}
/**
事件二
*/
private Observable<Integer> getIntegerObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(1);
Log.e(TAG, "Integer emit : 1 \n");
e.onNext(2);
Log.e(TAG, "Integer emit : 2 \n");
e.onNext(3);
Log.e(TAG, "Integer emit : 3 \n");
e.onNext(4);
Log.e(TAG, "Integer emit : 4 \n");
e.onNext(5);
Log.e(TAG, "Integer emit : 5 \n");
}
}
});
}
Observable.zip(getIntegerObservable(),getStringObservable(), new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer+s;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "zip : accept : " + s + "\n");
}
});
运行结果
09-19 16:45:08.947 4053-4053/com.nanchen.rxjava2examples E/RxZipActivity:
Integer emit : 1
Integer emit : 2
Integer emit : 3
Integer emit : 4
Integer emit : 5
zip : accept : 1A
String emit : A
zip : accept : 2B
String emit : B
zip : accept : 3C
String emit : C
6.doOnNext(让订阅者在接收到数据前干点事情的操作符)
Observable.just(1, 2, 3, 4)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "doOnNext 保存 " + integer + "成功" + "\n");
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "doOnNext :" + integer + "\n");
}
});
运行结果
09-19 17:00:26.167 4053-4053/com.nanchen.rxjava2examples E/RxDoOnNextActivity:
doOnNext 保存 1成功
doOnNext :1
doOnNext 保存 2成功
09-19 17:00:26.177 4053-4053/com.nanchen.rxjava2examples E/RxDoOnNextActivity:
doOnNext :2
doOnNext 保存 3成功
doOnNext :3
doOnNext 保存 4成功
doOnNext :4
注意事件
1 是指每发射一件事之前做的操作
- just 这个操作符是指若干个相同 的参数,
7. filter(过滤操作符,取正确的值)
- Filter 你会很常用的,它的作用也很简单,过滤器嘛。可以接受一个参数,让其过滤掉不符合我们条件的值.
Observable.just(1, 20, 65, -5, 7, 19)
.filter(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer integer) throws Exception {
return integer >= 10;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "filter : " + integer + "\n");
}
});
运行结果
09-19 17:25:53.987 4053-4053/com.nanchen.rxjava2examples E/RxFilterActivity:
filter : 20
filter : 65
filter : 19
可以看到,我们过滤器舍去了小于 10 的值,所以最好的输出只有 20, 65, 19。
7. skip( 代表跳过 count 个数目开始接收。)
Observable.just(1,2,3,4,5)
.skip(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "skip : "+integer + "\n");
}
});
运行结果
09-19 17:27:15.067 4053-4053/com.nanchen.rxjava2examples E/RxSkipActivity:
skip : 3
skip : 4
skip : 5
从log里面可以看到 是从第二位数开始 接收数据。 (不计算0位)
8. take(用于指定订阅者最多收到多少数据)
Flowable.just(1,2,3,4,5)
.take(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("take : "+integer + "\n");
Log.e(TAG, "accept: take : "+integer + "\n" );
}
});
运行结果
09-19 17:38:14.877 4053-4053/com.nanchen.rxjava2examples E/RxTakeActivity:
accept: take : 1
accept: take : 2
从log里面可以看到 ,只接受前两位的发射事件。
9distinct(去重操作符,其实就是简单的去重)
Observable.fromArray(1,1,1,1,1,1,12,2,3,3,3,4,4,5,6,5,6,7,8,9,0)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "distinct : " + integer + "\n");
}
});
运行结果
09-19 17:51:46.917 4273-4273/com.nanchen.rxjava2examples E/RxDistinctActivity:
distinct : 1
distinct : 12
distinct : 2
distinct : 3
distinct : 4
distinct : 5
distinct : 6
distinct : 7
distinct : 8
distinct : 9
distinct : 0
Log 日志显而易见,我们在经过 dinstinct() 后接收器接收到的事件只有1,12,2,3,4,5,6,7,8,,9,0了。
10 last(操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。)
Observable.just(1, 2, 3)
.takeLast(0)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("last : " + integer + "\n");
Log.e(TAG, "last : " + integer + "\n");
}
});
运行结果
09-19 17:56:29.137 4460-4460/com.nanchen.rxjava2examples E/RxLastActivity:
last : 3
最后需要说明一下 Observable
跟 Flowable
的区别
在rxjava2.x时代,上述的背压逻辑全部挪到Flowable里了,所以说Flowable支持背压。而2.x时代的Observable是没有背压的概念的,Observable如果来不及消费会死命的缓存直到OOM,所以rxjava2.x的官方文档里面有讲,大数据流用Flowable,小数据流用Observable 。