flink并行度与kafa分区(partition)设置

flink并行度与kafa分区(partition)设置

flink Connector-kafka

//TODO文章 connector系统文章,等待编写

原理

采用取模运算;平衡 kafka partition与并行度关系。

取模运算原理见取模运算理解

计算公式

kafkaPartition mod 并行度总数 = 分配到并行度中的partition

例子:partition 个数为 6;并行度为 3

partition 取模 取模值 并行度分配
partition-0 partition-0 mod 3 = 0 0 parallel-0
partition-1 partition-1 mod 3 = 1 1 parallel-1
partition-2 partition-2 mod 3 = 2 2 parallel-2
partition-3 partition-3 mod 3 = 0 0 parallel-0
partition-4 partition-4 mod 3 = 1 1 parallel-1
partition-5 partition-5 mod 3 = 2 2 parallel-2

图示如下:

partition取模.png

如上分析,如果并行度 大于 partition总数,那么多余的并行度分配不到 partition,该并行度也就不会有数据
如下图:3个kafka partition,flink设置4个并行度为例,编号为3的并行度将获取不到数据

partition取模1.png

源码分析

由于源码比较多,为了代码便于阅读,只抽取关键的代码

FlinkKafkaConsumerBase

public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
        CheckpointListener,
        ResultTypeQueryable<T>,
        CheckpointedFunction {
            
    /** 
      * The partition discoverer, used to find new partitions.
      * 分区 discover
      */
    private transient volatile AbstractPartitionDiscoverer partitionDiscoverer;
            
    /** 
      * Describes whether we are discovering partitions for fixed topics or a topic pattern.
      * topic 描述
      */
    private final KafkaTopicsDescriptor topicsDescriptor;
    
    //构造器
    public FlinkKafkaConsumerBase(
            List<String> topics,
            Pattern topicPattern,
            KafkaDeserializationSchema<T> deserializer,
            long discoveryIntervalMillis,
            boolean useMetrics) {
         // topicsDescriptor 创建
        this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern);
        //...
    }
    
    @Override
    public void open(Configuration configuration) throws Exception {
        // create the partition discoverer
        this.partitionDiscoverer = createPartitionDiscoverer(
                topicsDescriptor,
                getRuntimeContext().getIndexOfThisSubtask(),//当前并行度 id
                getRuntimeContext().getNumberOfParallelSubtasks());//所有并行度总数
        this.partitionDiscoverer.open();
         //获取当前并行度 分配的 kafka partitions
         final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
         //...
    }
            
    /**
     * Creates the partition discoverer that is used to find new partitions for this subtask.
     *
     * @param topicsDescriptor Descriptor that describes whether we are discovering partitions for fixed topics or a topic pattern.
     * @param indexOfThisSubtask The index of this consumer subtask.
     * @param numParallelSubtasks The total number of parallel consumer subtasks.
     *
     * @return The instantiated partition discoverer
     */
    protected abstract AbstractPartitionDiscoverer createPartitionDiscoverer(
            KafkaTopicsDescriptor topicsDescriptor,
            int indexOfThisSubtask,
            int numParallelSubtasks);
        
}

AbstractPartitionDiscoverer : 该类为抽象类,有些方法实现在各个版本的kafka实现类中

public abstract class AbstractPartitionDiscoverer {
    
    /**
      * 当前并行度 id
      * Index of the consumer subtask that this partition discoverer belongs to.
      */
    private final int indexOfThisSubtask;

    /** 
      * 所有并行度总数
      * The total number of consumer subtasks.
      */
    private final int numParallelSubtasks;
    
    public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
    
        //获取 所有 partitions
        List<KafkaTopicPartition> newDiscoveredPartitions = 
            //各版本的kafka实现类中
            getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
        //移除掉不属于该并行度 中的 partition
        Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
        KafkaTopicPartition nextPartition;
        while (iter.hasNext()) {
            nextPartition = iter.next();
            if (!setAndCheckDiscoveredPartition(nextPartition)) {
                iter.remove();
            }
        }
        
    }
    
    //判断是否是当前并行度的 任务
    public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
        if (isUndiscoveredPartition(partition)) {
            discoveredPartitions.add(partition);

            return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
        }

        return false;
    }
}

KafkaTopicPartitionAssigner

public class KafkaTopicPartitionAssigner {
    //取模算法
    public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
        int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
        return (startIndex + partition.getPartition()) % numParallelSubtasks;
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 一。 概述 在流计算场景中,数据没有边界源源不断的流入的,每条数据流入都可能会触发计算,比如在进行count或su...
    神奇的考拉阅读 15,810评论 0 8
  • Flink从外部数据源持续接收数据,每接收一条数据就会触发相应的计算操作。当Flink对数据进行聚合操作时,不可能...
    todd5167阅读 1,695评论 0 4
  • 当要拒绝做某件事时,最好使用客观的原因,而不是主观的原因。一来拒绝的原因是客观事实,明白人也就心知肚明了,二来不会...
    窦哲阅读 221评论 0 1
  • 不知不觉,又到了一年一度运动会的季节,幼儿园、小学、中学、大学都在轰轰烈烈的组织着各种类型特色运动会,仿佛像个淘气...
    b5b7ec306ec8阅读 515评论 0 1
  • 杨光柱 道长是名思维贩子(转) 01 你所经受的批评、干预和否定,更多是来自哪里呢? 我猜一定是来自你亲密关系的人...
    杨Sir杨光柱阅读 1,471评论 0 1

友情链接更多精彩内容