工程导入
compile 'io.reactivex.rxjava2:rxjava:2.1.5'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
Observable:在观察者模式中称为“被观察者”;
Observer:观察者模式中的“观察者”,可接收Observable发送的数据;
subscribe:订阅,观察者与被观察者,通过subscribe()方法进行订阅;
Subscriber:也是一种观察者,在2.0中 它与Observer没什么实质的区别,不同的是 Subscriber要与Flowable(也是一种被观察者)联合使用,该部分内容是2.0新增的。Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable
1. 最简单的天气观测
当天气发生变化时,把天气变化发送给各个观测者
创建天气
enum Weather{
WIND,SUNSHINE
}
模拟天气变化
Observable<Weather> observable = new Observable<Weather>() {
@Override
protected void subscribeActual(Observer<? super Weather> observer) {
observer.onNext(Weather.WIND);
observer.onNext(Weather.SUNSHINE);
}
};
用户
Observer<Weather> user = new Observer<Weather>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Weather o) {
Log.e("tag","天气:"+o);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
};
订阅
observable.subscribe(user);
日志输出:
tag: 天气:WIND
tag: 天气:SUNSHINE
更简单的写法
Observable.create(new ObservableOnSubscribe<Weather>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Weather> e) throws Exception {
e.onNext(Weather.WIND);
e.onNext(Weather.SUNSHINE);
}
}).subscribe(new Observer<Weather>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Weather weather) {
Log.e("tag","天气:"+weather);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Observable.just(Weather.WIND,Weather.SUNSHINE).subscribe(new Observer<Weather>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Weather weather) {
Log.e("tag","天气:"+weather);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Weather weather[]={Weather.WIND,Weather.SUNSHINE};
Observable.fromArray(weather).subscribe(new Observer<Weather>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Weather weather) {
Log.e("tag","天气:"+weather);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
2. 操作符
map
数据类型转换,一对一
Weather weather[] = {Weather.WIND, Weather.SUNSHINE};
Observable.fromArray(weather)
//第一个泛型为接收参数的数据类型,第二个泛型为转换后要发射的数据类型即把weather转为string
.map(new Function<Weather, String>() {
@Override
public String apply(@NonNull Weather weather) throws Exception {
return weather.toString();
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String weather) {
Log.e("tag", "天气:" + weather);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
flatMap
flatMap和map有共同点的,都是将一个对象转换为另一个对象,不同的是map只是一对一的转换,而flatMap可以是一对多的转换,并且是转换为另外一个Observable对象!
enum Weather {
WIND, SUNSHINE,RAIN,SNOW
}
Weather weather1[] = {Weather.WIND, Weather.SUNSHINE};
Weather weather2[] = {Weather.RAIN, Weather.SNOW};
List<Weather[]> list = new ArrayList<>();
list.add(weather1);
list.add(weather2);
Observable.fromIterable(list)
.flatMap(new Function<Weather[], ObservableSource<Weather>>() {
@Override
public ObservableSource<Weather> apply(@NonNull Weather[] weathers) throws Exception {
return Observable.fromArray(weathers);
}
}).subscribe(new Consumer<Weather>() {
@Override
public void accept(Weather weather) throws Exception {
Log.e("tag","天气:"+weather);
}
});
输出日志:
tag: 天气:WIND
tag: 天气:SUNSHINE
tag: 天气:RAIN
tag: 天气:SNOW
调度器
android主线程:AndroidSchedulers.mainThread()
线程切换:
Observable.fromIterable(list)
.flatMap(new Function<Weather[], ObservableSource<Weather>>() {
@Override
public ObservableSource<Weather> apply(@NonNull Weather[] weathers) throws Exception {
return Observable.fromArray(weathers);
}
})
.subscribeOn(Schedulers.io())//指定观察物的运行线程
.observeOn(AndroidSchedulers.mainThread())//指定观测者收到结果的运行线程
.subscribe(new Consumer<Weather>() {
@Override
public void accept(Weather weather) throws Exception {
Log.e("tag","天气:"+weather);
}
});
doOnSubscribe
如果情景为:打开进度dialog,并进行网络请求,需要使用
Observable.fromIterable(list)
.flatMap(new Function<Weather[], ObservableSource<Weather>>() {
@Override
public ObservableSource<Weather> apply(@NonNull Weather[] weathers) throws Exception {
return Observable.fromArray(weathers);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Toast.makeText(MainActivity.this,"模拟dialog",Toast.LENGTH_SHORT).show();
}
})
.subscribe(new Consumer<Weather>() {
@Override
public void accept(Weather weather) throws Exception {
Log.e("tag","天气:"+weather);
}
});