聊聊flink的MemoryBackendCheckpointStorage

本文主要研究一下flink的MemoryBackendCheckpointStorage

CheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorage.java

/**
 * CheckpointStorage implements the durable storage of checkpoint data and metadata streams.
 * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation},
 * created by this class.
 */
public interface CheckpointStorage {


    boolean supportsHighlyAvailableStorage();

    boolean hasDefaultSavepointLocation();

    CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;

    CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException;

    CheckpointStorageLocation initializeLocationForSavepoint(
            long checkpointId,
            @Nullable String externalLocationPointer) throws IOException;

    CheckpointStreamFactory resolveCheckpointStorageLocation(
            long checkpointId,
            CheckpointStorageLocationReference reference) throws IOException;

    CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException;
}
  • CheckpointStorage接口主要定义了持久化checkpoint data及metadata streams的基本方法;supportsHighlyAvailableStorage方法返回该backend是否支持highly available storage;hasDefaultSavepointLocation方法是否有默认的savepoint location;resolveCheckpoint方法用于解析checkpoint location返回CompletedCheckpointStorageLocation;initializeLocationForCheckpoint方法根据checkpointId来初始化storage location;initializeLocationForSavepoint方法用于根据checkpointId来初始化savepoint的storage location;resolveCheckpointStorageLocation方法解析CheckpointStorageLocationReference返回CheckpointStreamFactory;createTaskOwnedStateStream方法用于打开一个stream来持久化checkpoint state

AbstractFsCheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java

/**
 * An implementation of durable checkpoint storage to file systems.
 */
public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {

    // ------------------------------------------------------------------------
    //  Constants
    // ------------------------------------------------------------------------

    /** The prefix of the directory containing the data exclusive to a checkpoint. */
    public static final String CHECKPOINT_DIR_PREFIX = "chk-";

    /** The name of the directory for shared checkpoint state. */
    public static final String CHECKPOINT_SHARED_STATE_DIR = "shared";

    /** The name of the directory for state not owned/released by the master, but by the TaskManagers. */
    public static final String CHECKPOINT_TASK_OWNED_STATE_DIR = "taskowned";

    /** The name of the metadata files in checkpoints / savepoints. */
    public static final String METADATA_FILE_NAME = "_metadata";

    /** The magic number that is put in front of any reference. */
    private static final byte[] REFERENCE_MAGIC_NUMBER = new byte[] { 0x05, 0x5F, 0x3F, 0x18 };

    // ------------------------------------------------------------------------
    //  Fields and properties
    // ------------------------------------------------------------------------

    /** The jobId, written into the generated savepoint directories. */
    private final JobID jobId;

    /** The default location for savepoints. Null, if none is configured. */
    @Nullable
    private final Path defaultSavepointDirectory;

    @Override
    public boolean hasDefaultSavepointLocation() {
        return defaultSavepointDirectory != null;
    }

    @Override
    public CompletedCheckpointStorageLocation resolveCheckpoint(String checkpointPointer) throws IOException {
        return resolveCheckpointPointer(checkpointPointer);
    }

    /**
     * Creates a file system based storage location for a savepoint.
     *
     * <p>This methods implements the logic that decides which location to use (given optional
     * parameters for a configured location and a location passed for this specific savepoint)
     * and how to name and initialize the savepoint directory.
     *
     * @param externalLocationPointer    The target location pointer for the savepoint.
     *                                   Must be a valid URI. Null, if not supplied.
     * @param checkpointId               The checkpoint ID of the savepoint.
     *
     * @return The checkpoint storage location for the savepoint.
     *
     * @throws IOException Thrown if the target directory could not be created.
     */
    @Override
    public CheckpointStorageLocation initializeLocationForSavepoint(
            @SuppressWarnings("unused") long checkpointId,
            @Nullable String externalLocationPointer) throws IOException {

        // determine where to write the savepoint to

        final Path savepointBasePath;
        if (externalLocationPointer != null) {
            savepointBasePath = new Path(externalLocationPointer);
        }
        else if (defaultSavepointDirectory != null) {
            savepointBasePath = defaultSavepointDirectory;
        }
        else {
            throw new IllegalArgumentException("No savepoint location given and no default location configured.");
        }

        // generate the savepoint directory

        final FileSystem fs = savepointBasePath.getFileSystem();
        final String prefix = "savepoint-" + jobId.toString().substring(0, 6) + '-';

        Exception latestException = null;
        for (int attempt = 0; attempt < 10; attempt++) {
            final Path path = new Path(savepointBasePath, FileUtils.getRandomFilename(prefix));

            try {
                if (fs.mkdirs(path)) {
                    // we make the path qualified, to make it independent of default schemes and authorities
                    final Path qp = path.makeQualified(fs);

                    return createSavepointLocation(fs, qp);
                }
            } catch (Exception e) {
                latestException = e;
            }
        }

        throw new IOException("Failed to create savepoint directory at " + savepointBasePath, latestException);
    }

    protected abstract CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException;

    //......
}
  • AbstractFsCheckpointStorage主要是实现了CheckpointStorage接口的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法
  • resolveCheckpoint方法主要做两件事情,一个是解析checkpoint/savepoint path,一个是解析checkpoint/savepoint的metadata path,获取他们的FileStatus,然后创建FsCompletedCheckpointStorageLocation
  • initializeLocationForSavepoint方法主要是给savepoint创建一个CheckpointStorageLocation,它可以根据externalLocationPointer来创建,该值为null的话则使用defaultSavepointDirectory,该方法里头调用了createSavepointLocation抽象方法,由子类去实现

MemoryBackendCheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java

/**
 * An implementation of a checkpoint storage for the {@link MemoryStateBackend}.
 * Depending on whether this is created with a checkpoint location, the setup supports
 * durable checkpoints (durable metadata) or not.
 */
public class MemoryBackendCheckpointStorage extends AbstractFsCheckpointStorage {

    /** The target directory for checkpoints (here metadata files only). Null, if not configured. */
    @Nullable
    private final Path checkpointsDirectory;

    /** The file system to persist the checkpoints to. Null if this does not durably persist checkpoints. */
    @Nullable
    private final FileSystem fileSystem;

    /** The maximum size of state stored in a state handle. */
    private final int maxStateSize;

    /**
     * Creates a new MemoryBackendCheckpointStorage.
     *
     * @param jobId The ID of the job writing the checkpoints.
     * @param checkpointsBaseDirectory The directory to write checkpoints to. May be null,
     *                                 in which case this storage does not support durable persistence.
     * @param defaultSavepointLocation The default savepoint directory, or null, if none is set.
     * @param maxStateSize The maximum size of each individual piece of state.
     *
     * @throws IOException Thrown if a checkpoint base directory is given configured and the
     *                     checkpoint directory cannot be created within that directory.
     */
    public MemoryBackendCheckpointStorage(
            JobID jobId,
            @Nullable Path checkpointsBaseDirectory,
            @Nullable Path defaultSavepointLocation,
            int maxStateSize) throws IOException {

        super(jobId, defaultSavepointLocation);

        checkArgument(maxStateSize > 0);
        this.maxStateSize = maxStateSize;

        if (checkpointsBaseDirectory == null) {
            checkpointsDirectory = null;
            fileSystem = null;
        }
        else {
            this.fileSystem = checkpointsBaseDirectory.getFileSystem();
            this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointsBaseDirectory, jobId);

            fileSystem.mkdirs(checkpointsDirectory);
        }
    }

    // ------------------------------------------------------------------------
    //  Properties
    // ------------------------------------------------------------------------

    /**
     * Gets the size (in bytes) that a individual chunk of state may have at most.
     */
    public int getMaxStateSize() {
        return maxStateSize;
    }

    // ------------------------------------------------------------------------
    //  Checkpoint Storage
    // ------------------------------------------------------------------------

    @Override
    public boolean supportsHighlyAvailableStorage() {
        return checkpointsDirectory != null;
    }

    @Override
    public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
        checkArgument(checkpointId >= 0);

        if (checkpointsDirectory != null) {
            // configured for durable metadata
            // prepare all the paths needed for the checkpoints
            checkState(fileSystem != null);

            final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);

            // create the checkpoint exclusive directory
            fileSystem.mkdirs(checkpointDir);

            return new PersistentMetadataCheckpointStorageLocation(fileSystem, checkpointDir, maxStateSize);
        }
        else {
            // no durable metadata - typical in IDE or test setup case
            return new NonPersistentMetadataCheckpointStorageLocation(maxStateSize);
        }
    }

    @Override
    public CheckpointStreamFactory resolveCheckpointStorageLocation(
            long checkpointId,
            CheckpointStorageLocationReference reference) throws IOException {

        // no matter where the checkpoint goes, we always return the storage location that stores
        // state inline with the state handles.
        return new MemCheckpointStreamFactory(maxStateSize);
    }

    @Override
    public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
        return new MemoryCheckpointOutputStream(maxStateSize);
    }

    @Override
    protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException {
        return new PersistentMetadataCheckpointStorageLocation(fs, location, maxStateSize);
    }

    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------

    @Override
    public String toString() {
        return "MemoryBackendCheckpointStorage {" +
                "checkpointsDirectory=" + checkpointsDirectory +
                ", fileSystem=" + fileSystem +
                ", maxStateSize=" + maxStateSize +
                '}';
    }
}
  • MemoryBackendCheckpointStorage继承了AbstractFsCheckpointStorage,实现了它定义的createSavepointLocation方法,这里返回的是PersistentMetadataCheckpointStorageLocation
  • MemoryBackendCheckpointStorage还实现了CheckpointStorage接口定义的AbstractFsCheckpointStorage未实现的几个方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
  • supportsHighlyAvailableStorage是根据是否有配置checkpointsDirectory来判断;initializeLocationForCheckpoint这个根据checkpointsDirectory是否有设置来创建,为null的话,创建的是NonPersistentMetadataCheckpointStorageLocation,不为null创建的是PersistentMetadataCheckpointStorageLocation;resolveCheckpointStorageLocation这里创建的是MemCheckpointStreamFactory;而createTaskOwnedStateStream创建的是MemoryCheckpointOutputStream

小结

  • CheckpointStorage接口主要定义了持久化checkpoint data及metadata streams的基本方法;AbstractFsCheckpointStorage主要是实现了CheckpointStorage接口的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法,同时定义了一个抽象方法createSavepointLocation
  • MemoryBackendCheckpointStorage继承了AbstractFsCheckpointStorage,实现了它定义的createSavepointLocation方法,同时还实现了CheckpointStorage接口定义的AbstractFsCheckpointStorage未实现的几个方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
  • 这里可以看到MemoryBackendCheckpointStorage虽然是memory的,但是如果有配置checkpointsDirectory(highly available storage),checkpoint location使用的是PersistentMetadataCheckpointStorageLocation,否则使用NonPersistentMetadataCheckpointStorageLocation;而savepoint location使用的是PersistentMetadataCheckpointStorageLocation(checkpiont可以选择是否使用文件存储,而savepoint只能使用文件存储)

doc

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,907评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,987评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,298评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,586评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,633评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,488评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,275评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,176评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,619评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,819评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,932评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,655评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,265评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,871评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,994评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,095评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,884评论 2 354

推荐阅读更多精彩内容