RxJava3源码分析

ReactiveXReactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源。Rx现在已经支持几乎全部的流行编程语言,RxJavaReactiveXJava语言的支持库。简单来说,RxJava是使用观察者模式的基于事件流处理的响应式编程库(如果还不是特别了解观察者设计模式,可看看我的这篇文章观察者设计模式-RecyclerView中的观察者),下面是官方的RxJava原理图

RxJava原理图

本篇将从工作线程下载网络图片转成Bitmap,切换到主线程ImageView.setImageBitmap()将图片显示出来的过程分析RxJava源码,主要涉及以下部分

  • RxJava的基本使用
  • 创建操作符Just
  • 变换操作符Map
  • 辅助操作符SubscribeOn
  • 辅助操作符ObserveOn

基本使用

导入依赖

implementation "io.reactivex.rxjava3:rxjava:3.0.2"
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
Observable.just("https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1587923166363&di=050c57334c9bf2e16e19389161657cb3&imgtype=0&src=http%3A%2F%2Fp2.so.qhimgs1.com%2Ft01dfcbc38578dac4c2.jpg")
                .map(new Function<String, Bitmap>() {
                    @Override
                    public Bitmap apply(String urlStr) throws Throwable {
                        URL url = new URL(urlStr);
                        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
                        connection.connect();
                        InputStream inputStream = connection.getInputStream();
                        Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                        return bitmap;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Bitmap>() {
                    @Override
                    public void accept(Bitmap bitmap) throws Throwable {
                        iv.setImageBitmap(bitmap);
                    }
                });

Observable.just()创建网络图片链接的被观察者,map转换操作符中将String转换成Bitmap,subscribeOnobserveOn 将下载和转换Bitmap放在工作线程,并将下面的流程切换到主线程,subscribe将bitmap发送给观察者Consumer

创建操作符Just

    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> just(@NonNull T item) {//参数item在这里是网络图片url
        Objects.requireNonNull(item, "item is null");//判空抛异常,不是重点
        return RxJavaPlugins.onAssembly(new ObservableJust<>(item));
    }

RxJavaPlugins.onAssembly(new ObservableJust<>(item)),将网络图片Url作为构造参数创建ObservableJust对象,传入RxJavaPlugins静态方法onAssembly()

    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

onObservableAssembly是一个钩子参数,默认为空,不需要关心,即onAssembly()方法就是将传入的对象source直接返回了,这里的source就是刚才创建的ObservableJust,ObservableJustObservable的子类。ObservableJust构造方法中将传入参数赋值给了成员变量value

public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }

    @Override
    public T get() {
        return value;
    }
}

为了方便更好的理解,我将just()操作符的代码简化一下

    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> just(@NonNull T item) {
        return new ObservableJust<>(item);
    }

小结一下
创建操作符Just,实例化了ObservableJust对象,并给成员变量value赋值,并返回ObservableJust对象,给后面的流程链式调用。

变换操作符Map

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    @NonNull
    public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
        Objects.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
    }

map操作符代码结构和just操作符一样。为了方便更好的理解,我将map()操作符的代码简化一下

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    @NonNull
    public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
        return new ObservableMap<>(this, mapper);
    }

map()实例化了ObservableMap对象,ObservableMap传入了两个参数thismapper

  • this(类型为ObservableSource)
    Observable的对象,在这里的this,就是在just()操作符创建的ObservableJust
  • mapper(类型为Function<? super T, ? extends U>)
    Function第一个泛型参数规定了下限,必须为T或者T的父类,在这里就是网络图片的url,String类型。第一个参数规定了上限,是下游方法希望拿到的参数类型,在这里就是Bitmap类型
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

  //-------------------------------------下方代码暂时不用关心--------------------------------------------//

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
      
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Throwable {
            T t = qd.poll();
            return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    protected final ObservableSource<T> source;

    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}

ObservableMap构造方法中,将function赋值给了ObservableMap的成员变量function
source赋值给了父类AbstractObservableWithUpstream的成员变量source,类型为ObservableSource。在这里的source是刚才创建的ObservableJust对象

辅助操作符SubscribeOn

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    @NonNull
    public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
    }

按照套路继续简化代码

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    @NonNull
    public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
        return new ObservableSubscribeOn<>(this, scheduler);
    }

这里的this是上游的Observable,在这里是ObservableMap对象

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    
    //下方代码暂时不用关心,订阅之后再回来看
    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    //此处省略大量代码
}

SubscribeOn重点Schedulers.io()

    @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
static {
        //此处省略无关代码

        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        //此处省略无关代码
    }
    static final class IOTask implements Supplier<Scheduler> {
        @Override
        public Scheduler get() {
            return IoHolder.DEFAULT;
        }
    }
    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }
    public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }
        WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {

    private static final long serialVersionUID = -7789753024099756196L;

    final String prefix;

    final int priority;

    final boolean nonBlocking;

    public RxThreadFactory(String prefix) {
        this(prefix, Thread.NORM_PRIORITY, false);
    }

    public RxThreadFactory(String prefix, int priority) {
        this(prefix, priority, false);
    }

    public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {
        this.prefix = prefix;
        this.priority = priority;
        this.nonBlocking = nonBlocking;
    }

    @Override
    public Thread newThread(@NonNull Runnable r) {
        StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
        String name = nameBuilder.toString();
        Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
        t.setPriority(priority);
        t.setDaemon(true);
        return t;
    }
}
    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<>(NONE);
        start();
    }
    @Override
    public void start() {
        CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }
static final class CachedWorkerPool implements Runnable {
        private final long keepAliveTime;
        private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
        final CompositeDisposable allWorkers;
        private final ScheduledExecutorService evictorService;
        private final Future<?> evictorTask;
        private final ThreadFactory threadFactory;

        CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<>();
            this.allWorkers = new CompositeDisposable();
            this.threadFactory = threadFactory;

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }

小结一下
subscribeOn(Schedulers.io())就是将事件流作为runable,传入到newScheduledThreadPool线程池中,达到线程切换到工作线程的目的。

辅助操作符ObserveOn

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    @NonNull
    public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    @NonNull
    public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
    }

老套路 简化代码

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    @NonNull
    public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
        return new ObservableObserveOn<>(this, scheduler, false, 128);
    }
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

 //-------------------------------------下方代码暂时不用关心--------------------------------------------//

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }
    }
  //此处省略大量代码
}

ObserveOn重点AndroidSchedulers.mainThread()

    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
    private static final Scheduler MAIN_THREAD =
        RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
private static final class MainHolder {
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
    }

依然简化代码

    public static Scheduler mainThread() {
        return new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
    }
final class HandlerScheduler extends Scheduler {
    private final Handler handler;
    private final boolean async;

    HandlerScheduler(Handler handler, boolean async) {
        this.handler = handler;
        this.async = async;
    }

    //省略下方大量代码
}

再次小结一下
从上方的源码分析可以看到,这些操作符的代码套路都是一样的,ObserveOn在这里传入了HandlerScheduler,HandlerScheduler构造方法中传入了主线程的Handler,实现主线程切换。在后面的订阅方法中,我们需要重点关注实例化的Observable子类的subscribeActual()方法

subscribe订阅

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    @NonNull
    public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
    }
@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    @NonNull
    public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
            @NonNull Action onComplete) {
        Objects.requireNonNull(onNext, "onNext is null");
        Objects.requireNonNull(onError, "onError is null");
        Objects.requireNonNull(onComplete, "onComplete is null");

        LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());

        subscribe(ls);

        return ls;
    }

老套路,简化代码

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    @NonNull
    public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {

    LambdaObserver<T> ls = new LambdaObserver<>(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());

    subscribe(ls);

    return ls;
    }
public final class LambdaObserver<T> extends AtomicReference<Disposable>
        implements Observer<T>, Disposable, LambdaConsumerIntrospection {

    private static final long serialVersionUID = -7251123623727029452L;
    final Consumer<? super T> onNext;
    final Consumer<? super Throwable> onError;
    final Action onComplete;
    final Consumer<? super Disposable> onSubscribe;

    public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete,
            Consumer<? super Disposable> onSubscribe) {
        super();
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onSubscribe = onSubscribe;
    }
    //此处省略大量代码
}

subscribe(ls);

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(@NonNull Observer<? super T> observer) {
        //此处只列出重点方法
        subscribeActual(observer);
    }

subscribeActual(observer)是一个抽象方法,所以从后往回看上游Observable子类的实现方法,也就是前面注释暂时不关心的方法。

辅助操作符ObserveOn

subscribeActual(observer)

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }
    }
  //此处省略大量代码
}

通过前面的分析可以知道,这里的source是上游传入的ObservableObserveOn对象

 static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

    //此处省略大量代码

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

      
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
    //此处省略大量代码
    }

在这里的worker代表的是主线程,将事件作为一个runnable

        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }
        @NonNull
        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
final class HandlerScheduler extends Scheduler {
    //此处省略大量代码
    private static final class HandlerWorker extends Worker {
        private final Handler handler;
        private final boolean async;

        private volatile boolean disposed;

        HandlerWorker(Handler handler, boolean async) {
            this.handler = handler;
            this.async = async;
        }

        @Override
        @SuppressLint("NewApi") 
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposable.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            if (async) {
                message.setAsynchronous(true);
            }

            handler.sendMessageDelayed(message, unit.toMillis(delay));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposable.disposed();
            }

            return scheduled;
        }
      //此处省略大量代码
    }

    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed; // Tracked solely for isDisposed().

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void dispose() {
            handler.removeCallbacks(this);
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
}

schedule()中将事件切换到主线程执行。

总结一下
RxJava让复杂的事件流分步骤编码,让代码更具拓展性和可读性,更重要的是给我们带来了响应式的编程思想,它的操作符还是比较多的,但是基本所有的操作符,实现思路都和上面分析的这些操作符大同小异。
下面列出ReactiveX/RxJava文档中文版上的大量的操作符(应该大部分都不常用),有需要的同学可以去了解下

创建操作符

  • just( ) — 将一个或多个对象转换成发射这个或这些对象的一个Observable
  • from( ) — 将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
  • repeat( ) — 创建一个重复发射指定数据或数据序列的Observable
  • repeatWhen( ) — 创建一个重复发射指定数据或数据序列的Observable,它依赖于另一个Observable发射的数据
  • create( ) — 使用一个函数从头创建一个Observable
  • defer( ) — 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable
  • range( ) — 创建一个发射指定范围的整数序列的Observable
  • interval( ) — 创建一个按照给定的时间间隔发射整数序列的Observable
  • timer( ) — 创建一个在给定的延时之后发射单个数据的Observable
  • empty( ) — 创建一个什么都不做直接通知完成的Observable
  • error( ) — 创建一个什么都不做直接通知错误的Observable
  • never( ) — 创建一个不发射任何数据的Observable

变换操作符

  • map( ) — 对序列的每一项都应用一个函数来变换Observable发射的数据序列
  • flatMap( ), concatMap( ), and flatMapIterable( ) — 将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable
  • switchMap( ) — 将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
  • scan( ) — 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值
  • groupBy( ) — 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
  • buffer( ) — 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
  • window( ) — 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项
  • cast( ) — 在发射之前强制将Observable发射的所有数据转换为指定类型

过滤操作符

结合操作符

  • startWith( ) — 在数据序列的开头增加一项数据
  • merge( ) — 将多个Observable合并为一个
  • mergeDelayError( ) — 合并多个Observables,让没有错误的Observable都完成后再发射错误通知
  • zip( ) — 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果
  • and( ), then( ), and when( ) — (rxjava-joins) 通过模式和计划组合多个Observables发射的数据集合
  • combineLatest( ) — 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果
  • join( ) and groupJoin( ) — 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射
  • switchOnNext( ) — 将一个发射Observables的Observable转换成另一个Observable,后者发射这些Observables最近发射的数据

错误处理

  • onErrorResumeNext( ) — 指示Observable在遇到错误时发射一个数据序列
  • onErrorReturn( ) — 指示Observable在遇到错误时发射一个特定的数据
  • onExceptionResumeNext( ) — instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)指示Observable遇到错误时继续发射数据
  • retry( ) — 指示Observable遇到错误时重试
  • retryWhen( ) — 指示Observable遇到错误时,将错误传递给另一个Observable来决定是否要重新给订阅这个Observable

辅助操作符

条件操作符

  • amb( ) — 给定多个Observable,只让第一个发射数据的Observable发射全部数据
  • defaultIfEmpty( ) — 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据
  • (rxjava-computation-expressions) doWhile( ) — 发射原始Observable的数据序列,然后重复发射这个序列直到不满足这个条件为止
  • (rxjava-computation-expressions) ifThen( ) — 只有当某个条件为真时才发射原始Observable的数据序列,否则发射一个空的或默认的序列
  • skipUntil( ) — 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
  • skipWhile( ) — 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据
  • (rxjava-computation-expressions) switchCase( ) — 基于一个计算结果,发射一个指定Observable的数据序列
  • takeUntil( ) — 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知
  • takeWhile( ) and takeWhileWithIndex( ) — 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据
  • (rxjava-computation-expressions) whileDo( ) — 如果条件为true,则发射源Observable数据序列,并且只要条件保持为true就重复发射此数据序列

布尔操作符

算数模块的操作符

其它聚合操作符

  • concat( ) — 顺序连接多个Observables
  • count( ) and countLong( ) — 计算数据项的个数并发射结果
  • reduce( ) — 对序列使用reduce()函数并发射最终的结果
  • collect( ) — 将原始Observable发射的数据放到一个单一的可变的数据结构中,然后返回一个发射这个数据结构的Observable
  • toList( ) — 收集原始Observable发射的所有数据到一个列表,然后返回这个列表
  • toSortedList( ) — 收集原始Observable发射的所有数据到一个有序列表,然后返回这个列表
  • toMap( ) — 将序列数据转换为一个Map,Map的key是根据一个函数计算的
  • toMultiMap( ) — 将序列数据转换为一个列表,同时也是一个Map,Map的key是根据一个函数计算的

异步操作符

  • start( ) — 创建一个Observable,它发射一个函数的返回值
  • toAsync( ) or asyncAction( ) or asyncFunc( ) — 将一个函数或者Action转换为已Observable,它执行这个函数并发射函数的返回值
  • startFuture( ) — 将一个返回Future的函数转换为一个Observable,它发射Future的返回值
  • deferFuture( ) — 将一个返回Observable的Future转换为一个Observable,但是并不尝试获取这个Future返回的Observable,直到有订阅者订阅它
  • forEachFuture( ) — 传递Subscriber方法给一个Subscriber,但是同时表现得像一个Future一样阻塞直到它完成
  • fromAction( ) — 将一个Action转换为Observable,当一个订阅者订阅时,它执行这个action并发射它的返回值
  • fromCallable( ) — 将一个Callable转换为Observable,当一个订阅者订阅时,它执行这个Callable并发射Callable的返回值,或者发射异常
  • fromRunnable( ) — convert a Runnable into an Observable that invokes the runable and emits its result when a Subscriber subscribes将一个Runnable转换为Observable,当一个订阅者订阅时,它执行这个Runnable并发射Runnable的返回值
  • runAsync( ) — 返回一个StoppableObservable,它发射某个Scheduler上指定的Action生成的多个actions

连接操作符

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350