- buffer
- buffer(int):将源Observable每n个作为一个list发射
注意:- 最后发射的list的数量可能少于n
- 当源Observable触发onError()时,buffer会立即执行onError()
-
buffer(int,int)
buffer(n,m):每次发射n个数据,每隔m个数据忽略
- buffer(Func0)
-
buffer(Observable)
buffer(Observable,int) - buffer(Observable,Func1)
-
buffer(long,TimeUnit)
buffer(long,TimeUnit,Scheduler) -
buffer(long,TimeUnit,int)
buffer(long,TimeUnit,int,Scheduler) -
buffer(long,long,TimeUnit)
buffer(long,long,TimeUnit,Scheduler)
-
flatMap
flatMap(Func1):Func1将一个源Observable转换成多个Observable,并返回一个Observable,flatMap将返回的结果融合成自己的序列进行发射。
注意:顺序可能会乱,如果需要保证顺序可以用ConcatMap
flatMap(Func1,int):第二个参数会限制观察者的最大数量,当达到最大限制时,若有订阅者订阅,会等待其他的观察者结束
注意:当源Observable触发onError()时,会立即执行onError()
-
GroupBy
groupBy(Func1)
groupBy(Func1,Func1)
将数据转换成Observables,每一个Observable发射一个子序列
-
Map
map(Func1):将源数据通过func1转化后发射其结果
scan
-
scan(Func2)
示例代码:
Observable.just(1, 2, 3, 4, 5)
.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer sum, Integer item) {
return sum + item;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
- scan(R,Func2):提供第一次调用的数据R
- window
-
window(closingSelector)
-
window(windowOpenings, closingSelector)
-
window(count)
-
window(count, skip)