ObservableEmitter<T> emitter
1.发射器发出onComplete()或者onError()后,接收器将不再接收时间。
2.游可以不发送onComplete或onError。
3.最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然。
注: 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, **并不一定会导致程序崩溃. ** 比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.
Disposable d
当调用它的
dispose()
方法时, 它就会将两根管道切断, 从而导致下游收不到事件,但上游的还会继续发送剩余事件。
在Activity中将这个Disposable
保存起来, 当Activity退出时切断它即可。多个Disposable
则使用CompositeDisposable
管理,CompositeDisposable.add()
和CompositeDisposable.clear()
总结
ObservableEmitter<T> emitter
的onComplete()
和onError()
,以及Disposable d
的dispose()
都只会让下游接收不到事件,但上游假如还存在事件则会继续发送,以上的方法都可以视为阶段器,
subscribeOn()
和observeOn()
subscribeOn()
指定的是上游发送事件的线程,observeOn()
指定的是下游接收事件的线程.- 多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.
- 多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次.
Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
Schedulers.newThread() 代表一个常规的新线程
AndroidSchedulers.mainThread() 代表Android的主线程
Map操作符
对原数据进行变化操作(其实就是一个方法,接收原数据操作然后返回结果数据)
FlatMap操作符(玩的熟才用,否则容易晕)
将第一次发送的数据和flatMap发送的数据进行组合再此发送。比如第一次发送ABC,第二次发送123,那么
可能(因为不保证顺序)
会出现A1A2A3 B1B2B3 C1C2C3 。保证顺序的话用concatMap
Zip操作符
对多个发送源的数据进行合并,每个源数据的对应角标的元素进行合并,以最短发送源的为准,较长发送源的剩余元素被舍弃。同一线程一定有会有一个发送源先全部发送完毕。
Flowable(默认缓存为128个事件,响应式拉取)
背压策略:BackpressureStrategy(水缸)
。一般的使用场景都是发送量大且异步(因为这两个都可以会引起内存溢出)
- ERROR,上游积压超过128事件则会直接报异常
- BUFFER, 无限制缓存大小,但是会存在OOM风险
- DROP, 丢弃超过128个事件的剩余事件(默认缓存为128,你发了129,那么第129不会进入水缸)。 Drop就是直接把存不下的事件丢弃
- LATEST, Latest就是只保留最新的事件,当水缸(缓存128)已经存满了128个事件,那么这时候还有事件进入的话则前面的事件会被覆盖掉。
背压源码解析
Flowable
// 创建上游的方法
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
// 检查是否为null的工具类,不必深究
ObjectHelper.requireNonNull(source, "source is null");
ObjectHelper.requireNonNull(mode, "mode is null");
// RxJavaPlugins.onAssembly()。因为是链式模式,所以返回本身,这个方法就是一个包裹转换的功能,不必深究
// FlowableCreate,这个类才是重点
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
// 订阅下游的方法
public final void subscribe(Subscriber<? super T> s) {
// 一般我都是直接new一个Subscriber,所以走else块。
if (s instanceof FlowableSubscriber) {
subscribe((FlowableSubscriber<? super T>)s);
} else {
ObjectHelper.requireNonNull(s, "s is null");
// 包裹一层
subscribe(new StrictSubscriber<T>(s));
}
public final void subscribe(FlowableSubscriber<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");
try {
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(z, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(z); // !!!!!!!!!!真实发起订阅(其他代码可不看,就看这个句)
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Subscription has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
StrictSubscriber 下游类
public class StrictSubscriber<T>
extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -4945028590049415624L;
final Subscriber<? super T> downstream;
final AtomicThrowable error;
final AtomicLong requested;
final AtomicReference<Subscription> upstream;
final AtomicBoolean once;
volatile boolean done;
public StrictSubscriber(Subscriber<? super T> downstream) {
this.downstream = downstream;
this.error = new AtomicThrowable();
this.requested = new AtomicLong();
this.upstream = new AtomicReference<Subscription>();
this.once = new AtomicBoolean();
}
@Override
public void request(long n) {
if (n <= 0) {
cancel();
onError(new IllegalArgumentException("§3.9 violated: positive request amount required but it was " + n));
} else {
SubscriptionHelper.deferredRequest(upstream, requested, n);
}
}
@Override
public void cancel() {
if (!done) {
SubscriptionHelper.cancel(upstream);
}
}
@Override
public void onSubscribe(Subscription s) {
if (once.compareAndSet(false, true)) {
downstream.onSubscribe(this);
SubscriptionHelper.deferredSetOnce(this.upstream, requested, s);
} else {
s.cancel();
cancel();
onError(new IllegalStateException("§2.12 violated: onSubscribe must be called at most once"));
}
}
@Override
public void onNext(T t) {
HalfSerializer.onNext(downstream, t, this, error);
}
@Override
public void onError(Throwable t) {
done = true;
HalfSerializer.onError(downstream, t, this, error);
}
@Override
public void onComplete() {
done = true;
HalfSerializer.onComplete(downstream, this, error);
}
}
FlowableCreate(继承Flowable)
// 构造方法
public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
// 持有把上游对象
this.source = source;
// 持有背压模式对象
this.backpressure = backpressure;
}
// 实际订阅,Flowable的subscribe()内部会调用这个方法。
// 当你使用订阅下游的时候,会把下游对象传到这个方法。
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
// 工厂模式,根据背压模式实例化对应的发射器,且会把下游对象通过发射器的构造方法让发射器内部持有(所以我们在发射器才会知道下游所需的处理能力)。
// 背压的核心就是这些工厂类,执行的条件不同产生的效果就不同
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
// 调用下游的onSubscribe,并且把发射器对象传递过去让下游对象持有。(双向传递,下游和发射器互相持有对方的对象)
t.onSubscribe(emitter);
try {
// 上游持有了发射器对象
// 使用上游对象执行该对象的subscribe,其实就是走发射事件的逻辑
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
BaseEmitter背压发射器基类
abstract static class BaseEmitter<T>
extends AtomicLong
implements FlowableEmitter<T>, Subscription {
private static final long serialVersionUID = 7326289992464377023L;
final Subscriber<? super T> downstream;
final SequentialDisposable serial;
BaseEmitter(Subscriber<? super T> downstream) {
// 下游对象
this.downstream = downstream;
// 切断对象
this.serial = new SequentialDisposable();
}
@Override
public void onComplete() {
complete();
}
protected void complete() {
// 如果已经切断了就跳过,所以下游不会收到onComplete()事件
if (isCancelled()) {
return;
}
try {
// 回调下游的onComplete()事件
downstream.onComplete();
} finally {
// 切断
serial.dispose();
}
}
@Override
public final void onError(Throwable e) {
if (!tryOnError(e)) {
// 已经切断,如果接着发送onError内部会抛异常
RxJavaPlugins.onError(e);
}
}
@Override
public boolean tryOnError(Throwable e) {
return error(e);
}
protected boolean error(Throwable e) {
// 判断开发者传递的异常是否为null
if (e == null) {
e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (isCancelled()) {
return false;
}
try {
// 回调下游的方法
downstream.onError(e);
} finally {
// 切断
serial.dispose();
}
return true;
}
@Override
public final void cancel() {
// 切断
serial.dispose();
onUnsubscribed();
}
// 注销订阅,空实现
void onUnsubscribed() {
// default is no-op
}
@Override
public final boolean isCancelled() {
return serial.isDisposed();
}
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
// 将下游请求的事件数存放
BackpressureHelper.add(this, n);
onRequested();
}
}
void onRequested() {
// default is no-op
}
@Override
public final void setDisposable(Disposable d) {
serial.update(d);
}
@Override
public final void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public final long requested() {
return get();
}
@Override
public final FlowableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public String toString() {
return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
}
}
ErrorAsyncEmitter背压发射器(继承了NoOverflowBaseAsyncEmitter)
static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
private static final long serialVersionUID = 338953216916120960L;
ErrorAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
void onOverflow() {
// 回调下游的onError(),直接抛出异常
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}
}
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 4127754106204442833L;
NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (get() != 0) { // 下游所需事件不为0,就是下游还有处理的事件
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
// 调用子类重写的方法
onOverflow();
}
}
// 子类重写
abstract void onOverflow();
}
// BackpressureHelper的方法
public static long produced(AtomicLong requested, long n) {
for (;;) {
long current = requested.get();
if (current == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
// 下游所需事件 - 1
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
// 重置所需事件数
if (requested.compareAndSet(current, update)) {
return update;
}
}
}