一、 拆分使用
先创建被观察者和观察者,然后建立订阅关系,这样在观察者中就会接收到个生命周期的回调:
@Test
public void test(){
//1. 创建被观察者
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
// 发送消息
e.onNext(1);
e.onNext(2);
e.onComplete();
}
});
//2. 创建观察者
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("建立订阅关系");
}
@Override
public void onNext(Integer integer) {
//接受到消息
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("完成");
}
};
//3. 建立订阅关系
observable.subscribe(observer);
}
运行结果:
建立订阅关系
1
2
完成
二、 链式调用(一般都是这种写法):
@Test
public void test2() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("建立订阅关系");
}
@Override
public void onNext(Integer integer) {
//接受到消息
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("完成");
}
});
}
三、更简单的观察者
@Test
public void test3() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
}
Consumer
相对Observer
简化了很多,没有了onSubscribe()
onError ()
onComplete ()
,当然也无法对这些进行监听了。
四、创建操作符
上面用的creat
是创建被观察者的一种操作符,另外常用的还有just
、justArrat
、range
、empty
,直接看运行结果去理解就好了。
empty
这里说下,这个使用场景比如一个耗时操作不要任何数据反馈去更新UI,只是显示和隐藏加载动画。(先不用去纠结耗时操作在哪里添加)
@Test
public void test4() {
System.out.println("-----------------just");
Observable.just(1, 2, 3).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
System.out.println("-----------------fromArray");
Observable.fromArray(new Integer[]{1,2,3}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
System.out.println("-----------------range");
Observable.range(0, 3).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
System.out.println("-----------------empty");
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("建立订阅关系");
}
@Override
public void onNext(Object object) {
//接受到消息
System.out.println(object);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("完成");
}
});
运行结果:
-----------------just
1
2
3
-----------------fromArray
1
2
3
-----------------range
0
1
2
-----------------empty
建立订阅关系
完成
五、合并操作符
合并操作是指合并被观察者,用同一个观察者去接受,常用的有concatWith
、startWith
、concat
、merge
、zip
,这里为了显示出合并的区别,用了另一个创建创建操作符intervalRange
,比如Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS)
,这个代表从0开始发送10个数,延迟0秒后开始执行,每1秒发送一次。
用这两个被观察者测试上面几个合并操作符:
//发送0-4
Observable observable1 = Observable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS);
//发送10-14
Observable observable2 = Observable.intervalRange(10, 5, 0, 1, TimeUnit.SECONDS);
测试函数:
private void concatWith() {
Log.e(TAG, "-----------------concatWith");
observable1.concatWith(observable2).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "concatWith: " + aLong);
}
});
}
private void startWith() {
Log.e(TAG, "-----------------startWith");
observable1.startWith(observable2).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "startWith: " + aLong);
}
});
}
public void concat() {
Log.e(TAG, "-----------------concat");
Observable.concat(observable1,observable2).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "concat: " + aLong);
}
});
}
public void merge() {
Log.e(TAG, "-----------------merge");
Observable.merge(observable1,observable2).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "merge: " + aLong);
}
});
}
运行结果:
E/MainActivity: -----------------concatWith
E/MainActivity: accept: 0
E/MainActivity: accept: 1
E/MainActivity: accept: 2
E/MainActivity: accept: 3
E/MainActivity: accept: 4
E/MainActivity: accept: 10
E/MainActivity: accept: 11
E/MainActivity: accept: 12
E/MainActivity: accept: 13
E/MainActivity: accept: 14
E/MainActivity: -----------------startWith
E/MainActivity: accept: 10
E/MainActivity: accept: 11
E/MainActivity: accept: 12
E/MainActivity: accept: 13
E/MainActivity: accept: 14
E/MainActivity: accept: 0
E/MainActivity: accept: 1
E/MainActivity: accept: 2
E/MainActivity: accept: 3
E/MainActivity: accept: 4
E/MainActivity: -----------------concat
E/MainActivity: accept: 0
E/MainActivity: accept: 1
E/MainActivity: accept: 2
E/MainActivity: accept: 3
E/MainActivity: accept: 4
E/MainActivity: accept: 10
E/MainActivity: accept: 11
E/MainActivity: accept: 12
E/MainActivity: accept: 13
E/MainActivity: accept: 14
E/MainActivity: -----------------merge
E/MainActivity: -----------------merge
E/MainActivity: accept: 10
E/MainActivity: accept: 0
E/MainActivity: accept: 1
E/MainActivity: accept: 11
E/MainActivity: accept: 2
E/MainActivity: accept: 12
E/MainActivity: accept: 3
E/MainActivity: accept: 13
E/MainActivity: accept: 14
E/MainActivity: accept: 4
根据上面结果总结:
-
concatWith
和startWith
是执行的先后顺序不一样,是同步执行的 -
concatWith
和concat
都是顺序执行,只是写法不一样 -
concat
和merge
写法一样,但是merge
是异步的,两个被观察者没有先后顺序,各自执行。
还有一种zip
操作符,把被观察者合并时一一对应,直接看使用方式:
private void zip() {
Observable observable1 = Observable.just("语文", "数学", "英语");
Observable observable2 = Observable.just("100", "80", "60");
Observable.zip(observable1, observable2, new BiFunction() {
@Override
public Object apply(Object o, Object o2) throws Exception {
return o.toString() + ":" + o2.toString();
}
})
.subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.e(TAG, "accept: " + o);
}
});
}
运行结果:
E/MainActivity: accept: 语文:100
E/MainActivity: accept: 数学:80
E/MainActivity: accept: 英语:60
六、变换操作符
常见的有map
、concatMap
、flatMap
、groupBy
、buffer
先通过最简单的map
来看看变换操作符是干什么的
private void map() {
Log.e(TAG, "-----------------map");
Observable.just(1)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "转化为String" + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String string) throws Exception {
Log.e(TAG, "merge: " + string);
}
});
}
运行结果:
E/MainActivity: -----------------map
E/MainActivity: accept: 转化为String1
也就是说map里可以把被观察者传递过来的数据转换成另一种数据格式传递给观察者,这里是Integer转String,比如你也可以被观察者传递过来一个URL,在Function直接网络请求,转化成请求结果给观察者。
扯多了,继续看上面的操作符flatMap
:
private void flatMap() {
Observable.just(1)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(final Integer integer) throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("转化为String" + integer);
e.onNext("我还可以再发送" + integer);
e.onNext("我还可以随便发送" + integer);
}
});
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String string) throws Exception {
Log.e(TAG, "accept: " + string);
}
});
}
运行结果:
E/MainActivity: accept: 转化为String1
E/MainActivity: accept: 我还可以再发送1
E/MainActivity: accept: 我还可以随便发送1
这个相对map更灵活,map是的Function里直接返回的是转换之后的数据,一对一的,而flatMap的Function返回的是另一个被观察者,所以这个可以在里面随意发送给观察者。
在用concatMap
之前先看flatMap
的另一种操作:
private void flatMap2() {
Observable.just(1, 2, 3)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add(integer + "." + (1 + i));
}
return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS);
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String string) throws Exception {
Log.e(TAG, "accept: " + string);
}
});
}
在收到被观察者发来的数据后,生产一个List再延迟1s发送给观察者,看下运行结果:
E/MainActivity: accept: 2.1
E/MainActivity: accept: 2.2
E/MainActivity: accept: 2.3
E/MainActivity: accept: 1.1
E/MainActivity: accept: 3.1
E/MainActivity: accept: 3.2
E/MainActivity: accept: 3.3
E/MainActivity: accept: 1.2
E/MainActivity: accept: 1.3
每个都是先.1再.2再.3没错,但是整体并没有按照1、2、3顺序执行,说明他们是异步执行的,类似合并操作符中的merge(其实内部调用的就是merge)。看完这个问题,就可以猜到concatMap
的作用了,就不贴了,是完全按顺序同步输出的。
然后来groupBy
操作符:
private void group() {
Observable.just(20, 40, 60, 80, 100)
.groupBy(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer >= 60 ? "及格" : "不及格";
}
})
.subscribe(new Consumer<GroupedObservable<String, Integer>>() {
@Override
public void accept(final GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: " + integer + ":" + stringIntegerGroupedObservable.getKey());
}
});
}
});
}
输出结果:
E/MainActivity: accept: 20:不及格
E/MainActivity: accept: 40:不及格
E/MainActivity: accept: 60:及格
E/MainActivity: accept: 80:及格
E/MainActivity: accept: 100:及格
buffer
操作符:
private void buffer() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 100; i++) {
e.onNext(i);
}
e.onComplete();
}
})
.buffer(20)
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
}
运行结果:
E/MainActivity: accept: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
E/MainActivity: accept: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
E/MainActivity: accept: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
E/MainActivity: accept: [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
E/MainActivity: accept: [80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
这两种没什么特殊的,groupBy
是按条件分组, buffer
是分批发送。
七、过滤操作符
filter
、take
、distinct
、elementAl
//条件筛选,输出B、C
public void filter() {
Observable.just("A", "B", "C")
.filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
if ("A".equals(s)) {
return false;
}
return true;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: " + s);
}
});
}
//用于停止定时器,输出0、1、2、3、4
public void take() {
Observable.interval(1, TimeUnit.SECONDS)
.take(5)// 5次之后停下
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "accept: " + aLong);
}
});
}
//过滤重复,输出1、2、3
public void distinct() {
Observable.just(1,1,2,3,3)
.distinct()
.subscribe(new Consumer<Integer>() { // 下游 观察者
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
}
//制定发送角标,输出B
public void elementAt() {
Observable.just("A", "B", "C")
.elementAt(1, "X")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
}
八、条件操作符
any
、all
、contains
,这些就是改变Java中if的书写方式,与、或、包含
all:全部为true,才是true,只要有一个为false,就是false
any:全部为 false,才是false, 只要有一个为true,就是true
contains :是否包含
//等于Java中if的连续判断,有一个等于C就返回false,输出false
public void all() {
Observable.just("A", "B", "C", "D")
.all(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return !s.equals("C");
}
})
.subscribe(new Consumer<Boolean>() { // 下游 观察者
@Override
public void accept(Boolean s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
}
//判断包含
public void contains() {
Observable.just("A", "B", "C", "D")
.contains("C")
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
}
//和上面的All相反,有一个等于C就返回true,输出true
public void any() {
Observable.just("A", "B", "C", "D")
.any(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return !s.equals("C");
}
})
.subscribe(new Consumer<Boolean>() { // 下游 观察者
@Override
public void accept(Boolean s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
}
九、异常处理操作符
onErrorReturn
、onErrorResumeNext
、onExceptionResumeNext
、retry
先模拟个错误:
public void onError() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 10; i++) {
if (i == 5) {
e.onError(new Throwable("模拟一个错误"));
}
e.onNext(i);
}
e.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
运行结果:
D/MainActivity: onNext: 0
D/MainActivity: onNext: 1
D/MainActivity: onNext: 2
D/MainActivity: onNext: 3
D/MainActivity: onNext: 4
D/MainActivity: onError: 模拟一个错误
上面代码会在观察者的onError中收到回调,然后来看一下异常操作符能干什么,先看onErrorReturn
和onErrorResumeNext
,区别就是onErrorReturn
发送一次,onErrorResumeNext
可以任意发,跟上面很多其他的操作符一样:
private void onErrorReturn() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 10; i++) {
if (i == 5) {
e.onError(new Throwable("模拟一个错误"));
}
e.onNext(i);
}
e.onComplete();
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return 400;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
运行结果:
D/MainActivity: onNext: 400
D/MainActivity: onComplete:
onErrorResumeNext:
public void onErrorResumeNext() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 10; i++) {
if (i == 5) {
e.onError(new Throwable("模拟一个错误"));
}
e.onNext(i);
}
e.onComplete();
}
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(400);
e.onNext(4000);
e.onNext(40000);
e.onComplete();
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
运行结果:
D/MainActivity: onNext: 0
D/MainActivity: onNext: 1
D/MainActivity: onNext: 2
D/MainActivity: onNext: 3
D/MainActivity: onNext: 4
D/MainActivity: onNext: 400
D/MainActivity: onNext: 4000
D/MainActivity: onNext: 40000
D/MainActivity: onComplete:
这里两个注意点:
-
onErrorReturn
发生error后会自动调用onComplete()
,而onErrorResumeNext
需要根据需要手动调用 - 都不会再触发观察者的
onError()
回调,除非onErrorResumeNext
中再手动调用e.onError()
然后看下onExceptionResumeNext
代码:
public void onExceptionResumeNext() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 10; i++) {
if (i == 5) {
e.onError(new Throwable("模拟一个错误"));
}
e.onNext(i);
}
e.onComplete();
}
})
.onExceptionResumeNext(new ObservableSource<Integer>() {
@Override
public void subscribe(Observer<? super Integer> observer) {
observer.onNext(400);
observer.onNext(4000);
observer.onNext(40000);
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
运行结果:
D/MainActivity: onNext: 0
D/MainActivity: onNext: 1
D/MainActivity: onNext: 2
D/MainActivity: onNext: 3
D/MainActivity: onNext: 4
D/MainActivity: onError: 模拟一个错误
跟onErrorResumeNext
的运行结果对比,很明显没有400、4000、40000,说明新的Observer并不会起作用,这里用的是Throwable
,如果是用Exception,同样也会有400、4000、40000,所以:onErrorResumeNext
和onExceptionResumeNext
对Exception的处理是一样的流程,区别在于对Error处理的时候,是否会使用新的Observer发送消息,也就是onExceptionResumeNext
不处理Error,直接回调观察者的onError ()
,onErrorResumeNext
都处理,不会再调用观察者的onError ()
。
然后是retry这个操作符,这个很简单,贴出三种常用的:
public void retry() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 10; i++) {
if (i == 5) {
e.onError(new IllegalAccessError("模拟错误"));
}
e.onNext(i);
}
e.onComplete();
}
})
//不设置重试次数
.retry( new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
//true表示不停地重试 , false表示不重试
return true;
}
})
// //设置重试次数
// .retry(3, new Predicate<Throwable>() {
// @Override
// public boolean test(Throwable throwable) throws Exception {
// //true表示按设置的次数重试 , false表示不重试
// return true;
// }
// })
//
// //可获取重试次数
// .retry(new BiPredicate<Integer, Throwable>() {
// @Override
// public boolean test(Integer integer, Throwable throwable) throws Exception {
// //相对上面两种,这个integer表示重试次数, 返回值跟上面一样
// return true;
// }
// })
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
十、线程切换
默认发送和接收都是在主线程:
private void schedulers() {
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.e(TAG, "发送: " + Thread.currentThread().getName());
e.onNext("123");
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "接收: " + Thread.currentThread().getName());
}
});
}
输出:
E/MainActivity: 发送: main
E/MainActivity: 接收: main
可以通过subscribeOn()
会同时修改观察者和被观察者的线程,通过observeOn()
只设置观察者线程,通过AndroidSchedulers.mainThread()
得到主线程,通过Schedulers.io()
得到子线程:
private void schedulers() {
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.e(TAG, "发送: " + Thread.currentThread().getName());
e.onNext("123");
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "接收: " + Thread.currentThread().getName());
}
});
}
这一段,先通过subscribeOn(Schedulers.io())
把观察者和被观察者都设置到子线程,如果不写下面这句observeOn(AndroidSchedulers.mainThread())
,会输出:
E/MainActivity: 发送: RxCachedThreadScheduler-1
E/MainActivity: 接收: RxCachedThreadScheduler-1
但是下面又用observeOn(AndroidSchedulers.mainThread())
把观察者改回子线程,所以输出:
E/MainActivity: 发送: RxCachedThreadScheduler-1
E/MainActivity: 接收: main
十一、背压模式
当上下游运行在不同的线程中,且上游发射数据的速度大于下游接收处理数据的速度时,就会产生背压问题,内存使用越来越多,这时候就需要用Flowable去处理。Flowable会对上游发送的时间进行缓存,缓存池也满了(超出128)的时候会有4种不通的处理方式:
- BackpressureStrategy.ERROR:就会抛出异常
- BackpressureStrategy.DROP:把后面发射的事件丢弃
- BackpressureStrategy.LATEST:把前面发射的事件丢弃
- BackpressureStrategy.BUFFER:这种不会有上限,但是如果上游发送太多,也会造成内存使用越来越大
Flowable的使用跟Observable很类似,简单使用:
private void backpressure() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
// 改成129就会崩溃
for (int i = 0; i < 128; i++) {
e.onNext(i); // todo 1
}
e.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "接收: " + integer);
}
});
}
然后看一种完整模式的观察者:
private void backpressure() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
// 改成129就会崩溃
for (int i = 0; i < 128; i++) {
e.onNext(i); // todo 1
}
e.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "接收: " + integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
}
这一会发现观察者收不到任何消息,这里跟Observable有个区别,就是订阅的方法subscribe()
的参数,Observable订阅对应的是Observer,而Flowable对应的是Subscriber,Observer和Subscriber对应的回调onSubscribe(..)
参数不同,Subscriber的onSubscribe(..)
参数拿到的是一个Subscription,这个需要主动去取数据,比如:
@Override
public void onSubscribe(Subscription s) {
s.request(10);
}
这样就会onNext()
中就会收到前10个。那这个使用就很灵活了,根据代码需要,可以在需要的地方主动调用s.request(..),让观察者接收到数据。
十二、一个展示网络图片的例子
private void getImage(final String path) {
Observable.just(path)
// 通过map变换操作符把String转换成Bitmap
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Exception {
URL url = new URL(path);
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
httpURLConnection.setConnectTimeout(5000);
int responseCode = httpURLConnection.getResponseCode();
if (HttpURLConnection.HTTP_OK == responseCode) {
Bitmap bitmap = BitmapFactory.decodeStream(httpURLConnection.getInputStream());
return bitmap;
}
return null;
}
})
// 下载图片在子线程中
.subscribeOn(Schedulers.io())
// 设置图片在主线程中
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Bitmap>() {
@Override
// 开始操作前
public void onSubscribe(Disposable d) {
progressDialog = new ProgressDialog(MainActivity.this);
progressDialog.setMessage("正在下载中...");
progressDialog.show();
}
@Override
// 收到Bitmap
public void onNext(Bitmap bitmap) {
if (imageView != null) {
imageView.setImageBitmap(bitmap);
}
}
@Override
// 下载错误
public void onError(Throwable e) {
if (progressDialog != null) {
progressDialog.dismiss();
}
if (imageView != null) {
imageView.setImageResource(R.mipmap.ic_launcher);
}
Log.e(TAG, "onError: " + e.toString());
}
@Override
// 下载完成
public void onComplete() {
if (progressDialog != null) {
progressDialog.dismiss();
}
}
});
}