简介
- RxJava是基于观察者模式的,能链式调用的一套API。其中包含多个操作符的类型,比如创建操作符、转换操作符、组合操作符、功能操作符、过滤操作符、条件操作符。本文就创建操作符展开详细说明。在用法和达到的效果做一个详解。
创建操作符
观察者与被观察者
- Observer
RxJava中使用Observer来表示观察者
- 创建观察者
private void createObserve() {
observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//当观察者与被观察者进行订阅的时候调用次方法
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
//当订阅的被观察者中调用了onNext方法发送事件时候,调用此方法
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
// 当发送事件出现异常的时候,被观察者调用了onError个,会执行此方法,与omComplete方法互斥,代用了此方法,将不会接受到其他的事件。
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
//当事件成功发送完毕的时候,调用此方法,该方法与onerror互斥的。
Log.d(TAG, "onComplete: ");
}
};
}
- Observable
常规操作一般使用Observable作为被观察者
private void createObservable() {
// Create 创建一个被观察者
observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
//通过ObservableEmitter发射器来发送事件序列到观察者中。
emitter.onNext("dashingqi");
emitter.onComplete();
}
});
}
订阅
- subscribe
- observable.subscribe(observer);
观察者与被观察者进行订阅,当被观察者中发生变化,观察者中根据变化做出响应。
create操作符
- create操作是最常用的创建被观察者的操作符。
- 方法原型
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
- 使用方法
private void createObservable() {
// Create 创建一个被观察者
observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("dashingqi");
emitter.onComplete();
}
});
}
/**
* 创建观察者
*/
private void createObserve() {
observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
}
/**
* 订阅
*/
private void subscribe() {
//订阅
observable.subscribe(observer);
}
- 运行结果
2019-06-12 09:40:58.913 12925-12925/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onSubscribe:
2019-06-12 09:40:58.913 12925-12925/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: dashingqi
2019-06-12 09:40:58.913 12925-12925/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onComplete:
just操作符
*创建一个被观察者,并且发送一定数量的事件序列给被观察者,这个一定数量局限在1到10之间
- 方法原型
public static <T> Observable<T> just(T item)
.
.
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
- 使用方法
/**
* just 操作符 用来创建被观察者 同时发送事件事件个数不超过10个。
*/
private void createJustObservable() {
justObservable = Observable.just("dashingqi", "dashingqi1");
}
//该方法和上文中的方法一致,这里就不全部展示出来,可参考create操作中的
private void createObserve() {...}
//订阅
/**
* 订阅
*/
private void subscribe() {
//订阅
justObservable.subscribe(observer);
}
- 运行结果
上文中我们发送了两个事件序列给观察者,我们观察运行结果 发现调用了两次onNext方法 并获取到发送的值。
2019-06-12 09:55:59.437 14689-14689/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onSubscribe:
2019-06-12 09:55:59.437 14689-14689/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: dashingqi
2019-06-12 09:55:59.437 14689-14689/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: dashingqi1
2019-06-12 09:55:59.437 14689-14689/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onComplete:
From操作符
From操作符中我们分三个来具体讲解 分别是 fromArray,fromCallable,fromIterable
fromArray
- 方法原型
public static <T> Observable<T> fromArray(T... items)
该操作符可以创建一个被观察者并且发送一个不计数量的数组事件序列,也可以发送不计数量的事件序列,和just操作符相比,就是能发很多数量的事件序列。
- 使用方法
/**
* From 操作符
* fromArray 和just作用是一样,只不过fromArray是可以发送数组的,并且发送事件的个数可以大于10个,
* 创建被观察者 并发送事件。
*/
private void createFormArrayObservable() {
String[] arrays = {"1", "2", "3"};
fromArrayObservable = Observable.fromArray(arrays);
}
//创建观察者
private void createObserve() {}
//订阅
/**
* 订阅
*/
private void subscribe() {
//订阅
fromArrayObservable.subscribe(observer);
}
- 运行结果
2019-06-12 10:09:39.379 16074-16074/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onSubscribe:
2019-06-12 10:09:39.379 16074-16074/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 1
2019-06-12 10:09:39.379 16074-16074/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 2
2019-06-12 10:09:39.379 16074-16074/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 3
2019-06-12 10:09:39.380 16074-16074/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onComplete:
fromCallable
- 方法原型
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
在方法的参数列表中,入参是一个Callable,该Callable就是java.util.concurrent包下的,和Runnable的功能是一样,只不过它有一个返回值,该返回值就是发送给观察者的。
- 使用方法
private void createFormCallableObservable() {
callAbleObservable = Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "Dashingqi";
}
});
}
//创建观察者
private void createObserve() {}
//订阅
private void subscribe() {
//订阅
callAbleObservable.subscribe(observer);
}
- 运行结果
2019-06-12 10:15:53.301 17041-17041/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onSubscribe:
2019-06-12 10:15:53.301 17041-17041/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: Dashingqi
2019-06-12 10:15:53.301 17041-17041/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onComplete:
fromIterable
- 方法原型
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
该操作符是发送一个list事件序列给观察者
- 使用方法
//创建被观察者
private void createFromItneratorObservable() {
ArrayList<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("list");
//发送一个list给观察者
fromIterableObservable = Observable.fromIterable(list);
}
//创建观察者
private void createObserve() {}
//订阅
private void subscribe() {
//订阅
fromIterableObservable .subscribe(observer);
}
- 运行结果
2019-06-12 10:22:34.753 18266-18266/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onSubscribe:
2019-06-12 10:22:34.754 18266-18266/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 1
2019-06-12 10:22:34.754 18266-18266/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 2
2019-06-12 10:22:34.754 18266-18266/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 3
2019-06-12 10:22:34.754 18266-18266/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 4
2019-06-12 10:22:34.754 18266-18266/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: list
2019-06-12 10:22:34.754 18266-18266/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onComplete:
defer操作符
- 方法原型
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
defer操作符 是在被观察者与观察者进行订阅的时候,在创建一个新的被观察者,然后新创建的观察者发送一个事件序列到观察者,
- 使用方法
String str = "1";
public void createDeferObservable() {
Observable<String> deferObservable = Observable.defer((Callable<? extends ObservableSource<? extends String>>) new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String
> call() throws Exception {
return Observable.just(str);
}
});
str = "dahiqngi";
deferObservable.subscribe(observer);
str = "zhangqi";
deferObservable.subscribe(observer);
}
- 运行结果
2019-06-12 10:32:36.693 19763-19763/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onSubscribe:
2019-06-12 10:32:36.693 19763-19763/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: dahiqngi
2019-06-12 10:32:36.693 19763-19763/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onComplete:
2019-06-12 10:32:36.693 19763-19763/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onSubscribe:
2019-06-12 10:32:36.693 19763-19763/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: zhangqi
2019-06-12 10:32:36.693 19763-19763/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onComplete:
由于是在订阅的时候再去创建一个新的被观察者发送事件序列,所以发送的值都是当前最新赋值的。
timer操作符
- 方法原型
// delay :延时发送的时间
public static Observable<Long> timer(long delay, TimeUnit unit)
该操作符是在指定延时的时间将事件发送给订阅的观察者中,发送的事件值为Long类型的 从0开始
- 使用方法
public void createTimerObservable() {
timerObservable = Observable.timer(2, TimeUnit.SECONDS);
timerObservable.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
- 运行结果
2019-06-12 10:51:52.682 21459-21459/? D/MainActivity: onSubscribe:
2019-06-12 10:51:54.685 21459-21493/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 0
2019-06-12 10:51:54.686 21459-21493/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onComplete:
interval操作符
- 方法原型
public static Observable<Long> interval(long period, TimeUnit unit)
interval操作符是定时向观察者发送事件,其中方法原型中 period参数是表示 这个定时的时间间隔是多少。发送的事件值是从0开始的。
- 使用方法
public void createInternalObservable() {
intervalObservalble = Observable.interval(2, TimeUnit.SECONDS);
intervalObservalble.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
- 运行结果
2019-06-12 10:57:04.900 22648-22648/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onSubscribe:
2019-06-12 10:57:06.903 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 0
2019-06-12 10:57:08.903 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 1
2019-06-12 10:57:10.903 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 2
2019-06-12 10:57:12.903 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 3
2019-06-12 10:57:14.903 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 4
2019-06-12 10:57:16.903 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 5
2019-06-12 10:57:18.903 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 6
2019-06-12 10:57:20.903 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 7
2019-06-12 10:57:22.903 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 8
2019-06-12 10:57:24.903 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 9
2019-06-12 10:57:26.903 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 10
2019-06-12 10:57:28.902 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 11
2019-06-12 10:57:30.902 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 12
2019-06-12 10:57:32.903 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 13
2019-06-12 10:57:34.902 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 14
2019-06-12 10:57:36.903 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 15
2019-06-12 10:57:38.903 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 16
2019-06-12 10:57:40.906 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 17
2019-06-12 10:57:42.903 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 18
2019-06-12 10:57:44.903 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 19
2019-06-12 10:57:46.904 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 20
2019-06-12 10:57:48.904 22648-22670/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 21
........
intervalRange操作符
- 方法原型
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
start:指定发送事件值得起始值,count:事件发送的次数,iniitialDelay:指定 onSubscribe与发送第一次事件发送之间的时间间隔 period: 每次事件发送的时间间隔
- 使用方法
public void intervalRangeObservable() {
Observable<Long> intervalRangeObservable = Observable.intervalRange(2, 3, 2, 2, TimeUnit.SECONDS);
intervalRangeObservable.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
- 运行结果
2019-06-12 11:26:09.293 27171-27171/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onSubscribe:
2019-06-12 11:26:11.296 27171-27196/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 2
2019-06-12 11:26:13.296 27171-27196/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 3
2019-06-12 11:26:15.296 27171-27196/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 4
2019-06-12 11:26:15.296 27171-27196/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onComplete:
range操作符
- 方法原型
public static Observable<Integer> range(final int start, final int count)
start :发送事件值得起始值 count:发送事件的个数。每次发送的事件都会进行加一的操作。
- 使用方法
public void createRangeObservable() {
Observable<Integer> rangeObservable = Observable.range(3, 8);
rangeObservable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
- 运行结果
2019-06-12 11:29:49.128 28085-28085/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onSubscribe:
2019-06-12 11:29:49.128 28085-28085/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 3
2019-06-12 11:29:49.128 28085-28085/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 4
2019-06-12 11:29:49.128 28085-28085/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 5
2019-06-12 11:29:49.129 28085-28085/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 6
2019-06-12 11:29:49.129 28085-28085/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 7
2019-06-12 11:29:49.129 28085-28085/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 8
2019-06-12 11:29:49.129 28085-28085/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 9
2019-06-12 11:29:49.129 28085-28085/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onNext: 10
2019-06-12 11:29:49.129 28085-28085/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onComplete:
rangeLong 操作符
该操作符的功能是和range操作符是一致的,只不过发送的事件值是Long类型的。
####### empty never error 操作符
- empty操作符:是用来发送onComplete事件的,观察者接受到onComplete事件,只会执行onComplete方法
- never:不发送任何事件序列到观察者中。
- error:发送onError事件的,观察者执行onError方法。
- 使用方法
//创建观察者
public void createObserver(){}
/**
* empty:操作符 直接发送onComplete事件 不会发送其他事件 这样观察者只会调用
* onComplete方法
*/
public void createEmptyObservable() {
emptyObservable = Observable.empty();
}
//订阅empty
emptyObserverable.subscribe(observer);
/**
* never 操作符:不发送任何事件
*/
public void createNeverObservable() {
neverObsrevable = Observable.never();
}
//订阅never
neverObservable.subscribe(observer);
/**
* error操作符:仅仅发送onError()事件
*/
public void createErrorObsrvable() {
errorObservable = Observable.error(new Throwable());
}
//订阅error
errorObservable.subscribe(observer);
- 运行结果
// empty
2019-06-12 11:35:52.972 29097-29097/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onSubscribe:
2019-06-12 11:35:52.972 29097-29097/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onComplete:
//never
2019-06-12 11:38:33.067 29856-29856/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onSubscribe:
//errror
2019-06-12 11:39:07.398 30261-30261/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onSubscribe:
2019-06-12 11:39:07.398 30261-30261/wanandroidqi.dashingqi.com.rxjavademo D/MainActivity: onError:
结语
如果觉得总结的很到位,请点个赞,关注一波吧!如果有误的地方欢迎指出!