作为一开发者,既然没有造轮子的能力,那就先看看轮子的构造吧
操作符
#创建操作
- just
just将单个数据转换为发射那个数据的
Observable Observable.just(1,2,3)
- Timer
创建一个Observable,它再一个给定的延迟后发射一个特殊的值(数字0)。
Observable.timer(2, TimeUnit.SECONDS)
#变换操作
Map
操作符对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的observable-
Buffer
buffer(3, TimeUnit.SECONDS).//每隔3秒 取出消息 Observable.range(1, 5).buffer(5, 5) --> onNext:[1,2,3,4,5] Observable.range(1, 5).buffer(5, 1) --> onNext:[1,2,3,4,5] --> onNext:[2,3,4,5] --> onNext:[3,4,5] --> onNext:[4,5] --> onNext:[5] 一次全部订阅 Observable.range(1,5).buffer(5) --> onNext:[1, 2, 3, 4, 5] 一次订阅2个 Observable.range(1,5).buffer(2) onNext:[1, 2]-->onNext:[3, 4]-->onNext:[5]
Scan
连续地对数据序列的每一项应用一个函数,然后连续发射结果 操作符对原始Observable发射的第一个数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。
#过滤操作
- Take
只发射前面的N项数据 使用Take操作符让你可以修改Obserable的行为,只返回前面的N项数据,然后发射完成通知,忽略剩余的数据
private void doSomeWork() {
getObservable()
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.take(3)
.subscribe(getObserver());
}
- Filter
只发射通过了测试的数据项
Observable.just(1, 2, 3, 4, 5, 6)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;
}
})
.subscribe(getObserver());
- Skip
抑制Observable发射的前N项数据(skip(2)),只保留之后的数据
#Concat 不交错的发射两个或多个Observable的发射物
Concat操作符连接多个Observable的输出,就好像它们是一个Observable,第一个Observable发射的所有数据在第二个Observable发射的任何数据前面,以此类推。
#辅助操作
- Delay
延迟一段指定的时间 再来发射来自Observable的发射物
Single
#介绍
Rxjava(以及它派生出来的RxGroovy和RxScale)中有一个名为SIngle的Observable变种
Single类似于Observable,不同的是,它总是发射一个值,或者一个错误通知,而不是发射一系列的值。
因此,不同于Obserable需要三个方法onNext,onError,onCompleted,订阅Single只需要两个方法
Single只会调用这两个方法的一个,而且只会调用一次,调用了任何一个方法之后,订阅关系终止
- onSuccess - Single发射单个的值到这个方法
- onError - 如果无法发射需要的值,Single发射一个Throwable对象到这个方法