RxJava2

本篇文章使用的版本

implementation "io.reactivex.rxjava2:rxjava:2.2.5"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'

五种观察者模式

背压机制:在异步场景下,被观察者发送事件的速度 远快于 观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略
在进行异步操作时会通过缓存来存储发射的数据。
在 RxJava1.x 时,这些缓存是无界的,当需要缓存的数据非常多的时候,会占用非常多的存储空间,并有可能因为虚拟机不断 GC 而导致程序执行过慢,甚至 OOM。
在 RxJava2.x 中,大多数的异步操作内存都存在一个有界的缓存,当超出这个缓存的时候会抛出 MissingBackpressureException 异常并结束整个队列。

注意:2.0版本 .subscribe(Observer) 方法没有返回值 void,如有需要可参考使用 subscribeWith();但是 不完全参数public final Disposable subscribe(Consumer<? super T> onNext) 有返回值。

  1. Observable.subscribe(Observer)
    不支持背压
// package io.reactivex;
public abstract class Observable<T> implements ObservableSource<T>

// package io.reactivex;
public interface ObservableSource<T> {
    /**
     * 注意:2.0版本没有返回值 void
     * 如有需要可参考使用 subscribeWith()
     */
    void subscribe(@NonNull Observer<? super T> observer);
}

// package io.reactivex;
public interface Observer<T> {
    void onSubscribe(@NonNull Disposable d);
    void onNext(@NonNull T t);
    void onError(@NonNull Throwable e);
    void onComplete();
}

// package io.reactivex.disposables;
public interface Disposable {
    void dispose();
    boolean isDisposed();
}
  1. Flowable.subscribe(Subscriber)
    支持背压
// package io.reactivex;
public abstract class Flowable<T> implements Publisher<T> 

// package org.reactivestreams;
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

// package org.reactivestreams;
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

// package org.reactivestreams;
public interface Subscription {
    public void request(long n);
    public void cancel();
}
  1. Single. subscribe(SingleObserver)
    只有一个 onSuccess 或者 onError
// package io.reactivex;
public abstract class Single<T> implements SingleSource<T>

// package io.reactivex;
public interface SingleSource<T> {
    void subscribe(@NonNull SingleObserver<? super T> observer);
}

// package io.reactivex;
public interface SingleObserver<T> {
    void onSubscribe(@NonNull Disposable d);
    void onSuccess(@NonNull T t);
    void onError(@NonNull Throwable e);
}

// package io.reactivex.disposables;
public interface Disposable {
    void dispose();
    boolean isDisposed();
}
  1. Completable.subscribe(CompletableObserver)
    只有一个 onComplete 或者 onError
// package io.reactivex;
public abstract class Completable implements CompletableSource 

// package io.reactivex;
public interface CompletableSource {
    void subscribe(@NonNull CompletableObserver co);
}

// package io.reactivex;
public interface CompletableObserver {
    void onSubscribe(@NonNull Disposable d);
    void onComplete();
    void onError(@NonNull Throwable e);
}

// package io.reactivex.disposables;
public interface Disposable {
    void dispose();
    boolean isDisposed();
}
  1. Maybe. subscribe(MaybeObserver)
    只有一个 onSuccess 或者 onError 或者 onComplete
// package io.reactivex;
public abstract class Maybe<T> implements MaybeSource<T> 

// package io.reactivex;
public interface MaybeSource<T> {
    void subscribe(@NonNull MaybeObserver<? super T> observer);
}

// package io.reactivex;
public interface MaybeObserver<T> {
    void onSubscribe(@NonNull Disposable d);
    void onSuccess(@NonNull T t);
    void onError(@NonNull Throwable e);
    void onComplete();
}

// package io.reactivex.disposables;
public interface Disposable {
    void dispose();
    boolean isDisposed();
}

Observable 操作符使用

看图即可,之后具体分析可忽略,直接看下一节


RxJava2操作符

创建操作符

create()

public static <T> Observable<T> create(ObservableOnSubscribe<T> source)

just()
不超过10个

public static <T> Observable<T> just(T item)
...
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)

from 系列

public static <T> Observable<T> fromArray(T... items)
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)

defer()
只有在观察者订阅的时候才会创建被观察者

public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
Observable.defer(new Callable<ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> call() throws Exception {
        return Observable.just(1);
    }
});

timer()
当到指定时间后发送一个 0L 的值给观察者

public static Observable<Long> timer(long delay, TimeUnit unit)

interval
每隔一段时间发送一个事件,不断增1的数字

public static Observable<Long> interval(long period, TimeUnit unit)

intervalRange

public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)

range

public static Observable<Integer> range(final int start, final int count)

rangeLong

public static Observable<Long> rangeLong(long start, long count)

empty & never & error

// 不发送任何事件
public static <T> Observable<T> never()
// 发送 onComplete() 事件
public static <T> Observable<T> empty()
// 发送 onError() 事件
public static <T> Observable<T> error(final Throwable exception)

repeat() & repeatWhen()

转换操作符

map() & cast()
一对一


map
cast

flatMap()& concatMap()
一对多,flatMap不保证事件的顺序,concatMap与上游发送的顺序一致

buffer()
将整个事件流进行分组

Observable.range(1,7)
        .buffer(3)
        .subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Exception {
                Log.i(TAG,integers.toString());
                Log.i(TAG,"----");
            }
        });

输出
01-29 15:03:09.570 I/TestRxJava2Operate: [1, 2, 3]
01-29 15:03:09.570 I/TestRxJava2Operate: ----
01-29 15:03:09.570 I/TestRxJava2Operate: [4, 5, 6]
01-29 15:03:09.570 I/TestRxJava2Operate: ----
01-29 15:03:09.570 I/TestRxJava2Operate: [7]
01-29 15:03:09.570 I/TestRxJava2Operate: ----
buffer

groupBy()

groupBy

scan()
累加器 accumulator,依次输出


scan

reduce()
和 scan() 一样累加,只是只发送最后一个值

window()
和 buffer 类似,但不是发射来自 Observable 的数据包,发射的是 Observable,最后发射一个 onCompleted 通知
[图片上传失败...(image-d3834c-1548834337169)]

过滤操作符

filter()
规则过滤

distinct() & distinctUntilChanged()
去重过滤

skip() & skipLast() & skipWhile() & skipUntil()
过滤掉前几项

take() & takeLast() & takeUntil() & takeWhile()
只保留前几项

elementAt() & firstElement() & lastElement()
获取队列中指定位置的事件

ignoreElements()
过滤掉所有队列中的事件,只保留 onComplete / onError

throttleFirst() & throttleLast & throttleLatest & throttleWithTimeout
对时间进行切片,选取第一个,最后一个,最近的一个
throttleLast 底层使用的 sample 方法实现

throttleWithTimeout 底层使用的 debounce 方法实现;仅在过了一段指定的时间还没有发射数据时才发射一个数据,如果在一个事件片达到之前,发射的数据之后又紧跟着发射了一个数据,那么这个时间片 之前发射的数据会被丢弃

debounce()
用来限制发射频率过快的,它仅在过了一段指定的时间还没发射数据时才发射一个数据,否则丢弃之前的数据


debounce

sample()
throttleLast 内部实现调用的是 sample


sample

ofType()
过滤掉不符合该类型的事件

组合操作符

startWith() & startWithArray()
在事件队列之前插入数据

merge() & mergeArray() & mergeDelayError & mergeArrayDelayError()
多个事件队列合并起来发射,可能交错无序

merge() 和 mergeError() 只有在处理错误 onError 时不同。mergeError() 在错误之前所有事件发射完毕之后才把错误发射出来,多个错误只发射一个错误;
merge() 在遇到错误时,直接抛出来结束操作

concat() & concatArray() & concatEager()& concatDelayError & concatArrayDelayError()
多个事件队列合并起来发射,严格按顺序发射

concatEager方法,当一个观察者订阅了它的结果,那么就相当于订阅了它拼接的所有ObservableSource,并且会先缓存这些ObservableSource发射的数据,然后再按照顺序将它们发射出来;

zip() & zipArray() & zipIterable()
将多个数据项合并,可以指定合并规则

combineLatest() & combineLatestDelayError()
组合最近的俩个数据


image

collect()
将数据收集到数据结构中

Observable.just(1, 2, 3, 4)
.collect(new Callable < ArrayList < Integer >> () {
    @Override
    public ArrayList < Integer > call() throws Exception {
        return new ArrayList < > ();
    }
},
new BiConsumer < ArrayList < Integer > , Integer > () {
    @Override
    public void accept(ArrayList < Integer > integers, Integer integer) throws Exception {
        integers.add(integer);
    }
})
.subscribe(new Consumer < ArrayList < Integer >> () {
    @Override
    public void accept(ArrayList < Integer > integers) throws Exception {
        Log.d(TAG, "===============accept " + integers);
    }
});

辅助操作符

delay()
在发送事件前延迟
doXXX 系列

doOnSubscribe,在 subscribe() 订阅之前触发
doOnLifecycle,
doOnEach,在每个 onNext() 调用之前触发
doOnNext,在 onNext() 之前触发
doAfterNext,在 onNext() 方法之后触发
doOnTerminate,在 Observable 终止onComplete() / onError()之前触发
doOnComplete,在 onComplete() 之前触发
doOnError,在 onError() 之前触发
doFinally,在 onComplete() 或 onError() 结束之后触发
doAfterTerminate,在 Observable 终止之后触发
doOnDispose,在 dispose 之前触发

输出示例:

01-28 16:08:30.460 I/TestRxJava2: doOnSubscribe 
01-28 16:08:30.460 I/TestRxJava2: doOnLifecycle Consumer
01-28 16:08:30.460 I/TestRxJava2: Observer onSubscribe 
01-28 16:08:30.460 I/TestRxJava2: ObservableEmitter onNext 0
01-28 16:08:30.460 I/TestRxJava2: doOnEach 
01-28 16:08:30.460 I/TestRxJava2: doOnNext 
01-28 16:08:30.460 I/TestRxJava2: Observer onNext 0
01-28 16:08:30.460 I/TestRxJava2: doAfterNext 
01-28 16:08:30.460 I/TestRxJava2: ObservableEmitter onComplete
01-28 16:08:30.460 I/TestRxJava2: doOnTerminate 
01-28 16:08:30.460 I/TestRxJava2: doOnComplete 
01-28 16:08:30.460 I/TestRxJava2: doOnEach 
01-28 16:08:30.460 I/TestRxJava2: Observer onComplete
01-28 16:08:30.460 I/TestRxJava2: doFinally 
01-28 16:08:30.460 I/TestRxJava2: doAfterTerminate 

subscribeOn() & observeOn()
线程切换,subscribeOn()指定观察者运行的线程;observeOn()指定被观察者发射事件所运行的线程

timeout()
设置超时时间,指定时间内没有任何数据,就执行我们的数据项,后续事件不再发送

错误处理操作符

onErrorReturn() & onErrorResumeNext() & onExceptionResumeNext()
onErrorReturn(): 在触发 onError 的时候用一个值代替,并调用 onCompleted() 结束本次队列,而不会将错误传递给观察者
onErrorResumeNext(): 用 Observable 代替
onExceptionResumeNext(): 只处理 Exception() ,如果是 Error 则不处理。(二者都继承 Throwable)

retry() & retryUtil() & retryWhen()
重复试错

Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new Throwable("Error1"));
//                        emitter.onError(new Throwable("Error2"));
                emitter.onNext(3);
            }
        })
        .retry(2, new Predicate<Throwable>() {
            @Override
            public boolean test(Throwable throwable) throws Exception {
                return true;
            }
        })
        .subscribe();

// 输出
01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onSubscribe false
01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 1
01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 2
01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 1
01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 2
01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 1
01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 2
01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onError: java.lang.Throwable: Error1

条件操作符

all() & any()
all(): 是否全部满足
any(): 是否存在一个

contains() & isEmpty()
contains(): 是否包含指定项
isEmpty(): 是否为空

sequenceEqual()
判断俩个序列是否相等

amb()
作用两个或多个 Observable,但是只会发射最先发射数据的那个 Observable 的全部数据

Observable
        .amb(Arrays.asList(Observable.range(1, 5),
                Observable.range(6, 5)))
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.i(TAG, "testAmb " + integer);
            }
        }); 

defaultIfEmpty()
当指定的序列为空的时候指定一个用于发射的值,需要调用 onCompleted()

转换操作符

toList() & toSortedList()
toMap() & toMultimap()
toFlowable()
to()

Flowable 背压机制

使用示例

示例1

Flowable
    .range(0, 100)
    .onBackpressureBuffer()
    .observeOn(Schedulers.io())
    .subscribe(new Subscriber<Integer>() {
        
        Subscription sub;

        @Override
        public void onSubscribe(Subscription s) {
            Log.w(TAG, "onsubscribe start");
            sub = s;
            s.request(2);
            Log.w(TAG, "onsubscribe end");
        }

        @Override
        public void onNext(Integer integer) {
            Log.w(TAG, "onNext--->" + integer);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            sub.request(3);
        }

        @Override
        public void onError(Throwable t) {
            t.printStackTrace();
        }

        @Override
        public void onComplete() {
            Log.w(TAG, "onComplete");
        }
    });

输出结果如下

01-28 10:48:09.040 W/TestRxJava2: onsubscribe start
01-28 10:48:09.040 W/TestRxJava2: onsubscribe end
01-28 10:48:09.050 W/TestRxJava2: onNext--->0
01-28 10:48:11.050 W/TestRxJava2: onNext--->1
01-28 10:48:13.050 W/TestRxJava2: onNext--->2
01-28 10:48:15.050 W/TestRxJava2: onNext--->3
01-28 10:48:17.050 W/TestRxJava2: onNext--->4

示例2

Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            emitter.onNext(i);
        }
        emitter.onComplete();
    }
}, BackpressureStrategy.BUFFER)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.i(TAG, "Consumer accept integer: " + integer);
            }
        });

解决背压问题

导致的原因:使用 .observeOn() 方法监听了 非背压PublishProcessortimer()interval()或者自定义的create()
.observeOn() 方法的默认缓存大小是128,当生产的速度过快时,会很快超出该缓存大小,从而导致内存溢出。

1 增加缓存大小

使用.observeOn() 重载方法来设置缓存的大小

PublishProcessor<Integer> source = PublishProcessor.create();
source
        .observeOn(Schedulers.computation(), false, 1024 * 1024)
//                .observeOn(Schedulers.computation())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                compute(integer);
            }
        });

for (int i = 1; i < 1_000_000; i++) {
    source.onNext(i);
}

只解决暂时背压问题,当生产速率过快的时候还是可能造成缓存溢出

2 通过丢弃或者过滤来减轻缓存压力

使用 throttleXXX 或者 sample()等方式减少接收的数据

PublishProcessor<Integer> source = PublishProcessor.create();
source
        .sample(1, TimeUnit.SECONDS)
        .observeOn(Schedulers.computation())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                compute(integer);
            }
        });

for (int i = 1; i < 1_000_000; i++) {
    source.onNext(i);
}

该方式仅用来减少下游接收的数据,不改变上游发送数据和缓存数据,解决背压问题,但还是会导致缓存溢出

3 onBackpressureBuffer

onBackpressureBuffer()

无参的方式使用一个无界的缓存,只要虚拟机没有抛出 OOM 异常,它就会把所有的数据缓存下来,而只会将一小部分的数据传递给 observeOn

Flowable
    .range(1, Integer.MAX_VALUE)
    .onBackpressureBuffer()
    .observeOn(Schedulers.computation(), false, 8)
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            compute(integer);
        }
    });

这种处理方式实际上不存在背压,因为 onBackpressureBuffer 缓存了所有的数据

onBackpressureBuffer 系列

onBackpressureBuffer(boolean delayError)
onBackpressureBuffer(int capacity)
onBackpressureBuffer(int capacity, boolean delayError)
onBackpressureBuffer(int capacity, boolean delayError, boolean unbounded)
onBackpressureBuffer(int capacity, boolean delayError, boolean unbounded,Action onOverflow)
onBackpressureBuffer(int capacity, Action onOverflow)
onBackpressureBuffer(long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy)
capacity:指定有界缓存
delayError:是否延迟抛出异常
unbounded:无界
onOverflow:缓存溢出回调
overflowStrategy:缓存策略

public enum BackpressureOverflowStrategy {
    /** 抛出异常 BufferOverflowException */
    ERROR,
    /** 丢去最老的值*/
    DROP_OLDEST,
    /** 丢弃最新的值 */
    DROP_LATEST
}

4 onBackpressureDrop

不会缓存任何数据,专注当下,新来的数据来不及处理就丢掉,以后会有更好的


bp.obp.drop.png
5 onBackpressureLatest

会缓存一个数据,当正在执行某个人物的时候有新的数据过来,会把它缓存起来,如果又有新的数据过来,那就把之前的替换掉,缓存里面的总是最新的


bp.obp.latest.png

操作符

public interface Action {
    void run() throws Exception;
}
public interface Consumer<T> {
    void accept(T t) throws Exception;
}

public interface BiConsumer<T1, T2> {
    void accept(T1 t1, T2 t2) throws Exception;
}

public interface LongConsumer {
    void accept(long t) throws Exception;
}
public interface Function<T, R> {
    R apply(@NonNull T t) throws Exception;
}

public interface BiFunction<T1, T2, R> {
    R apply(@NonNull T1 t1, @NonNull T2 t2) throws Exception;
}

public interface Function3<T1, T2, T3, R> {
    R apply(@NonNull T1 t1, @NonNull T2 t2, @NonNull T3 t3) throws Exception;
}

 ... Function4 ... Function8 ...

public interface Function9<T1, T2, T3, T4, T5, T6, T7, T8, T9, R> {
    R apply(@NonNull T1 t1, @NonNull T2 t2, @NonNull T3 t3, @NonNull T4 t4, @NonNull T5 t5, @NonNull T6 t6, @NonNull T7 t7, @NonNull T8 t8, @NonNull T9 t9) throws Exception;
}

public interface IntFunction<T> {
    T apply(int i) throws Exception;
}
public interface Predicate<T> {
    boolean test(@NonNull T t) throws Exception;
}

public interface BiPredicate<T1, T2> {
    boolean test(@NonNull T1 t1, @NonNull T2 t2) throws Exception;
}
public interface Cancellable {
    void cancel() throws Exception;
}
public interface BooleanSupplier {
    boolean getAsBoolean() throws Exception; // NOPMD
}

非背压 Subject

Subject 可以同时代表 Observer 和 Observable,允许从数据源中多次发送结果给多个观察者。除了 onSubscribe(), onNext(), onError() 和 onComplete() 之外,所有的方法都是线程安全的。此外,你还可以使用 toSerialized() 方法,也就是转换成串行的,将这些方法设置成线程安全的,如 PublishSubject.create().toSerialized()

public abstract class Subject<T> extends Observable<T> implements Observer<T> {

    public abstract boolean hasObservers();

    public abstract boolean hasThrowable();

    public abstract boolean hasComplete();

    public abstract Throwable getThrowable();

    public final Subject<T> toSerialized() {
        if (this instanceof SerializedSubject) {
            return this;
        }
        return new SerializedSubject<T>(this);
    }
}

AsyncSubjectBehaviorSubject、SingleSubjectPublishSubjectReplaySubjectUnicastSubjectCompletableSubjectMaybeSubject 等均继承 Subject

AsyncSubject

非粘性,只有先注册后发送事件才能接收
只有调用 onComplete 才能触发

    public void testAsyncSubject() {
        AsyncSubject<String> subject = AsyncSubject.create();
        subject.onNext("one");
        subject.onNext("two");
        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i(TAG, "testAsyncSubject: " + s);
            }
        });
        subject.onNext("three");
        subject.onComplete();
    }

输出

01-27 18:18:03.190  I/TestRxJava2: testAsyncSubject: three

BehaviorSubject

能收到订阅之前的最后一个事件 和订阅之后发送的事件

        BehaviorSubject<String> subject = BehaviorSubject.create();
        subject.onNext("zero");
        subject.onNext("one");
        subject.onNext("two");
        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i(TAG, "testBehaviorSubject first: " + s);
            }
        });
        subject.onNext("three");
        subject.onNext("four");

输出

01-27 18:18:27.990  I/TestRxJava2: testBehaviorSubject: two
01-27 18:18:27.990  I/TestRxJava2: testBehaviorSubject: three
 01-27 18:18:27.990 I/TestRxJava2: testBehaviorSubject: four

PublishSubject

非粘性,只有先注册后发送事件才能接收

        PublishSubject<Integer> subject = PublishSubject.create();
        subject.onNext(1);
        subject.onNext(2);
        subject.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.i(TAG, "testPublishSubject first: " + integer);
            }
        });
        subject.onNext(3);
        subject.onNext(4);
        subject.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.i(TAG, "testPublishSubject second: " + integer);
            }
        });
        subject.onNext(5);
        subject.onNext(6);

输出

01-27 18:14:33.600  I/TestRxJava2: testPublishSubject first: 3
01-27 18:14:33.600  I/TestRxJava2: testPublishSubject first: 4
01-27 18:14:33.600  I/TestRxJava2: testPublishSubject first: 5
01-27 18:14:33.600  I/TestRxJava2: testPublishSubject second: 5
01-27 18:14:33.600  I/TestRxJava2: testPublishSubject first: 6
01-27 18:14:33.600  I/TestRxJava2: testPublishSubject second: 6

ReplaySubject

粘性事件

        ReplaySubject<String> subject = ReplaySubject.create();
        subject.onNext("zero");
        subject.onNext("one");
        subject.onNext("two");
        subject.onNext("four");
        subject.onNext("five");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i(TAG, "testRelaySubject first: " + s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.i(TAG, "testRelaySubject first error");

            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.i(TAG, "testRelaySubject first onComplete");
            }
        });
        subject.onNext("six");
        subject.onNext("seven");
        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i(TAG, "testRelaySubject second: " + s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.i(TAG, "testRelaySubject second error");
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.i(TAG, "testRelaySubject second onComplete");
            }
        });
        subject.onNext("eight");
        subject.onNext("nine");
        subject.onComplete();

输出

01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: zero
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: one
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: two
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: four
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: five
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: six
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: seven
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: zero
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: one
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: two
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: four
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: five
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: six
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: seven
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: eight
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: eight
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: nine
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: nine
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first onComplete
01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second onComplete

UnicastSubject

粘性事件,只能有一个观察者,java.lang.IllegalStateException: Only a single observer allowed.

UnicastSubject<Integer> subject = UnicastSubject.create();
        subject.onNext(0);
        subject.onNext(1);
        subject.onNext(2);
        subject.onNext(3);
        subject.onNext(4);
        subject.onNext(5);
        subject.onNext(6);
        subject.onNext(7);
        subject.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.i(TAG, "testUnicastSubject first: " + integer);
            }
        });
        subject.onNext(8);
        subject.onNext(9);

输出

01-27 18:22:10.440 I/TestRxJava2: testUnicastSubject first: 0
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 1
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 2
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 3
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 4
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 5
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 6
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 7
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 8
01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 9

背压 Processor

public abstract class FlowableProcessor<T> extends Flowable<T> implements Processor<T, T>, FlowableSubscriber<T> {
    
    public abstract boolean hasSubscribers();

    public abstract boolean hasThrowable();

    public abstract boolean hasComplete();

    public abstract Throwable getThrowable();

    public final FlowableProcessor<T> toSerialized() {
        if (this instanceof SerializedProcessor) {
            return this;
        }
        return new SerializedProcessor<T>(this);
    }
}

以下均继承FlowableProcessor

AsyncProcessor
BehaviorProcessor --
MulticastProcessor
PublishProcessor --
ReplayProcessor
SerializedProcessor --
UnicastProcessor

Scheduler 线程调度

computation()
io()
trampoline()
newThread()
single()
from(@NonNull Executor executor)

统一取消订阅

CompositeDisposable 统一订阅
subscribeWith 返回观察者
ResourceSubscriber 等实现 Disposable 接口

CompositeDisposable compositeDisposable = new CompositeDisposable();

ResourceSubscriber<Integer> resourceSubscriber 
        = Flowable
        .range(1, 8)
        .subscribeWith(new ResourceSubscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
            }

            @Override
            public void onError(Throwable t) {
            }

            @Override
            public void onComplete() {
            }
        });

compositeDisposable.add(resourceSubscriber);

类似 ResourceSubscriber 的还有

Subscribers 系列

public abstract class DisposableSubscriber<T> implements FlowableSubscriber<T>, Disposable 

public abstract class ResourceSubscriber<T> implements FlowableSubscriber<T>, Disposable 

// 没有实现 Disposable,而是实现了Subscription
public final class SafeSubscriber<T> implements FlowableSubscriber<T>, Subscription 

// 没有实现 Disposable,而是实现了Subscription
public final class SerializedSubscriber<T> implements FlowableSubscriber<T>, Subscription 

public class TestSubscriber<T>
extends BaseTestConsumer<T, TestSubscriber<T>>
implements FlowableSubscriber<T>, Subscription, Disposable 

Observers 系列

public abstract class DisposableCompletableObserver implements CompletableObserver, Disposable 

public abstract class DisposableMaybeObserver<T> implements MaybeObserver<T>, Disposable 

public abstract class DisposableObserver<T> implements Observer<T>, Disposable 

public abstract class DisposableSingleObserver<T> implements SingleObserver<T>, Disposable 

public abstract class ResourceCompletableObserver implements CompletableObserver, Disposable 

public abstract class ResourceMaybeObserver<T> implements MaybeObserver<T>, Disposable 

public abstract class ResourceObserver<T> implements Observer<T>, Disposable 

public abstract class ResourceSingleObserver<T> implements SingleObserver<T>, Disposable 

public final class SafeObserver<T> implements Observer<T>, Disposable 

public final class SerializedObserver<T> implements Observer<T>, Disposable 

public class TestObserver<T>
extends BaseTestConsumer<T, TestObserver<T>>
implements Observer<T>, Disposable, MaybeObserver<T>, SingleObserver<T>, CompletableObserver 

参考资料

感谢以下文章作者
RxJava2 系列-1:一篇的比较全面的 RxJava2 方法总结
关于 RxJava 最友好的文章—— RxJava 2.0 全新来袭
RxJava2 只看这一篇文章就够了
RxJava 组合操作符

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容