Kafka之Producer端如何更新MetaData(二)

前言

在上篇文章中,我们简单介绍了Producer端的消息发送过程,其中的很多的细节并没有讲解到,我们阅读源码的时候会发现,在消息发送的过程中,有很多地方会请求metadata数据,本篇文章将主要讲解Producer端如何更新metadata。

一、Cluster与MetaData数据结构

MetaData封装了一个关于元数据的逻辑。这个类是客户端线程和后台的Sender线程共享,MetaData维护了一个包含了部分topic的集合,当我们请求topic对应的metadata而不可得时,就会触发metadata的更新。

public class Metadata implements Closeable {

    private static final Logger log = LoggerFactory.getLogger(Metadata.class);

    public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
    private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;

    private final long refreshBackoffMs;
    private final long metadataExpireMs;
    private int version;
    private long lastRefreshMs;
    private long lastSuccessfulRefreshMs;
    private AuthenticationException authenticationException;
    private MetadataCache cache = MetadataCache.empty();
    private boolean needUpdate;
    /* Topics with expiry time */
    private final Map<String, Long> topics;
    private final List<Listener> listeners;
    private final ClusterResourceListeners clusterResourceListeners;
    private boolean needMetadataForAllTopics;
    private final boolean allowAutoTopicCreation;
    private final boolean topicExpiryEnabled;
    private boolean isClosed;
    private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;

}

Cluster包含了kafka集群中部分的结点、Topics和partitions信息。

public final class Cluster {

    private final boolean isBootstrapConfigured;
    private final List<Node> nodes;
    private final Set<String> unauthorizedTopics;
    private final Set<String> invalidTopics;
    private final Set<String> internalTopics;
    private final Node controller;
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
    private final Map<String, List<PartitionInfo>> partitionsByTopic;
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
    private final Map<Integer, Node> nodesById;
    private final ClusterResource clusterResource;

}

二、MetaData的更新流程

在producer发送消息的过程中,首先就会确保topicPartition对应的metadata是否存在,其逻辑主要封装在waitOnMetadata方法中。

private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
        // add topic to metadata topic list if it is not there already and reset expiry
        Cluster cluster = metadata.fetch();

        if (cluster.invalidTopics().contains(topic))
            throw new InvalidTopicException(topic);

        metadata.add(topic);

        Integer partitionsCount = cluster.partitionCountForTopic(topic);
        // Return cached metadata if we have it, and if the record's partition is either undefined
        // or within the known partition range
        if (partitionsCount != null && (partition == null || partition < partitionsCount))
            return new ClusterAndWaitTime(cluster, 0);

        long begin = time.milliseconds();
        long remainingWaitMs = maxWaitMs;
        long elapsed;
        // Issue metadata requests until we have metadata for the topic and the requested partition,
        // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
        // is stale and the number of partitions for this topic has increased in the meantime.
        do {
            if (partition != null) {
                log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
            } else {
                log.trace("Requesting metadata update for topic {}.", topic);
            }
            metadata.add(topic);
            int version = metadata.requestUpdate();
            sender.wakeup();
            try {
                metadata.awaitUpdate(version, remainingWaitMs);
            } catch (TimeoutException ex) {
                // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
                throw new TimeoutException(
                        String.format("Topic %s not present in metadata after %d ms.",topic, maxWaitMs));
            }
            cluster = metadata.fetch();
            elapsed = time.milliseconds() - begin;
            if (elapsed >= maxWaitMs) {
                throw new TimeoutException(partitionsCount == null ?
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs) :
                        String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
                                partition, topic, partitionsCount, maxWaitMs));
            }
            if (cluster.unauthorizedTopics().contains(topic))
                throw new TopicAuthorizationException(topic);
            if (cluster.invalidTopics().contains(topic))
                throw new InvalidTopicException(topic);
            remainingWaitMs = maxWaitMs - elapsed;
            partitionsCount = cluster.partitionCountForTopic(topic);
        } while (partitionsCount == null || (partition != null && partition >= partitionsCount));

        return new ClusterAndWaitTime(cluster, elapsed);
    }

(1)、首先如果cluster中的invalidTopics列表包含了要发送的topic,方法抛出异常。如果缓存的cluster包含了对应的topic的patritions信息,就直接返回。
(2)、do while中的循环逻辑会不断的请求更新metadata数据,直到取到数据或者超时抛出异常。

  • metadata.requestUpdate()方法会把相关字段needUpdate置为true, 然后唤醒sender后台线程。
  • metadata.awaitUpdate(version, remainingWaitMs)方法等待更新结果,直到更新后版本号大与传进来的版本号,或者超时remainingWaitMs。 方法内部其实仍然是一个循环,跳出循环的条件就是版本号或者超时。

我们发现线程会一直堵塞在一个嵌套的两层循环中,直到超时或者取到想要的结果。其实更新metadata操作,主要是通过sender.wakeup()来唤醒 sender线程,间接唤醒NetworkClient线程,NetworkClient线程来负责发送Metadata请求,并处理Server端的响应。

2.1 NetworkClient中的poll()

在整个Producer端的发送过程中,真正发送请求、处理返回结果的请求封装在NetworkClient类中的poll()方法中,send()函数的方法名字很具有迷惑性,官方wiki给出的注释:"Queue up the given request for sending",send()方法只是将相关信息保存在了InFlightRequests的发送队列中和KafkaChannel中,InFlightRequests保存发送的消息内容、KafkaChannel保存发送的元信息。真正的相关操作实现封装在poll方法中。

public List<ClientResponse> poll(long timeout, long now) {
    ensureActive();

    if (!abortedSends.isEmpty()) {
        // If there are aborted sends because of unsupported version exceptions or disconnects,
        // handle them immediately without waiting for Selector#poll.
        List<ClientResponse> responses = new ArrayList<>();
        handleAbortedSends(responses);
        completeResponses(responses);
        return responses;
    }

    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    try {
        this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }

    // process completed actions
    long updatedNow = this.time.milliseconds();
    List<ClientResponse> responses = new ArrayList<>();
    handleCompletedSends(responses, updatedNow);
    handleCompletedReceives(responses, updatedNow);
    handleDisconnections(responses, updatedNow);
    handleConnections();
    handleInitiateApiVersionRequests(updatedNow);
    handleTimedOutRequests(responses, updatedNow);
    completeResponses(responses);

    return responses;
}

poll()方法主要包含了四个步骤:

  • 如果有因为版本不一致或者失去连接的发送请求,就直接处理掉。
  • metadataUpdater.maybeUpdate(now)方法判断现在是否需要更新metadata,如果需要的话,选择负载最小的一个结点,建立连接。maybeUpdate(now, node)主要是将更新metadata请求进入请求到发送队列中去。
public long maybeUpdate(long now) {
    // should we update our metadata?
    long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
    long waitForMetadataFetch = this.metadataFetchInProgress ? defaultRequestTimeoutMs : 0;

    long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);

    if (metadataTimeout > 0) {
        return metadataTimeout;
    }

    // Beware that the behavior of this method and the computation of timeouts for poll() are
    // highly dependent on the behavior of leastLoadedNode.
    Node node = leastLoadedNode(now);
    if (node == null) {
        log.debug("Give up sending metadata request since no node is available");
        return reconnectBackoffMs;
    }
    return maybeUpdate(now, node);

}
private long maybeUpdate(long now, Node node) {
            String nodeConnectionId = node.idString();

            if (canSendRequest(nodeConnectionId, now)) {
                this.metadataFetchInProgress = true;
                MetadataRequest.Builder metadataRequest;
                if (metadata.needMetadataForAllTopics())
                    metadataRequest = MetadataRequest.Builder.allTopics();
                else
                    metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()),
                            metadata.allowAutoTopicCreation());


                log.debug("Sending metadata request {} to node {}", metadataRequest, node);
                sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
                return defaultRequestTimeoutMs;
            }

            // If there's any connection establishment underway, wait until it completes. This prevents
            // the client from unnecessarily connecting to additional nodes while a previous connection
            // attempt has not been completed.
            if (isAnyNodeConnecting()) {
                // Strictly the timeout we should return here is "connect timeout", but as we don't
                // have such application level configuration, using reconnect backoff instead.
                return reconnectBackoffMs;
            }

            if (connectionStates.canConnect(nodeConnectionId, now)) {
                // we don't have a connection to this node right now, make one
                log.debug("Initialize connection to node {} for sending metadata request", node);
                initiateConnect(node, now);
                return reconnectBackoffMs;
            }

            // connected, but can't send more OR connecting
            // In either case, we just need to wait for a network event to let us know the selected
            // connection might be usable again.
            return Long.MAX_VALUE;
        }
    }

在上一篇的文章中,我们直到每次请求更新metadata的时候,线程会阻塞在两层的循环中,直到超时或者成功更新。为了方法理解:我们考虑第一次发送数据的情景:
(1)第一次唤醒send线程时,调用poll方法: 尝试与node结点建立连接。
(2)第二次唤醒send线程时,调用poll方法: 发送metadata的更新请求。
(3)第二次唤醒send线程时,调用poll方法: 处理metadata的更新结果。
经过三次唤醒后,成功获取metadata数据,线程跳出循环继续下面的流程。

  • selector.poll()方法执行真正的I/O操作,包括发送请求、接受结果。该方法底层依赖了JAVA原生的NIO方法。
  • 处理请求后的结果,包括metadata返回的结果和数据请求的结果。在此我们之分析metadata的响应结果,逻辑主要封装在handleCompletedMetadataResponse()方法中:
   public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
            this.metadataFetchInProgress = false;

            // If any partition has leader with missing listeners, log a few for diagnosing broker configuration
            // issues. This could be a transient issue if listeners were added dynamically to brokers.
            List<TopicPartition> missingListenerPartitions = response.topicMetadata().stream().flatMap(topicMetadata ->
                topicMetadata.partitionMetadata().stream()
                    .filter(partitionMetadata -> partitionMetadata.error() == Errors.LISTENER_NOT_FOUND)
                    .map(partitionMetadata -> new TopicPartition(topicMetadata.topic(), partitionMetadata.partition())))
                .collect(Collectors.toList());
            if (!missingListenerPartitions.isEmpty()) {
                int count = missingListenerPartitions.size();
                log.warn("{} partitions have leader brokers without a matching listener, including {}",
                        count, missingListenerPartitions.subList(0, Math.min(10, count)));
            }

            // check if any topics metadata failed to get updated
            Map<String, Errors> errors = response.errors();
            if (!errors.isEmpty())
                log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors);

            // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
            // created which means we will get errors and no nodes until it exists
            if (response.brokers().isEmpty()) {
                log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
                this.metadata.failedUpdate(now, null);
            } else {
                this.metadata.update(response, now);
            }
        }

2.2 Metadata策略更新形式

通过总结发现,meta的更新主要有两种形式:

  • 强制更新:调用metadata.requestUpdate()强制更新,requestUpdate()函数里面其实什么都没做,就是把needUpdate置成了true。强制更新主要存在如下更几种情况:
    • 初始化连接,initConnect的时候
    • 处理超时请求,handleTimedOutRequests
    • 处理失连,handleDisconnections
    • topic对应的leader不存在的时候
    • 强制关闭Sender线程,forceClose时候
    • prodcuer端对应的prodcerId不存在,maybeWaitForProducerId的时候
    • 由于InvalidMetadata曝出异常InvalidMetadataException
  • 周期更新:根据lastSuccessfulRefreshMs、metadataExpireMs、lastRefreshMs、refreshBackoffMs等字段、周期性的更新metadata数据。

三、总结

本篇文章主要讲解了producer端如何更新metadata,只是介绍了主要的更新流程,还有很多的细节,目前我也没有搞懂,之后如果有新的理解的话,我会及时更新在文章中。下一篇将讲解为了保障partition消息的时序性,producer端做了哪些工作。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,084评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,623评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,450评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,322评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,370评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,274评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,126评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,980评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,414评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,599评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,773评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,470评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,080评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,713评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,852评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,865评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,689评论 2 354