flink state根据是否是有key分为如下两种
- KeyedState
ValueState,ListState,ReducingState,AggregationState,MapState - OperatorState
ListState
keyGroup,状态重分布
为了 dynamically scale Flink operators that use partitoned(key-value) state, 使用key group概念把多个key进行分组
AbstractKeyedStateBackend#
- 创建keyedstate后台参数解释
StateBackend.java
创建接口
createOperatorStateBackend
createKeyedStateBackend
实现类
FsStateBackend
MemoryStateBackend
RocksDBStateBackend
使用
StreamTaskStateInitializerImpl.java
BackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle> backendRestorer =
new BackendRestorerProcedure<>(
() -> stateBackend.createKeyedStateBackend(
environment,
environment.getJobID(),
operatorIdentifierText,
keySerializer,
taskInfo.getMaxNumberOfParallelSubtasks(),
keyGroupRange,
environment.getTaskKvStateRegistry(),
TtlTimeProvider.DEFAULT,
metricGroup),
backendCloseableRegistry,
logDescription);
<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup) throws Exception;
- numberOfKeyGroups = subTask最大并行度 operator.setMaxParallelism(int maxParallelism)
final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
taskInfo.getMaxNumberOfParallelSubtasks(),
taskInfo.getNumberOfParallelSubtasks(),
taskInfo.getIndexOfThisSubtask());
StreamTransformation.java--
public void setMaxParallelism(int maxParallelism) {
Preconditions.checkArgument(maxParallelism > 0
&& maxParallelism <= StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM,
"Maximum parallelism must be between 1 and " + StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM
+ ". Found: " + maxParallelism);
this.maxParallelism = maxParallelism;
}
ExecutionJobVertex.java--
final int configuredMaxParallelism = jobVertex.getMaxParallelism();
this.maxParallelismConfigured = (VALUE_NOT_SET(-1) != configuredMaxParallelism);
//如果没有设置,则计算默认最大并行度
setMaxParallelismInternal(maxParallelismConfigured ?
configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices));
// 最小128
Math.min(
Math.max(
MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
DEFAULT_LOWER_BOUND_MAX_PARALLELISM 128 ),
UPPER_BOUND_MAX_PARALLELISM 32768);
- keyGroupRange
KeyGroupRangeAssignment.java
final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
taskInfo.getMaxNumberOfParallelSubtasks(),
taskInfo.getNumberOfParallelSubtasks(),
taskInfo.getIndexOfThisSubtask() 子任务的index [0-maxParallelism-1]);
计算单个key属于哪个keyGroup == Assigns the given key to a key-group index.
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
计算单个任务的keyGroupRange边界
public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
int maxParallelism,
int parallelism,
int operatorIndex) {
int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
return new KeyGroupRange(start, end);
}
并行度和最大并行度区别?
parallelism 系统默认并行度
maxParallelism operator 单个设置operator.setMaxParallelism(int maxParallelism)
DataStream.java setMaxParallelism()
参考
rocksdb概念
https://cloud.tencent.com/developer/article/1403939
namespace is operator-uid @ StateTable 概念
state最佳实践
https://www.cnblogs.com/rossiXYZ/p/12594315.html
State Processor API namespace-> operator.setUid(),就是算子uid
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html