Kafka源码分析-Producer(8)-总结(1)

一.整体流程

kafka生产者整体架构 (1).png

步骤:
1.ProducerInterceptors对消息进行拦截。
2.Serializer对消息的key和value进行序列化。
3.Partitioner为消息选择合适的Partition。
4.RecordAccumulator收集消息,实现批量发送。
5.Sender从RecordAccumulator获取消息。
6.构造ClientRequest。
7.将ClientRequest交给NetworkClient,准备发送。
8.NetworkClient将请求送入KafkaChannel的缓存。
9.执行网络I/O,发送请求。
10.收到响应,调用ClientRequest的回调函数。
11.调用RecordBatch的回调函数,最终调用每个消息上注册的回调函数。
消息发送过程中,涉及两个线程协同工作。主线程首先将业务数据封装成ProducerRecord对象,之后调用send()方法将消息放入RecordAccumulator(消息收集器,也是主线程和sender线程共享的缓冲区)中暂存。Sender线程负责将消息信息构成请求,最终执行网络I/O的线程,它从RecordAccumulator中取出消息并批量发送出去。

二.主线程:

1.Metadata更新流程:

MetaData更新流程.jpg

第一个阶段

在KafkaProducer的构造方法里工作,主要目的是初始化MetaData对象,并把MetaData对象传到Sender类里。

  1. 初始化MetaData:
 this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));

retryBackoffMs:避让时间。
ProducerConfig.METADATA_MAX_AGE_CONFIG:即使没有请求要求更新MetaData,超出一定时间也要更新,默认5min。

2.第一次调用this.metadata.update方法:

this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());

把用来启动的server node节点给metadata对象内的Cluster属性。
3.实例化NetworkClient和Sender,并把metadata作为构造方法的参数传进去。
4.启动执行Sender任务的线程。

this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

第二阶段

send()主线程发起Metadata的更新。
1.KafkaProducer调用send方法,会调用waitOnMetadata(record.topic(), this.maxBlockTimeMs)方法。

/**
     * Wait for cluster metadata including partitions for the given topic to be available.
     * @param topic The topic we want metadata for
     * @param maxWaitMs The maximum time in ms for waiting on the metadata
     * @return The amount of time we waited in ms
     */
    private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
        // add topic to metadata topic list if it is not there already.
        if (!this.metadata.containsTopic(topic))
            this.metadata.add(topic);

        if (metadata.fetch().partitionsForTopic(topic) != null)
            return 0;

        long begin = time.milliseconds();
        long remainingWaitMs = maxWaitMs;
        while (metadata.fetch().partitionsForTopic(topic) == null) {
            log.trace("Requesting metadata update for topic {}.", topic);
            int version = metadata.requestUpdate();
            sender.wakeup();
            metadata.awaitUpdate(version, remainingWaitMs);
            long elapsed = time.milliseconds() - begin;
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            if (metadata.fetch().unauthorizedTopics().contains(topic))
                throw new TopicAuthorizationException(topic);
            remainingWaitMs = maxWaitMs - elapsed;
        }
        return time.milliseconds() - begin;
    }

2.首先请求metadata.requestUpdate(),修改metadata的属性needUpdate 为 true,同时返回metadata的版本号。
3.调用metadata.awaitUpdate(version, remainingWaitMs);等待Sender线程调用metadata.update()更新来释放主线程。

/**
     * Wait for metadata update until the current version is larger than the last version we know of
     */
    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
        if (maxWaitMs < 0) {
            throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
        }
        long begin = System.currentTimeMillis();
        long remainingWaitMs = maxWaitMs;
        while (this.version <= lastVersion) {
            if (remainingWaitMs != 0)
                wait(remainingWaitMs);
            long elapsed = System.currentTimeMillis() - begin;
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            remainingWaitMs = maxWaitMs - elapsed;
        }
    }

第三阶段

Sender线程负责把更新MetaData的request发给Server端。
1.Sender线程run()方法轮询调用NetworkClient.poll()。
2.调用maybeUpdate(long now)。用来判断更新时间是否到了,更新时间到了才会发出更新,而且找到负载最小的 node。
3.调用maybeUpdate(now, node)。构造ClientRequest对象发送给doSend(clientRequest,now)。

...
ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
doSend(clientRequest, now);//缓存请求,下次poll()操作会将其发送出去
...

4.request加入his.inFlightRequests.add(request);

private void doSend(ClientRequest request, long now) {
        request.setSendTimeMs(now);
        this.inFlightRequests.add(request);
        selector.send(request.request());
    }

5.调用selector.send(request.request()):

/**
     * Queue the given request for sending in the subsequent {@link #poll(long)} calls
     * @param send The request to send
     */
    public void send(Send send) {
        KafkaChannel channel = channelOrFail(send.destination());
        try {
            channel.setSend(send);
        } catch (CancelledKeyException e) {
            this.failedSends.add(send.destination());
            close(channel);
        }
    }

5.调用selector.send(request.request()):赋值给selector的send属性,同时关注OP_WRITE事件:

public void setSend(Send send) {
        if (this.send != null)
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
        this.send = send;//设置send字段
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);//关注OP_WRITE事件
    }

第四阶段:

处理server端返回用来更新metaData的数据。
1.NetworkClient.poll()调用selector.poll()轮询到了读事件。

 /* if channel is ready read from any connections that have readable data */
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                    NetworkReceive networkReceive;//OP_READ事件处理。

                    while ((networkReceive = channel.read()) != null)
                    /*
                        上面channel.read()读取到一个完整的 NetworkReceive,则将其添加到stagedReceives中保存,若读取不到一个完整的则将其添加到stagedReceives,则返回null,下次处理OP_READ事件时,继续读取,直到读到一个完整的NetworkReceive。
                     */
                        addToStagedReceives(channel, networkReceive);
                }

2.调用addToStagedReceives方法,把读的消息放到stagedReceives里对应channel的ArrayDeque。

/**
     * adds a receive to staged receives
     */
    private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
        if (!stagedReceives.containsKey(channel))
            stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());

        Deque<NetworkReceive> deque = stagedReceives.get(channel);
        deque.add(receive);
    }
  1. selector.poll()后,NetworkClient执行handleCompletedReceives()方法:
/**
     * 从completedReceives得到从broker的返回值 NetworkReceive,然后找到inFlightRequests对应的ClientRequest,
     * 构造一个NetworkReceive和ClientRequest 为参数的ClientResponse,并加入到responses里
     * Handle any completed receives and update the response list with the responses received.
     *
     * @param responses The list of responses to update
     * @param now The current time
     */
    private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        for (NetworkReceive receive : this.selector.completedReceives()) {
            String source = receive.source();//返回响应的NodeId
            //从inFlightRequests中取出对应的ClientRequest
            ClientRequest req = inFlightRequests.completeNext(source);
            //解析响应
            Struct body = parseResponse(receive.payload(), req.request().header());
            //调用MetadataUpdater.maybeHandleCompletedReceive()方法处理
            // MetadataResponse。其中会更新Metadata中记录的集群元数据,并唤醒所有
            //等待Metadata更新完成的线程。
            if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
                //如果不是MetadataResponse,则创建ClientResponse并添加到response集合里
                responses.add(new ClientResponse(req, now, false, body));
        }
    }

4.调用metadataUpdater.maybeHandleCompletedReceive(req, now, body)方法:判断是否是metaData的response

public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
            short apiKey = req.request().header().apiKey();
            //检测是否为MetadataRequest请求。
            if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
                handleResponse(req.request().header(), body, now);
                return true;
            }
            return false;
        }

5.如果是metaData的response,调用handleResponse(),处理返回值。
构造MetadataResponse,取出response.cluster(),调用metadata.update(cluster, now),更新了metadata。

private void handleResponse(RequestHeader header, Struct body, long now) {
            this.metadataFetchInProgress = false;//收到 MetadataResponse 了,于是修改metadataFetchInProgress=false。
            //解析MetadataResponse
            MetadataResponse response = new MetadataResponse(body);
            //创建Cluster对象
            Cluster cluster = response.cluster();
            // check if any topics metadata failed to get updated。检测 MetadataResponse 里的错误码。
            Map<String, Errors> errors = response.errors();
            if (!errors.isEmpty())
                log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);

            // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
            // created which means we will get errors and no nodes until it exists
            if (cluster.nodes().size() > 0) {

                this.metadata.update(cluster, now);
            } else {
                log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
                this.metadata.failedUpdate(now);
            }
        }

6.调用metadata.update(Cluster cluster, long now)。

  • 把this.needUpdate 置为false。
  • 版本号加1,这样前面的metaData.awaitUpdate()方法内就获得了新的版本好,跳出while循环。
  • 调用notifyAll():因为metaData.awaitUpdate()在while循环内调用了wait(),是为了给sender线程更新metaData获取时间的。因为metaData更新完了,再notifyAll()后,主线程就不用wait了。
/**
     * Update the cluster metadata
     */
    public synchronized void update(Cluster cluster, long now) {
        this.needUpdate = false;
        this.lastRefreshMs = now;
        this.lastSuccessfulRefreshMs = now;
        this.version += 1;
        //1.通知Metadata上的监听器。
        for (Listener listener: listeners)
            listener.onMetadataUpdate(cluster);
        //更新cluster字段。
        // Do this after notifying listeners as subscribed topics' list can be changed by listeners
        this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;

        notifyAll();
        log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
    }

最终,主线程获取到了最新版的metadata对象,可以获得topic的partition,然后继续往下走了。

2. RecordAccumulator流程:

RecordAccumulator.append()方法的流程

RecordAccumulator的append工作流程.jpg
/**
     * Add a record to the accumulator, return the append result
     * <p>
     * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
     * <p>
     *
     * @param tp The topic/partition to which this record is being sent
     * @param timestamp The timestamp of the record
     * @param key The key for the record
     * @param value The value for the record
     * @param callback The user-supplied callback to execute when the request is complete
     * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
     */
    public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        appendsInProgress.incrementAndGet();
        try {
            // check if we have an in-progress batch
            //1.查找TopicPartition对应的Deque
            Deque<RecordBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {//2.对Deque加锁
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                //3.向Deque中最后一个RecordBatch追加Record
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                if (appendResult != null)
                    return appendResult;//4.追加成功返回
            }//5.解锁

            // we don't have an in-progress record batch try to allocate a new batch
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            //6.追加失败,从BufferPool中申请新空间。
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                //7.对Deque加锁后,再次调用tryAppend()方法尝试追加Record
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                //8.追加成功,则返回,释放步骤7申请的新空间
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    free.deallocate(buffer);
                    return appendResult;
                }
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                //新建RecordBatch。
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                //9.在新创建的RecordBatch中追加Record,并将其添加到Batches集合中
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

                dq.addLast(batch);
                //10.新创建的RecordBatch中追加到incomplete集合。
                incomplete.add(batch);
                //11.返回RecordAppendResult
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
            }//12.解锁
        } finally {
            appendsInProgress.decrementAndGet();
        }
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,110评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,443评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,474评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,881评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,902评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,698评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,418评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,332评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,796评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,968评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,110评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,792评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,455评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,003评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,130评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,348评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,047评论 2 355

推荐阅读更多精彩内容