KafkaProducer 流程解析

我们都知道Kafka对消息的处理速度非常的快,单机的TPS达到了百万条的数量级。主要是由于Producer端将对个小消息进行合并,进行一个batch message的操作。对于KafkaProducer 的流程设计通过源码的角度进行详细的解析。

这里使用的代码版本为: 0.10.1

  • 构造方法: public KafkaProducer(Map<String, Object> configs) {...}
  • KafkaProducer 中成员变量
    // 如果用户没有配置"client.id", 则用 "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement()作为cientId
    private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);

    // 作为jmx中BeanName 的前缀
    private static final String JMX_PREFIX = "kafka.producer";

    private String clientId;

    //通过此类的`partition`方法将那一条消息负载到指定的topic的partition中
    //用户可以自定义扩展此类, 可通过 "partitioner.class"进行配置
    private final Partitioner partitioner;

    //对message的size做限制, 可通过"max.request.size"进行配置
    private final int maxRequestSize;

    //作为Produce端申请message内存的大小, 如果下一条消息申请内存时,内存大小不够,则等待。可通过"buffer.memory"进行配置
    //具体使用RecordAccumulator中的BufferPool中使用
    private final long totalMemorySize;

    //作为获取集群信息的元数据类
    private final Metadata metadata;

    //message消息的累积类, 每次发送消息,都将消息append在RecordAccumulator中
    private final RecordAccumulator accumulator;

    //消息发送线程,从accumulator中获取可以发送的消息, 进行消息的发送
    //放入ioThread线程中, 实例化的时候就会启动
    private final Sender sender;

    //metrics 数据监控类
    private final Metrics metrics;

    //作为sender启动类
    private final Thread ioThread;

    //数据传输的压缩格式
    private final CompressionType compressionType;

    //消息发送失败的统计类
    private final Sensor errors;

    private final Time time;

    //通过此类将消息的key序列化为传输的byte[]
    //用户可自己实现序列化方法, 可通过"key.serializer"进行配置
    private final Serializer<K> keySerializer;

    //通过此类将消息的value序列化为传输的byte[]
    //同样可以自己实现, 可通过"value.serializer"进行配置
    private final Serializer<V> valueSerializer;

    //作为KafkaProducer实现的的输入参数, 用户配置信息类
    private final ProducerConfig producerConfig;

    //发送消息时,最大的阻塞时间,the buffer is full or metadata unavailable,可通过"max.block.ms"配置(0.10.1版本)
    private final long maxBlockTimeMs;

    //发送消息时,发送请求的最大超时时间, 可通过"request.timeout.ms"配置(0.10.1版本)
    private final int requestTimeoutMs;

    //发送数据的拦截器列表, 对发送ProducerRecord时, 进行一些拦截处理
    private final ProducerInterceptors<K, V> interceptors; 

  • 消息的发送方法 KafkaProducer.send(ProducerRecord<K, V> record, Callback callback)
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            //确保metadata对当前record.topic可用,并返回cluster + waitedOnMetadataMs(此方法的阻塞时间)
            //可用的条件:metadata中cluster当前topic的partitionsCount != null
            //1> 用户没有指定partion
            //2> 用户指定了partion,必须 partition < partitionsCount (因为partition是从0开始)
            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            //计算下面操作最大可阻塞的时间
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            //将ProducerRecord 中的key跟value根据对应的序列化类序列化为对应的byte[]
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer");
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer");
            }
            //如果用户没有指定partition即record.partition != null
            //则根据配置的Partitioner进行消息的负载分配
            int partition = partition(record, serializedKey, serializedValue, cluster);
            //消息序列化的size, 加上消息头的大小 SIZE_LENGTH(INT类型的大小 4) + OFFSET_LENGTH(LONG类型的大小 8)
            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
            //验证单次消息的大小, 必须小于maxRequestSize, 必须小于totalMemorySize
            ensureValidRecordSize(serializedSize);
            //生成消息发送对应的TopicPartition
            tp = new TopicPartition(record.topic(), partition);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback
            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
            //将消息append到accumulator 中
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
            //如果消息满足发送的条件, 则唤醒发送线程, 进行消息的发送
            //满足消息发送的条件:
            //1> RecordAccumulator中batches对应的
            //   TopicPartition的消息队列Deque<RecordBatch>的size() > 1; 
            //   或者当前RecordBatch.isFull()已经满了
            // 2> 当前RecordBatch 是新建的, 新建的表示一定有数据                     
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
        } catch (...) {
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        }
    }
  • 消息的batch方法。 accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs)
    /**
     * Add a record to the accumulator, return the append result
     * <p>
     * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
     * <p>
     *
     * @param tp The topic/partition to which this record is being sent
     * @param timestamp The timestamp of the record
     * @param key The key for the record
     * @param value The value for the record
     * @param callback The user-supplied callback to execute when the request is complete
     * @param maxTimeToBlock 最大的申请内存的阻塞时间
     */
    public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        //统计, 在append 中的消息的数据信息
        appendsInProgress.incrementAndGet();
        try {
            // 如果batches中存在对应的TopicPartition的消息队列, 直接返回, 否则创建一个
            Deque<RecordBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                //将消息放入dq中: 获取dq中最后一个RecordBatch, 如果不存在, 直接返回NULL
                //               如果存在, 将消息append到RecordBatch中, 如果RecordBatch没有空间存放,直接返回NULL
                //                                如果有空间, append进去, 生成一个FutureRecordMetadata, 
                //==>并通过callback+FutureRecordMetadata实例化一个Thunk, 添加到thunks中, 供消息响应之后回调
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }

            //上面流程append不成功, 则重新申请内存,创建Records, 进行append
            //申请内存的大小为, batchSize与当前消息需要size的最大值
            //free申请内存时: 1: 如果size == poolableSize(即为batchSize), 从Deque<ByteBuffer> free 队列中获取, 
            //          如果队列为空,则重新分配一个batchSize的ByteBuffer, 需要跑判断availableMemory是否大于需要分配size
            //          如果满足,则直接分配, 否则需要等待内存的内存的释放
            //                2: 如果内存不为空,跟1中队列为空之后的分配策略相同
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");

                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    free.deallocate(buffer);
                    return appendResult;
                }
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

                dq.addLast(batch);
                incomplete.add(batch);
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
            }
        } finally {
            appendsInProgress.decrementAndGet();
        }
    }

  • ioThread 中 Sender 的工作流程
    public void run() {

        // 主调用流程,循环执行
        while (running) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // 非强制关闭时, 如果accumulator 跟 client 还有未发送完的消息, 等待发送
        while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        // 强制关闭, accumulator中数据直接abort
        if (forceClose) {
            // We need to fail all the incomplete batches and wake up the threads waiting on
            // the futures.
            this.accumulator.abortIncompleteBatches();
        }
        try {
            this.client.close();
        } catch (Exception e) {
            log.error("Failed to close network client", e);
        }

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }

    /**
     * 方法几点说明:
     * 1: guaranteeMessageOrder字段来判断是否需要担保,数据发送的有序性
     *     kafka这里为了保证消息发送的顺序, 发送一条Record消息, 进行muted操作,响应之后umuted, 就可以继续发送
     * 2: 消息重新发送, RecordBatch中字段attempts + lastAttemptMs, attempts>0 表示重新发送的Record,
     *     必须满足 batch.lastAttemptMs + retryBackoffMs > nowMs 才能继续发送
     * 3: this.client.ready(node, now) 
     *     必须为连接状态, 即ConnectionState.CONNECTED
     *     对于需要权限验证的请求,必须已验证
     *     InFlightRequests.canSendMore(node): 当前节点请求队列为空
     *         或者队列中第一个请求已完成且queue.size() < this.maxInFlightRequestsPerConnection   
     */
    void run(long now) {
        Cluster cluster = metadata.fetch();
        //获取当前accumulator中batches中的数据, readyNodes + nextReadyCheckDelayMs + unknownLeaderTopics
        //readyNodes: 同时满足下面两个条件 
        //  1.可以发送数据, 下面任何一个条件满足即可
        //      a.数据有满的数据: deque.size() > 1 (一定有一个数据是满的) 或者第一个; 或者deque中第一个数据是满的
        //      b.数据存放的时间已失效
        //      c.BufferPool中有等待释放内存的队列有数据
        //      d.accumulator 中有刷新操作, 此操作是用户进行KafkaProducer.flush()操作                
        //  2.若是重试数据,已超过重试的阻塞时间,可以重新发送
        //nextReadyCheckDelayMs: 对于readyNodes中不满足可以发送数据数据时, 需要等待可以发送数据的时间,即下一个检测准备数据的延迟的时间
        //unknownLeaderTopics: batches中的TopicPartition在cluster不能找到Leader且!deque.isEmpty()(有数据需要发送)
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        // 如果返回的数据有不知道Leader的Topic, 则放入metadata 中, 请求更新metadata
        if (!result.unknownLeaderTopics.isEmpty()) {
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
            this.metadata.requestUpdate();
        }

        // 对于readyNodes中Node中不能发送数据的直接 移除
        // notReadyTimeout 用于this.client.poll(pollTimeout, now);即此方法的最大阻塞时间
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // create produce requests
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<RecordBatch> batchList : batches.values()) {
                for (RecordBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        //移除超时的 RecordBatch
        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        // update sensors
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);
        List<ClientRequest> requests = createProduceRequests(batches, now);
        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }
        for (ClientRequest request : requests)
            client.send(request, now);

        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        this.client.poll(pollTimeout, now);
    }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容