Kafka源码分析-Producer(6)-Sender分析(3)

一.MetadataUpdater

简介

MetadataUpdater接口是一个辅助NetworkClient更新的Metadata接口,有两个实现类:


image.png

DefaultMetadataUpdater是NetworkClient使用的默认实现,下面是它的三个字段:

  • metadata:指向集群元数据的Metadata对象。
  • metadataFetchInProgress:用来标识是否已经发送了MetadataRequest请求更新Metadata,如果已经发送,就不会重复发送。
  • lastNoNodeAvailableMs:监测到没有可用节点时,用这个字段记录时间戳。

maybeUpdate()方法:

maybeUpdate()方法是DefaultMetadataUpdater的核心方法,用来判断当前的Metadata中保存的集群元数据是否需要更新。首先会监测metadataFetchInProgress字段,如果没有发送,满足下面一个条件就能够更新:

  • Metadata.needUpdate字段被设置为true,且避让时间已到。
  • 长时间没有更新,默认5分钟一次。

MetadataRequest

如果需要更新,则发送MetadataRequest请求,MetadataRequest请求的请求头包含ApiKeys.MetaData标识,消息体中包含Topic集合表示需要获取元数据的Topic,如果Topic集合为Null则表示全部Topic的元数据。

MetadataResponse

名称 类型 含义
node_id int Node节点的Id
host String Node节点的Host名称
rack String 每个Broker的机架信息
controller_id int controller所在的Node节点的Id
topic_error_code short 错误码
topic String topic名称
is_internal boolean 是否为Kafka内部的topic
partition_error_code short 错误码
partition_id int 分区编号
leader int 分区的Leader Replica所在的Id
replicas int集合 此分区所有Replica所在的Node节点的Id的集合
isr int集合 此分区的ISR所在的Node节点的Id的集合

MetadataRequest请求发送之前,要将metadataFetchInProgress置为true,然后从所有Node中选择负载最小的Node节点,向其发送更新请求。负载大小通过每个Node在InFlightRequests队列中未确认的请求决定的,未确认的请求越多则认为负载越大。
发送过程:将请求添加到InFlightRequests队列中,然后设置到KafkaChannel的send字段中,通过KSelector.poll()方法将MetadataRequest请求发送出去。DefaultMetadataUpdater.maybeUpdate()方法的代码:

 public long maybeUpdate(long now) {
            /*
            调用metadata.timeToNextUpdate(now)方法,其中会检测needUpdate的值,退避时间,是否长时间未更新。
            最终得到一个下次更新集群元数据的实际戳。
             */
            // should we update our metadata?
            long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
            //获取下次尝试重新连接服务器端的时间戳。
            long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
            //检测是否已经发送了 MetadataRequest 请求。
            long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
            // if there is no node available to connect, back off refreshing metadata
            //计算当前距离下次可以发送MetadataRequest请求的时间差。
            long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
                    waitForMetadataFetch);

            if (metadataTimeout == 0) {
                // Beware that the behavior of this method and the computation of timeouts for poll() are
                // highly dependent on the behavior of leastLoadedNode.
                //找到负载最小的 node,若没有可以用的node,则返回null。
                Node node = leastLoadedNode(now);
                //创建并缓存MetadataRequest,等待下次poll()方法才会真正发送。
                maybeUpdate(now, node);
            }

            return metadataTimeout;
        }


/**
         * Add a metadata request to the list of sends if we can make one
         */
        private void maybeUpdate(long now, Node node) {
            if (node == null) {//是否有node可用
                log.debug("Give up sending metadata request since no node is available");
                // mark the timestamp for no node available to connect
                this.lastNoNodeAvailableMs = now;//标记没有node可以连接的时间戳
                return;
            }
            String nodeConnectionId = node.idString();
            //检测是否允许向此Node发送请求。
            if (canSendRequest(nodeConnectionId)) {
                this.metadataFetchInProgress = true;
                MetadataRequest metadataRequest;
                if (metadata.needMetadataForAllTopics())//指定需要更新元数据的Topic
                    metadataRequest = MetadataRequest.allTopics();
                else
                    metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
                //将MetadataRequest封装成ClientRequest
                ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
                log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
                doSend(clientRequest, now);//缓存请求,下次poll()操作会将其发送出去
            } else if (connectionStates.canConnect(nodeConnectionId, now)) {
                // we don't have a connection to this node right now, make one
                log.debug("Initialize connection to node {} for sending metadata request", node.id());
                initiateConnect(node, now);//初始化连接
                // If initiateConnect failed immediately, this node will be put into blackout and we
                // should allow immediately retrying in case there is another candidate node. If it
                // is still connecting, the worst case is that we end up setting a longer timeout
                // on the next round and then wait for the response.
            } else { 
                // connected, but can't send more OR connecting
                // In either case, we just need to wait for a network event to let us know the selected
                // connection might be usable again.
                //已成功连接到指定的节点,但不能发送请求,则更新lastNoNodeAvailableMs后等待
                this.lastNoNodeAvailableMs = now;
            }
        }

在收到MetadataResponse之后,会先调用MetaUpdater.maybeHandleCompletedReceive()方法检测是否为MetadataResponse,如果是,就调用handleResponse()解析响应,并构造Cluster对象更新Metadata.cluster字段。
因为cluster是不可变字段,更新集群元数据的方式是:
创建新的Cluster对象,并覆盖Metadata.cluster字段,代码如下:
NetworkClient类里的方法:

@Override
        public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
            short apiKey = req.request().header().apiKey();
            //检测是否为MetadataRequest请求。
            if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
                handleResponse(req.request().header(), body, now);
                return true;
            }
            return false;
        }


private void handleResponse(RequestHeader header, Struct body, long now) {
            this.metadataFetchInProgress = false;//收到 MetadataResponse 了,于是修改metadataFetchInProgress=false。
            //解析MetadataResponse
            MetadataResponse response = new MetadataResponse(body);
            //创建Cluster对象
            Cluster cluster = response.cluster();
            // check if any topics metadata failed to get updated。检测 MetadataResponse 里的错误码。
            Map<String, Errors> errors = response.errors();
            if (!errors.isEmpty())
                log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);

            // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
            // created which means we will get errors and no nodes until it exists
            if (cluster.nodes().size() > 0) {

                this.metadata.update(cluster, now);
            } else {
                log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
                this.metadata.failedUpdate(now);
            }
        }

Metadata类里的方法:

    /**
     * Update the cluster metadata
     */
    public synchronized void update(Cluster cluster, long now) {
        this.needUpdate = false;
        this.lastRefreshMs = now;
        this.lastSuccessfulRefreshMs = now;
        this.version += 1;
        //1.通知Metadata上的监听器。
        for (Listener listener: listeners)
            listener.onMetadataUpdate(cluster);
        //更新cluster字段。
        // Do this after notifying listeners as subscribed topics' list can be changed by listeners
        this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;

        notifyAll();
        log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
    }

当连接断开或其他的异常导致无法获得响应时,由maybeHandleDisconnetion()方法处理,他会将metadataFetchInProgress字段置为false,这样就能顺利的发送下一次更新Metadata请求了。

@Override
        public boolean maybeHandleDisconnection(ClientRequest request) {
            ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());

            if (requestKey == ApiKeys.METADATA) {//是否为MetadataRequest请求
                Cluster cluster = metadata.fetch();
                if (cluster.isBootstrapConfigured()) {
                    int nodeId = Integer.parseInt(request.request().destination());
                    Node node = cluster.nodeById(nodeId);
                    if (node != null)
                        log.warn("Bootstrap broker {}:{} disconnected", node.host(), node.port());
                }

                metadataFetchInProgress = false;//更新metadataFetchInProgress = false
                return true;
            }

            return false;
        }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容