(1)create
Observable.create();
Flowable.create();
Single.create();
Completable.create();
Maybe.create();
用来创建被观察者, 5种被观察者都可以使用create操作符创建。
(2)just
Observable.just("1", "2", "3");
Flowable.just("1", "2", "3");
Single.just("1");
Maybe.just("1");
只有四种被观察者可以使用just,一旦使用just就必须发射一条数据。
-
Observable 可以发射0个或多个数据,标准写法如下
Observable.just("1"); Observable.just("1", "2"); Observable.just("1", "2", "3");
-
Flowable可以发射0个或多个数据,标准写法如下
Flowable.just("1"); Flowable.just("1", "2"); Flowable.just("1", "2", "3");
-
Single只能发射一条数据,标准写法如下
Single.just("1");
Completable不能发射数据,由于just至少要有一条数据,所以Completable没有just操作符;
-
Maybe只能发射0条或1条数据,由于just至少要有一条数据,所以标准写法如下
Maybe.just("1");
另外,需要注意的是:
just操作符最多可以设置10个参数。
(3)from
from只要是将其他类种的对象和数据类型转成Observable
-
fromPublisher 将Publisher转成Observable
Observable.fromPublisher(new Publisher<String>() { @Override public void subscribe(Subscriber<? super String> s) { s.onNext("A"); } }).subscribe(new Consumer<String>() { @Override public void accept(String o) throws Exception { System.out.println(o); } });
Publisher将在后续介绍压背的时候重点提出。
-
fromArray 将数组转成Observable
Observable.fromArray("A", "B", "C");
看一下源码
在这里被我圈出来的可变长度参数,也就是说参数的个数是可变的。
-
fromIterable 将集合转成Observable
List<String> list = new ArrayList<>(); Observable.fromIterable(list);
fromIterable的参数是Iterable类型, Collection是Iterable的子接口,所以只要是最终实现Collection接口的集合都可以作为参数,以下的java集合框架图可以作为参考:
-
fromCallable 将Callable转成Observable
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); Callable callable = new Callable<String>() { @Override public String call() throws Exception { return "A"; } }; singleThreadExecutor.submit(callable); Observable.fromCallable(callable).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } });
-
fromFuture 将Future转成Observable
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); Future future = singleThreadExecutor.submit(new Runnable() { @Override public void run() { } }, "A"); Observable.fromFuture(future).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } });
注意:其他被观察者也可以使用from操作符
(4)range
发射0~9之间的整数,左闭右开[0,9)
Observable.range(0, 9).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(String.valueOf(integer));
}
});
其他方式
Observable.rangeLong();
Flowable.range();
Flowable.rangeLong();
(5) defer
defer只有在订阅的时候才会创建被观察者,以保证每次发射的数据是最新的。
Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
return Observable.just("A", "B");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
其他方式
Flowable.defer();
Single.defer();
Completable.defer();
Maybe.defer();
(6)interval
创建一个按固定时间间隔发射整数序列的Observable
//延迟initialDelay秒后,按period秒定时时间间隔发射整数序列,调度器为computation
Observable.interval(1000, 1000, TimeUnit.MILLISECONDS, Schedulers.computation());
//延迟period秒后,按period秒定时时间间隔发射整数序列,调度器为computation
Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(String.valueOf(aLong));
}
});
有关Schedulers的讲解:
Schedulers.computation() :
用于cpu密集型计算任务,即不会被被I/O等操作限制性能的耗时操作,例如XML,JSON文件的解析,Bitmap图片的压缩取样等,具有固定的线程池,大小为CPU核数,不可以用于IO操作,因为IO操作的等待时间会浪费cpuSchedulers.from(@NonNull Excutor excutor):指定一个线程调度器,由此调度器来控制任务的执行策略。
Schedulers.io():用于IO密集型的操作,例如写磁盘操作,查询数据库,网络访问,具有线程缓存机制,在此调度器接收到任务之后,先检查线程缓存池中是否有空闲的线程可用,如果有,复用,如果没有则 创建新的线程,并将其加入到线程池中,如果每次都没有空闲的线程使用,可以无上限的创建线程。
Schedulers.newThread(): 在每执行一个任务时创建一个新的线程,不具有线程缓存机制,由于创建一个线程比起复用一个线程更加耗时耗力,虽然使用Schedulers.io()的地方都可以使用Schedulers.newThread(),但是总体上的Schedulers.newThread()的效率没有Schedulers.io()的高。
Schedulers.trampoline():在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停下来,等插入进来的任务执行完成之后,再将未完成的任务接着执行。
Schedulers.single():拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务在执行的时候其他任务将按照队列先进先出的顺序依次执行。
AndroidSchedulers.mainThread():在Andriod UI线程中执行任务,属于Android的专属定制。
注意:在Rxjava 2.x版本中,废弃了1.x版本Schedulers.immediate(),在1.x中,Schedulers.immediate的作用是在当前线程中立即执行任务,功能等同于Rxjava中的2.x版本中的Schedulers.trampoline(),而在Schedulers.trampoline()在1.x版本的时候,作用是:当其他排队的任务执行完成之后,在当前线程排队开始执行接收到的任务,有点像2.x版本的Schedulers.single(),但是也不完全相同,因为Schedulers.single()不是在当前线程而是在一个线程单例中排队执行任务。
其他方式
Flowable.interval();
(7)timer
实现延迟执行
Observable.timer(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(String.valueOf(aLong));
}
});
Observable.timer(1000, TimeUnit.MILLISECONDS, Schedulers.computation());
其他方式
Flowable.timer();
Single.timer();
Completable.timer();
Maybe.timer();
(8)empty
使直接完成发射数据,也就是直接执行了onComplete。
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("11111111111111");
}
@Override
public void onNext(Object o) {
System.out.println("2222222222222");
}
@Override
public void onError(Throwable e) {
System.out.println("33333333333333");
}
@Override
public void onComplete() {
System.out.println("4444444444444444");
}
});
其他方式
Flowable.empty();
Maybe.empty();
(9)error
使直接发生异常结束发射数据,也就是直接执行了onError。
Observable.error(new Throwable("nullpoint exception")).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("11111111111111");
}
@Override
public void onNext(Object o) {
System.out.println("2222222222222");
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onComplete() {
System.out.println("4444444444444444");
}
});
其他方式
Flowable.error();
Single.error();
Completable.error();
Maybe.error();
(10)never
不发射数据,也不结束发射。
Observable.never().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("11111111111111");
}
@Override
public void onNext(Object o) {
System.out.println("2222222222222");
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onComplete() {
System.out.println("4444444444444444");
}
});
其他方式
Flowable.never();
Single.never();
Completable.never();
Maybe.never();