Scheduler
/Worker
需要满足以下的要求:
- 所有的方法都需要是线程安全的;
-
Worker
需要保证即时、串行提交的任务按照先进先出(FIFO)的顺序被执行; -
Worker
需要尽可能保证被取消订阅时要取消还未执行的任务; -
Worker
被取消订阅时不能影响同一个 Scheduler 的其他 Worker;
public final class ContextAwareScheduler extends Scheduler {
public static final ContextAwareScheduler INSTANCE =
new ContextAwareScheduler(); // (1)
final NewThreadWorker worker;
private ContextAwareScheduler() {
this.worker = new NewThreadWorker(
new RxThreadFactory("ContextAwareScheduler")); // (2)
}
@Override
public Worker createWorker() {
return new ContextAwareWorker(worker); // (3)
}
static final class ContextAwareWorker extends Worker {
final CompositeDisposable tracking; // (4)
final NewThreadWorker worker;
public ContextAwareWorker(NewThreadWorker worker) {
this.worker = worker;
this.tracking = new CompositeSubscription();
}
@Override
public Subscription schedule(Action0 action) {
// implement
}
@Override
public void dispose() {
tracking.dispose(); // (5)
}
@Override
public boolean isDisposed() {
return tracking.isDisposed();
}
}
}
上面是我们定义的一个 Scheduler
:
- 由于
scheduler
不能存在多个实例,因此我们使用了一个静态的单例。 -
scheduler
会把几乎所有的工作都转交给内部的一个worker
。 - 我们不能对外只提供一个
worker
实例,否则一旦 worker 被取消订阅,所有人的 worker 都被取消订阅了。因此我们需要单独为每个 worker 实例记录提交过来的任务。以便我们可以检查是否已经取消了订阅,以及进行取消订阅操作
接下来,我们就需要前面提到的线程局部上下文了:
public final class ContextManager {
static final ThreadLocal<Object> ctx = new ThreadLocal<>();
private ContextManager() {
throw new IllegalStateException();
}
public static Object get() {
return ctx.get();
}
public static void set(Object context) {
ctx.set(context);
}
}
ContextManager
仅仅是包装了一个静态的 ThreadLocal
变量。在实际情况中,你可能要把 Object
替换成你需要的类型。
现在让我们继续看 schedule()
的实现:
// ...
@Override
public Disposable schedule(final Runnable run, long delay, TimeUnit unit) {
if (isDisposed()) { // (2)
return EmptyDisposable.INSTANCE;
}
final Object context = ContextManager.get(); // (3)
Runnable a = new Runnable() {
@Override
public void run() {
ContextManager.set(context); // (4)
run.run();
}
};
return worker.scheduleActual(a, delay, unit, tracking);
}
// ...
让我们尝试一下:
Worker w = INSTANCE.createWorker();
CountDownLatch cdl = new CountDownLatch(1);
ContextManager.set(1);
w.schedule(() -> {
System.out.println(Thread.currentThread());
System.out.println(ContextManager.get());
});
ContextManager.set(2);
w.schedule(() -> {
System.out.println(Thread.currentThread());
System.out.println(ContextManager.get());
cdl.countDown();
});
cdl.await();
ContextManager.set(3);
Observable.timer(500, TimeUnit.MILLISECONDS, INSTANCE)
.doOnNext(v -> {
System.out.println(Thread.currentThread());
System.out.println(ContextManager.get());
}).blockingFirst();
w.dispose();
运行输出如下:
Thread[ContextAwareScheduler1,5,main]
1
Thread[ContextAwareScheduler1,5,main]
2
Thread[ContextAwareScheduler1,5,main]
3
在引入了 Scheduler/Worker
之后我们需要记录这些任务,并取消它们。一旦 Future
被记录了,那就需要在它们完成或者被取消时取消记录,否则就会发生内存泄漏。这就意味着我们不能直接把一个 Action0/Runnable
提交到 ExecutorService
上,我们需要包装一下它们,让它们可以在完成或者被取消时被取消记录。解决方案就是 ScheduledAction
类。
public final class ScheduledAction implements Runnable, Subscription {
final Action0 action; // (1)
final SubscriptionList slist; // (2)
public ScheduledAction(Action0 action) {
this.action = action;
this.slist = new SubscriptionList();
}
@Override
public void run() {
try {
action.call(); // (3)
} finally {
unsubscribe(); // (4)
}
}
@Override
public boolean isUnsubscribed() {
return slist.isUnsubscribed();
}
@Override
public void unsubscribe() {
slist.unsubscribe();
}
public void add(Subscription s) { // (5)
slist.add(s);
}
}
接下来就是要在任务被提交到 ExecutorService
之前把所有的记录以及清理任务都串起来。为了简单起见,我们假设我们的 ExecutorService
是个单线程的 service。我们将在后面处理多线程的情况。
// ...
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (isUnsubscribed()) { // (1)
return Subscriptions.unsubscribed();
}
ScheduledAction sa = new ScheduledAction(action); // (2)
tracking.add(sa); // (3)
sa.add(Subscriptions.create(
() -> tracking.remove(sa)));
Future<?> f;
if (delayTime <= 0) { // (4)
f = exec.submit(sa);
} else if (exec instanceof ScheduledExecutorService) { // (5)
f = ((ScheduledExecutorService)exec)
.schedule(sa, delayTime, unit);
} else {
f = genericScheduler.schedule(() -> { // (6)
Future<?> g = exec.submit(sa);
sa.add(Subscriptions.create( // (7)
() -> g.cancel(false)));
}, delayTime, unit);
}
sa.add(Subscriptions.create( // (8)
() -> f.cancel(false)));
return sa; // (9)
}
// ...
代码工作机制如下:
- 如果
worker
已经被取消订阅,我们就返回一个表示已经取消的常量subscription
。注意,如果有 schedule 调用(由于多线程竞争)通过了这个检查,它将会收到一个来自底层线程池的RejectedExecutionException
。你可以把函数中后面的代码都用一个try-catch
包裹起来,并在异常发生时返回同样的表示已经取消的常量 subscription。 - 我们把任务包装为
ScheduledAction
。 - 在实际调度这个任务之前,我们把它加入到
tracking
中,并且增加一个取消订阅的回调,以便在它执行完毕或者被取消时可以将其从tracking
中移除。注意,由于幂等性,remove()
不会调用ScheduledAction
的unsubscribe()
,从而不会导致死循环。 - 如果调度是没有延迟的,我们就立即将其提交,并且保存返回的
Future
。 - 如果我们的 ExecutorService 是
ScheduledExecutorService
,我们就可以直接调用它的schedule()
函数了。 - 否则我们就需要借助
ScheduledExecutorService
来实现延迟调度了,但我们不能直接把任务调度给它,因为这样它会在错误的线程中执行。我们需要创建一个中间任务,它将在延迟结束之后,向正确的线程池调度一个即时的任务。 - 我们需要保证提交后返回的
Future
能在unsubscribe()
调用时被取消。这里我们把内部的 Future 加入到了 ScheduledAction 中。 - 无论是立即调度,还是延迟调度,我们都需要在取消订阅时取消这个调度,所以我们把返回的 Future 加入到
ScheduledAction
中(通过把 Future#cancel() 包装到一个 Subscription 中)。在这里,你就可以控制是否需要强行(中断)取消了。(RxJava 会根据取消订阅时所处的线程来决定:如果取消订阅就是在执行任务的线程中,那就没必要中断了) -
ScheduledAction
也是任务发起方用来取消订阅的凭证(token)。
最后缺失的一点代码就是 genericScheduler
了。你可以为 worker 添加一个static final
成员,并像下面这样设置:
static final ScheduledExecutorService genericScheduler;
static {
genericScheduler = Executors.newScheduledThreadPool(1, r -> {
Thread t = new Thread(r, "GenericScheduler");
t.setDaemon(true);
return t;
});
}
Worker
最重要的一个规则就是有序提交的非延迟任务要按序执行,但是 Executor 的线程是随机取走任务,而且是并发乱序执行的。
解决办法就是使用我们以前介绍过的“队列漏”,并且对调度的任务进行一个中继操作.
首先是类结构
public final class ExecutorScheduler extends Scheduler {
final Executor exec;
public ExecutorScheduler(Executor exec) {
this.exec = exec;
}
@Override
public Worker createWorker() {
return new ExecutorWorker();
}
final class ExecutorWorker extends Worker implements Runnable { // (1)
// data fields here
@Override
public Subscription schedule( Action0 action) {
// implement
}
@Override
public Subscription schedule( Action0 action, long delayTime, TimeUnit unit) {
// implement
}
@Override
public void run() {
// implement
}
@Override
public boolean isUnsubscribed() {
// implement
}
@Override
public void unsubscribe() {
// implement
}
}
}
接着是一些变量
// 漏逻辑需要
// 一个正在执行的标记、
final AtomicInteger wip = new AtomicInteger();
// 一个队列、
final Queue<ScheduledAction> queue = new ConcurrentLinkedQueue<>();
//以及一个 subscription 容器类型,以便集中取消订阅:
final CompositeSubscription tracking = new CompositeSubscription();
首先让我们看看无延迟的 schedule()
:
@Override
public Subscription schedule(Action0 action) {
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledAction sa =
new ScheduledAction(action);
tracking.add(sa);
sa.add(Subscriptions.create(
() -> tracking.remove(sa))); // (1)
queue.offer(sa); // (2)
sa.add(Subscriptions.create(
() -> queue.remove(sa))); // (3)
if (wip.getAndIncrement() == 0) { // (4)
exec.execute(this); // (5)
}
return sa;
}
主要逻辑是:
- 创建一个
ScheduledAction
,并且在其被取消订阅时将自己从tracking
中移除。 - 把任务加入到队列中,队列会保证任务按照提交顺序先进先出(FIFO)执行。
- 在任务被取消时,将任务从队列中移除。注意这里的移除操作复杂度为 O(n),n 表示队列中在该任务之前等待执行的任务数。
- 我们只允许一个漏线程,也就是把
wip
从 0 增加到 1 的线程。 - 如果当前线程赢得了漏权利,那我们就把 worker 自己提交到
Executor
上,并在run()
函数中从队列中取出任务执行。注意,这里的this
接下来是 ExecutorWorker
的 run()
方法
@Override
public void run() {
do {
if (isUnsubscribed()) { // (1)
queue.clear();
return;
}
ScheduledAction sa = queue.poll(); // (2)
if (sa != null && !sa.isUnsubscribed()) {
sa.run(); // (3)
}
} while (wip.decrementAndGet() > 0); // (4)
}
漏的逻辑比较直观:
- 我们先检查
worker
是否已经被取消请阅,如果已经取消,那我们就清空队列,并且返回。 - 我们从队列中取出一个任务。
- 由于在
run()
函数执行期间可能会删除任务,或者由于取消订阅而清空队列,所以我们需要检查是否为 null,以及是否已经被取消订阅。如果都没有,那我们就执行这个任务。 - 我们递减
wip
,直到为 0 就退出循环,此时我们就可以安全地重新调度这个 worker,并在Executor
上执行漏任务了。
最后,最复杂的就是延迟调度的实现了 schedule()
:
@Override
public Subscription schedule(
Action0 action,
long delayTime,
TimeUnit unit) {
if (delayTime <= 0) {
return schedule(action); // (1)
}
if (isUnsubscribed()) {
return Subscriptions.unsubscribed(); // (2)
}
ScheduledAction sa =
new ScheduledAction(action);
tracking.add(sa);
sa.add(Subscriptions.create(
() -> tracking.remove(sa))); // (3)
ScheduledExecutorService schedex;
if (exec instanceof ScheduledExecutorService) {
schedex = (ScheduledExecutorService) exec; // (4)
} else {
schedex = CustomWorker.genericScheduler; // (5)
}
Future<?> f = schedex.schedule(() -> { // (6)
queue.offer(sa); // (7)
sa.add(Subscriptions.create(
() -> queue.remove(sa)));
if (wip.getAndIncrement() == 0) {
exec.execute(this);
}
}, delayTime, unit);
sa.add(Subscriptions.create(
() -> f.cancel(false))); // (8)
return sa;
}
我们可以看到这里取的顺序只与插入的顺序相关,而与延迟的时间无关