聊聊flink TaskManager的memory大小设置

本文主要研究一下flink TaskManager的memory大小设置

flink-conf.yaml

flink-release-1.7.2/flink-dist/src/main/resources/flink-conf.yaml

# The heap size for the TaskManager JVM

taskmanager.heap.size: 1024m


# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

taskmanager.numberOfTaskSlots: 1

# Specify whether TaskManager's managed memory should be allocated when starting
# up (true) or when memory is requested.
#
# We recommend to set this value to 'true' only in setups for pure batch
# processing (DataSet API). Streaming setups currently do not use the TaskManager's
# managed memory: The 'rocksdb' state backend uses RocksDB's own memory management,
# while the 'memory' and 'filesystem' backends explicitly keep data as objects
# to save on serialization cost.
#
# taskmanager.memory.preallocate: false

# The amount of memory going to the network stack. These numbers usually need 
# no tuning. Adjusting them may be necessary in case of an "Insufficient number
# of network buffers" error. The default min is 64MB, teh default max is 1GB.
# 
# taskmanager.network.memory.fraction: 0.1
# taskmanager.network.memory.min: 64mb
# taskmanager.network.memory.max: 1gb
  • flink-conf.yaml提供了taskmanager.heap.size来设置taskmanager的memory(heap及offHeap)大小
  • 提供了taskmanager.memory相关配置(taskmanager.memory.fraction、taskmanager.memory.off-heap、taskmanager.memory.preallocate、taskmanager.memory.segment-size、taskmanager.memory.size)用于设置memory
  • 提供了taskmanager.network.memory相关配置(taskmanager.network.detailed-metrics、taskmanager.network.memory.buffers-per-channel、taskmanager.network.memory.floating-buffers-per-gate、taskmanager.network.memory.fraction、taskmanager.network.memory.max、taskmanager.network.memory.min)用于设置taskmanager的network stack的内存

config.sh

flink-release-1.7.2/flink-dist/src/main/flink-bin/bin/config.sh

#!/usr/bin/env bash

# WARNING !!! , these values are only used if there is nothing else is specified in
# conf/flink-conf.yaml

DEFAULT_ENV_PID_DIR="/tmp"                          # Directory to store *.pid files to
DEFAULT_ENV_LOG_MAX=5                               # Maximum number of old log files to keep
DEFAULT_ENV_JAVA_OPTS=""                            # Optional JVM args
DEFAULT_ENV_JAVA_OPTS_JM=""                         # Optional JVM args (JobManager)
DEFAULT_ENV_JAVA_OPTS_TM=""                         # Optional JVM args (TaskManager)
DEFAULT_ENV_JAVA_OPTS_HS=""                         # Optional JVM args (HistoryServer)
DEFAULT_ENV_SSH_OPTS=""                             # Optional SSH parameters running in cluster mode
DEFAULT_YARN_CONF_DIR=""                            # YARN Configuration Directory, if necessary
DEFAULT_HADOOP_CONF_DIR=""                          # Hadoop Configuration Directory, if necessary

KEY_TASKM_MEM_SIZE="taskmanager.heap.size"
KEY_TASKM_MEM_MB="taskmanager.heap.mb"
KEY_TASKM_MEM_MANAGED_SIZE="taskmanager.memory.size"
KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction"
KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap"
KEY_TASKM_MEM_PRE_ALLOCATE="taskmanager.memory.preallocate"

KEY_TASKM_NET_BUF_FRACTION="taskmanager.network.memory.fraction"
KEY_TASKM_NET_BUF_MIN="taskmanager.network.memory.min"
KEY_TASKM_NET_BUF_MAX="taskmanager.network.memory.max"
KEY_TASKM_NET_BUF_NR="taskmanager.network.numberOfBuffers" # fallback

KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"

# Define FLINK_TM_HEAP if it is not already set
if [ -z "${FLINK_TM_HEAP}" ]; then
    FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}")
fi

# Try read old config key, if new key not exists
if [ "${FLINK_TM_HEAP}" == 0 ]; then
    FLINK_TM_HEAP_MB=$(readFromConfig ${KEY_TASKM_MEM_MB} 0 "${YAML_CONF}")
fi

# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set
if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then
    FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}")

    if hasUnit ${FLINK_TM_MEM_MANAGED_SIZE}; then
        FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}))
    else
        FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}"m"))
    fi
fi

# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
    FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0.7 "${YAML_CONF}")
fi

# Define FLINK_TM_OFFHEAP if it is not already set
if [ -z "${FLINK_TM_OFFHEAP}" ]; then
    FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} "false" "${YAML_CONF}")
fi

# Define FLINK_TM_MEM_PRE_ALLOCATE if it is not already set
if [ -z "${FLINK_TM_MEM_PRE_ALLOCATE}" ]; then
    FLINK_TM_MEM_PRE_ALLOCATE=$(readFromConfig ${KEY_TASKM_MEM_PRE_ALLOCATE} "false" "${YAML_CONF}")
fi


# Define FLINK_TM_NET_BUF_FRACTION if it is not already set
if [ -z "${FLINK_TM_NET_BUF_FRACTION}" ]; then
    FLINK_TM_NET_BUF_FRACTION=$(readFromConfig ${KEY_TASKM_NET_BUF_FRACTION} 0.1 "${YAML_CONF}")
fi

# Define FLINK_TM_NET_BUF_MIN and FLINK_TM_NET_BUF_MAX if not already set (as a fallback)
if [ -z "${FLINK_TM_NET_BUF_MIN}" -a -z "${FLINK_TM_NET_BUF_MAX}" ]; then
    FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_NR} -1 "${YAML_CONF}")
    if [ $FLINK_TM_NET_BUF_MIN != -1 ]; then
        FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN})
        FLINK_TM_NET_BUF_MAX=${FLINK_TM_NET_BUF_MIN}
    fi
fi

# Define FLINK_TM_NET_BUF_MIN if it is not already set
if [ -z "${FLINK_TM_NET_BUF_MIN}" -o "${FLINK_TM_NET_BUF_MIN}" = "-1" ]; then
    # default: 64MB = 67108864 bytes (same as the previous default with 2048 buffers of 32k each)
    FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_MIN} 67108864 "${YAML_CONF}")
    FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN})
fi

# Define FLINK_TM_NET_BUF_MAX if it is not already set
if [ -z "${FLINK_TM_NET_BUF_MAX}" -o "${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then
    # default: 1GB = 1073741824 bytes
    FLINK_TM_NET_BUF_MAX=$(readFromConfig ${KEY_TASKM_NET_BUF_MAX} 1073741824 "${YAML_CONF}")
    FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX})
fi
  • config.sh在相关变量没有设置的前提下,初始化了FLINK_TM_HEAP、FLINK_TM_MEM_MANAGED_SIZE、FLINK_TM_MEM_MANAGED_FRACTION、FLINK_TM_OFFHEAP、FLINK_TM_MEM_PRE_ALLOCATE、FLINK_TM_NET_BUF_FRACTION等变量

taskmanager.sh

flink-release-1.7.2/flink-dist/src/main/flink-bin/bin/taskmanager.sh

#!/usr/bin/env bash
# Start/stop a Flink TaskManager.
USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"

STARTSTOP=$1

ARGS=("${@:2}")

if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
  echo $USAGE
  exit 1
fi

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

ENTRYPOINT=taskexecutor

if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then

    # if memory allocation mode is lazy and no other JVM options are set,
    # set the 'Concurrent Mark Sweep GC'
    if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
        export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
    fi

    if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
        echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace with key \`${KEY_TASKM_MEM_SIZE}\`"
    else
        flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
        FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
    fi

    if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then
        echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
        exit 1
    fi

    if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then

        TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
        # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used
        TM_MAX_OFFHEAP_SIZE="8388607T"

        export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"

    fi

    # Add TaskManager-specific JVM options
    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"

    # Startup parameters
    ARGS+=("--configDir" "${FLINK_CONF_DIR}")
fi

if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
else
    if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
        # Start a single TaskManager
        "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
    else
        # Example output from `numactl --show` on an AWS c4.8xlarge:
        # policy: default
        # preferred node: current
        # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
        # cpubind: 0 1
        # nodebind: 0 1
        # membind: 0 1
        read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
        for NODE_ID in "${NODE_LIST[@]:1}"; do
            # Start a TaskManager for each NUMA node
            numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
        done
    fi
fi
  • taskmanager.sh首先调用config.sh初始化相关变量,之后计算并export了JVM_ARGS及FLINK_ENV_JAVA_OPTS,最后调用flink-console.sh启动相关类
  • 如果FLINK_TM_MEM_PRE_ALLOCATE为false且FLINK_ENV_JAVA_OPTS及FLINK_ENV_JAVA_OPTS_TM都没有设置,则追加-XX:+UseG1GC到JVM_ARGS;之后读取FLINK_TM_HEAP到FLINK_TM_HEAP_MB;如果FLINK_TM_HEAP_MB大于0则通过calculateTaskManagerHeapSizeMB计算TM_HEAP_SIZE,然后以TM_HEAP_SIZE设置xms及Xmx,以TM_MAX_OFFHEAP_SIZE设置MaxDirectMemorySize,追加到JVM_ARGS中;而FLINK_ENV_JAVA_OPTS_TM则会追加到FLINK_ENV_JAVA_OPTS
  • calculateTaskManagerHeapSizeMB在config.sh中有定义,另外其对应的java代码在TaskManagerServices.calculateHeapSizeMB

TaskManagerServices

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {
    //......

    /**
     * Calculates the amount of heap memory to use (to set via <tt>-Xmx</tt> and <tt>-Xms</tt>)
     * based on the total memory to use and the given configuration parameters.
     *
     * @param totalJavaMemorySizeMB
     *      overall available memory to use (heap and off-heap)
     * @param config
     *      configuration object
     *
     * @return heap memory to use (in megabytes)
     */
    public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
        Preconditions.checkArgument(totalJavaMemorySizeMB > 0);

        // subtract the Java memory used for network buffers (always off-heap)
        final long networkBufMB =
            calculateNetworkBufferMemory(
                totalJavaMemorySizeMB << 20, // megabytes to bytes
                config) >> 20; // bytes to megabytes
        final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;

        // split the available Java memory between heap and off-heap

        final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);

        final long heapSizeMB;
        if (useOffHeap) {

            long offHeapSize;
            String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
            if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
                try {
                    offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
                } catch (IllegalArgumentException e) {
                    throw new IllegalConfigurationException(
                        "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
                }
            } else {
                offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
            }

            if (offHeapSize <= 0) {
                // calculate off-heap section via fraction
                double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
                offHeapSize = (long) (fraction * remainingJavaMemorySizeMB);
            }

            TaskManagerServicesConfiguration
                .checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize,
                    TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
                    "Managed memory size too large for " + networkBufMB +
                        " MB network buffer memory and a total of " + totalJavaMemorySizeMB +
                        " MB JVM memory");

            heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;
        } else {
            heapSizeMB = remainingJavaMemorySizeMB;
        }

        return heapSizeMB;
    }

    /**
     * Calculates the amount of memory used for network buffers based on the total memory to use and
     * the according configuration parameters.
     *
     * <p>The following configuration parameters are involved:
     * <ul>
     *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
     *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
     *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
     *  <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
     * </ul>.
     *
     * @param totalJavaMemorySize
     *      overall available memory to use (heap and off-heap, in bytes)
     * @param config
     *      configuration object
     *
     * @return memory to use for network buffers (in bytes); at least one memory segment
     */
    @SuppressWarnings("deprecation")
    public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) {
        Preconditions.checkArgument(totalJavaMemorySize > 0);

        int segmentSize =
            checkedDownCast(MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());

        final long networkBufBytes;
        if (TaskManagerServicesConfiguration.hasNewNetworkBufConf(config)) {
            // new configuration based on fractions of available memory with selectable min and max
            float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
            long networkBufMin = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
            long networkBufMax = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();


            TaskManagerServicesConfiguration
                .checkNetworkBufferConfig(segmentSize, networkBufFraction, networkBufMin, networkBufMax);

            networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
                (long) (networkBufFraction * totalJavaMemorySize)));

            TaskManagerServicesConfiguration
                .checkConfigParameter(networkBufBytes < totalJavaMemorySize,
                    "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
                    "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
                        TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
                        TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
                    "Network buffer memory size too large: " + networkBufBytes + " >= " +
                        totalJavaMemorySize + " (total JVM memory size)");
            TaskManagerServicesConfiguration
                .checkConfigParameter(networkBufBytes >= segmentSize,
                    "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
                    "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
                        TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
                        TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
                    "Network buffer memory size too small: " + networkBufBytes + " < " +
                        segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
        } else {
            // use old (deprecated) network buffers parameter
            int numNetworkBuffers = config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
            networkBufBytes = (long) numNetworkBuffers * (long) segmentSize;

            TaskManagerServicesConfiguration.checkNetworkConfigOld(numNetworkBuffers);

            TaskManagerServicesConfiguration
                .checkConfigParameter(networkBufBytes < totalJavaMemorySize,
                    networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
                    "Network buffer memory size too large: " + networkBufBytes + " >= " +
                        totalJavaMemorySize + " (total JVM memory size)");
            TaskManagerServicesConfiguration
                .checkConfigParameter(networkBufBytes >= segmentSize,
                    networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
                    "Network buffer memory size too small: " + networkBufBytes + " < " +
                        segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
        }

        return networkBufBytes;
    }

    //......
}
  • FLINK_TM_HEAP设置的是taskmanager的memory(heap及offHeap)大小,而network buffers总是使用offHeap,因而这里首先要从FLINK_TM_HEAP扣减掉这部分offHeap然后重新计算Xms及Xmx
  • calculateHeapSizeMB先调用calculateNetworkBufferMemory计算networkBufMB,然后从totalJavaMemorySizeMB扣减掉networkBufMB得到remainingJavaMemorySizeMB
  • 之后读取taskmanager.memory.off-heap设置,默认为false,则直接以remainingJavaMemorySizeMB返回;如果为true,则需要计算offHeapSize的值,然后从remainingJavaMemorySizeMB扣减offHeapSize再返回

小结

  • flink-conf.yaml提供了taskmanager.heap.size来设置taskmanager的memory(heap及offHeap)大小;提供了taskmanager.memory相关配置(taskmanager.memory.fraction、taskmanager.memory.off-heap、taskmanager.memory.preallocate、taskmanager.memory.segment-size、taskmanager.memory.size)用于设置memory;提供了taskmanager.network.memory相关配置(taskmanager.network.detailed-metrics、taskmanager.network.memory.buffers-per-channel、taskmanager.network.memory.floating-buffers-per-gate、taskmanager.network.memory.fraction、taskmanager.network.memory.max、taskmanager.network.memory.min)用于设置taskmanager的network stack的内存
  • taskmanager.sh首先调用config.sh初始化相关变量,之后计算并export了JVM_ARGS及FLINK_ENV_JAVA_OPTS,最后调用flink-console.sh启动相关类;如果FLINK_TM_MEM_PRE_ALLOCATE为false且FLINK_ENV_JAVA_OPTS及FLINK_ENV_JAVA_OPTS_TM都没有设置,则追加-XX:+UseG1GC到JVM_ARGS;之后读取FLINK_TM_HEAP到FLINK_TM_HEAP_MB;如果FLINK_TM_HEAP_MB大于0则通过calculateTaskManagerHeapSizeMB计算TM_HEAP_SIZE,然后以TM_HEAP_SIZE设置xms及Xmx,以TM_MAX_OFFHEAP_SIZE设置MaxDirectMemorySize,追加到JVM_ARGS中;而FLINK_ENV_JAVA_OPTS_TM则会追加到FLINK_ENV_JAVA_OPTS;calculateTaskManagerHeapSizeMB在config.sh中有定义,另外其对应的java代码在TaskManagerServices.calculateHeapSizeMB
  • FLINK_TM_HEAP设置的是taskmanager的memory(heap及offHeap)大小,而network buffers总是使用offHeap,因而这里首先要从FLINK_TM_HEAP扣减掉这部分offHeap然后重新计算Xms及Xmx;calculateHeapSizeMB先调用calculateNetworkBufferMemory计算networkBufMB,然后从totalJavaMemorySizeMB扣减掉networkBufMB得到remainingJavaMemorySizeMB;之后读取taskmanager.memory.off-heap设置,默认为false,则直接以remainingJavaMemorySizeMB返回;如果为true,则需要计算offHeapSize的值,然后从remainingJavaMemorySizeMB扣减offHeapSize再返回

由此可见最后的jvm参数取决于JVM_ARGS及FLINK_ENV_JAVA_OPTS;其中注意不要设置内存相关参数到JVM_ARGS,因为taskmanager.sh在FLINK_TM_HEAP_MB大于0的时候,则使用该值计算TM_HEAP_SIZE设置Xms及Xmx追加到JVM_ARGS变量中,而FLINK_TM_HEAP_MB则取决于FLINK_TM_HEAP或者taskmanager.heap.size配置;FLINK_ENV_JAVA_OPTS的配置则取决于env.java.opts以及env.java.opts.taskmanager;因而要配置taskmanager的memory(heap及offHeap)大小,可以指定FLINK_TM_HEAP环境变量(比如FLINK_TM_HEAP=512m),或者在flink-conf.yaml中指定taskmanager.heap.size;而最终的Xms及Xmx则是FLINK_TM_HEAP扣减掉offHeap而来,确定使用offHeap为network buffers,其余的看是否开启taskmanager.memory.off-heap,默认为false

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容