作为RxJava的典型特性之一,背压,我们知道他是在生产者与消费者速率不一致时做的一种数据处理策略,只是在想,它是如何实现的?内部原理是什么?
还是从源码入手,撸完源码,想必能解开自己的疑惑。
背压分析
1 来一段背压控制的代码。
Flowable.range(1,100_000)
.onBackpressureDrop()
.subscribe(s-> {
Thread.sleep(100);
System.out.println(s);
});
2 根据前面的分析,知道Flowable.range会产生一个新的Observable,但是对于Flowable则是产生了FlowableRange。
new FlowableRange(start, count)
3 FlowableRange扩展了Flowable,调用其onBackpressureDrop()方法,内部实现为。
// this即上一步的FlowableRange。
// 内部的source = FlowableRange
new FlowableOnBackpressureDrop<T>(this)
4 subscribe方法,类似于Observable,只是这里为LambdaSubscriber。然后subscribe内部调用的是subscribeActual方法,参数为LambdaSubscriber
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Subscription> onSubscribe) {
// onNext为我们传入的Consumer类
LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
5 查看FlowableOnBackpressureDrop的subscribeActual方法实现。其实不难理解这里调用到的是第一步的FlowableRange的subscribeActual方法,参数为new BackpressureDropSubscribe。
// FlowableOnBackpressureDrop
// s = LambdaSubscriber
// onDrop = this = FlowableOnBackpressureDrop实例
this.source.subscribe(new BackpressureDropSubscriber<T>(s, onDrop))
6 看下FlowableRange的subscribeActual方法。
@Override
public void subscribeActual(Subscriber<? super Integer> s) {
if (s instanceof ConditionalSubscriber) {
s.onSubscribe(new RangeConditionalSubscription(
(ConditionalSubscriber<? super Integer>)s, start, end));
} else {
// 走这段逻辑,上面代码忽略
// s = BackpressureDropSubscriber
// start,end为开始时传入的参数,当前的案例start = 1, end = 100_000
s.onSubscribe(new RangeSubscription(s, start, end));
}
}
7 调用BackpressureDropSubscriber的onSubscribe方法。参数为new RangeSubscription
// BackpressureDropSubscriber
@Override
public void onSubscribe(Subscription s) {
// upstream为RangeSubscription
// downstream为LambdaSubscriber
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
downstream.onSubscribe(this);
s.request(Long.MAX_VALUE);
}
}
8 查看LambdaSubscriber的onSubscribe方法。
// LambdaSubscriber
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(this, s)) {
try {
// 因为这里我们没有实现onSubscribe方法,这里什么都没处理。
onSubscribe.accept(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
onError(ex);
}
}
}
9 在BackpressureDropSubscriber的upstream中request方法,其根据n值走那条路径,在我们的代码中走的是fastPath。
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
if (BackpressureHelper.add(this, n) == 0L) {
// 走fastPath
if (n == Long.MAX_VALUE) {
fastPath();
} else {
slowPath(n);
}
}
}
}
10 看一下fastPath内容,实际上就是循环的将range中的代码丢给下游去处理。
@Override
void fastPath() {
// f = 100_0000
int f = end;
Subscriber<? super Integer> a = downstream;
// index = 1,不断的调用下游的onNext方法
for (int i = index; i != f; i++) {
if (cancelled) {
return;
}
a.onNext(i);
}
if (cancelled) {
return;
}
// 最终调用下游的onComplete方法。
a.onComplete();
}
11 BackpressureDropSubscriber在接收到上游的这些数据后如何处理的呢?
@Override
public void onNext(T t) {
if (done) {
return;
}
// 在每一次调用downstream.onSubscribe方法的时候都会请求上游一个数字,默认是Long.Max
long r = get();
if (r != 0L) {
// 调用下游的观察者去处理
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
try {
onDrop.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
cancel();
onError(e);
}
}
}
12 背压的工具类处理。
BackpressureHelper.produced(this, 1);
public static long produced(AtomicLong requested, long n) {
for (;;) {
// 如果设置为默认的Long.Max_value的时候不会进行背压控制
long current = requested.get();
if (current == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
// 当前值每次消耗1个,直到小于0,那么启用背压策略。
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
if (requested.compareAndSet(current, update)) {
return update;
}
}
}
13 可以看出来在创建FlowableSubscriber时如果指定了要请求的数据量,那么上游返回的数据就可以得到控制。
修改最开始的代码如下:
Flowable.range(1, 100_000)
.onBackpressureDrop()
.subscribe(new FlowableSubscriber() {
@Override
public void onSubscribe(Subscription s) {
s.request(100);
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable t) {
System.out.println("on error");
}
@Override
public void onComplete() {
System.out.println("complete");
}
});
Thread.sleep(10000);
// 最终只会打印到100
14 因为这里是同步处理的逻辑,上游生产的数据会同步的被下游处理,而在同步状态下,下游只好指定自己准备消耗多少数据。
其实背压就是在进行Subscription的时候指定自己要处理多少数据,如果这些数据已经全部下发到下游了,后续继续下发的数据就会被背压策略处理。
简述
上面代码分析比较繁琐,看完之后,我觉得背压处理逻辑是这样:
Subscriber: 开始subscribe了,我想要100个数据。
Upstream: 不关心下游的状态,不断发送数据,每次发送的时候,Subscriber设置的请求100个数据都会减少一个,直到最终为0. 当request的数量减少到0的时候,代表发给下游的数据已经超出下游想要处理的数据。
Subscriber:我这边处理完了100个数, 还想要一些数据,再去request一些数据。
Upstream:看到request的数据又不为0了,继续发数据给下游。
Subscriber:我不想要数据了,调用cancel()
upstream: 已经cancel了,那么数据就不再下发了。
upstream:我发现我的数据已经全部给下游了,调用下游的onComplete处理。
最后
背压处理,其实就是让消费者通知生产者自己想要处理多少数据,或者能够处理多少数据,然后上游的数据源收到了消费者请求的内容,做自己的策略处理。