flink keyby 后分布不均匀-数据倾斜问题解决

案例:

假设业务场景key经过取模操作后映射到[0,100) 的区间

   stream.keyBy((KeySelector<Msg, Integer>) value -> value.key%100);

我们会发现个别subtask数据很多,数据keyby后分布不均匀,出现了数据倾斜的问题

原因:

某个key的数据要发往的subTask的具体实现逻辑

// Returns the logical channel index, to which the given record should be written. 
KeyGroupStreamPartitioner.selectChannel(SerializationDelegate<StreamRecord<T>> record);
// record -> key
key = keySelector.getKey(record.getInstance().getValue());

// assigns the given key to a parallel operator index.
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
     return computeOperatorIndexForKeyGroup(
               maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}
computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));

// 计算 key的keyGroup
public static int assignToKeyGroup(Object key, int maxParallelism) {
       return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
     return MathUtils.murmurHash(keyHash) % maxParallelism;
}

// 计算keyGroup 所属算子下标(即对应的subTask)
public static int computeOperatorIndexForKeyGroup(
            int maxParallelism, int parallelism, int keyGroupId) {
        return keyGroupId * parallelism / maxParallelism;
    }

在key分布区间较小的情况下,我们模拟测试下该分配算法的均匀性

// 假设有取值为0~100的key, 算子parallelism设置为20,maxParallelism取默认值128 
List<Integer> res = new ArrayList<>();
for (int i = 0; i < 100; i++) {
       res.add(MathUtils.murmurHash(String.valueOf(i).hashCode()) % 128 * 20/128);
}
// 分布到 0 ~ 19 的subtask 消息数量如下, 数量波动在 3 ~ 9, 相差有3倍之多
{0=3, 1=7, 2=7, 3=5, 4=6, 5=12, 6=4, 7=8, 8=7, 9=6, 10=4, 11=6, 12=7, 13=5, 14=5, 15=9, 16=5, 17=4, 18=6, 19=4}

说明如果key分布区间较小,flink keyby后很容易出现数据倾斜

解决
   public static Integer[] createRebalanceKeys(int parallelism) {
        //计算 max parallelism, 如果用户自定义了算子最大并行度,作为参数传入
        int maxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism);
        // flink keyGroupRange   subTaskIndex -> keyGroup Set
        HashMap<Integer, Set<Integer>> groupRanges = new HashMap<>();
        // random key 取值区间 [0 : parallelism * 20)
        int maxRandomKey = parallelism * 20;
        for (int randomKey = 0; randomKey < maxRandomKey; randomKey++) {
        // 根据flink内部keyGroupRange分配算法(上文所述)分配指定的key到所属的并行算子下标(指定的subtask)
            int subtaskIndex = KeyGroupRangeAssignment.assignKeyToParallelOperator(randomKey, maxParallelism, parallelism);
            Set<Integer> randomKeys = groupRanges.computeIfAbsent(subtaskIndex, k -> new LinkedHashSet<>());
            randomKeys.add(randomKey);
        }
      // 填充 rebalanceKeys: 取对应subtask随机key任意一值
        Integer[] rebalanceKeys = new Integer[parallelism];
        for (int i = 0; i < parallelism; i++) {
            // set findFirst
            rebalanceKeys[i] = groupRanges.get(i).stream().findFirst().get();
        }
        return rebalanceKeys;
    }
  
    // 使用reblanceKeys 数组映射生成重平衡key
   stream.keyBy((KeySelector<Msg, Integer>) value -> rebalanceKeys[value.key%parallelism]);
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容