1、线程切换
涉及线程切换操作的操作符主要就俩,subscribeOn和observedOn, 我们还是像RxJava2 的原理浅析 一样,考究源码实现。我们先把主要注意力放在 subsribeActual方法里。
-
FlowableSubscribeOn
我们可以看到与普通的Flowable子类的subscribeActual实现是不太一样的, 通常我们的Flowable实现是这样的,
调用上游的source,执行subscribe就好了,但是在FlowableSubscribeOn这个类里面,我们发现是先执行onSubscribe,也就是说先回溯,然后下游会再递归回来执行对应subscription的request(long )方法
到达这里 request(long n) 有可能执行requestUpstream,向上游递归执行request。w.schedule(sos) 这里,就是执行SubscribeOnSubscriber的run方法,w就是一个Scheduler.Worker,内部包装了一个Java自带的ExectutorService。我们发现,执行subsribe向上游递归时,线程发生切换,后续的代码都执行在w指定的线程池上。
- FlowableObservedOn
FlowableObservedOn它的subscribeActual方法与其他类是一样的,我们把主要注意力放到它的BaseObserveOnSubscriber这个内部类里面,显然,这个类实现了runnable接口的run方法,这会用于线程切换。我们主要看onNext以及接下来要执行的trySchedule(看名字就知道要切换线程了),以及run方法。
在onNext(T )方法里面我们会将数据 t 加入队列queue,稍后会取出数据,调用actual.onNext(),数据将会传递到下游。
在ObserveOnSubscriber方法里,真正执行的三个方法其实是
以runSync为例子,我们可以看到取出队头数据我们会看到一些异常处理,顺便说一下,e 可以看成已经发送的数据个数,r就是请求的数据个数,具体细节就是在处理队列里面残留的数据啦,不过在这里我们暂时先不关心这些。runSync最后就会跑到 a.onNext(v),这里的a就是下游的actual,一个subsriber。整个方法体被封装在run方法里调用,也就是trySchedule方法里面的那个work.scheduler(this),在这个地方发生了线程切换。
2、任务取消的原理
取消任务的做法其实很简单,flowable执行subscribe(Subscriber s) 以后,就会返回一个LambdaSubscriber,它自己实现了disposable接口的dipose方法,所以做法就如下就好。
然后我们来看下它的原理,一共分两种情况讨论,同步任务和异步任务
- 同步任务
还是回到Flowable.subscribeActual方法里
@CheckReturnValue
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Subscription> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
关键点在先执行subscribe(ls),然后才返回我们的disposable,先执行subscribe,整条执行链路就都跑起来了,然后又是同步,等到返回的时候必然已经执行完毕,所以同步的情况不需要取消任务。
- 异步任务
异步任务麻烦一点,我们需要看下代码执行调用的一些顺序,我们以这段代码为例子
public void run() {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Runnable unlockAction = () -> {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
};
Runnable lockAction = () -> {
lock.lock();
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
};
Disposable disposable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(23);
System.out.println(23);
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.delay(3, TimeUnit.MINUTES)
.subscribe(integer -> {
System.out.println(integer);
});
Scheduler.Worker worker = Schedulers.newThread().createWorker();
worker.schedule(() -> {
System.out.println("dispose start");
disposable.dispose();
unlockAction.run();
}, 3, TimeUnit.SECONDS);
lockAction.run();
}
执行disposable以后通过打断点观看栈帧,dispose方法执行以后会递归执行subscription 的 cancel 方法。
这些个Flowable的子类大多都有个内部类,同时实现subscriber和subscription方法,同时也是 atomic*的子类,有些是AtomicLong的子类,有些是AtomicReference<?>的子类,每个subscription都会在onSubscribe方法里获取到上游的subscription,这样就能够递归执行了。最终到源头的FlowableCreate里面执行cancel方法。
接下来是异步取消任务的核心,代码展示说不太清,需要一点点思考。
当递归执行subscription cancel的时候,总会跑到某个subscriber里面,我们称呼它为A,它的worker正在执行,虽然执行代码的片段不一定是A的run或者是onNext(有可能是它下游的某个subscriber) 但是一定会运行在A的worker里面,所以执行worker.dipose停掉这个worker就可以取消异步任务(所有的runnable都会被封装成为FutureTask 从而可以被取消)
其实原理就辣么简单。