聊聊flink的jobstore配置

本文主要研究一下flink的jobstore配置

JobManagerOptions

flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java

@PublicEvolving
public class JobManagerOptions {
    //......

    /**
     * The job store cache size in bytes which is used to keep completed
     * jobs in memory.
     */
    public static final ConfigOption<Long> JOB_STORE_CACHE_SIZE =
        key("jobstore.cache-size")
        .defaultValue(50L * 1024L * 1024L)
        .withDescription("The job store cache size in bytes which is used to keep completed jobs in memory.");

    /**
     * The time in seconds after which a completed job expires and is purged from the job store.
     */
    public static final ConfigOption<Long> JOB_STORE_EXPIRATION_TIME =
        key("jobstore.expiration-time")
        .defaultValue(60L * 60L)
        .withDescription("The time in seconds after which a completed job expires and is purged from the job store.");

    //......
}
  • jobstore.cache-size默认是50M;jobstore.expiration-time默认是1小时

SessionClusterEntrypoint

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java

public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {

    public SessionClusterEntrypoint(Configuration configuration) {
        super(configuration);
    }

    @Override
    protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
            Configuration configuration,
            ScheduledExecutor scheduledExecutor) throws IOException {
        final File tmpDir = new File(ConfigurationUtils.parseTempDirectories(configuration)[0]);

        final Time expirationTime =  Time.seconds(configuration.getLong(JobManagerOptions.JOB_STORE_EXPIRATION_TIME));
        final long maximumCacheSizeBytes = configuration.getLong(JobManagerOptions.JOB_STORE_CACHE_SIZE);

        return new FileArchivedExecutionGraphStore(
            tmpDir,
            expirationTime,
            maximumCacheSizeBytes,
            scheduledExecutor,
            Ticker.systemTicker());
    }
}
  • SessionClusterEntrypoint的createSerializableExecutionGraphStore方法读取了JobManagerOptions.JOB_STORE_EXPIRATION_TIME及JobManagerOptions.JOB_STORE_CACHE_SIZE配置,然后创建FileArchivedExecutionGraphStore

FileArchivedExecutionGraphStore

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java

public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore {

    private static final Logger LOG = LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);

    private final File storageDir;

    private final Cache<JobID, JobDetails> jobDetailsCache;

    private final LoadingCache<JobID, ArchivedExecutionGraph> archivedExecutionGraphCache;

    private final ScheduledFuture<?> cleanupFuture;

    private final Thread shutdownHook;

    private int numFinishedJobs;

    private int numFailedJobs;

    private int numCanceledJobs;

    public FileArchivedExecutionGraphStore(
            File rootDir,
            Time expirationTime,
            long maximumCacheSizeBytes,
            ScheduledExecutor scheduledExecutor,
            Ticker ticker) throws IOException {

        final File storageDirectory = initExecutionGraphStorageDirectory(rootDir);

        LOG.info(
            "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.",
            FileArchivedExecutionGraphStore.class.getSimpleName(),
            storageDirectory,
            expirationTime.toMilliseconds(),
            maximumCacheSizeBytes);

        this.storageDir = Preconditions.checkNotNull(storageDirectory);
        Preconditions.checkArgument(
            storageDirectory.exists() && storageDirectory.isDirectory(),
            "The storage directory must exist and be a directory.");
        this.jobDetailsCache = CacheBuilder.newBuilder()
            .expireAfterWrite(expirationTime.toMilliseconds(), TimeUnit.MILLISECONDS)
            .removalListener(
                (RemovalListener<JobID, JobDetails>) notification -> deleteExecutionGraphFile(notification.getKey()))
            .ticker(ticker)
            .build();

        this.archivedExecutionGraphCache = CacheBuilder.newBuilder()
            .maximumWeight(maximumCacheSizeBytes)
            .weigher(this::calculateSize)
            .build(new CacheLoader<JobID, ArchivedExecutionGraph>() {
                @Override
                public ArchivedExecutionGraph load(JobID jobId) throws Exception {
                    return loadExecutionGraph(jobId);
                }});

        this.cleanupFuture = scheduledExecutor.scheduleWithFixedDelay(
            jobDetailsCache::cleanUp,
            expirationTime.toMilliseconds(),
            expirationTime.toMilliseconds(),
            TimeUnit.MILLISECONDS);

        this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG);

        this.numFinishedJobs = 0;
        this.numFailedJobs = 0;
        this.numCanceledJobs = 0;
    }

    @Override
    public int size() {
        return Math.toIntExact(jobDetailsCache.size());
    }

    @Override
    @Nullable
    public ArchivedExecutionGraph get(JobID jobId) {
        try {
            return archivedExecutionGraphCache.get(jobId);
        } catch (ExecutionException e) {
            LOG.debug("Could not load archived execution graph for job id {}.", jobId, e);
            return null;
        }
    }

    @Override
    public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
        final JobStatus jobStatus = archivedExecutionGraph.getState();
        final JobID jobId = archivedExecutionGraph.getJobID();
        final String jobName = archivedExecutionGraph.getJobName();

        Preconditions.checkArgument(
            jobStatus.isGloballyTerminalState(),
            "The job " + jobName + '(' + jobId +
                ") is not in a globally terminal state. Instead it is in state " + jobStatus + '.');

        switch (jobStatus) {
            case FINISHED:
                numFinishedJobs++;
                break;
            case CANCELED:
                numCanceledJobs++;
                break;
            case FAILED:
                numFailedJobs++;
                break;
            default:
                throw new IllegalStateException("The job " + jobName + '(' +
                    jobId + ") should have been in a globally terminal state. " +
                    "Instead it was in state " + jobStatus + '.');
        }

        // write the ArchivedExecutionGraph to disk
        storeArchivedExecutionGraph(archivedExecutionGraph);

        final JobDetails detailsForJob = WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);

        jobDetailsCache.put(jobId, detailsForJob);
        archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
    }

    @Override
    public JobsOverview getStoredJobsOverview() {
        return new JobsOverview(0, numFinishedJobs, numCanceledJobs, numFailedJobs);
    }

    @Override
    public Collection<JobDetails> getAvailableJobDetails() {
        return jobDetailsCache.asMap().values();
    }

    @Nullable
    @Override
    public JobDetails getAvailableJobDetails(JobID jobId) {
        return jobDetailsCache.getIfPresent(jobId);
    }

    @Override
    public void close() throws IOException {
        cleanupFuture.cancel(false);

        jobDetailsCache.invalidateAll();

        // clean up the storage directory
        FileUtils.deleteFileOrDirectory(storageDir);

        // Remove shutdown hook to prevent resource leaks
        ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
    }

    // --------------------------------------------------------------
    // Internal methods
    // --------------------------------------------------------------

    private int calculateSize(JobID jobId, ArchivedExecutionGraph serializableExecutionGraph) {
        final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);

        if (archivedExecutionGraphFile.exists()) {
            return Math.toIntExact(archivedExecutionGraphFile.length());
        } else {
            LOG.debug("Could not find archived execution graph file for {}. Estimating the size instead.", jobId);
            return serializableExecutionGraph.getAllVertices().size() * 1000 +
                serializableExecutionGraph.getAccumulatorsSerialized().size() * 1000;
        }
    }

    private ArchivedExecutionGraph loadExecutionGraph(JobID jobId) throws IOException, ClassNotFoundException {
        final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);

        if (archivedExecutionGraphFile.exists()) {
            try (FileInputStream fileInputStream = new FileInputStream(archivedExecutionGraphFile)) {
                return InstantiationUtil.deserializeObject(fileInputStream, getClass().getClassLoader());
            }
        } else {
            throw new FileNotFoundException("Could not find file for archived execution graph " + jobId +
                ". This indicates that the file either has been deleted or never written.");
        }
    }

    private void storeArchivedExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
        final File archivedExecutionGraphFile = getExecutionGraphFile(archivedExecutionGraph.getJobID());

        try (FileOutputStream fileOutputStream = new FileOutputStream(archivedExecutionGraphFile)) {
            InstantiationUtil.serializeObject(fileOutputStream, archivedExecutionGraph);
        }
    }

    private File getExecutionGraphFile(JobID jobId) {
        return new File(storageDir, jobId.toString());
    }

    private void deleteExecutionGraphFile(JobID jobId) {
        Preconditions.checkNotNull(jobId);

        final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);

        try {
            FileUtils.deleteFileOrDirectory(archivedExecutionGraphFile);
        } catch (IOException e) {
            LOG.debug("Could not delete file {}.", archivedExecutionGraphFile, e);
        }

        archivedExecutionGraphCache.invalidate(jobId);
        jobDetailsCache.invalidate(jobId);
    }

    private static File initExecutionGraphStorageDirectory(File tmpDir) throws IOException {
        final int maxAttempts = 10;

        for (int attempt = 0; attempt < maxAttempts; attempt++) {
            final File storageDirectory = new File(tmpDir, "executionGraphStore-" + UUID.randomUUID());

            if (storageDirectory.mkdir()) {
                return storageDirectory;
            }
        }

        throw new IOException("Could not create executionGraphStorage directory in " + tmpDir + '.');
    }

    // --------------------------------------------------------------
    // Testing methods
    // --------------------------------------------------------------

    @VisibleForTesting
    File getStorageDir() {
        return storageDir;
    }

    @VisibleForTesting
    LoadingCache<JobID, ArchivedExecutionGraph> getArchivedExecutionGraphCache() {
        return archivedExecutionGraphCache;
    }
}
  • FileArchivedExecutionGraphStore实现了ArchivedExecutionGraphStore接口,它的构造器使用guava cache创建了jobDetailsCache及archivedExecutionGraphCache
  • jobDetailsCache的expireAfterWrite使用的是expirationTime,即使用jobstore.expiration-time配置;archivedExecutionGraphCache的maximumWeight使用的是maximumCacheSizeBytes,即jobstore.cache-size配置
  • FileArchivedExecutionGraphStore还设置了一个定时任务,每隔expirationTime的时间去执行jobDetailsCache的cleanUp方法来清理缓存

小结

  • flink的jobstore有两个配置,分别是jobstore.cache-size默认是50M,jobstore.expiration-time默认是1小时
  • SessionClusterEntrypoint的createSerializableExecutionGraphStore方法读取了JobManagerOptions.JOB_STORE_EXPIRATION_TIME及JobManagerOptions.JOB_STORE_CACHE_SIZE配置,然后创建FileArchivedExecutionGraphStore
  • FileArchivedExecutionGraphStore实现了ArchivedExecutionGraphStore接口,它的构造器使用guava cache创建了jobDetailsCache及archivedExecutionGraphCache;jobDetailsCache的expireAfterWrite使用的是expirationTime,即使用jobstore.expiration-time配置;archivedExecutionGraphCache的maximumWeight使用的是maximumCacheSizeBytes,即jobstore.cache-size配置;它还设置了一个定时任务,每隔expirationTime的时间去执行jobDetailsCache的cleanUp方法来清理缓存

doc

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容