ReactiveX
是Reactive Extensions
的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源。Rx现在已经支持几乎全部的流行编程语言,RxJava
是ReactiveX
对Java
语言的支持库。简单来说,RxJava
是使用观察者模式的基于事件流处理的响应式编程库(如果还不是特别了解观察者设计模式,可看看我的这篇文章观察者设计模式-RecyclerView中的观察者),下面是官方的RxJava原理图
本篇将从工作线程下载网络图片转成
Bitmap
,切换到主线程ImageView.setImageBitmap()
将图片显示出来的过程分析RxJava
源码,主要涉及以下部分
- RxJava的基本使用
- 创建操作符Just
- 变换操作符Map
- 辅助操作符SubscribeOn
- 辅助操作符ObserveOn
基本使用
导入依赖
implementation "io.reactivex.rxjava3:rxjava:3.0.2"
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
Observable.just("https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1587923166363&di=050c57334c9bf2e16e19389161657cb3&imgtype=0&src=http%3A%2F%2Fp2.so.qhimgs1.com%2Ft01dfcbc38578dac4c2.jpg")
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String urlStr) throws Throwable {
URL url = new URL(urlStr);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.connect();
InputStream inputStream = connection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
return bitmap;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Throwable {
iv.setImageBitmap(bitmap);
}
});
Observable.just()
创建网络图片链接的被观察者,map
转换操作符中将String转换成Bitmap,subscribeOn
和observeOn
将下载和转换Bitmap放在工作线程,并将下面的流程切换到主线程,subscribe
将bitmap发送给观察者Consumer
创建操作符Just
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(@NonNull T item) {//参数item在这里是网络图片url
Objects.requireNonNull(item, "item is null");//判空抛异常,不是重点
return RxJavaPlugins.onAssembly(new ObservableJust<>(item));
}
RxJavaPlugins.onAssembly(new ObservableJust<>(item))
,将网络图片Url作为构造参数创建ObservableJust
对象,传入RxJavaPlugins
静态方法onAssembly()
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
onObservableAssembly
是一个钩子参数,默认为空,不需要关心,即onAssembly()
方法就是将传入的对象source
直接返回了,这里的source
就是刚才创建的ObservableJust
,ObservableJust
是Observable
的子类。ObservableJust
构造方法中将传入参数赋值给了成员变量value
。
public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
@Override
public T get() {
return value;
}
}
为了方便更好的理解,我将just()
操作符的代码简化一下
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(@NonNull T item) {
return new ObservableJust<>(item);
}
小结一下
创建操作符Just
,实例化了ObservableJust
对象,并给成员变量value赋值,并返回ObservableJust
对象,给后面的流程链式调用。
变换操作符Map
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
}
map
操作符代码结构和just
操作符一样。为了方便更好的理解,我将map()
操作符的代码简化一下
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
return new ObservableMap<>(this, mapper);
}
map()
实例化了ObservableMap
对象,ObservableMap
传入了两个参数this
和mapper
- this(类型为ObservableSource)
Observable的对象,在这里的this
,就是在just()
操作符创建的ObservableJust
- mapper(类型为Function<? super T, ? extends U>)
Function
第一个泛型参数规定了下限,必须为T或者T的父类,在这里就是网络图片的url,String
类型。第一个参数规定了上限,是下游方法希望拿到的参数类型,在这里就是Bitmap
类型
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
//-------------------------------------下方代码暂时不用关心--------------------------------------------//
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Throwable {
T t = qd.poll();
return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
在ObservableMap
构造方法中,将function
赋值给了ObservableMap
的成员变量function
。
source
赋值给了父类AbstractObservableWithUpstream
的成员变量source
,类型为ObservableSource
。在这里的source
是刚才创建的ObservableJust
对象
辅助操作符SubscribeOn
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}
按照套路继续简化代码
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
return new ObservableSubscribeOn<>(this, scheduler);
}
这里的this
是上游的Observable
,在这里是ObservableMap
对象
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
//下方代码暂时不用关心,订阅之后再回来看
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
//此处省略大量代码
}
SubscribeOn重点Schedulers.io()
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
static {
//此处省略无关代码
IO = RxJavaPlugins.initIoScheduler(new IOTask());
//此处省略无关代码
}
static final class IOTask implements Supplier<Scheduler> {
@Override
public Scheduler get() {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
private static final long serialVersionUID = -7789753024099756196L;
final String prefix;
final int priority;
final boolean nonBlocking;
public RxThreadFactory(String prefix) {
this(prefix, Thread.NORM_PRIORITY, false);
}
public RxThreadFactory(String prefix, int priority) {
this(prefix, priority, false);
}
public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {
this.prefix = prefix;
this.priority = priority;
this.nonBlocking = nonBlocking;
}
@Override
public Thread newThread(@NonNull Runnable r) {
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
String name = nameBuilder.toString();
Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
t.setPriority(priority);
t.setDaemon(true);
return t;
}
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<>(NONE);
start();
}
@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
static final class CachedWorkerPool implements Runnable {
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
final CompositeDisposable allWorkers;
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;
private final ThreadFactory threadFactory;
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
小结一下
subscribeOn(Schedulers.io())
就是将事件流作为runable,传入到newScheduledThreadPool线程池中,达到线程切换到工作线程的目的。
辅助操作符ObserveOn
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
Objects.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}
老套路 简化代码
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
return new ObservableObserveOn<>(this, scheduler, false, 128);
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
//-------------------------------------下方代码暂时不用关心--------------------------------------------//
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
//此处省略大量代码
}
ObserveOn重点AndroidSchedulers.mainThread()
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
private static final Scheduler MAIN_THREAD =
RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
}
依然简化代码
public static Scheduler mainThread() {
return new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
}
final class HandlerScheduler extends Scheduler {
private final Handler handler;
private final boolean async;
HandlerScheduler(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
//省略下方大量代码
}
再次小结一下
从上方的源码分析可以看到,这些操作符的代码套路都是一样的,ObserveOn
在这里传入了HandlerScheduler
,HandlerScheduler
构造方法中传入了主线程的Handler,实现主线程切换。在后面的订阅方法中,我们需要重点关注实例化的Observable
子类的subscribeActual()
方法
subscribe订阅
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
@NonNull Action onComplete) {
Objects.requireNonNull(onNext, "onNext is null");
Objects.requireNonNull(onError, "onError is null");
Objects.requireNonNull(onComplete, "onComplete is null");
LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());
subscribe(ls);
return ls;
}
老套路,简化代码
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {
LambdaObserver<T> ls = new LambdaObserver<>(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
subscribe(ls);
return ls;
}
public final class LambdaObserver<T> extends AtomicReference<Disposable>
implements Observer<T>, Disposable, LambdaConsumerIntrospection {
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
}
//此处省略大量代码
}
subscribe(ls);
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull Observer<? super T> observer) {
//此处只列出重点方法
subscribeActual(observer);
}
subscribeActual(observer)是一个抽象方法,所以从后往回看上游Observable
子类的实现方法,也就是前面注释暂时不关心的方法。
辅助操作符ObserveOn
subscribeActual(observer)
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
//此处省略大量代码
}
通过前面的分析可以知道,这里的source是上游传入的ObservableObserveOn
对象
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
//此处省略大量代码
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
//此处省略大量代码
}
在这里的worker
代表的是主线程,将事件作为一个runnable
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
final class HandlerScheduler extends Scheduler {
//此处省略大量代码
private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;
private volatile boolean disposed;
HandlerWorker(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
@Override
@SuppressLint("NewApi")
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposable.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposable.disposed();
}
return scheduled;
}
//此处省略大量代码
}
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed; // Tracked solely for isDisposed().
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
@Override
public void dispose() {
handler.removeCallbacks(this);
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
}
}
schedule()
中将事件切换到主线程执行。
总结一下
RxJava让复杂的事件流分步骤编码,让代码更具拓展性和可读性,更重要的是给我们带来了响应式的编程思想,它的操作符还是比较多的,但是基本所有的操作符,实现思路都和上面分析的这些操作符大同小异。
下面列出ReactiveX/RxJava文档中文版上的大量的操作符(应该大部分都不常用),有需要的同学可以去了解下
创建操作符
-
just( )
— 将一个或多个对象转换成发射这个或这些对象的一个Observable -
from( )
— 将一个Iterable, 一个Future, 或者一个数组转换成一个Observable -
repeat( )
— 创建一个重复发射指定数据或数据序列的Observable -
repeatWhen( )
— 创建一个重复发射指定数据或数据序列的Observable,它依赖于另一个Observable发射的数据 -
create( )
— 使用一个函数从头创建一个Observable -
defer( )
— 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable -
range( )
— 创建一个发射指定范围的整数序列的Observable -
interval( )
— 创建一个按照给定的时间间隔发射整数序列的Observable -
timer( )
— 创建一个在给定的延时之后发射单个数据的Observable -
empty( )
— 创建一个什么都不做直接通知完成的Observable -
error( )
— 创建一个什么都不做直接通知错误的Observable -
never( )
— 创建一个不发射任何数据的Observable
变换操作符
-
map( )
— 对序列的每一项都应用一个函数来变换Observable发射的数据序列 -
flatMap( )
,concatMap( )
, andflatMapIterable( )
— 将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable -
switchMap( )
— 将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据 -
scan( )
— 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值 -
groupBy( )
— 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据 -
buffer( )
— 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个 -
window( )
— 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项 -
cast( )
— 在发射之前强制将Observable发射的所有数据转换为指定类型
过滤操作符
-
filter( )
— 过滤数据 -
takeLast( )
— 只发射最后的N项数据 -
last( )
— 只发射最后的一项数据 -
lastOrDefault( )
— 只发射最后的一项数据,如果Observable为空就发射默认值 -
takeLastBuffer( )
— 将最后的N项数据当做单个数据发射 -
skip( )
— 跳过开始的N项数据 -
skipLast( )
— 跳过最后的N项数据 -
take( )
— 只发射开始的N项数据 -
first( )
andtakeFirst( )
— 只发射第一项数据,或者满足某种条件的第一项数据 -
firstOrDefault( )
— 只发射第一项数据,如果Observable为空就发射默认值 -
elementAt( )
— 发射第N项数据 -
elementAtOrDefault( )
— 发射第N项数据,如果Observable数据少于N项就发射默认值 -
sample( )
orthrottleLast( )
— 定期发射Observable最近的数据 -
throttleFirst( )
— 定期发射Observable发射的第一项数据 -
throttleWithTimeout( )
ordebounce( )
— 只有当Observable在指定的时间后还没有发射数据时,才发射一个数据 -
timeout( )
— 如果在一个指定的时间段后还没发射数据,就发射一个异常 -
distinct( )
— 过滤掉重复数据 -
distinctUntilChanged( )
— 过滤掉连续重复的数据 -
ofType( )
— 只发射指定类型的数据 -
ignoreElements( )
— 丢弃所有的正常数据,只发射错误或完成通知
结合操作符
-
startWith( )
— 在数据序列的开头增加一项数据 -
merge( )
— 将多个Observable合并为一个 -
mergeDelayError( )
— 合并多个Observables,让没有错误的Observable都完成后再发射错误通知 -
zip( )
— 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果 -
and( )
,then( )
, andwhen( )
— (rxjava-joins
) 通过模式和计划组合多个Observables发射的数据集合 -
combineLatest( )
— 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果 -
join( )
andgroupJoin( )
— 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射 -
switchOnNext( )
— 将一个发射Observables的Observable转换成另一个Observable,后者发射这些Observables最近发射的数据
错误处理
-
onErrorResumeNext( )
— 指示Observable在遇到错误时发射一个数据序列 -
onErrorReturn( )
— 指示Observable在遇到错误时发射一个特定的数据 -
onExceptionResumeNext( )
— instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)指示Observable遇到错误时继续发射数据 -
retry( )
— 指示Observable遇到错误时重试 -
retryWhen( )
— 指示Observable遇到错误时,将错误传递给另一个Observable来决定是否要重新给订阅这个Observable
辅助操作符
-
materialize( )
— 将Observable转换成一个通知列表convert an Observable into a list of Notifications -
dematerialize( )
— 将上面的结果逆转回一个Observable -
timestamp( )
— 给Observable发射的每个数据项添加一个时间戳 -
serialize( )
— 强制Observable按次序发射数据并且要求功能是完好的 -
cache( )
— 记住Observable发射的数据序列并发射相同的数据序列给后续的订阅者 -
observeOn( )
— 指定观察者观察Observable的调度器 -
subscribeOn( )
— 指定Observable执行任务的调度器 -
doOnEach( )
— 注册一个动作,对Observable发射的每个数据项使用 -
doOnCompleted( )
— 注册一个动作,对正常完成的Observable使用 -
doOnError( )
— 注册一个动作,对发生错误的Observable使用 -
doOnTerminate( )
— 注册一个动作,对完成的Observable使用,无论是否发生错误 -
doOnSubscribe( )
— 注册一个动作,在观察者订阅时使用 -
doOnUnsubscribe( )
— 注册一个动作,在观察者取消订阅时使用 -
finallyDo( )
— 注册一个动作,在Observable完成时使用 -
delay( )
— 延时发射Observable的结果 -
delaySubscription( )
— 延时处理订阅请求 -
timeInterval( )
— 定期发射数据 -
using( )
— 创建一个只在Observable生命周期存在的资源 -
single( )
— 强制返回单个数据,否则抛出异常 -
singleOrDefault( )
— 如果Observable完成时返回了单个数据,就返回它,否则返回默认数据 -
toFuture( )
,toIterable( )
,toList( )
— 将Observable转换为其它对象或数据结构
条件操作符
-
amb( )
— 给定多个Observable,只让第一个发射数据的Observable发射全部数据 -
defaultIfEmpty( )
— 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据 - (
rxjava-computation-expressions
)doWhile( )
— 发射原始Observable的数据序列,然后重复发射这个序列直到不满足这个条件为止 - (
rxjava-computation-expressions
)ifThen( )
— 只有当某个条件为真时才发射原始Observable的数据序列,否则发射一个空的或默认的序列 -
skipUntil( )
— 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据 -
skipWhile( )
— 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据 - (
rxjava-computation-expressions
)switchCase( )
— 基于一个计算结果,发射一个指定Observable的数据序列 -
takeUntil( )
— 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知 -
takeWhile( )
andtakeWhileWithIndex( )
— 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据 - (
rxjava-computation-expressions
)whileDo( )
— 如果条件为true
,则发射源Observable数据序列,并且只要条件保持为true
就重复发射此数据序列
布尔操作符
-
all( )
— 判断是否所有的数据项都满足某个条件 -
contains( )
— 判断Observable是否会发射一个指定的值 -
exists( )
andisEmpty( )
— 判断Observable是否发射了一个值 -
sequenceEqual( )
— 判断两个Observables发射的序列是否相等
算数模块的操作符
-
averageInteger( )
— 求序列平均数并发射 -
averageLong( )
— 求序列平均数并发射 -
averageFloat( )
— 求序列平均数并发射 -
averageDouble( )
— 求序列平均数并发射 -
max( )
— 求序列最大值并发射 -
maxBy( )
— 求最大key对应的值并发射 -
min( )
— 求最小值并发射 -
minBy( )
— 求最小Key对应的值并发射 -
sumInteger( )
— 求和并发射 -
sumLong( )
— 求和并发射 -
sumFloat( )
— 求和并发射 -
sumDouble( )
— 求和并发射
其它聚合操作符
-
concat( )
— 顺序连接多个Observables -
count( )
andcountLong( )
— 计算数据项的个数并发射结果 -
reduce( )
— 对序列使用reduce()函数并发射最终的结果 -
collect( )
— 将原始Observable发射的数据放到一个单一的可变的数据结构中,然后返回一个发射这个数据结构的Observable -
toList( )
— 收集原始Observable发射的所有数据到一个列表,然后返回这个列表 -
toSortedList( )
— 收集原始Observable发射的所有数据到一个有序列表,然后返回这个列表 -
toMap( )
— 将序列数据转换为一个Map,Map的key是根据一个函数计算的 -
toMultiMap( )
— 将序列数据转换为一个列表,同时也是一个Map,Map的key是根据一个函数计算的
异步操作符
-
start( )
— 创建一个Observable,它发射一个函数的返回值 -
toAsync( )
orasyncAction( )
orasyncFunc( )
— 将一个函数或者Action转换为已Observable,它执行这个函数并发射函数的返回值 -
startFuture( )
— 将一个返回Future的函数转换为一个Observable,它发射Future的返回值 -
deferFuture( )
— 将一个返回Observable的Future转换为一个Observable,但是并不尝试获取这个Future返回的Observable,直到有订阅者订阅它 -
forEachFuture( )
— 传递Subscriber方法给一个Subscriber,但是同时表现得像一个Future一样阻塞直到它完成 -
fromAction( )
— 将一个Action转换为Observable,当一个订阅者订阅时,它执行这个action并发射它的返回值 -
fromCallable( )
— 将一个Callable转换为Observable,当一个订阅者订阅时,它执行这个Callable并发射Callable的返回值,或者发射异常 -
fromRunnable( )
— convert a Runnable into an Observable that invokes the runable and emits its result when a Subscriber subscribes将一个Runnable转换为Observable,当一个订阅者订阅时,它执行这个Runnable并发射Runnable的返回值 -
runAsync( )
— 返回一个StoppableObservable,它发射某个Scheduler上指定的Action生成的多个actions
连接操作符
-
ConnectableObservable.connect( )
— 指示一个可连接的Observable开始发射数据 -
Observable.publish( )
— 将一个Observable转换为一个可连接的Observable -
Observable.replay( )
— 确保所有的订阅者看到相同的数据序列,即使它们在Observable开始发射数据之后才订阅 -
ConnectableObservable.refCount( )
— 让一个可连接的Observable表现得像一个普通的Observable