案例:
假设业务场景key经过取模操作后映射到[0,100) 的区间
stream.keyBy((KeySelector<Msg, Integer>) value -> value.key%100);
我们会发现个别subtask数据很多,数据keyby后分布不均匀,出现了数据倾斜的问题
原因:
某个key的数据要发往的subTask的具体实现逻辑
// Returns the logical channel index, to which the given record should be written.
KeyGroupStreamPartitioner.selectChannel(SerializationDelegate<StreamRecord<T>> record);
// record -> key
key = keySelector.getKey(record.getInstance().getValue());
// assigns the given key to a parallel operator index.
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
return computeOperatorIndexForKeyGroup(
maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}
computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
// 计算 key的keyGroup
public static int assignToKeyGroup(Object key, int maxParallelism) {
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
// 计算keyGroup 所属算子下标(即对应的subTask)
public static int computeOperatorIndexForKeyGroup(
int maxParallelism, int parallelism, int keyGroupId) {
return keyGroupId * parallelism / maxParallelism;
}
在key分布区间较小的情况下,我们模拟测试下该分配算法的均匀性
// 假设有取值为0~100的key, 算子parallelism设置为20,maxParallelism取默认值128
List<Integer> res = new ArrayList<>();
for (int i = 0; i < 100; i++) {
res.add(MathUtils.murmurHash(String.valueOf(i).hashCode()) % 128 * 20/128);
}
// 分布到 0 ~ 19 的subtask 消息数量如下, 数量波动在 3 ~ 9, 相差有3倍之多
{0=3, 1=7, 2=7, 3=5, 4=6, 5=12, 6=4, 7=8, 8=7, 9=6, 10=4, 11=6, 12=7, 13=5, 14=5, 15=9, 16=5, 17=4, 18=6, 19=4}
说明如果key分布区间较小,flink keyby后很容易出现数据倾斜
解决
public static Integer[] createRebalanceKeys(int parallelism) {
//计算 max parallelism, 如果用户自定义了算子最大并行度,作为参数传入
int maxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism);
// flink keyGroupRange subTaskIndex -> keyGroup Set
HashMap<Integer, Set<Integer>> groupRanges = new HashMap<>();
// random key 取值区间 [0 : parallelism * 20)
int maxRandomKey = parallelism * 20;
for (int randomKey = 0; randomKey < maxRandomKey; randomKey++) {
// 根据flink内部keyGroupRange分配算法(上文所述)分配指定的key到所属的并行算子下标(指定的subtask)
int subtaskIndex = KeyGroupRangeAssignment.assignKeyToParallelOperator(randomKey, maxParallelism, parallelism);
Set<Integer> randomKeys = groupRanges.computeIfAbsent(subtaskIndex, k -> new LinkedHashSet<>());
randomKeys.add(randomKey);
}
// 填充 rebalanceKeys: 取对应subtask随机key任意一值
Integer[] rebalanceKeys = new Integer[parallelism];
for (int i = 0; i < parallelism; i++) {
// set findFirst
rebalanceKeys[i] = groupRanges.get(i).stream().findFirst().get();
}
return rebalanceKeys;
}
// 使用reblanceKeys 数组映射生成重平衡key
stream.keyBy((KeySelector<Msg, Integer>) value -> rebalanceKeys[value.key%parallelism]);