聊聊flink的NetworkEnvironmentConfiguration

本文主要研究一下flink的NetworkEnvironmentConfiguration

NetworkEnvironmentConfiguration

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java

public class NetworkEnvironmentConfiguration {

    private final float networkBufFraction;

    private final long networkBufMin;

    private final long networkBufMax;

    private final int networkBufferSize;

    private final IOMode ioMode;

    private final int partitionRequestInitialBackoff;

    private final int partitionRequestMaxBackoff;

    private final int networkBuffersPerChannel;

    private final int floatingNetworkBuffersPerGate;

    private final NettyConfig nettyConfig;

    /**
     * Constructor for a setup with purely local communication (no netty).
     */
    public NetworkEnvironmentConfiguration(
            float networkBufFraction,
            long networkBufMin,
            long networkBufMax,
            int networkBufferSize,
            IOMode ioMode,
            int partitionRequestInitialBackoff,
            int partitionRequestMaxBackoff,
            int networkBuffersPerChannel,
            int floatingNetworkBuffersPerGate) {

        this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize,
                ioMode,
                partitionRequestInitialBackoff, partitionRequestMaxBackoff,
                networkBuffersPerChannel, floatingNetworkBuffersPerGate,
                null);
        
    }

    public NetworkEnvironmentConfiguration(
            float networkBufFraction,
            long networkBufMin,
            long networkBufMax,
            int networkBufferSize,
            IOMode ioMode,
            int partitionRequestInitialBackoff,
            int partitionRequestMaxBackoff,
            int networkBuffersPerChannel,
            int floatingNetworkBuffersPerGate,
            @Nullable NettyConfig nettyConfig) {

        this.networkBufFraction = networkBufFraction;
        this.networkBufMin = networkBufMin;
        this.networkBufMax = networkBufMax;
        this.networkBufferSize = networkBufferSize;
        this.ioMode = ioMode;
        this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
        this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
        this.networkBuffersPerChannel = networkBuffersPerChannel;
        this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
        this.nettyConfig = nettyConfig;
    }

    // ------------------------------------------------------------------------

    public float networkBufFraction() {
        return networkBufFraction;
    }

    public long networkBufMin() {
        return networkBufMin;
    }

    public long networkBufMax() {
        return networkBufMax;
    }

    public int networkBufferSize() {
        return networkBufferSize;
    }

    public IOMode ioMode() {
        return ioMode;
    }

    public int partitionRequestInitialBackoff() {
        return partitionRequestInitialBackoff;
    }

    public int partitionRequestMaxBackoff() {
        return partitionRequestMaxBackoff;
    }

    public int networkBuffersPerChannel() {
        return networkBuffersPerChannel;
    }

    public int floatingNetworkBuffersPerGate() {
        return floatingNetworkBuffersPerGate;
    }

    public NettyConfig nettyConfig() {
        return nettyConfig;
    }

    // ------------------------------------------------------------------------

    @Override
    public int hashCode() {
        int result = 1;
        result = 31 * result + networkBufferSize;
        result = 31 * result + ioMode.hashCode();
        result = 31 * result + partitionRequestInitialBackoff;
        result = 31 * result + partitionRequestMaxBackoff;
        result = 31 * result + networkBuffersPerChannel;
        result = 31 * result + floatingNetworkBuffersPerGate;
        result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        else if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        else {
            final NetworkEnvironmentConfiguration that = (NetworkEnvironmentConfiguration) obj;

            return this.networkBufFraction == that.networkBufFraction &&
                    this.networkBufMin == that.networkBufMin &&
                    this.networkBufMax == that.networkBufMax &&
                    this.networkBufferSize == that.networkBufferSize &&
                    this.partitionRequestInitialBackoff == that.partitionRequestInitialBackoff &&
                    this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff &&
                    this.networkBuffersPerChannel == that.networkBuffersPerChannel &&
                    this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate &&
                    this.ioMode == that.ioMode && 
                    (nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
        }
    }

    @Override
    public String toString() {
        return "NetworkEnvironmentConfiguration{" +
                "networkBufFraction=" + networkBufFraction +
                ", networkBufMin=" + networkBufMin +
                ", networkBufMax=" + networkBufMax +
                ", networkBufferSize=" + networkBufferSize +
                ", ioMode=" + ioMode +
                ", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff +
                ", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff +
                ", networkBuffersPerChannel=" + networkBuffersPerChannel +
                ", floatingNetworkBuffersPerGate=" + floatingNetworkBuffersPerGate +
                ", nettyConfig=" + nettyConfig +
                '}';
    }
}
  • NetworkEnvironmentConfiguration主要是flink network的相关配置,里头有networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig属性

TaskManagerServicesConfiguration

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java

public class TaskManagerServicesConfiguration {

    //......

    /**
     * Creates the {@link NetworkEnvironmentConfiguration} from the given {@link Configuration}.
     *
     * @param configuration to create the network environment configuration from
     * @param localTaskManagerCommunication true if task manager communication is local
     * @param taskManagerAddress address of the task manager
     * @param slots to start the task manager with
     * @return Network environment configuration
     */
    @SuppressWarnings("deprecation")
    private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfiguration(
        Configuration configuration,
        boolean localTaskManagerCommunication,
        InetAddress taskManagerAddress,
        int slots) throws Exception {

        // ----> hosts / ports for communication and data exchange

        int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT);

        checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
            "Leave config parameter empty or use 0 to let the system choose a port automatically.");

        checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(),
            "Number of task slots must be at least one.");

        final int pageSize = checkedDownCast(MemorySize.parse(configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());

        // check page size of for minimum size
        checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
            TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
            "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);

        // check page size for power of two
        checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
            TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
            "Memory segment size must be a power of 2.");

        // network buffer memory fraction

        float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
        long networkBufMin = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
        long networkBufMax = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
        checkNetworkBufferConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax);

        // fallback: number of network buffers
        final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
        checkNetworkConfigOld(numNetworkBuffers);

        if (!hasNewNetworkBufConf(configuration)) {
            // map old config to new one:
            networkBufMin = networkBufMax = ((long) numNetworkBuffers) * pageSize;
        } else {
            if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
                LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
                    TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
            }
        }

        final NettyConfig nettyConfig;
        if (!localTaskManagerCommunication) {
            final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);

            nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(),
                taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration);
        } else {
            nettyConfig = null;
        }

        // Default spill I/O mode for intermediate results
        final String syncOrAsync = configuration.getString(
            ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
            ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);

        final IOManager.IOMode ioMode;
        if (syncOrAsync.equals("async")) {
            ioMode = IOManager.IOMode.ASYNC;
        } else {
            ioMode = IOManager.IOMode.SYNC;
        }

        int initialRequestBackoff = configuration.getInteger(
            TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
        int maxRequestBackoff = configuration.getInteger(
            TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);

        int buffersPerChannel = configuration.getInteger(
            TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
        int extraBuffersPerGate = configuration.getInteger(
            TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);

        return new NetworkEnvironmentConfiguration(
            networkBufFraction,
            networkBufMin,
            networkBufMax,
            pageSize,
            ioMode,
            initialRequestBackoff,
            maxRequestBackoff,
            buffersPerChannel,
            extraBuffersPerGate,
            nettyConfig);
    }

    //......
}
  • TaskManagerServicesConfiguration有个私有方法parseNetworkEnvironmentConfiguration,用于创建NetworkEnvironmentConfiguration;它会读取TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE等配置

TaskManagerOptions

flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java

@PublicEvolving
public class TaskManagerOptions {
    //......

    /**
     * Size of memory buffers used by the network stack and the memory manager.
     */
    public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
            key("taskmanager.memory.segment-size")
            .defaultValue("32kb")
            .withDescription("Size of memory buffers used by the network stack and the memory manager.");

    /**
     * Fraction of JVM memory to use for network buffers.
     */
    public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
            key("taskmanager.network.memory.fraction")
            .defaultValue(0.1f)
            .withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" +
                " data exchange channels a TaskManager can have at the same time and how well buffered the channels" +
                " are. If a job is rejected or you get a warning that the system has not enough buffers available," +
                " increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" +
                "` and \"taskmanager.network.memory.max\" may override this fraction.");

    /**
     * Minimum memory size for network buffers.
     */
    public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MIN =
            key("taskmanager.network.memory.min")
            .defaultValue("64mb")
            .withDescription("Minimum memory size for network buffers.");

    /**
     * Maximum memory size for network buffers.
     */
    public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MAX =
            key("taskmanager.network.memory.max")
            .defaultValue("1gb")
            .withDescription("Maximum memory size for network buffers.");

    /**
     * Number of buffers used in the network stack. This defines the number of possible tasks and
     * shuffles.
     *
     * @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
     * and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
     */
    @Deprecated
    public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
            key("taskmanager.network.numberOfBuffers")
            .defaultValue(2048);

    /**
     * Minimum backoff for partition requests of input channels.
     */
    public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
            key("taskmanager.network.request-backoff.initial")
            .defaultValue(100)
            .withDeprecatedKeys("taskmanager.net.request-backoff.initial")
            .withDescription("Minimum backoff in milliseconds for partition requests of input channels.");

    /**
     * Maximum backoff for partition requests of input channels.
     */
    public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
            key("taskmanager.network.request-backoff.max")
            .defaultValue(10000)
            .withDeprecatedKeys("taskmanager.net.request-backoff.max")
            .withDescription("Maximum backoff in milliseconds for partition requests of input channels.");

    /**
     * Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
     *
     * <p>Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
     */
    public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
            key("taskmanager.network.memory.buffers-per-channel")
            .defaultValue(2)
            .withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
                "In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
                " configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
                " for parallel serialization.");

    /**
     * Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).
     */
    public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
            key("taskmanager.network.memory.floating-buffers-per-gate")
            .defaultValue(8)
            .withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." +
                " In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." +
                " The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" +
                " help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" +
                " increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");                                                                                                        
    //......
}
  • taskmanager.memory.segment-size指定memory segment的大小,默认为32kb;taskmanager.network.memory.fraction指定network buffers使用的memory的比例,默认为0.1;taskmanager.network.memory.min指定network buffers使用的最小内存,默认为64mb;taskmanager.network.memory.max指定network buffers使用的最大内存,默认为1gb;taskmanager.network.numberOfBuffers指定network使用的buffers数量,默认为2048,该配置已经被废弃,使用taskmanager.network.memory.fraction、taskmanager.network.memory.min、taskmanager.network.memory.max这几个配置来替代
  • taskmanager.network.request-backoff.initial指定input channels的partition requests的最小backoff时间(毫秒),默认为100;taskmanager.network.request-backoff.max指定input channels的partition requests的最大backoff时间(毫秒),默认为10000
  • taskmanager.network.memory.buffers-per-channel指定每个outgoing/incoming channel使用buffers数量,默认为2;taskmanager.network.memory.floating-buffers-per-gate指定每个outgoing/incoming gate使用buffers数量,默认为8

NettyConfig

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java

public class NettyConfig {

    private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class);

    // - Config keys ----------------------------------------------------------

    public static final ConfigOption<Integer> NUM_ARENAS = ConfigOptions
            .key("taskmanager.network.netty.num-arenas")
            .defaultValue(-1)
            .withDeprecatedKeys("taskmanager.net.num-arenas")
            .withDescription("The number of Netty arenas.");

    public static final ConfigOption<Integer> NUM_THREADS_SERVER = ConfigOptions
            .key("taskmanager.network.netty.server.numThreads")
            .defaultValue(-1)
            .withDeprecatedKeys("taskmanager.net.server.numThreads")
            .withDescription("The number of Netty server threads.");

    public static final ConfigOption<Integer> NUM_THREADS_CLIENT = ConfigOptions
            .key("taskmanager.network.netty.client.numThreads")
            .defaultValue(-1)
            .withDeprecatedKeys("taskmanager.net.client.numThreads")
            .withDescription("The number of Netty client threads.");

    public static final ConfigOption<Integer> CONNECT_BACKLOG = ConfigOptions
            .key("taskmanager.network.netty.server.backlog")
            .defaultValue(0) // default: 0 => Netty's default
            .withDeprecatedKeys("taskmanager.net.server.backlog")
            .withDescription("The netty server connection backlog.");

    public static final ConfigOption<Integer> CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions
            .key("taskmanager.network.netty.client.connectTimeoutSec")
            .defaultValue(120) // default: 120s = 2min
            .withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec")
            .withDescription("The Netty client connection timeout.");

    public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE = ConfigOptions
            .key("taskmanager.network.netty.sendReceiveBufferSize")
            .defaultValue(0) // default: 0 => Netty's default
            .withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize")
            .withDescription("The Netty send and receive buffer size. This defaults to the system buffer size" +
                " (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.");

    public static final ConfigOption<String> TRANSPORT_TYPE = ConfigOptions
            .key("taskmanager.network.netty.transport")
            .defaultValue("nio")
            .withDeprecatedKeys("taskmanager.net.transport")
            .withDescription("The Netty transport type, either \"nio\" or \"epoll\"");

    // ------------------------------------------------------------------------

    enum TransportType {
        NIO, EPOLL, AUTO
    }

    static final String SERVER_THREAD_GROUP_NAME = "Flink Netty Server";

    static final String CLIENT_THREAD_GROUP_NAME = "Flink Netty Client";

    private final InetAddress serverAddress;

    private final int serverPort;

    private final int memorySegmentSize;

    private final int numberOfSlots;

    private final Configuration config; // optional configuration

    public NettyConfig(
            InetAddress serverAddress,
            int serverPort,
            int memorySegmentSize,
            int numberOfSlots,
            Configuration config) {

        this.serverAddress = checkNotNull(serverAddress);

        checkArgument(serverPort >= 0 && serverPort <= 65536, "Invalid port number.");
        this.serverPort = serverPort;

        checkArgument(memorySegmentSize > 0, "Invalid memory segment size.");
        this.memorySegmentSize = memorySegmentSize;

        checkArgument(numberOfSlots > 0, "Number of slots");
        this.numberOfSlots = numberOfSlots;

        this.config = checkNotNull(config);

        LOG.info(this.toString());
    }

    InetAddress getServerAddress() {
        return serverAddress;
    }

    int getServerPort() {
        return serverPort;
    }

    int getMemorySegmentSize() {
        return memorySegmentSize;
    }

    public int getNumberOfSlots() {
        return numberOfSlots;
    }

    // ------------------------------------------------------------------------
    // Getters
    // ------------------------------------------------------------------------

    public int getServerConnectBacklog() {
        return config.getInteger(CONNECT_BACKLOG);
    }

    public int getNumberOfArenas() {
        // default: number of slots
        final int configValue = config.getInteger(NUM_ARENAS);
        return configValue == -1 ? numberOfSlots : configValue;
    }

    public int getServerNumThreads() {
        // default: number of task slots
        final int configValue = config.getInteger(NUM_THREADS_SERVER);
        return configValue == -1 ? numberOfSlots : configValue;
    }

    public int getClientNumThreads() {
        // default: number of task slots
        final int configValue = config.getInteger(NUM_THREADS_CLIENT);
        return configValue == -1 ? numberOfSlots : configValue;
    }

    public int getClientConnectTimeoutSeconds() {
        return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS);
    }

    public int getSendAndReceiveBufferSize() {
        return config.getInteger(SEND_RECEIVE_BUFFER_SIZE);
    }

    public TransportType getTransportType() {
        String transport = config.getString(TRANSPORT_TYPE);

        switch (transport) {
            case "nio":
                return TransportType.NIO;
            case "epoll":
                return TransportType.EPOLL;
            default:
                return TransportType.AUTO;
        }
    }

    @Nullable
    public SSLHandlerFactory createClientSSLEngineFactory() throws Exception {
        return getSSLEnabled() ?
                SSLUtils.createInternalClientSSLEngineFactory(config) :
                null;
    }

    @Nullable
    public SSLHandlerFactory createServerSSLEngineFactory() throws Exception {
        return getSSLEnabled() ?
                SSLUtils.createInternalServerSSLEngineFactory(config) :
                null;
    }

    public boolean getSSLEnabled() {
        return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED)
            && SSLUtils.isInternalSSLEnabled(config);
    }

    public boolean isCreditBasedEnabled() {
        return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
    }

    public Configuration getConfig() {
        return config;
    }

    @Override
    public String toString() {
        String format = "NettyConfig [" +
                "server address: %s, " +
                "server port: %d, " +
                "ssl enabled: %s, " +
                "memory segment size (bytes): %d, " +
                "transport type: %s, " +
                "number of server threads: %d (%s), " +
                "number of client threads: %d (%s), " +
                "server connect backlog: %d (%s), " +
                "client connect timeout (sec): %d, " +
                "send/receive buffer size (bytes): %d (%s)]";

        String def = "use Netty's default";
        String man = "manual";

        return String.format(format, serverAddress, serverPort, getSSLEnabled() ? "true" : "false",
                memorySegmentSize, getTransportType(), getServerNumThreads(),
                getServerNumThreads() == 0 ? def : man,
                getClientNumThreads(), getClientNumThreads() == 0 ? def : man,
                getServerConnectBacklog(), getServerConnectBacklog() == 0 ? def : man,
                getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(),
                getSendAndReceiveBufferSize() == 0 ? def : man);
    }
}
  • NettyConfig的构造器接收serverAddress、serverPort、memorySegmentSize、numberOfSlots、config这几个参数;它还提供了getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType等方法用于从config读取配置
  • taskmanager.network.netty.server.backlog用于指定netty server的connection backlog,默认值为0即使用netty默认的配置;taskmanager.network.netty.client.connectTimeoutSec指定netty client的connection timeout,默认为120(单位秒);taskmanager.network.netty.sendReceiveBufferSize指定netty send/receive buffer大小,默认为0即使用netty的默认配置,默认是使用system buffer size,即/proc/sys/net/ipv4/tcp_[rw]mem的配置;taskmanager.network.netty.transport指定的是netty transport的类型,默认是nio
  • taskmanager.network.netty.num-arenas指定的是netty arenas的数量,默认为-1;taskmanager.network.netty.server.numThreads指定的是netty server的threads数量,默认为-1;taskmanager.network.netty.client.numThreads指定的是netty client的threads数量,默认为-1;这几个配置当配置值为-1的时候,对应get方法返回的是numberOfSlots值

小结

  • NetworkEnvironmentConfiguration主要是flink network的相关配置,里头有networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig属性
  • TaskManagerServicesConfiguration有个私有方法parseNetworkEnvironmentConfiguration,用于创建NetworkEnvironmentConfiguration;它会读取TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE等配置
  • NettyConfig的构造器接收serverAddress、serverPort、memorySegmentSize、numberOfSlots、config这几个参数;它还提供了getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType等方法用于从config读取配置

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容