序
本文主要研究一下flink的NetworkEnvironmentConfiguration
NetworkEnvironmentConfiguration
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
public class NetworkEnvironmentConfiguration {
private final float networkBufFraction;
private final long networkBufMin;
private final long networkBufMax;
private final int networkBufferSize;
private final IOMode ioMode;
private final int partitionRequestInitialBackoff;
private final int partitionRequestMaxBackoff;
private final int networkBuffersPerChannel;
private final int floatingNetworkBuffersPerGate;
private final NettyConfig nettyConfig;
/**
* Constructor for a setup with purely local communication (no netty).
*/
public NetworkEnvironmentConfiguration(
float networkBufFraction,
long networkBufMin,
long networkBufMax,
int networkBufferSize,
IOMode ioMode,
int partitionRequestInitialBackoff,
int partitionRequestMaxBackoff,
int networkBuffersPerChannel,
int floatingNetworkBuffersPerGate) {
this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize,
ioMode,
partitionRequestInitialBackoff, partitionRequestMaxBackoff,
networkBuffersPerChannel, floatingNetworkBuffersPerGate,
null);
}
public NetworkEnvironmentConfiguration(
float networkBufFraction,
long networkBufMin,
long networkBufMax,
int networkBufferSize,
IOMode ioMode,
int partitionRequestInitialBackoff,
int partitionRequestMaxBackoff,
int networkBuffersPerChannel,
int floatingNetworkBuffersPerGate,
@Nullable NettyConfig nettyConfig) {
this.networkBufFraction = networkBufFraction;
this.networkBufMin = networkBufMin;
this.networkBufMax = networkBufMax;
this.networkBufferSize = networkBufferSize;
this.ioMode = ioMode;
this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
this.networkBuffersPerChannel = networkBuffersPerChannel;
this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
this.nettyConfig = nettyConfig;
}
// ------------------------------------------------------------------------
public float networkBufFraction() {
return networkBufFraction;
}
public long networkBufMin() {
return networkBufMin;
}
public long networkBufMax() {
return networkBufMax;
}
public int networkBufferSize() {
return networkBufferSize;
}
public IOMode ioMode() {
return ioMode;
}
public int partitionRequestInitialBackoff() {
return partitionRequestInitialBackoff;
}
public int partitionRequestMaxBackoff() {
return partitionRequestMaxBackoff;
}
public int networkBuffersPerChannel() {
return networkBuffersPerChannel;
}
public int floatingNetworkBuffersPerGate() {
return floatingNetworkBuffersPerGate;
}
public NettyConfig nettyConfig() {
return nettyConfig;
}
// ------------------------------------------------------------------------
@Override
public int hashCode() {
int result = 1;
result = 31 * result + networkBufferSize;
result = 31 * result + ioMode.hashCode();
result = 31 * result + partitionRequestInitialBackoff;
result = 31 * result + partitionRequestMaxBackoff;
result = 31 * result + networkBuffersPerChannel;
result = 31 * result + floatingNetworkBuffersPerGate;
result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
else if (obj == null || getClass() != obj.getClass()) {
return false;
}
else {
final NetworkEnvironmentConfiguration that = (NetworkEnvironmentConfiguration) obj;
return this.networkBufFraction == that.networkBufFraction &&
this.networkBufMin == that.networkBufMin &&
this.networkBufMax == that.networkBufMax &&
this.networkBufferSize == that.networkBufferSize &&
this.partitionRequestInitialBackoff == that.partitionRequestInitialBackoff &&
this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff &&
this.networkBuffersPerChannel == that.networkBuffersPerChannel &&
this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate &&
this.ioMode == that.ioMode &&
(nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
}
}
@Override
public String toString() {
return "NetworkEnvironmentConfiguration{" +
"networkBufFraction=" + networkBufFraction +
", networkBufMin=" + networkBufMin +
", networkBufMax=" + networkBufMax +
", networkBufferSize=" + networkBufferSize +
", ioMode=" + ioMode +
", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff +
", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff +
", networkBuffersPerChannel=" + networkBuffersPerChannel +
", floatingNetworkBuffersPerGate=" + floatingNetworkBuffersPerGate +
", nettyConfig=" + nettyConfig +
'}';
}
}
- NetworkEnvironmentConfiguration主要是flink network的相关配置,里头有networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig属性
TaskManagerServicesConfiguration
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
public class TaskManagerServicesConfiguration {
//......
/**
* Creates the {@link NetworkEnvironmentConfiguration} from the given {@link Configuration}.
*
* @param configuration to create the network environment configuration from
* @param localTaskManagerCommunication true if task manager communication is local
* @param taskManagerAddress address of the task manager
* @param slots to start the task manager with
* @return Network environment configuration
*/
@SuppressWarnings("deprecation")
private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfiguration(
Configuration configuration,
boolean localTaskManagerCommunication,
InetAddress taskManagerAddress,
int slots) throws Exception {
// ----> hosts / ports for communication and data exchange
int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT);
checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
"Leave config parameter empty or use 0 to let the system choose a port automatically.");
checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(),
"Number of task slots must be at least one.");
final int pageSize = checkedDownCast(MemorySize.parse(configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());
// check page size of for minimum size
checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
"Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
// check page size for power of two
checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
"Memory segment size must be a power of 2.");
// network buffer memory fraction
float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
long networkBufMin = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
long networkBufMax = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
checkNetworkBufferConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax);
// fallback: number of network buffers
final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
checkNetworkConfigOld(numNetworkBuffers);
if (!hasNewNetworkBufConf(configuration)) {
// map old config to new one:
networkBufMin = networkBufMax = ((long) numNetworkBuffers) * pageSize;
} else {
if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
}
}
final NettyConfig nettyConfig;
if (!localTaskManagerCommunication) {
final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);
nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(),
taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration);
} else {
nettyConfig = null;
}
// Default spill I/O mode for intermediate results
final String syncOrAsync = configuration.getString(
ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
final IOManager.IOMode ioMode;
if (syncOrAsync.equals("async")) {
ioMode = IOManager.IOMode.ASYNC;
} else {
ioMode = IOManager.IOMode.SYNC;
}
int initialRequestBackoff = configuration.getInteger(
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
int maxRequestBackoff = configuration.getInteger(
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
int buffersPerChannel = configuration.getInteger(
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
int extraBuffersPerGate = configuration.getInteger(
TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
return new NetworkEnvironmentConfiguration(
networkBufFraction,
networkBufMin,
networkBufMax,
pageSize,
ioMode,
initialRequestBackoff,
maxRequestBackoff,
buffersPerChannel,
extraBuffersPerGate,
nettyConfig);
}
//......
}
- TaskManagerServicesConfiguration有个私有方法parseNetworkEnvironmentConfiguration,用于创建NetworkEnvironmentConfiguration;它会读取TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE等配置
TaskManagerOptions
flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@PublicEvolving
public class TaskManagerOptions {
//......
/**
* Size of memory buffers used by the network stack and the memory manager.
*/
public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
.defaultValue("32kb")
.withDescription("Size of memory buffers used by the network stack and the memory manager.");
/**
* Fraction of JVM memory to use for network buffers.
*/
public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
key("taskmanager.network.memory.fraction")
.defaultValue(0.1f)
.withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" +
" data exchange channels a TaskManager can have at the same time and how well buffered the channels" +
" are. If a job is rejected or you get a warning that the system has not enough buffers available," +
" increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" +
"` and \"taskmanager.network.memory.max\" may override this fraction.");
/**
* Minimum memory size for network buffers.
*/
public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MIN =
key("taskmanager.network.memory.min")
.defaultValue("64mb")
.withDescription("Minimum memory size for network buffers.");
/**
* Maximum memory size for network buffers.
*/
public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MAX =
key("taskmanager.network.memory.max")
.defaultValue("1gb")
.withDescription("Maximum memory size for network buffers.");
/**
* Number of buffers used in the network stack. This defines the number of possible tasks and
* shuffles.
*
* @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
* and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
*/
@Deprecated
public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
key("taskmanager.network.numberOfBuffers")
.defaultValue(2048);
/**
* Minimum backoff for partition requests of input channels.
*/
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
key("taskmanager.network.request-backoff.initial")
.defaultValue(100)
.withDeprecatedKeys("taskmanager.net.request-backoff.initial")
.withDescription("Minimum backoff in milliseconds for partition requests of input channels.");
/**
* Maximum backoff for partition requests of input channels.
*/
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
key("taskmanager.network.request-backoff.max")
.defaultValue(10000)
.withDeprecatedKeys("taskmanager.net.request-backoff.max")
.withDescription("Maximum backoff in milliseconds for partition requests of input channels.");
/**
* Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
*
* <p>Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
*/
public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
key("taskmanager.network.memory.buffers-per-channel")
.defaultValue(2)
.withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
"In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
" configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
" for parallel serialization.");
/**
* Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).
*/
public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
key("taskmanager.network.memory.floating-buffers-per-gate")
.defaultValue(8)
.withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." +
" In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." +
" The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" +
" help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" +
" increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");
//......
}
- taskmanager.memory.segment-size指定memory segment的大小,默认为32kb;taskmanager.network.memory.fraction指定network buffers使用的memory的比例,默认为0.1;taskmanager.network.memory.min指定network buffers使用的最小内存,默认为64mb;taskmanager.network.memory.max指定network buffers使用的最大内存,默认为1gb;taskmanager.network.numberOfBuffers指定network使用的buffers数量,默认为2048,该配置已经被废弃,使用taskmanager.network.memory.fraction、taskmanager.network.memory.min、taskmanager.network.memory.max这几个配置来替代
- taskmanager.network.request-backoff.initial指定input channels的partition requests的最小backoff时间(
毫秒
),默认为100;taskmanager.network.request-backoff.max指定input channels的partition requests的最大backoff时间(毫秒
),默认为10000 - taskmanager.network.memory.buffers-per-channel指定每个outgoing/incoming channel使用buffers数量,默认为2;taskmanager.network.memory.floating-buffers-per-gate指定每个outgoing/incoming gate使用buffers数量,默认为8
NettyConfig
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
public class NettyConfig {
private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class);
// - Config keys ----------------------------------------------------------
public static final ConfigOption<Integer> NUM_ARENAS = ConfigOptions
.key("taskmanager.network.netty.num-arenas")
.defaultValue(-1)
.withDeprecatedKeys("taskmanager.net.num-arenas")
.withDescription("The number of Netty arenas.");
public static final ConfigOption<Integer> NUM_THREADS_SERVER = ConfigOptions
.key("taskmanager.network.netty.server.numThreads")
.defaultValue(-1)
.withDeprecatedKeys("taskmanager.net.server.numThreads")
.withDescription("The number of Netty server threads.");
public static final ConfigOption<Integer> NUM_THREADS_CLIENT = ConfigOptions
.key("taskmanager.network.netty.client.numThreads")
.defaultValue(-1)
.withDeprecatedKeys("taskmanager.net.client.numThreads")
.withDescription("The number of Netty client threads.");
public static final ConfigOption<Integer> CONNECT_BACKLOG = ConfigOptions
.key("taskmanager.network.netty.server.backlog")
.defaultValue(0) // default: 0 => Netty's default
.withDeprecatedKeys("taskmanager.net.server.backlog")
.withDescription("The netty server connection backlog.");
public static final ConfigOption<Integer> CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions
.key("taskmanager.network.netty.client.connectTimeoutSec")
.defaultValue(120) // default: 120s = 2min
.withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec")
.withDescription("The Netty client connection timeout.");
public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE = ConfigOptions
.key("taskmanager.network.netty.sendReceiveBufferSize")
.defaultValue(0) // default: 0 => Netty's default
.withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize")
.withDescription("The Netty send and receive buffer size. This defaults to the system buffer size" +
" (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.");
public static final ConfigOption<String> TRANSPORT_TYPE = ConfigOptions
.key("taskmanager.network.netty.transport")
.defaultValue("nio")
.withDeprecatedKeys("taskmanager.net.transport")
.withDescription("The Netty transport type, either \"nio\" or \"epoll\"");
// ------------------------------------------------------------------------
enum TransportType {
NIO, EPOLL, AUTO
}
static final String SERVER_THREAD_GROUP_NAME = "Flink Netty Server";
static final String CLIENT_THREAD_GROUP_NAME = "Flink Netty Client";
private final InetAddress serverAddress;
private final int serverPort;
private final int memorySegmentSize;
private final int numberOfSlots;
private final Configuration config; // optional configuration
public NettyConfig(
InetAddress serverAddress,
int serverPort,
int memorySegmentSize,
int numberOfSlots,
Configuration config) {
this.serverAddress = checkNotNull(serverAddress);
checkArgument(serverPort >= 0 && serverPort <= 65536, "Invalid port number.");
this.serverPort = serverPort;
checkArgument(memorySegmentSize > 0, "Invalid memory segment size.");
this.memorySegmentSize = memorySegmentSize;
checkArgument(numberOfSlots > 0, "Number of slots");
this.numberOfSlots = numberOfSlots;
this.config = checkNotNull(config);
LOG.info(this.toString());
}
InetAddress getServerAddress() {
return serverAddress;
}
int getServerPort() {
return serverPort;
}
int getMemorySegmentSize() {
return memorySegmentSize;
}
public int getNumberOfSlots() {
return numberOfSlots;
}
// ------------------------------------------------------------------------
// Getters
// ------------------------------------------------------------------------
public int getServerConnectBacklog() {
return config.getInteger(CONNECT_BACKLOG);
}
public int getNumberOfArenas() {
// default: number of slots
final int configValue = config.getInteger(NUM_ARENAS);
return configValue == -1 ? numberOfSlots : configValue;
}
public int getServerNumThreads() {
// default: number of task slots
final int configValue = config.getInteger(NUM_THREADS_SERVER);
return configValue == -1 ? numberOfSlots : configValue;
}
public int getClientNumThreads() {
// default: number of task slots
final int configValue = config.getInteger(NUM_THREADS_CLIENT);
return configValue == -1 ? numberOfSlots : configValue;
}
public int getClientConnectTimeoutSeconds() {
return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS);
}
public int getSendAndReceiveBufferSize() {
return config.getInteger(SEND_RECEIVE_BUFFER_SIZE);
}
public TransportType getTransportType() {
String transport = config.getString(TRANSPORT_TYPE);
switch (transport) {
case "nio":
return TransportType.NIO;
case "epoll":
return TransportType.EPOLL;
default:
return TransportType.AUTO;
}
}
@Nullable
public SSLHandlerFactory createClientSSLEngineFactory() throws Exception {
return getSSLEnabled() ?
SSLUtils.createInternalClientSSLEngineFactory(config) :
null;
}
@Nullable
public SSLHandlerFactory createServerSSLEngineFactory() throws Exception {
return getSSLEnabled() ?
SSLUtils.createInternalServerSSLEngineFactory(config) :
null;
}
public boolean getSSLEnabled() {
return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED)
&& SSLUtils.isInternalSSLEnabled(config);
}
public boolean isCreditBasedEnabled() {
return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
}
public Configuration getConfig() {
return config;
}
@Override
public String toString() {
String format = "NettyConfig [" +
"server address: %s, " +
"server port: %d, " +
"ssl enabled: %s, " +
"memory segment size (bytes): %d, " +
"transport type: %s, " +
"number of server threads: %d (%s), " +
"number of client threads: %d (%s), " +
"server connect backlog: %d (%s), " +
"client connect timeout (sec): %d, " +
"send/receive buffer size (bytes): %d (%s)]";
String def = "use Netty's default";
String man = "manual";
return String.format(format, serverAddress, serverPort, getSSLEnabled() ? "true" : "false",
memorySegmentSize, getTransportType(), getServerNumThreads(),
getServerNumThreads() == 0 ? def : man,
getClientNumThreads(), getClientNumThreads() == 0 ? def : man,
getServerConnectBacklog(), getServerConnectBacklog() == 0 ? def : man,
getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(),
getSendAndReceiveBufferSize() == 0 ? def : man);
}
}
- NettyConfig的构造器接收serverAddress、serverPort、memorySegmentSize、numberOfSlots、config这几个参数;它还提供了getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType等方法用于从config读取配置
- taskmanager.network.netty.server.backlog用于指定netty server的connection backlog,默认值为0即使用netty默认的配置;taskmanager.network.netty.client.connectTimeoutSec指定netty client的connection timeout,默认为120(
单位秒
);taskmanager.network.netty.sendReceiveBufferSize指定netty send/receive buffer大小,默认为0即使用netty的默认配置,默认是使用system buffer size,即/proc/sys/net/ipv4/tcp_[rw]mem的配置;taskmanager.network.netty.transport指定的是netty transport的类型,默认是nio - taskmanager.network.netty.num-arenas指定的是netty arenas的数量,默认为-1;taskmanager.network.netty.server.numThreads指定的是netty server的threads数量,默认为-1;taskmanager.network.netty.client.numThreads指定的是netty client的threads数量,默认为-1;这几个配置当配置值为-1的时候,对应get方法返回的是numberOfSlots值
小结
- NetworkEnvironmentConfiguration主要是flink network的相关配置,里头有networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig属性
- TaskManagerServicesConfiguration有个私有方法parseNetworkEnvironmentConfiguration,用于创建NetworkEnvironmentConfiguration;它会读取TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE等配置
- NettyConfig的构造器接收serverAddress、serverPort、memorySegmentSize、numberOfSlots、config这几个参数;它还提供了getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType等方法用于从config读取配置