一.MetadataUpdater
简介
MetadataUpdater接口是一个辅助NetworkClient更新的Metadata接口,有两个实现类:
DefaultMetadataUpdater是NetworkClient使用的默认实现,下面是它的三个字段:
- metadata:指向集群元数据的Metadata对象。
- metadataFetchInProgress:用来标识是否已经发送了MetadataRequest请求更新Metadata,如果已经发送,就不会重复发送。
- lastNoNodeAvailableMs:监测到没有可用节点时,用这个字段记录时间戳。
maybeUpdate()方法:
maybeUpdate()方法是DefaultMetadataUpdater的核心方法,用来判断当前的Metadata中保存的集群元数据是否需要更新。首先会监测metadataFetchInProgress字段,如果没有发送,满足下面一个条件就能够更新:
- Metadata.needUpdate字段被设置为true,且避让时间已到。
- 长时间没有更新,默认5分钟一次。
MetadataRequest
如果需要更新,则发送MetadataRequest请求,MetadataRequest请求的请求头包含ApiKeys.MetaData标识,消息体中包含Topic集合表示需要获取元数据的Topic,如果Topic集合为Null则表示全部Topic的元数据。
MetadataResponse
名称 | 类型 | 含义 |
---|---|---|
node_id | int | Node节点的Id |
host | String | Node节点的Host名称 |
rack | String | 每个Broker的机架信息 |
controller_id | int | controller所在的Node节点的Id |
topic_error_code | short | 错误码 |
topic | String | topic名称 |
is_internal | boolean | 是否为Kafka内部的topic |
partition_error_code | short | 错误码 |
partition_id | int | 分区编号 |
leader | int | 分区的Leader Replica所在的Id |
replicas | int集合 | 此分区所有Replica所在的Node节点的Id的集合 |
isr | int集合 | 此分区的ISR所在的Node节点的Id的集合 |
MetadataRequest请求发送之前,要将metadataFetchInProgress置为true,然后从所有Node中选择负载最小的Node节点,向其发送更新请求。负载大小通过每个Node在InFlightRequests队列中未确认的请求决定的,未确认的请求越多则认为负载越大。
发送过程:将请求添加到InFlightRequests队列中,然后设置到KafkaChannel的send字段中,通过KSelector.poll()方法将MetadataRequest请求发送出去。DefaultMetadataUpdater.maybeUpdate()方法的代码:
public long maybeUpdate(long now) {
/*
调用metadata.timeToNextUpdate(now)方法,其中会检测needUpdate的值,退避时间,是否长时间未更新。
最终得到一个下次更新集群元数据的实际戳。
*/
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
//获取下次尝试重新连接服务器端的时间戳。
long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
//检测是否已经发送了 MetadataRequest 请求。
long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
// if there is no node available to connect, back off refreshing metadata
//计算当前距离下次可以发送MetadataRequest请求的时间差。
long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
waitForMetadataFetch);
if (metadataTimeout == 0) {
// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
//找到负载最小的 node,若没有可以用的node,则返回null。
Node node = leastLoadedNode(now);
//创建并缓存MetadataRequest,等待下次poll()方法才会真正发送。
maybeUpdate(now, node);
}
return metadataTimeout;
}
/**
* Add a metadata request to the list of sends if we can make one
*/
private void maybeUpdate(long now, Node node) {
if (node == null) {//是否有node可用
log.debug("Give up sending metadata request since no node is available");
// mark the timestamp for no node available to connect
this.lastNoNodeAvailableMs = now;//标记没有node可以连接的时间戳
return;
}
String nodeConnectionId = node.idString();
//检测是否允许向此Node发送请求。
if (canSendRequest(nodeConnectionId)) {
this.metadataFetchInProgress = true;
MetadataRequest metadataRequest;
if (metadata.needMetadataForAllTopics())//指定需要更新元数据的Topic
metadataRequest = MetadataRequest.allTopics();
else
metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
//将MetadataRequest封装成ClientRequest
ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
doSend(clientRequest, now);//缓存请求,下次poll()操作会将其发送出去
} else if (connectionStates.canConnect(nodeConnectionId, now)) {
// we don't have a connection to this node right now, make one
log.debug("Initialize connection to node {} for sending metadata request", node.id());
initiateConnect(node, now);//初始化连接
// If initiateConnect failed immediately, this node will be put into blackout and we
// should allow immediately retrying in case there is another candidate node. If it
// is still connecting, the worst case is that we end up setting a longer timeout
// on the next round and then wait for the response.
} else {
// connected, but can't send more OR connecting
// In either case, we just need to wait for a network event to let us know the selected
// connection might be usable again.
//已成功连接到指定的节点,但不能发送请求,则更新lastNoNodeAvailableMs后等待
this.lastNoNodeAvailableMs = now;
}
}
在收到MetadataResponse之后,会先调用MetaUpdater.maybeHandleCompletedReceive()方法检测是否为MetadataResponse,如果是,就调用handleResponse()解析响应,并构造Cluster对象更新Metadata.cluster字段。
因为cluster是不可变字段,更新集群元数据的方式是:
创建新的Cluster对象,并覆盖Metadata.cluster字段,代码如下:
NetworkClient类里的方法:
@Override
public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
short apiKey = req.request().header().apiKey();
//检测是否为MetadataRequest请求。
if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
handleResponse(req.request().header(), body, now);
return true;
}
return false;
}
private void handleResponse(RequestHeader header, Struct body, long now) {
this.metadataFetchInProgress = false;//收到 MetadataResponse 了,于是修改metadataFetchInProgress=false。
//解析MetadataResponse
MetadataResponse response = new MetadataResponse(body);
//创建Cluster对象
Cluster cluster = response.cluster();
// check if any topics metadata failed to get updated。检测 MetadataResponse 里的错误码。
Map<String, Errors> errors = response.errors();
if (!errors.isEmpty())
log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);
// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
// created which means we will get errors and no nodes until it exists
if (cluster.nodes().size() > 0) {
this.metadata.update(cluster, now);
} else {
log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
this.metadata.failedUpdate(now);
}
}
Metadata类里的方法:
/**
* Update the cluster metadata
*/
public synchronized void update(Cluster cluster, long now) {
this.needUpdate = false;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.version += 1;
//1.通知Metadata上的监听器。
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster);
//更新cluster字段。
// Do this after notifying listeners as subscribed topics' list can be changed by listeners
this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;
notifyAll();
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
当连接断开或其他的异常导致无法获得响应时,由maybeHandleDisconnetion()方法处理,他会将metadataFetchInProgress字段置为false,这样就能顺利的发送下一次更新Metadata请求了。
@Override
public boolean maybeHandleDisconnection(ClientRequest request) {
ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
if (requestKey == ApiKeys.METADATA) {//是否为MetadataRequest请求
Cluster cluster = metadata.fetch();
if (cluster.isBootstrapConfigured()) {
int nodeId = Integer.parseInt(request.request().destination());
Node node = cluster.nodeById(nodeId);
if (node != null)
log.warn("Bootstrap broker {}:{} disconnected", node.host(), node.port());
}
metadataFetchInProgress = false;//更新metadataFetchInProgress = false
return true;
}
return false;
}