聊聊flink的ScheduledExecutor

本文主要研究一下flink的ScheduledExecutor

Executor

java.base/java/util/concurrent/Executor.java

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}
  • jdk的Executor接口定义了execute方法,接收参数类型为Runnable

ScheduledExecutor

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java

public interface ScheduledExecutor extends Executor {

    /**
     * Executes the given command after the given delay.
     *
     * @param command the task to execute in the future
     * @param delay the time from now to delay the execution
     * @param unit the time unit of the delay parameter
     * @return a ScheduledFuture representing the completion of the scheduled task
     */
    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    /**
     * Executes the given callable after the given delay. The result of the callable is returned
     * as a {@link ScheduledFuture}.
     *
     * @param callable the callable to execute
     * @param delay the time from now to delay the execution
     * @param unit the time unit of the delay parameter
     * @param <V> result type of the callable
     * @return a ScheduledFuture which holds the future value of the given callable
     */
    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    /**
     * Executes the given command periodically. The first execution is started after the
     * {@code initialDelay}, the second execution is started after {@code initialDelay + period},
     * the third after {@code initialDelay + 2*period} and so on.
     * The task is executed until either an execution fails, or the returned {@link ScheduledFuture}
     * is cancelled.
     *
     * @param command the task to be executed periodically
     * @param initialDelay the time from now until the first execution is triggered
     * @param period the time after which the next execution is triggered
     * @param unit the time unit of the delay and period parameter
     * @return a ScheduledFuture representing the periodic task. This future never completes
     * unless an execution of the given task fails or if the future is cancelled
     */
    ScheduledFuture<?> scheduleAtFixedRate(
        Runnable command,
        long initialDelay,
        long period,
        TimeUnit unit);

    /**
     * Executed the given command repeatedly with the given delay between the end of an execution
     * and the start of the next execution.
     * The task is executed repeatedly until either an exception occurs or if the returned
     * {@link ScheduledFuture} is cancelled.
     *
     * @param command the task to execute repeatedly
     * @param initialDelay the time from now until the first execution is triggered
     * @param delay the time between the end of the current and the start of the next execution
     * @param unit the time unit of the initial delay and the delay parameter
     * @return a ScheduledFuture representing the repeatedly executed task. This future never
     * completes unless the execution of the given task fails or if the future is cancelled
     */
    ScheduledFuture<?> scheduleWithFixedDelay(
        Runnable command,
        long initialDelay,
        long delay,
        TimeUnit unit);
}
  • ScheduledExecutor接口继承了Executor,它定义了schedule、scheduleAtFixedRate、scheduleWithFixedDelay方法,其中schedule方法可以接收Runnable或者Callable,这些方法返回的都是ScheduledFuture;该接口有两个实现类,分别是ScheduledExecutorServiceAdapter及ActorSystemScheduledExecutorAdapter

ScheduledExecutorServiceAdapter

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java

public class ScheduledExecutorServiceAdapter implements ScheduledExecutor {

    private final ScheduledExecutorService scheduledExecutorService;

    public ScheduledExecutorServiceAdapter(ScheduledExecutorService scheduledExecutorService) {
        this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return scheduledExecutorService.schedule(command, delay, unit);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return scheduledExecutorService.schedule(callable, delay, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        return scheduledExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }

    @Override
    public void execute(Runnable command) {
        scheduledExecutorService.execute(command);
    }
}
  • ScheduledExecutorServiceAdapter实现了ScheduledExecutor接口,它使用的是jdk的ScheduledExecutorService来实现,使用了scheduledExecutorService的schedule、scheduleAtFixedRate、
    scheduleWithFixedDelay、execute方法

ActorSystemScheduledExecutorAdapter

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java

public final class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor {

    private final ActorSystem actorSystem;

    public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem) {
        this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService");
    }

    @Override
    @Nonnull
    public ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) {
        ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(command, unit.toNanos(delay), 0L);

        Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);

        scheduledFutureTask.setCancellable(cancellable);

        return scheduledFutureTask;
    }

    @Override
    @Nonnull
    public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) {
        ScheduledFutureTask<V> scheduledFutureTask = new ScheduledFutureTask<>(callable, unit.toNanos(delay), 0L);

        Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);

        scheduledFutureTask.setCancellable(cancellable);

        return scheduledFutureTask;
    }

    @Override
    @Nonnull
    public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) {
        ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
            command,
            triggerTime(unit.toNanos(initialDelay)),
            unit.toNanos(period));

        Cancellable cancellable = actorSystem.scheduler().schedule(
            new FiniteDuration(initialDelay, unit),
            new FiniteDuration(period, unit),
            scheduledFutureTask,
            actorSystem.dispatcher());

        scheduledFutureTask.setCancellable(cancellable);

        return scheduledFutureTask;
    }

    @Override
    @Nonnull
    public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) {
        ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
            command,
            triggerTime(unit.toNanos(initialDelay)),
            unit.toNanos(-delay));

        Cancellable cancellable = internalSchedule(scheduledFutureTask, initialDelay, unit);

        scheduledFutureTask.setCancellable(cancellable);

        return scheduledFutureTask;
    }

    @Override
    public void execute(@Nonnull Runnable command) {
        actorSystem.dispatcher().execute(command);
    }

    private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) {
        return actorSystem.scheduler().scheduleOnce(
            new FiniteDuration(delay, unit),
            runnable,
            actorSystem.dispatcher());
    }

    private long now() {
        return System.nanoTime();
    }

    private long triggerTime(long delay) {
        return now() + delay;
    }

    private final class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {

        private long time;

        private final long period;

        private volatile Cancellable cancellable;

        ScheduledFutureTask(Callable<V> callable, long time, long period) {
            super(callable);
            this.time = time;
            this.period = period;
        }

        ScheduledFutureTask(Runnable runnable, long time, long period) {
            super(runnable, null);
            this.time = time;
            this.period = period;
        }

        public void setCancellable(Cancellable newCancellable) {
            this.cancellable = newCancellable;
        }

        @Override
        public void run() {
            if (!isPeriodic()) {
                super.run();
            } else if (runAndReset()){
                if (period > 0L) {
                    time += period;
                } else {
                    cancellable = internalSchedule(this, -period, TimeUnit.NANOSECONDS);

                    // check whether we have been cancelled concurrently
                    if (isCancelled()) {
                        cancellable.cancel();
                    } else {
                        time = triggerTime(-period);
                    }
                }
            }
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean result = super.cancel(mayInterruptIfRunning);

            return result && cancellable.cancel();
        }

        @Override
        public long getDelay(@Nonnull  TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }

        @Override
        public int compareTo(@Nonnull Delayed o) {
            if (o == this) {
                return 0;
            }

            long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
            return (diff < 0L) ? -1 : (diff > 0L) ? 1 : 0;
        }

        @Override
        public boolean isPeriodic() {
            return period != 0L;
        }
    }
}
  • ActorSystemScheduledExecutorAdapter实现了ScheduledExecutor接口,它使用的是actorSystem来实现;其中execute方法使用的是actorSystem.dispatcher().execute方法
  • schedule及scheduleWithFixedDelay方法调用的是internalSchedule方法,它使用的是actorSystem.scheduler().scheduleOnce方法,只是它们的ScheduledFutureTask不同,其中schedule方法的ScheduledFutureTask的period为0,而scheduleWithFixedDelay方法的ScheduledFutureTask的period为unit.toNanos(-delay);ScheduledFutureTask的run方法会对period进行判断,小于等于0的,会再次调用internalSchedule方法,来实现以FixedDelay进行调度的效果
  • scheduleAtFixedRate方法,它使用的是actorSystem.scheduler().schedule方法,其ScheduledFutureTask的period即为方法参数的period,没有像scheduleWithFixedDelay方法那样用unit.toNanos(-delay)作为period

小结

  • ScheduledExecutor接口继承了Executor,它定义了schedule、scheduleAtFixedRate、scheduleWithFixedDelay方法,其中schedule方法可以接收Runnable或者Callable,这些方法返回的都是ScheduledFuture;该接口有两个实现类,分别是ScheduledExecutorServiceAdapter及ActorSystemScheduledExecutorAdapter
  • ScheduledExecutorServiceAdapter实现了ScheduledExecutor接口,它使用的是jdk的ScheduledExecutorService来实现,使用了scheduledExecutorService的schedule、scheduleAtFixedRate、scheduleWithFixedDelay、execute方法
  • ActorSystemScheduledExecutorAdapter实现了ScheduledExecutor接口,它使用的是actorSystem来实现;其中execute方法使用的是actorSystem.dispatcher().execute方法;schedule及scheduleWithFixedDelay方法调用的是internalSchedule方法,它使用的是actorSystem.scheduler().scheduleOnce方法,只是它们的ScheduledFutureTask不同,其中schedule方法的ScheduledFutureTask的period为0,而scheduleWithFixedDelay方法的ScheduledFutureTask的period为unit.toNanos(-delay);ScheduledFutureTask的run方法会对period进行判断,小于等于0的,会再次调用internalSchedule方法,来实现以FixedDelay进行调度的效果;scheduleAtFixedRate方法,它使用的是actorSystem.scheduler().schedule方法,其ScheduledFutureTask的period即为方法参数的period,没有像scheduleWithFixedDelay方法那样用unit.toNanos(-delay)作为period

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,386评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,142评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,704评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,702评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,716评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,573评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,314评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,230评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,680评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,873评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,991评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,706评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,329评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,910评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,038评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,158评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,941评论 2 355

推荐阅读更多精彩内容