Scheduler/Worker 原理与实现

Scheduler/Worker 需要满足以下的要求:

  1. 所有的方法都需要是线程安全的;
  2. Worker 需要保证即时、串行提交的任务按照先进先出(FIFO)的顺序被执行;
  3. Worker 需要尽可能保证被取消订阅时要取消还未执行的任务;
  4. Worker 被取消订阅时不能影响同一个 Scheduler 的其他 Worker;
public final class ContextAwareScheduler extends Scheduler {
    public static final ContextAwareScheduler INSTANCE = 
            new ContextAwareScheduler();                       // (1)
    
    final NewThreadWorker worker;
    
    private ContextAwareScheduler() {
        this.worker = new NewThreadWorker(
                new RxThreadFactory("ContextAwareScheduler")); // (2)
    }
    @Override
    public Worker createWorker() {
        return new ContextAwareWorker(worker);                 // (3)
    }
    
    static final class ContextAwareWorker extends Worker {
        final CompositeDisposable tracking;                  // (4)
        final NewThreadWorker worker;

        public ContextAwareWorker(NewThreadWorker worker) {
            this.worker = worker;
            this.tracking = new CompositeSubscription();
        }
        @Override
        public Subscription schedule(Action0 action) {
            // implement
        }

        @Override
        public void dispose() {
            tracking.dispose();      // (5)
        }

        @Override
        public boolean isDisposed() {
            return tracking.isDisposed();
        }
    }
}

上面是我们定义的一个 Scheduler:

  1. 由于scheduler 不能存在多个实例,因此我们使用了一个静态的单例。
  2. scheduler 会把几乎所有的工作都转交给内部的一个 worker
  3. 我们不能对外只提供一个 worker 实例,否则一旦 worker 被取消订阅,所有人的 worker 都被取消订阅了。因此我们需要单独为每个 worker 实例记录提交过来的任务。以便我们可以检查是否已经取消了订阅,以及进行取消订阅操作

接下来,我们就需要前面提到的线程局部上下文了:

public final class ContextManager {
    static final ThreadLocal<Object> ctx = new ThreadLocal<>();
    
    private ContextManager() {
        throw new IllegalStateException();
    }
    
    public static Object get() {
        return ctx.get();
    }
    public static void set(Object context) {
        ctx.set(context);
    }
}

ContextManager 仅仅是包装了一个静态的 ThreadLocal 变量。在实际情况中,你可能要把 Object 替换成你需要的类型。

现在让我们继续看 schedule() 的实现:

// ...
        @Override
        public Disposable schedule(final Runnable run, long delay, TimeUnit unit) {
            if (isDisposed()) {                         // (2)
                return EmptyDisposable.INSTANCE;
            }

            final Object context = ContextManager.get();          // (3)
            Runnable a = new Runnable() {
                @Override
                public void run() {
                    ContextManager.set(context);                // (4)
                    run.run();
                }
            };

            return worker.scheduleActual(a, delay, unit, tracking);
        }
// ...

让我们尝试一下:

Worker w = INSTANCE.createWorker();
CountDownLatch cdl = new CountDownLatch(1);

ContextManager.set(1);
w.schedule(() -> {
    System.out.println(Thread.currentThread());
    System.out.println(ContextManager.get());
});

ContextManager.set(2);
w.schedule(() -> {
    System.out.println(Thread.currentThread());
    System.out.println(ContextManager.get());
    cdl.countDown();
});

cdl.await();

ContextManager.set(3);

Observable.timer(500, TimeUnit.MILLISECONDS, INSTANCE)
.doOnNext(v -> {
    System.out.println(Thread.currentThread());
    System.out.println(ContextManager.get());
}).blockingFirst();

w.dispose();

运行输出如下:

Thread[ContextAwareScheduler1,5,main]
1
Thread[ContextAwareScheduler1,5,main]
2
Thread[ContextAwareScheduler1,5,main]
3

在引入了 Scheduler/Worker 之后我们需要记录这些任务,并取消它们。一旦 Future 被记录了,那就需要在它们完成或者被取消时取消记录,否则就会发生内存泄漏。这就意味着我们不能直接把一个 Action0/Runnable 提交到 ExecutorService 上,我们需要包装一下它们,让它们可以在完成或者被取消时被取消记录。解决方案就是 ScheduledAction 类。

public final class ScheduledAction implements Runnable, Subscription {
    final Action0 action;                       // (1)
    final SubscriptionList slist;               // (2)
 
    public ScheduledAction(Action0 action) {
        this.action = action;
        this.slist = new SubscriptionList();
    }
    @Override
    public void run() {
        try {
            action.call();                      // (3)
        } finally {
            unsubscribe();                      // (4)
        }
    }
    @Override
    public boolean isUnsubscribed() {
        return slist.isUnsubscribed();
    }
    @Override
    public void unsubscribe() {
        slist.unsubscribe();
    }
     
    public void add(Subscription s) {           // (5)
        slist.add(s);
    }
}

接下来就是要在任务被提交到 ExecutorService 之前把所有的记录以及清理任务都串起来。为了简单起见,我们假设我们的 ExecutorService 是个单线程的 service。我们将在后面处理多线程的情况。

// ...
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
    if (isUnsubscribed()) {                                // (1)
        return Subscriptions.unsubscribed();
    }
    ScheduledAction sa = new ScheduledAction(action);      // (2) 
     
    tracking.add(sa);                                      // (3)
    sa.add(Subscriptions.create(
        () -> tracking.remove(sa)));
     
    Future<?> f;
    if (delayTime <= 0) {                                  // (4)
        f = exec.submit(sa);
    } else if (exec instanceof ScheduledExecutorService) { // (5)
        f = ((ScheduledExecutorService)exec)
             .schedule(sa, delayTime, unit);
    } else {
        f = genericScheduler.schedule(() -> {              // (6)
            Future<?> g = exec.submit(sa);
            sa.add(Subscriptions.create(                   // (7)
                () -> g.cancel(false)));
        }, delayTime, unit);
    }
     
    sa.add(Subscriptions.create(                           // (8)
        () -> f.cancel(false)));
 
    return sa;                                             // (9)
}
// ...

代码工作机制如下:

  1. 如果 worker 已经被取消订阅,我们就返回一个表示已经取消的常量 subscription。注意,如果有 schedule 调用(由于多线程竞争)通过了这个检查,它将会收到一个来自底层线程池的 RejectedExecutionException。你可以把函数中后面的代码都用一个try-catch 包裹起来,并在异常发生时返回同样的表示已经取消的常量 subscription。
  2. 我们把任务包装为 ScheduledAction
  3. 在实际调度这个任务之前,我们把它加入到tracking 中,并且增加一个取消订阅的回调,以便在它执行完毕或者被取消时可以将其从 tracking中移除。注意,由于幂等性,remove() 不会调用 ScheduledActionunsubscribe(),从而不会导致死循环。
  4. 如果调度是没有延迟的,我们就立即将其提交,并且保存返回的 Future
  5. 如果我们的 ExecutorService 是 ScheduledExecutorService,我们就可以直接调用它的 schedule()函数了。
  6. 否则我们就需要借助 ScheduledExecutorService 来实现延迟调度了,但我们不能直接把任务调度给它,因为这样它会在错误的线程中执行。我们需要创建一个中间任务,它将在延迟结束之后,向正确的线程池调度一个即时的任务。
  7. 我们需要保证提交后返回的 Future 能在 unsubscribe() 调用时被取消。这里我们把内部的 Future 加入到了 ScheduledAction 中。
  8. 无论是立即调度,还是延迟调度,我们都需要在取消订阅时取消这个调度,所以我们把返回的 Future 加入到 ScheduledAction 中(通过把 Future#cancel() 包装到一个 Subscription 中)。在这里,你就可以控制是否需要强行(中断)取消了。(RxJava 会根据取消订阅时所处的线程来决定:如果取消订阅就是在执行任务的线程中,那就没必要中断了)
  9. ScheduledAction 也是任务发起方用来取消订阅的凭证(token)。

最后缺失的一点代码就是 genericScheduler 了。你可以为 worker 添加一个static final 成员,并像下面这样设置:

static final ScheduledExecutorService genericScheduler;
static {
    genericScheduler = Executors.newScheduledThreadPool(1, r -> {
        Thread t = new Thread(r, "GenericScheduler");
        t.setDaemon(true);
        return t;
    });
}

Worker最重要的一个规则就是有序提交的非延迟任务要按序执行,但是 Executor 的线程是随机取走任务,而且是并发乱序执行的。

解决办法就是使用我们以前介绍过的“队列漏”,并且对调度的任务进行一个中继操作.

首先是类结构

public final class ExecutorScheduler extends Scheduler {
    final Executor exec;
    public ExecutorScheduler(Executor exec) {
        this.exec = exec;
    }
    @Override
    public Worker createWorker() {
        return new ExecutorWorker();
    }
     
    final class ExecutorWorker extends Worker implements Runnable {                             // (1)
        // data fields here
        @Override
        public Subscription schedule( Action0 action) {
            // implement
        }
        @Override
        public Subscription schedule( Action0 action, long delayTime, TimeUnit unit) {
            // implement
        }
        @Override
        public void run() {
            // implement
        }
        @Override
        public boolean isUnsubscribed() {
            // implement
        }
        @Override
        public void unsubscribe() {
            // implement
        }
    }
}

接着是一些变量

// 漏逻辑需要
// 一个正在执行的标记、
final AtomicInteger wip = new AtomicInteger();
// 一个队列、
final Queue<ScheduledAction> queue = new ConcurrentLinkedQueue<>();
//以及一个 subscription 容器类型,以便集中取消订阅:
final CompositeSubscription tracking = new CompositeSubscription();

首先让我们看看无延迟的 schedule()

@Override 
public Subscription schedule(Action0 action) {
    if (isUnsubscribed()) {
        return Subscriptions.unsubscribed();
    }
    ScheduledAction sa = 
            new ScheduledAction(action);
    tracking.add(sa);
    sa.add(Subscriptions.create(
            () -> tracking.remove(sa)));        // (1)
        
    queue.offer(sa);                            // (2)
         
    sa.add(Subscriptions.create(
            () -> queue.remove(sa)));           // (3)
         
    if (wip.getAndIncrement() == 0) {           // (4)
        exec.execute(this);                     // (5)
    }
         
    return sa;
}

主要逻辑是:

  1. 创建一个 ScheduledAction,并且在其被取消订阅时将自己从 tracking 中移除。
  2. 把任务加入到队列中,队列会保证任务按照提交顺序先进先出(FIFO)执行。
  3. 在任务被取消时,将任务从队列中移除。注意这里的移除操作复杂度为 O(n),n 表示队列中在该任务之前等待执行的任务数。
  4. 我们只允许一个漏线程,也就是把 wip 从 0 增加到 1 的线程。
  5. 如果当前线程赢得了漏权利,那我们就把 worker 自己提交到 Executor 上,并在 run() 函数中从队列中取出任务执行。注意,这里的this

接下来是 ExecutorWorkerrun() 方法

@Override 
public void run() {
    do {
        if (isUnsubscribed()) {                   // (1)
            queue.clear();
            return;
        }
        ScheduledAction sa = queue.poll();        // (2)
        if (sa != null && !sa.isUnsubscribed()) {
            sa.run();                             // (3)
        }
    } while (wip.decrementAndGet() > 0);          // (4)
}

漏的逻辑比较直观:

  1. 我们先检查 worker 是否已经被取消请阅,如果已经取消,那我们就清空队列,并且返回。
  2. 我们从队列中取出一个任务。
  3. 由于在 run() 函数执行期间可能会删除任务,或者由于取消订阅而清空队列,所以我们需要检查是否为 null,以及是否已经被取消订阅。如果都没有,那我们就执行这个任务。
  4. 我们递减 wip,直到为 0 就退出循环,此时我们就可以安全地重新调度这个 worker,并在 Executor 上执行漏任务了。

最后,最复杂的就是延迟调度的实现了 schedule()

@Override
public Subscription schedule(
        Action0 action, 
        long delayTime,
        TimeUnit unit) {
 
    if (delayTime <= 0) {
        return schedule(action);                      // (1)
    }
    if (isUnsubscribed()) {
        return Subscriptions.unsubscribed();          // (2)
    }
     
    ScheduledAction sa = 
            new ScheduledAction(action);
    tracking.add(sa);
    sa.add(Subscriptions.create(
            () -> tracking.remove(sa)));              // (3)
     
    ScheduledExecutorService schedex;
    if (exec instanceof ScheduledExecutorService) {
        schedex = (ScheduledExecutorService) exec;    // (4)
    } else {
        schedex = CustomWorker.genericScheduler;      // (5)
    }
     
    Future<?> f = schedex.schedule(() -> {            // (6)
         
        queue.offer(sa);                              // (7)
         
        sa.add(Subscriptions.create(
                () -> queue.remove(sa)));
         
        if (wip.getAndIncrement() == 0) {
            exec.execute(this);
        }
         
    }, delayTime, unit);
     
    sa.add(Subscriptions.create(
            () -> f.cancel(false)));                  // (8)
     
    return sa;
}

我们可以看到这里取的顺序只与插入的顺序相关,而与延迟的时间无关

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352