聊聊flink TaskManager的managed memory

本文主要研究一下flink TaskManager的managed memory

TaskManagerOptions

flink-core-1.7.2-sources.jar!/org/apache/flink/configuration/TaskManagerOptions.java

@PublicEvolving
public class TaskManagerOptions {
    //......

    /**
     * JVM heap size for the TaskManagers with memory size.
     */
    @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
    public static final ConfigOption<String> TASK_MANAGER_HEAP_MEMORY =
            key("taskmanager.heap.size")
            .defaultValue("1024m")
            .withDescription("JVM heap size for the TaskManagers, which are the parallel workers of" +
                    " the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" +
                    " YARN container, minus a certain tolerance value.");

    /**
     * Amount of memory to be allocated by the task manager's memory manager. If not
     * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}.
     */
    public static final ConfigOption<String> MANAGED_MEMORY_SIZE =
            key("taskmanager.memory.size")
            .defaultValue("0")
            .withDescription("Amount of memory to be allocated by the task manager's memory manager." +
                " If not set, a relative fraction will be allocated.");

    /**
     * Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is
     * not set.
     */
    public static final ConfigOption<Float> MANAGED_MEMORY_FRACTION =
            key("taskmanager.memory.fraction")
            .defaultValue(0.7f)
            .withDescription("The relative amount of memory (after subtracting the amount of memory used by network" +
                " buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results." +
                " For example, a value of `0.8` means that a task manager reserves 80% of its memory" +
                " for internal data buffers, leaving 20% of free memory for the task manager's heap for objects" +
                " created by user-defined functions. This parameter is only evaluated, if " + MANAGED_MEMORY_SIZE.key() +
                " is not set.");

    /**
     * Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager
     * as well as the network buffers.
     **/
    public static final ConfigOption<Boolean> MEMORY_OFF_HEAP =
            key("taskmanager.memory.off-heap")
            .defaultValue(false)
            .withDescription("Memory allocation method (JVM heap or off-heap), used for managed memory of the" +
                " TaskManager as well as the network buffers.");

    /**
     * Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.
     */
    public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE =
            key("taskmanager.memory.preallocate")
            .defaultValue(false)
            .withDescription("Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.");

    //......
}
  • taskmanager.memory.size设置的是由task manager memory manager管理的内存大小(主要用于sorting,hashing及caching),默认为0;taskmanager.heap.size设置的是taskmanager的heap及offHeap的memory

TaskManagerServices.calculateHeapSizeMB

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {
    //......

    /**
     * Calculates the amount of heap memory to use (to set via <tt>-Xmx</tt> and <tt>-Xms</tt>)
     * based on the total memory to use and the given configuration parameters.
     *
     * @param totalJavaMemorySizeMB
     *      overall available memory to use (heap and off-heap)
     * @param config
     *      configuration object
     *
     * @return heap memory to use (in megabytes)
     */
    public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
        Preconditions.checkArgument(totalJavaMemorySizeMB > 0);

        // subtract the Java memory used for network buffers (always off-heap)
        final long networkBufMB =
            calculateNetworkBufferMemory(
                totalJavaMemorySizeMB << 20, // megabytes to bytes
                config) >> 20; // bytes to megabytes
        final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;

        // split the available Java memory between heap and off-heap

        final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);

        final long heapSizeMB;
        if (useOffHeap) {

            long offHeapSize;
            String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
            if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
                try {
                    offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
                } catch (IllegalArgumentException e) {
                    throw new IllegalConfigurationException(
                        "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
                }
            } else {
                offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
            }

            if (offHeapSize <= 0) {
                // calculate off-heap section via fraction
                double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
                offHeapSize = (long) (fraction * remainingJavaMemorySizeMB);
            }

            TaskManagerServicesConfiguration
                .checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize,
                    TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
                    "Managed memory size too large for " + networkBufMB +
                        " MB network buffer memory and a total of " + totalJavaMemorySizeMB +
                        " MB JVM memory");

            heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;
        } else {
            heapSizeMB = remainingJavaMemorySizeMB;
        }

        return heapSizeMB;
    }

    //......
}
  • taskmanager.memory.size值小于等于0的话,则会根据taskmanager.memory.fraction配置来分配,默认为0.7
  • 如果开启了taskmanager.memory.off-heap,则taskmanager.memory.fraction * (taskmanager.heap.size - networkBufMB)得出的值作为task manager memory manager管理的offHeapSize
  • 如果开启了taskmanager.memory.off-heap,则taskmanager的Xmx值为taskmanager.heap.size - networkBufMB - offHeapSize

TaskManagerServices.createMemoryManager

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {
    //......

    /**
     * Creates a {@link MemoryManager} from the given {@link TaskManagerServicesConfiguration}.
     *
     * @param taskManagerServicesConfiguration to create the memory manager from
     * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory
     * @param maxJvmHeapMemory the maximum JVM heap size
     * @return Memory manager
     * @throws Exception
     */
    private static MemoryManager createMemoryManager(
            TaskManagerServicesConfiguration taskManagerServicesConfiguration,
            long freeHeapMemoryWithDefrag,
            long maxJvmHeapMemory) throws Exception {
        // computing the amount of memory to use depends on how much memory is available
        // it strictly needs to happen AFTER the network stack has been initialized

        // check if a value has been configured
        long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();

        MemoryType memType = taskManagerServicesConfiguration.getMemoryType();

        final long memorySize;

        boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();

        if (configuredMemory > 0) {
            if (preAllocateMemory) {
                LOG.info("Using {} MB for managed memory." , configuredMemory);
            } else {
                LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);
            }
            memorySize = configuredMemory << 20; // megabytes to bytes
        } else {
            // similar to #calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig)
            float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();

            if (memType == MemoryType.HEAP) {
                // network buffers allocated off-heap -> use memoryFraction of the available heap:
                long relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);
                if (preAllocateMemory) {
                    LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
                        memoryFraction , relativeMemSize >> 20);
                } else {
                    LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +
                        "memory will be allocated lazily." , memoryFraction , relativeMemSize >> 20);
                }
                memorySize = relativeMemSize;
            } else if (memType == MemoryType.OFF_HEAP) {
                // The maximum heap memory has been adjusted according to the fraction (see
                // calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e.
                // maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction)
                // directMemorySize = jvmTotalNoNet * memoryFraction
                long directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction);
                if (preAllocateMemory) {
                    LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
                        memoryFraction, directMemorySize >> 20);
                } else {
                    LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +
                        " memory will be allocated lazily.", memoryFraction, directMemorySize >> 20);
                }
                memorySize = directMemorySize;
            } else {
                throw new RuntimeException("No supported memory type detected.");
            }
        }

        // now start the memory manager
        final MemoryManager memoryManager;
        try {
            memoryManager = new MemoryManager(
                memorySize,
                taskManagerServicesConfiguration.getNumberOfSlots(),
                taskManagerServicesConfiguration.getNetworkConfig().networkBufferSize(),
                memType,
                preAllocateMemory);
        } catch (OutOfMemoryError e) {
            if (memType == MemoryType.HEAP) {
                throw new Exception("OutOfMemory error (" + e.getMessage() +
                    ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
            } else if (memType == MemoryType.OFF_HEAP) {
                throw new Exception("OutOfMemory error (" + e.getMessage() +
                    ") while allocating the TaskManager off-heap memory (" + memorySize +
                    " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
            } else {
                throw e;
            }
        }
        return memoryManager;
    }

    //......
}
  • TaskManagerServices提供了私有静态方法createMemoryManager用于根据配置创建MemoryManager;这里根据MemoryType来重新计算memorySize,然后传递给MemoryManager的构造器,创建MemoryManager
  • 当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction)
  • 当memType为MemoryType.OFF_HEAP时,其memorySize为directMemorySize,directMemorySize = jvmTotalNoNet * memoryFraction,而maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction),因而directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction)

TaskManagerServicesConfiguration

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java

public class TaskManagerServicesConfiguration {
    //......

    /**
     * Utility method to extract TaskManager config parameters from the configuration and to
     * sanity check them.
     *
     * @param configuration The configuration.
     * @param remoteAddress identifying the IP address under which the TaskManager will be accessible
     * @param localCommunication True, to skip initializing the network stack.
     *                                      Use only in cases where only one task manager runs.
     * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
     */
    public static TaskManagerServicesConfiguration fromConfiguration(
            Configuration configuration,
            InetAddress remoteAddress,
            boolean localCommunication) throws Exception {

        // we need this because many configs have been written with a "-1" entry
        int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
        if (slots == -1) {
            slots = 1;
        }

        final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration);
        String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration);

        if (localStateRootDir.length == 0) {
            // default to temp dirs.
            localStateRootDir = tmpDirs;
        }

        boolean localRecoveryMode = configuration.getBoolean(
            CheckpointingOptions.LOCAL_RECOVERY.key(),
            CheckpointingOptions.LOCAL_RECOVERY.defaultValue());

        final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration(
            configuration,
            localCommunication,
            remoteAddress,
            slots);

        final QueryableStateConfiguration queryableStateConfig =
                parseQueryableStateConfiguration(configuration);

        // extract memory settings
        long configuredMemory;
        String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
        if (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
            try {
                configuredMemory = MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
            } catch (IllegalArgumentException e) {
                throw new IllegalConfigurationException(
                    "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
            }
        } else {
            configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
        }

        checkConfigParameter(
            configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) ||
                configuredMemory > 0, configuredMemory,
            TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
            "MemoryManager needs at least one MB of memory. " +
                "If you leave this config parameter empty, the system automatically " +
                "pick a fraction of the available memory.");

        // check whether we use heap or off-heap memory
        final MemoryType memType;
        if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
            memType = MemoryType.OFF_HEAP;
        } else {
            memType = MemoryType.HEAP;
        }

        boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE);

        float memoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
        checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction,
            TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
            "MemoryManager fraction of the free memory must be between 0.0 and 1.0");

        long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis();

        return new TaskManagerServicesConfiguration(
            remoteAddress,
            tmpDirs,
            localStateRootDir,
            localRecoveryMode,
            networkConfig,
            queryableStateConfig,
            slots,
            configuredMemory,
            memType,
            preAllocateMemory,
            memoryFraction,
            timerServiceShutdownTimeout,
            ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
    }

    //......
}
  • TaskManagerServicesConfiguration提供了一个静态方法fromConfiguration,用于从Configuration创建TaskManagerServicesConfiguration;其中memType是依据taskmanager.memory.off-heap的配置来,如果为true则为MemoryType.OFF_HEAP,否则为MemoryType.HEAP

小结

  • TaskManager的managed memory分类heap及offHeap两种类型;taskmanager.memory.size设置的是由task manager memory manager管理的内存大小(主要用于sorting,hashing及caching),默认为0;taskmanager.heap.size设置的是taskmanager的heap及offHeap的memory;taskmanager.memory.size值小于等于0的话,则会根据taskmanager.memory.fraction配置来分配,默认为0.7;如果开启了taskmanager.memory.off-heap,则taskmanager.memory.fraction * (taskmanager.heap.size - networkBufMB)得出的值作为task manager memory manager管理的offHeapSize;如果开启了taskmanager.memory.off-heap,则taskmanager的Xmx值为taskmanager.heap.size - networkBufMB - offHeapSize
  • TaskManagerServices提供了私有静态方法createMemoryManager用于根据配置创建MemoryManager;这里根据MemoryType来重新计算memorySize,然后传递给MemoryManager的构造器,创建MemoryManager;当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);当memType为MemoryType.OFF_HEAP时,其memorySize为directMemorySize,directMemorySize = jvmTotalNoNet * memoryFraction,而maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction),因而directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction)
  • TaskManagerServicesConfiguration提供了一个静态方法fromConfiguration,用于从Configuration创建TaskManagerServicesConfiguration;其中memType是依据taskmanager.memory.off-heap的配置来,如果为true则为MemoryType.OFF_HEAP,否则为MemoryType.HEAP

doc

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容