Kafka源码学习 Producer

基于0.10.1版本

整体流程

发送消息流程.png

Producer.send()入口

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            // 获取集群信息
            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            //省略序列化serializedKey,serializedValue
            //获取分区
            int partition = partition(record, serializedKey, serializedValue, cluster);
            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
            //校验消息体大小
            ensureValidRecordSize(serializedSize);
            tp = new TopicPartition(record.topic(), partition);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback
            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
            //把消息写入消息收集器
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
            //如果最后一个RecordBatch已经写满,或者Deque队列大小>1就唤醒发送线程
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        }
doSend.png

RecordAccumulator.append方法利用了分段锁,当并发出现时,前一个线程需要的内存空间比较大,不满足写入,后一个线程需要的内存空间比较小,满足写入。提高了并发,此处也可以看出,如果消息需要有发送顺序的保证,一个Producer实例不要在多处调用。

public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        appendsInProgress.incrementAndGet();
        try {
            //从ConcurrentMap<TopicPartition, Deque<RecordBatch>>中换取分区对应的Deque队列
            Deque<RecordBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                //再次尝试写入
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                //如果写入成功,说明别的线程new了一个新的RecordBatch
                if (appendResult != null) {
                    // 释放之前申请的buffer
                    free.deallocate(buffer);
                    return appendResult;
                }
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

                dq.addLast(batch);
                incomplete.add(batch);
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
            }
        } finally {
            appendsInProgress.decrementAndGet();
        }
    }
消息写入收集器流程.png

RecordAccumulator.tryAppend方法,获取最后一个RecordBatch尝试写入

private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
        RecordBatch last = deque.peekLast();
        if (last != null) {
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
            //写入失败,需要把MemoryRecords置为不可写状态,buffer.flip()切换到读
            if (future == null)
                last.records.close();
            else
                //判断是否可以唤醒Sender发送线程,deque.size() > 1 || last.records.isFull()
                return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
        }
        return null;
    }

RecordBatch.tryAppend写入

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
        //判断是否需要新创建RecordBatch来存储消息
        if (!this.records.hasRoomFor(key, value)) {
            return null;
        } else {
            long checksum = this.records.append(offsetCounter++, timestamp, key, value);
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            this.lastAppendTime = now;
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            //如果需要回调把回调加入thunks
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }

判断是否需要new RecordBatch

public boolean hasRoomFor(byte[] key, byte[] value) {
        //当前不是写模式,说明消息已经写满,正在发送,或者在发送中
        if (!this.writable)
            return false;
        /**
         * MemoryRecords是否写入过消息?
         * 没写过,申请Buffer初始化内存大小是否>=消息体大小
         * 写入过,batch.size是否>=已写入的内存大小+消息体大小
         * 考虑这两种情况是因为如果曾经写入了消息体大小大于batch.size,则该Buffer只会保存这一条消息,
         * 从BufferPool获取的Buffer大小都是batch.size,此时initialCapacity=writeLimit
         *
          */
        return this.compressor.numRecordsWritten() == 0 ?
            this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
            this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
    }

BufferPool源码解读

BufferPool是Buffer缓冲区,创建出来的Buffer大小都等于batch.size,如果控制每条消息都小于或者等于batch.size,充分利用了缓冲区,性能会更好。

BufferPool 类主要变量

public final class BufferPool {

    /**
     * BufferPool缓冲池最大的字节大小,也就是配置buffer.memory
     */
    private final long totalMemory;
    /**
     * 每个Buffer固定大小,也就是配置batch.size
     */
    private final int poolableSize;
    private final ReentrantLock lock;
    /**
     * 空闲的buffer队列
     */
    private final Deque<ByteBuffer> free;
    /**
     * 等待分配Buffer的队列
     */
    private final Deque<Condition> waiters;
    /**
     * 可用的内存大小
     */
    private long availableMemory;
    private final Metrics metrics;
    private final Time time;
    private final Sensor waitTime;
}

通过下面的代码获取当前需要申请的Buffer内存空间大小

int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));

如果超过batch.size,通过ByteBuffer.allocate(size)来申请空间,不从BufferPool中获取。

BufferPool主要方法就是申请空间allocate,释放空间deallocate,具体流程如下


申请空间流程.png
释放空间流程.png

Sender 发送线程

每次往消息收集器写完消息,都会检验下,是否需要唤醒发送线程Sender

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
 //如果最后一个RecordBatch已经写满,或者Deque队列大小>1就唤醒发送线程
 if (result.batchIsFull || result.newBatchCreated) {
        log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
 }

KafkaClient 分析

KafkaClient 关键字段

public class NetworkClient implements KafkaClient {

    /* the selector used to perform network i/o */
    //执行NIO的选择器
    private final Selectable selector;
    /* the state of each node's connection */
    //每个连接Node的状态
    private final ClusterConnectionStates connectionStates;
    /* the set of requests currently being sent or awaiting a response */
    //准备发送或者等待响应的消息
    private final InFlightRequests inFlightRequests;

}

Sender 分析

Sender 关键字段

public class Sender implements Runnable {

    /* the state of each nodes connection */
    //每个连接的状态
    private final KafkaClient client;

    /* the record accumulator that batches records */
    //消息收集器
    private final RecordAccumulator accumulator;

    /* the flag indicating whether the producer should guarantee the message order on the broker or not. */
    //是否需要保证一个topic正在发送的RecordBatch只有一个,max.in.flight.requests.per.connection 设置为1时会保证
    private final boolean guaranteeMessageOrder;

    /* the maximum request size to attempt to send to the server */
    private final int maxRequestSize;

    /* the number of acknowledgements to request from the server */
    private final short acks;
}

Sender是一个线程类,看看它的核心方法run

    void run(long now) {
        Cluster cluster = metadata.fetch();
        // 获取满足发送条件的RecordBatch对应的nodes
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
        // 如果有任何一个leader node未知,则强制更新标示
        if (!result.unknownLeaderTopics.isEmpty()) {
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
            this.metadata.requestUpdate();
        }

        // 移除没有连接的Node的,并且初始化网络连接,等待下次再调用
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        //把TopicPartition->List<RecordBatch> 转化为 NodeId(每个Broker节点Id)->List<RecordBatch>
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);
        //max.in.flight.requests.per.connection 设置为1时,保证一个topic只有一个RecordBatch在发送,保证有序性
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<RecordBatch> batchList : batches.values()) {
                for (RecordBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        //移除发送超时的RecordBatch,并执行RecordBatch对应的done,最后执行callback.onCompletion方法,可以根据自定义是否补发
        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);
        //把NodeId(每个Broker节点Id)->List<RecordBatch>转化为List<ClientRequest>
        List<ClientRequest> requests = createProduceRequests(batches, now);
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }
        //写入待发送的KafkaChannel中的Send
        for (ClientRequest request : requests)
            client.send(request, now);
        this.client.poll(pollTimeout, now);
    }

client.send(request, now)需要满足下面的条件,才能写入待发送的Send中

private boolean canSendRequest(String node) {
        //该Node是否连接状态&&该Node是否已准备好发送&&是否可以发送更多
        return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
}

//inFlightRequests.canSendMore
public boolean canSendMore(String node) {
        Deque<ClientRequest> queue = requests.get(node);
        return queue == null || queue.isEmpty() ||
               (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}

看看NetworkClient.poll方法

public List<ClientResponse> poll(long timeout, long now) {
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            //真正操作NIO的地方
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        //处理已发送的消息
        handleCompletedSends(responses, updatedNow);
        //处理已发送成功响应的消息
        handleCompletedReceives(responses, updatedNow);
        //处理已断开的连接,重新请求 meta
        handleDisconnections(responses, updatedNow);
        //处理新建立的连接,需要验证通过
        handleConnections();
        //处理超时请求
        handleTimedOutRequests(responses, updatedNow);

        // invoke callbacks
        for (ClientResponse response : responses) {
            if (response.request().hasCallback()) {
                try {
                    //回调Callback的onComplete方法
                    response.request().callback().onComplete(response);
                } catch (Exception e) {
                    log.error("Uncaught error in request completion:", e);
                }
            }
        }

        return responses;
}

来看看真正操作NIO的

public void poll(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("timeout should be >= 0");

        clear();

        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
            timeout = 0;

        /* check ready keys */
        long startSelect = time.nanoseconds();
        int readyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            //处理已经就绪的连接
            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
            //处理新建立的连接
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        }

        addToCompletedReceives();

        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());

        // we use the time at the end of select to ensure that we don't close any connections that
        // have just been processed in pollSelectionKeys
        //关闭空闲的连接,根据配置的最大空闲时间connections.max.idle.ms判断
        maybeCloseOldestConnection(endSelect);
}
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                   boolean isImmediatelyConnected,
                                   long currentTimeNanos) {
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            KafkaChannel channel = channel(key);
            sensors.maybeRegisterConnectionMetrics(channel.id());
            if (idleExpiryManager != null)
                idleExpiryManager.update(channel.id(), currentTimeNanos);
            try {
                if (isImmediatelyConnected || key.isConnectable()) {
                    if (channel.finishConnect()) {
                        this.connected.add(channel.id());
                        this.sensors.connectionCreated.record();
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
                                socketChannel.socket().getReceiveBufferSize(),
                                socketChannel.socket().getSendBufferSize(),
                                socketChannel.socket().getSoTimeout(),
                                channel.id());
                    } else
                        continue;
                }

                if (channel.isConnected() && !channel.ready())
                    channel.prepare();
                //读取消息操作,一次读完所有可读buffer
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                    NetworkReceive networkReceive;
                    while ((networkReceive = channel.read()) != null)
                        addToStagedReceives(channel, networkReceive);
                }
                //写消息事件操作
                if (channel.ready() && key.isWritable()) {
                    Send send = channel.write();
                    if (send != null) {
                        this.completedSends.add(send);
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }
                //不是有效的事件操作
                if (!key.isValid()) {
                    close(channel);
                    this.disconnected.add(channel.id());
                }

            } catch (Exception e) {
                String desc = channel.socketDescription();
                if (e instanceof IOException)
                    log.debug("Connection with {} disconnected", desc, e);
                else
                    log.warn("Unexpected error from {}; closing connection", desc, e);
                close(channel);
                this.disconnected.add(channel.id());
            }
        }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容