Func1 和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。 Func1 和 Action 的区别在于 Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。
1.flatMapIterable: (将数据转换后再发送)
private Observable<String> flatMapIterableObserver(){
return Observable.just(1,2,3)
.flatMapIterable(new Func1<Integer, Iterable<? extends String>>() {
@Override
public Iterable<? extends String> call(Integer integer) {
ArrayList<String> strings = new ArrayList<>();
for (int i=0;i<3;i++){
strings.add("flatMapIterableObserver "+integer);
}
return strings;
}
});
}
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 1
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 1
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 1
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 2
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 2
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 2
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 3
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 3
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver 3
2.map:(将数据源Observable发送给每个数据进行指定函数转换,再将转换后的数据发送出去)
private Observable<Integer> mapObservable(){
return Observable.just(1,2,3)
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer*10;
}
});
}
09-13 19:30:11.802 32095-32095/com.example.administrator.rxjavademo E/call: map: 10
09-13 19:30:11.803 32095-32095/com.example.administrator.rxjavademo E/call: map: 20
09-13 19:30:11.803 32095-32095/com.example.administrator.rxjavademo E/call: map: 30
- flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。
- flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中,map返回的是结果集。
- flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat
- map适用于一对一转换,flatmap适用于一对多,多对多的场景
3.groupBy:(将数据源转换为groupBy筛选后的Observable对象)
private Observable<GroupedObservable<Integer,String>> groupedByStringObservable(){
return Observable.just(1,2,3,4,5,6,7,8,9)
.groupBy(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer % 2;
}
}, new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "groupBy: "+integer;
}
});
}
groupedByStringObservable().subscribe(new Action1<GroupedObservable<Integer, String>>() {
@Override
public void call(GroupedObservable<Integer, String> integerStringGroupedObservable) {
if (integerStringGroupedObservable.getKey()==0){
integerStringGroupedObservable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("call", s);
}
});
}
}
});
09-13 19:17:28.812 31719-31719/com.example.administrator.rxjavademo E/call: groupBy: 2
09-13 19:17:28.812 31719-31719/com.example.administrator.rxjavademo E/call: groupBy: 4
09-13 19:17:28.812 31719-31719/com.example.administrator.rxjavademo E/call: groupBy: 6
09-13 19:17:28.812 31719-31719/com.example.administrator.rxjavademo E/call: groupBy: 8
4.cast:(将Observable发送的数据强转成另外一种类型)
5.scan:(做一次计算,有条件、有筛选的输出最终结果)
6.throttleWithTimeout:(时间限流, 低于指定时间的都将被过滤)
private Observable<Integer> createObserver(){
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i=0;i<10;i++){
if (!subscriber.isUnsubscribed()){
subscriber.onNext(i);
}
int sleep = 100;
if (i%3==0){
sleep = 300;
}
try{
Thread.sleep(sleep);
} catch (Exception e){
e.printStackTrace();
}
}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation());
}
private Observable<Integer> throttleWithThimoutObserver(){
return createObserver().throttleWithTimeout(200,TimeUnit.MILLISECONDS);
}
对小于3的倍数的数据延迟100毫秒后发送新数据, 100毫秒小于过滤的时间,数据被过滤掉;
输出日志如下:
09-14 10:06:19.673 2916-2964/com.example.administrator.rxjavademo E/call: integer: 0
09-14 10:06:20.174 2916-2964/com.example.administrator.rxjavademo E/call: integer: 3
09-14 10:06:20.674 2916-2964/com.example.administrator.rxjavademo E/call: integer: 6
09-14 10:06:21.175 2916-2964/com.example.administrator.rxjavademo E/call: integer: 9
7.distinct:(去重,不能List元素出重, 可以list对象出重)
private Observable<Integer> distinctObserver(){
return Observable.just(1,2,4,5,6,8,4,3,2,1).distinct();
}
09-14 10:33:51.124 3710-3710/com.example.administrator.rxjavademo E/call: integer: 1
09-14 10:33:51.124 3710-3710/com.example.administrator.rxjavademo E/call: integer: 2
09-14 10:33:51.124 3710-3710/com.example.administrator.rxjavademo E/call: integer: 4
09-14 10:33:51.124 3710-3710/com.example.administrator.rxjavademo E/call: integer: 5
09-14 10:33:51.125 3710-3710/com.example.administrator.rxjavademo E/call: integer: 6
09-14 10:33:51.125 3710-3710/com.example.administrator.rxjavademo E/call: integer: 8
09-14 10:33:51.125 3710-3710/com.example.administrator.rxjavademo E/call: integer: 3
8.elementAt:(下标顺序过滤数据源)
private Observable<Integer> elementAtObserver(){
return Observable.just(1,3,8,10,9).elementAt(3);
}
09-14 11:05:48.591 5006-5006/com.example.administrator.rxjavademo E/call: integer: 10
9.filter:(根据函数进行过滤操作,返回true就往下执行,否则过滤掉 , 和last类似)
private Observable<Integer> filterObserver(){
return Observable.just(0,1,2,3,4,5,6,7,8,9)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer<3;
}
});
}
09-14 11:39:34.319 5856-5856/com.example.administrator.rxjavademo E/call: integer: 0
09-14 11:39:34.319 5856-5856/com.example.administrator.rxjavademo E/call: integer: 1
09-14 11:39:34.319 5856-5856/com.example.administrator.rxjavademo E/call: integer: 2
10.first:(只会返回第一条或者满足条件的第一条数据)
Observable.just(0,1,2,3,4,5).first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>3;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("call","integer: "+integer);
}
});
//BlockingObservable不会做任何处理,只会阻塞;
int result = Observable.just(0,1,2,3,4,5)
.toBlocking()
.first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>3;
}
});
09-14 11:56:10.987 6328-6328/com.example.administrator.rxjavademo E/call: integer: 4
11.skip和take: (skip操作符过滤掉前面的数据,take只取前面数据 ,将后面的数据全部过滤掉)
12.sample和throttleFrist:(sample一次性发送间隔的几个数据,throttleFrist间隔时间后发送一个数据)
13.join:(将两个Observable在有效的时间内拼接)
private Observable<String> getLeftObservable(){
return Observable.just("a","b","c");
}
private Observable<Long> getRightObservable(){
return Observable.just(1L,2L,3L);
}
private Observable<String> joinObserver(){
return getLeftObservable().join(getRightObservable(), new Func1<String, Observable<Long>>() {
@Override
public Observable<Long> call(String s) {
return Observable.timer(1000, TimeUnit.MILLISECONDS);
}
}, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.timer(1000, TimeUnit.MILLISECONDS);
}
}, new Func2<String, Long, String>() {
@Override
public String call(String s, Long aLong) {
return s+" : "+aLong;
}
});
}
09-14 15:08:57.223 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: b : 1
09-14 15:08:57.223 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: a : 1
09-14 15:08:57.223 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: c : 1
09-14 15:08:57.224 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: b : 2
09-14 15:08:57.224 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: a : 2
09-14 15:08:57.224 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: c : 2
09-14 15:08:57.225 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: b : 3
09-14 15:08:57.225 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: a : 3
09-14 15:08:57.225 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: c : 3
14.merge:(将多个Observable发送的数据整合在一起后发送,但发送的数据可能是错乱的,如果不想错乱可以使用concat)
private Observable<Integer> mergeObserver(){
return Observable.merge(Observable.just(1,2,3),Observable.just(4,5,6));
}
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 1
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 2
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 3
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 4
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 5
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 6
15.mergeDelayError:(类似merge ,但是遇到异常后会继续组合操作,等所有数据发送完成后才将这个异常抛出)
private Observable<Integer> mergeDelayErrorObserver(){
return Observable.mergeDelayError(Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i=0;i<5;i++){
if (i==3){
subscriber.onError(new Throwable("onError"));
}
subscriber.onNext(i);
}
}
}),Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i=0;i<5;i++){
if (i==3){
subscriber.onNext(i+5);
}
subscriber.onCompleted();
}
}
}));
}
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 0
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 1
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 2
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 3
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 4
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 8
09-14 15:44:31.090 10353-10353/com.example.administrator.rxjavademo E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.example.administrator.rxjavademo, PID: 10353
rx.exceptions.OnErrorNotImplementedException: onError
16.startWith:(需要发送的数据源前面插入数据)
private Observable<Integer> startWithObserver(){
return Observable.just(1,2,3).startWith(-1,4);
}
09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: -1
09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: 4
09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: 1
09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: 2
09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: 3
17.onErrorReturn:(捕获异常并返回了指定的字符串给订阅者)
private Observable<String> createObserver(){
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i=1;i<=6;i++){
if (i<3){
subscriber.onNext("onNext: "+i);
} else {
subscriber.onError(new Throwable("onError"));
}
}
}
});
}
private Observable<String> onErrorReturnObserver() {
return createObserver().onErrorReturn(new Func1<Throwable, String>() {
@Override
public String call(Throwable throwable) {
return "onErrorReturn";
}
});
}
09-14 16:25:28.983 11558-11558/com.example.administrator.rxjavademo E/onErrorReturnObserver: onNext: onNext: 1
09-14 16:25:28.983 11558-11558/com.example.administrator.rxjavademo E/onErrorReturnObserver: onNext: onNext: 2
09-14 16:25:28.983 11558-11558/com.example.administrator.rxjavademo E/onErrorReturnObserver: onNext: onErrorReturn
09-14 16:25:28.983 11558-11558/com.example.administrator.rxjavademo E/onErrorReturnObserver: onCompleted
18.onErrorResumeNext:(发生异常的时候, 创建新的Observable来继续发送数据)
19.onExceptionResumeNext:(类似onErrorResumeNext,不同之处是对异常数据做判断 ,如果是Exception就会使用另一个Observable代替原来的继续发数据,否则将错误分发给Subscriber)
20.retry:(发生错误的时候会重新订阅,而且可以重复多次,但是这样也就有可能造成死循环,建议指定最大重复次数)
private Observable<Integer> createObserver(){
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.e("createObserver","subscriber");
for (int i=0;i<3;i++){
if (i==2){
subscriber.onError(new Exception("Exception"));
} else {
subscriber.onNext(i);
}
}
}
});
}
createObserver().retry(2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e("retry","onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e("retry","onError: "+e.getMessage());
}
@Override
public void onNext(Integer integer) {
Log.e("retry","call: "+integer);
}
});
09-17 17:34:02.069 18668-18668/com.example.administrator.rxjavademo E/createObserver: subscriber
09-17 17:34:02.069 18668-18668/com.example.administrator.rxjavademo E/retry: call: 0
09-17 17:34:02.069 18668-18668/com.example.administrator.rxjavademo E/retry: call: 1
09-17 17:34:02.070 18668-18668/com.example.administrator.rxjavademo E/createObserver: subscriber
09-17 17:34:02.070 18668-18668/com.example.administrator.rxjavademo E/retry: call: 0
09-17 17:34:02.070 18668-18668/com.example.administrator.rxjavademo E/retry: call: 1
09-17 17:34:02.071 18668-18668/com.example.administrator.rxjavademo E/createObserver: subscriber
09-17 17:34:02.071 18668-18668/com.example.administrator.rxjavademo E/retry: call: 0
09-17 17:34:02.071 18668-18668/com.example.administrator.rxjavademo E/retry: call: 1
09-17 17:34:02.071 18668-18668/com.example.administrator.rxjavademo E/retry: onError: Exception
21.delay:(延迟发送)
22.do:(给Observable的各个阶段加上监听,执行到的时候就触发)
- doOnEach() :Observable每次发送一个数据的时候就会触发这个回调,无论Observable调用的是onNext,onError还是onCompleted.
- doOnNext: 只有Observable调用onNext 发送数据的时候才会调用;
- doOnError: 只有Observable通过onError 分发错误的时候才会触发回调,并且调用Throwble对象作为参数传递到回调函数去;
- doOnComplete:只有Observable调用doOnComplete 发送结束事件的时候才会触发回调;
- doOnSubscribe和doOnUnsubscribe: 会在Subscrible进行订阅和反订阅的时候才会触发回调;
- doOnTerminate:会在Observable结束前触发回调,无论是正常结束还是异常结束;
- finallyDo: 会在Observable结束后触发回调,无论是正常结束还是异常结束;
23.subscribeOn和observeOn:(subscribeOn是在哪个线程上订阅,也就是用subscribeOn指定要工作的线程;observeOn是在哪个线程上观察,也就是结果被使用的线程)
private Observable<Integer> observableOnserver(){
return createObserver()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.newThread());
}
private Observable<Integer> createObserver(){
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.e("createObserver","call: "+Thread.currentThread().getName());
subscriber.onNext(1);
subscriber.onCompleted();
}
});
}
private Observable<Integer> subscribeOnObserver(){
return createObserver()
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.immediate());
}
09-18 10:34:52.832 3048-3092/com.example.administrator.rxjavademo E/createObserver: call: RxComputationThreadPool-1
09-18 10:34:52.833 3048-3092/com.example.administrator.rxjavademo E/computation: call: 1 RxComputationThreadPool-1
09-18 10:34:52.843 3048-3091/com.example.administrator.rxjavademo E/createObserver: call: RxNewThreadScheduler-1
09-18 10:34:52.844 3048-3048/com.example.administrator.rxjavademo E/mainThread: call: 1 main
24.using:(创建一个在Observable生命周期内存活的资源,但是Observable终止的时候,资源也会被销毁)
25.all:(对Observable发送的所有数据进行判断,如果全部满足就返回true,否则返回false)
26.amb:(最多将9个Observable结合起来,看哪个先发送(包括onError和onComplete),后发送的将被丢弃)
27.contains:(判断发送的所有数据有没有包含某个数据,如果包含就返回true,Observable没发送完所有数据前不会返回数据)
28.isEmpty:(判断Observable是否发送过数据,如果发送过了就返回false;如果Observavble已经结束了都还没发送这个数据,则返回true)
29.concat:(将发送的数据组合起来,类似startWith和merge)
30.from:(接收一个对象作为参数来创建Observable,参数对象可以是Iterable,Callable,Future和数组)
31.just:(接收对象作为输入,然后创建一个发送该对象的Observable,对象可以是数字,字符串,数组,Iterate对象等)
- from()创建方式和just()操作符类似,但是just操作符创建的Observable会将整个参数对象作为数据一下子发送出去,例如参数是个含有10个数字的数组,使用from创建Observable就会发送10次,而just创建的Observable会一次将整个数组发送出去;
- 一般如果用from转换多个数据,比如 ArrayList等包含多个数据的数组或者列表, just用于处理单个的数据。
- from 遇到只包含一个数据的时候,会使用just创建Observable; just 遇到多于一个的情况,会使用from 创建 Observable
32.自定义操作符:
A. 可以多次调用Subscriber的onNext方法,但是同个数据只能调用一次;
B. 可以调用Subscriber的onComplete或者onError方法,但是这两个方法是互斥的,调用了其中一个就不能调用另一个,并且一旦调用了两者中的任何一个方法就不能调用onNext方法;
C. 如果无法保证无法保证上面两条原则,可以对自定义操作符加上serialize操作符,这个操作符会强制性发送正确的数据;
D. 自定义操作内部不能阻塞;
E.如果有异常的时候,不能继续发送正常的数据,要立刻调用Subscriber的onError() 来将异常抛出;
F.null也属于一种数据, 可以正常发送,和完全不发送是两回事;
G.如果通过组合多个Rxjava原生操作符就能达到目的, 就不要使用自定义操作符实现;例如:
- first(操作符是通过take(1).single()来实现的;
- ignoreElements()是通过filter(alwaysFalse())来实现的;
- reduce(a)是通过scan(a).last()来实现的;
参考:
- RxJava响应式编程
- RxJavaX GitHub
- RxJavaX 官方
- RxJavaX 中文文档
- 给 Android 开发者的 RxJava 详解