Configuration

原文链接


对于单节点设置,Flink已经准备就绪,不需要更改默认配置就可以启动。

开箱即用的配置会使用你默认安装的Java。如果你想手动覆盖Java运行环境,可以手动设置环境变量JAVA_HOME或者conf/flink-conf.yaml文件中的env.java.home属性。

本页列出了配置一个良好的运行(分布式的)环境,通常需要设置的最常见的选项。。此外,这里列出了所有可用配置参数的完整列表。

所有的配置都是在 conf/flink-conf.yaml中完成的,它期望是一个key: value格式的YAML键值对 的扁平集合。

系统和运行脚本在启动时解析配置。对配置文件的更改需要重新启动Flink JobManager和TaskManager。

任务管理器的配置文件可以不同,Flink不假设集群中有统一的机器。

通用选项

  • env.java.home: 使用的Java安装路径(默认:如果找到的话,系统默认的Java安装路径)。如果启动脚本未能自动解析java主目录,则需要指定。可以指定指向一个特定的Java安装路径或版本。如果没有指定此选项,启动脚本还好评估$JAVA_HOME环境变量。

  • env.java.opts: 设置自定义JVM选项。Flink的启动脚本,JobManager,TaskManager和Flink的YARN客户端关心这个值。它可以用来设置不同的垃圾收集器或者将远程调试添加到运行Flink服务的JVM中。包含双引号的选项,延迟参数替换,从而允许Flink的启动脚本访问变量。分别使用 env.java.opts.jobmanagerenv.java.opts.taskmanager 为JobManager和TaskManager制定选项。

  • env.java.opts.jobmanager: JobManager指定的JVM选项。JobManager-specific JVM options. 这些是除了常规 env.java.opts 之外的选项。

  • env.java.opts.taskmanager: TaskManager 指定的JVM选项。 这些是除了常规 env.java.opts 之外的选项。

  • jobmanager.rpc.address: JobManager的外部地址,它是分布式系统的master/协调者的地址(默认: localhost)。注意: 地址(主机名和IP)应该能被所有的节点和客户端访问。

  • jobmanager.rpc.port: JobManager的端口号(默认:6123)。

  • jobmanager.heap.mb: JobManager的JVM堆大小(以兆字节为单位)。如果您正在运行非常大的应用程序(有许多操作符),或者您保存了很长的历史,那么您可能需要增加JobManager的堆大小。

  • taskmanager.heap.mb: TaskManager的JVM堆大小,它是系统的并行worker。与Hadoop相比,Flink在TaskManager内(包括排序/哈希/缓存)运行操作符(例如,join,aggregate)和用户定义的函数(例如,Map,Reduce,CoGroup),因此这个值应该尽可能大。如果集群只是运行Flink,那么每个机器的可用内存总量减去一些操作系统的内存(可能1-2GB)是一个比较好的值。在YARN上,这个值自动设置成TaskManager的YARN容器的大小,减去一定的公差值。

  • taskmanager.numberOfTaskSlots: 一个TaskManager能够运行的并行的操作符或者用户函数实例的数量(默认:1)。如果这个值大于1,一个TaskManager将接受多个函数或操作符的实例。这样,TaskManager能利用多个CPU核,但是同时,可用内存被划分给不同的操作符或函数实例。这个值通常和TaskManager所在的机器的物理CPU核数成正比(例如,等于核心的数量,或者是核心数量的一半)。更多的关于slot.

  • parallelism.default: 对于没有指定并行性的程序,默认并行性为1。对于没有并发作业运行的系统,设置这个值为NumTaskManagers * NumSlotsPerTaskManager,这样使得系统使用所有可用的资源来执行程序。注意: 默认的并行性可以被作业重写,通过调用ExecutionEnvironment上的 setParallelism(int parallelism)方法或者通过传递 -p <parallelism>到Flink命令行前端。可以通过调用操作符上的setParallelism(int parallelism)方法重写单个转换的并行度。关于并行度的更多信息见并行执行

  • fs.default-scheme: 使用的默认文件系统的scheme,具有连接的必要权限,例如在HDFS的情况下,NameNode 的host:port(如果需要)。默认情况下,这个值被设为 file:///,它指向本地文件系统。这意味着将使用本地文件系统来搜索用户指定的文件,而不需要显示scheme定义。另外一个例子,如果这个值被设为hdfs://localhost:9000/,然后用户指定的文件路径没有scheme定义,例如/user/USERNAME/in.txt,将被转换为hdfs://localhost:9000/user/USERNAME/in.txt。这个scheme仅用于在用户提供的URI中没有指定其它scheme(显示的)。

  • fs.hdfs.hadoopconf: Hadoop文件系统(HDFS)配置目录的绝对路径(可选值)。指定这个值允许程序使用短URI(hdfs:///path/to/files,而不用在URI中包含NameNode的地址和端口)引用HDFS文件。如果没有这个选项,HDFS文件可以访问,但需要完全限定的URI,就像hdfs://address:port/path/to/files。这个选项还会导致文件写者获取HDFS的块大小和复制因子的默认值。Flink会在指定的目录中查找“core-site.xml”和“hdfs-site.xml”文件。

  • classloader.resolve-order: 当加载用户代码类时,Flink使用child-first的 ClassLoader还是parent-first ClassLoader。可以是parent-firstchild-first中的一个值。(默认:child-first)

  • classloader.parent-first-patterns: 一个(分号分割的)模式列表,它指定哪些类应该总是通过父 ClassLoader进行解析。模式是对类全限定名的简单前缀匹配。默认情况下,它被设置为java.;org.apache.flink.;javax.annotation;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback。如果你想更改此设置,并且希望保持默认行为,则必须确保在你的模式列表中包含默认模式。

高级选项

计算

  • taskmanager.compute.numa: When enabled a TaskManager is started on each NUMA node for each worker listed in conf/slaves (DEFAULT: false). Note: only supported when deploying Flink as a standalone cluster.

托管内存

默认情况下,Flink分配空闲内存(通过taskmanager.heap.mb配置的总内存减去用作网络缓冲的内存)的0.7倍作为托管内存。托管内存帮助Flink有效的运行批处理操作。它阻止OutOfMemoryException发生,因为Flink知道它有多少内存能用于执行操作。如果Flink耗尽了托管内存,它就利用磁盘空间。使用托管内存,一些操作可以直接在原始数据上执行,而不用将数据反序列化成Java对象。总之,托管内存提高了系统的健壮性和速度。

托管内存的默认分数可以通过taskmanager.memory.fraction参数进行调整。可以使用taskmanager.memory.size设置绝对值(覆盖分数参数)。如果需要,托内内存可以分配在JVM堆外。这可以提高具有大内存空间情况下的性能。

  • taskmanager.memory.size: TaskManager持有的用于排序,hash表和中间结果缓存的堆内或堆外(依赖于taskmanager.memory.off-heap)的内存大小(MB)。如果未指定(-1),内存管理器将使用TaskManager JVM堆大小的固定比率的内存,该比率由taskmanager.memory.fraction指定。(默认值:-1)

  • taskmanager.memory.fraction: 相对数量的内存(相对于taskmanager.heap.mb,再减去网络缓存使用的内存数量之后),TaskManager使用它来进行排序,hash表和缓存中间结果。例如,值为0.8意味着TaskManager将其内存(堆上或者堆外依赖于taskmanager.memory.off-heap)的80%用于内部数据缓冲,剩下的20%的内存用于用户定义的函数创建的对象。(默认:0.7) 这个参数仅用于没有设置taskmanager.memory.size时进行评估。

  • taskmanager.memory.off-heap: 如果设置为true,TaskManager分配用于排序,hash表和缓存中间结果的内存位于JVM堆外。对于具有较大内存的设置,这可以提高在内存上执行的操作的效率(默认为false)。

  • taskmanager.memory.segment-size: 内存管理和网络栈使用的内存缓冲块字节数大小。(默认: 32768 (= 32 KB))。

  • taskmanager.memory.preallocate: 可以是truefalse。指定TaskManager是否应该在启动时分配所有的托管内存。 (默认: false)。当 taskmanager.memory.off-heap设置为true时,建议这个配置也设为 true。如果这个配置设为false,那么只有配置的JVM参数MaxDirectMemorySize到达并触发一个full GC时才会清理分配的堆外内存。注意:对于流设置,我们强烈推荐设置这个值为false,因为核心的状态后端当前不使用托管内存。

内存和性能调试

这些选项对于调试Flink应用内存和垃圾收集相关问题时非常有用,例如性能和OOM导致死亡或异常。

  • taskmanager.debug.memory.startLogThread: 使TaskManager定期记录内存和垃圾收集统计信息。统计数据包括当前堆,堆外和其它内存池的利用率,以及堆内存池花费在垃圾收集上的时间。

  • taskmanager.debug.memory.logIntervalMs: TaskManager记录内存和垃圾收集统计信息的间隔(毫秒)。只有在taskmanager.debug.memory.startLogThread设置为true时才生效。

其它

  • taskmanager.tmp.dirs: The directory for temporary files, or a list of directories separated by the system’s directory delimiter (for example ‘:’ (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round-robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system’s tmp dir).

  • taskmanager.log.path: The config parameter defining the taskmanager log file location

  • jobmanager.web.address: Address of the JobManager’s web interface (DEFAULT: anyLocalAddress()).

  • jobmanager.web.port: Port of the JobManager’s web interface (DEFAULT: 8081).

  • jobmanager.web.tmpdir: This configuration parameter allows defining the Flink web directory to be used by the web interface. The web interface will copy its static files into the directory. Also uploaded job jars are stored in the directory if not overridden. By default, the temporary directory is used.

  • jobmanager.web.upload.dir: The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory will be used under the directory specified by jobmanager.web.tmpdir.

  • fs.overwrite-files: Specifies whether file output writers should overwrite existing files by default. Set to true to overwrite by default, false otherwise. (DEFAULT: false)

  • fs.output.always-create-directory: File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to true, writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to false, the writer will directly create the file directly at the output path, without creating a containing directory. (DEFAULT: false)

  • taskmanager.network.memory.fraction: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. (DEFAULT: 0.1)

  • taskmanager.network.memory.min: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB)

  • taskmanager.network.memory.max: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB)

  • state.backend: The backend that will be used to store operator state checkpoints if checkpointing is enabled. Supported backends:

    • jobmanager: In-memory state, backup to JobManager’s/ZooKeeper’s memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging.
    • filesystem: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, …
  • state.backend.fs.checkpointdir: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use file:// only for local setups.

  • state.backend.rocksdb.checkpointdir: The local directory for storing RocksDB files, or a list of directories separated by the systems directory delimiter (for example ‘:’ (colon) on Linux/Unix). (DEFAULT value is taskmanager.tmp.dirs)

  • state.checkpoints.dir: The target directory for meta data of externalized checkpoints.

  • state.checkpoints.num-retained: The number of completed checkpoint instances to retain. Having more than one allows recovery fallback to an earlier checkpoints if the latest checkpoint is corrupt. (Default: 1)

  • high-availability.zookeeper.storageDir: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named recovery.zookeeper.storageDir.

  • blob.storage.directory: Directory for storing blobs (such as user JARs) on the TaskManagers.

  • blob.service.cleanup.interval: Cleanup interval (in seconds) of transient blobs at server and caches as well as permanent blobs at the caches (DEFAULT: 1 hour). Whenever a job is not referenced at the cache anymore, we set a TTL for its permanent blob files and let the periodic cleanup task (executed every blob.service.cleanup.interval seconds) remove them after this TTL has passed. We do the same for transient blob files at both server and caches but immediately after accessing them, i.e. an put or get operation. This means that a blob will be retained at most <tt>2 * blob.service.cleanup.interval</tt> seconds after not being referenced anymore (permanent blobs) or their last access (transient blobs). For permanent blobs, this means that a recovery still has the chance to use existing files rather downloading them again.

  • blob.server.port: Port definition for the blob server (serving user JARs) on the TaskManagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine.

  • blob.service.ssl.enabled: Flag to enable ssl for the blob client/server communication. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: true).

  • restart-strategy: Default restart strategy to use in case no restart strategy has been specified for the job. The options are:

    • fixed delay strategy: fixed-delay.
    • failure rate strategy: failure-rate.
    • no restarts: none

    Default value is none unless checkpointing is enabled for the job in which case the default is fixed-delay with Integer.MAX_VALUE restart attempts and 10s delay.

  • restart-strategy.fixed-delay.attempts: Number of restart attempts, used if the default restart strategy is set to “fixed-delay”. Default value is 1, unless “fixed-delay” was activated by enabling checkpoints, in which case the default is Integer.MAX_VALUE.

  • restart-strategy.fixed-delay.delay: Delay between restart attempts, used if the default restart strategy is set to “fixed-delay”. (default: 1 s)

  • restart-strategy.failure-rate.max-failures-per-interval: Maximum number of restarts in given time interval before failing a job in “failure-rate” strategy. Default value is 1.

  • restart-strategy.failure-rate.failure-rate-interval: Time interval for measuring failure rate in “failure-rate” strategy. Default value is 1 minute.

  • restart-strategy.failure-rate.delay: Delay between restart attempts, used if the default restart strategy is set to “failure-rate”. Default value is the akka.ask.timeout.

Full Reference

HDFS

These parameters configure the default HDFS used by Flink. Setups that do not specify a HDFS configuration have to specify the full path to HDFS files (hdfs://address:port/path/to/files) Files will also be written with default HDFS parameters (block size, replication factor).

  • fs.hdfs.hadoopconf: The absolute path to the Hadoop configuration directory. The system will look for the “core-site.xml” and “hdfs-site.xml” files in that directory (DEFAULT: null).

  • fs.hdfs.hdfsdefault: The absolute path of Hadoop’s own configuration file “hdfs-default.xml” (DEFAULT: null).

  • fs.hdfs.hdfssite: The absolute path of Hadoop’s own configuration file “hdfs-site.xml” (DEFAULT: null).

JobManager & TaskManager

The following parameters configure Flink’s JobManager and TaskManagers.

  • jobmanager.rpc.address: The external address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost). Note: The address (host name or IP) should be accessible by all nodes including the client.

  • jobmanager.rpc.port: The port number of the JobManager (DEFAULT: 6123).

  • taskmanager.hostname: The hostname of the network interface that the TaskManager binds to. By default, the TaskManager searches for network interfaces that can connect to the JobManager and other TaskManagers. This option can be used to define a hostname if that strategy fails for some reason. Because different TaskManagers need different values for this option, it usually is specified in an additional non-shared TaskManager-specific config file.

  • taskmanager.rpc.port: The task manager’s IPC port (DEFAULT: 0, which lets the OS choose a free port). Flink also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.

  • taskmanager.data.port: The task manager’s port used for data exchange operations (DEFAULT: 0, which lets the OS choose a free port).

  • taskmanager.data.ssl.enabled: Enable SSL support for the taskmanager data transport. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: true)

  • jobmanager.heap.mb: JVM heap size (in megabytes) for the JobManager (DEFAULT: 256).

  • taskmanager.heap.mb: JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. In contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager (including sorting/hashing/caching), so this value should be as large as possible (DEFAULT: 512). On YARN setups, this value is automatically configured to the size of the TaskManager’s YARN container, minus a certain tolerance value.

  • taskmanager.numberOfTaskSlots: The number of parallel operator or user function instances that a single TaskManager can run (DEFAULT: 1). If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager’s machine has (e.g., equal to the number of cores, or half the number of cores).

  • taskmanager.tmp.dirs: The directory for temporary files, or a list of directories separated by the system’s directory delimiter (for example ‘:’ (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system’s tmp dir).

  • taskmanager.network.memory.fraction: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. Also note, that taskmanager.network.memory.min and taskmanager.network.memory.max may override this fraction. (DEFAULT: 0.1)

  • taskmanager.network.memory.min: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB). Previously, this was determined from taskmanager.network.numberOfBuffers and taskmanager.memory.segment-size.

  • taskmanager.network.memory.max: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB). Previously, this was determined from taskmanager.network.numberOfBuffers and taskmanager.memory.segment-size.

  • taskmanager.network.numberOfBuffers (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: 2048). If set, it will be mapped to taskmanager.network.memory.min and taskmanager.network.memory.max based on taskmanager.memory.segment-size.

  • taskmanager.memory.size: The amount of memory (in megabytes) that the task manager reserves on the JVM’s heap space for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed ratio of the heap memory available to the JVM, as specified by taskmanager.memory.fraction. (DEFAULT: -1)

  • taskmanager.memory.fraction: The relative amount of memory (with respect to taskmanager.heap.mb, after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of 0.8 means that a task manager reserves 80% of its memory (on-heap or off-heap depending on taskmanager.memory.off-heap) for internal data buffers, leaving 20% of free memory for the task manager’s heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if taskmanager.memory.size is not set.

  • taskmanager.debug.memory.startLogThread: Causes the TaskManagers to periodically log memory and Garbage collection statistics. The statistics include current heap-, off-heap, and other memory pool utilization, as well as the time spent on garbage collection, by heap memory pool.

  • taskmanager.debug.memory.logIntervalMs: The interval (in milliseconds) in which the TaskManagers log the memory and garbage collection statistics. Only has an effect, if taskmanager.debug.memory.startLogThread is set to true.

  • taskmanager.maxRegistrationDuration: Defines the maximum time it can take for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates. The max registration duration requires a time unit specifier (ms/s/min/h/d) (e.g. “10 min”). (DEFAULT: Inf)

  • taskmanager.initial-registration-pause: The initial registration pause between two consecutive registration attempts. The pause is doubled for each new registration attempt until it reaches the maximum registration pause. The initial registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. “5 s”). (DEFAULT: 500 ms)

  • taskmanager.max-registration-pause: The maximum registration pause between two consecutive registration attempts. The max registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. “5 s”). (DEFAULT: 30 s)

  • taskmanager.refused-registration-pause: The pause after a registration has been refused by the job manager before retrying to connect. The refused registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. “5 s”). (DEFAULT: 10 s)

  • taskmanager.jvm-exit-on-oom: Indicates that the TaskManager should immediately terminate the JVM if the task thread throws an OutOfMemoryError (DEFAULT: false).

  • blob.fetch.retries: The number of retries for the TaskManager to download BLOBs (such as JAR files) from the JobManager (DEFAULT: 50).

  • blob.fetch.num-concurrent: The number concurrent BLOB fetches (such as JAR file downloads) that the JobManager serves (DEFAULT: 50).

  • blob.fetch.backlog: The maximum number of queued BLOB fetches (such as JAR file downloads) that the JobManager allows (DEFAULT: 1000).

  • task.cancellation-interval: Time interval between two successive task cancellation attempts in milliseconds (DEFAULT: 30000).

  • taskmanager.exit-on-fatal-akka-error: Whether the TaskManager shall be terminated in case of a fatal Akka error (quarantining event). (DEFAULT: false)

  • jobmanager.tdd.offload.minsize: Maximum size of the TaskDeploymentDescriptor’s serialized task and job information to still transmit them via RPC. Larger blobs may be offloaded to the BLOB server. (DEFAULT: 1 KiB).

High Availability (HA)

  • high-availability: Defines the high availability mode used for the cluster execution. Currently, Flink supports the following modes:
    • none (default): No high availability. A single JobManager runs and no JobManager state is checkpointed.
    • zookeeper: Supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the ‘zookeeper’ mode, it is mandatory to also define the high-availability.zookeeper.quorum configuration value.
  • high-availability.cluster-id: (Default /default_ns in standalone cluster mode, or the <yarn-application-id>under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named recovery.zookeeper.path.namespace and high-availability.zookeeper.path.namespace.</yarn-application-id>

Previously this key was named recovery.mode and the default value was standalone.

ZooKeeper-based HA Mode

  • high-availability.zookeeper.quorum: Defines the ZooKeeper quorum URL which is used to connect to the ZooKeeper cluster when the ‘zookeeper’ HA mode is selected. Previously this key was named recovery.zookeeper.quorum.

  • high-availability.zookeeper.path.root: (Default /flink) Defines the root dir under which the ZooKeeper HA mode will create namespace directories. Previously this ket was named recovery.zookeeper.path.root.

  • high-availability.zookeeper.path.latch: (Default /leaderlatch) Defines the znode of the leader latch which is used to elect the leader. Previously this key was named recovery.zookeeper.path.latch.

  • high-availability.zookeeper.path.leader: (Default /leader) Defines the znode of the leader which contains the URL to the leader and the current leader session ID. Previously this key was named recovery.zookeeper.path.leader.

  • high-availability.storageDir: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). Required for HA. Previously this key was named recovery.zookeeper.storageDir and high-availability.zookeeper.storageDir.

  • high-availability.zookeeper.client.session-timeout: (Default 60000) Defines the session timeout for the ZooKeeper session in ms. Previously this key was named recovery.zookeeper.client.session-timeout

  • high-availability.zookeeper.client.connection-timeout: (Default 15000) Defines the connection timeout for ZooKeeper in ms. Previously this key was named recovery.zookeeper.client.connection-timeout.

  • high-availability.zookeeper.client.retry-wait: (Default 5000) Defines the pause between consecutive retries in ms. Previously this key was named recovery.zookeeper.client.retry-wait.

  • high-availability.zookeeper.client.max-retry-attempts: (Default 3) Defines the number of connection retries before the client gives up. Previously this key was named recovery.zookeeper.client.max-retry-attempts.

  • high-availability.job.delay: (Default akka.ask.timeout) Defines the delay before persisted jobs are recovered in case of a master recovery situation. Previously this key was named recovery.job.delay.

  • high-availability.zookeeper.client.acl: (Default open) Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos). The ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes

ZooKeeper Security

  • zookeeper.sasl.disable: (Default: true) Defines if SASL based authentication needs to be enabled or disabled. The configuration value can be set to “true” if ZooKeeper cluster is running in secure mode (Kerberos).

  • zookeeper.sasl.service-name: (Default: zookeeper) If the ZooKeeper server is configured with a different service name (default:”zookeeper”) then it can be supplied using this configuration. A mismatch in service name between client and server configuration will cause the authentication to fail.

Environment

  • env.log.dir: (Defaults to the log directory under Flink’s home) Defines the directory where the Flink logs are saved. It has to be an absolute path.

  • env.log.max: (Default: 5) The maximum number of old log files to keep.

  • env.ssh.opts: Additional command line options passed to SSH clients when starting or stopping JobManager, TaskManager, and Zookeeper services (start-cluster.sh, stop-cluster.sh, start-zookeeper-quorum.sh, stop-zookeeper-quorum.sh).

Configuring TaskManager processing slots

Flink executes a program in parallel by splitting it into subtasks and scheduling these subtasks to processing slots.

Each Flink TaskManager provides processing slots in the cluster. The number of slots is typically proportional to the number of available CPU cores of each TaskManager. As a general recommendation, the number of available CPU cores is a good default for taskmanager.numberOfTaskSlots.

When starting a Flink application, users can supply the default number of slots to use for that job. The command line value therefore is called -p (for parallelism). In addition, it is possible to set the number of slots in the programming APIs for the whole application and for individual operators.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容