首先说下简单的使用,先实例化一个KafkaConsumer
对象,再通过对象的subscribe
方法订阅topic,通过poll
方法获取到数据并做相应处理,完成处理后,调用commitSync
提交获取到数据的偏移量。
//初始化
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(new Properties());
//订阅
consumer.subscribe(Collections.singletonList("topic"));
while (Constant.MQTT_SERVICE_STARTED) {//死循环一直获取
ConsumerRecords<String, String> records = consumer.poll(10000);//获取数据
for (TopicPartition partition : records.partitions()) {//处理数据
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
if (record != null && null != record.value()) {
//TODO:处理
}
}
//偏移量处理
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
那么这样的话,就先从KafkaConsumer
的构造函数开始,进到KafkaConsumer.java
,可以发现,所有的构造函数,最终执行的都是下面这个。
简单的总结一下这个构造函数中干了什么,在这里从传入的properties
参数中读取到一些相关的配置,初始化metadata
、client
、subscriptions
、coordinator
、fetcher
等等,构造函数的相关源码如下,代码不少,暂时可以先只了解一下,后续可以再回过头来看看这个方法。
private KafkaConsumer(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
try {
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
this.time = new SystemTime();
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
Map<String, String> metricsTags = new LinkedHashMap<>();
metricsTags.put("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.tags(metricsTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
// load interceptors and make sure they get clientId
Map<String, Object> userProvidedConfigs = config.originals();
userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.keyDeserializer.configure(config.originals(), true);
} else {
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
this.keyDeserializer = keyDeserializer;
}
if (valueDeserializer == null) {
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.valueDeserializer.configure(config.originals(), false);
} else {
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
this.valueDeserializer = valueDeserializer;
}
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), 0);
String metricGrpPrefix = "consumer";
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
this.metadata,
clientId,
100, // a fixed large enough value will suffice
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(offsetResetStrategy);
List<PartitionAssignor> assignors = config.getConfiguredInstances(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
PartitionAssignor.class);
this.coordinator = new ConsumerCoordinator(this.client,
config.getString(ConsumerConfig.GROUP_ID_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
assignors,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
this.time,
retryBackoffMs,
new ConsumerCoordinator.DefaultOffsetCommitCallback(),
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
this.fetcher = new Fetcher<>(this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
this.keyDeserializer,
this.valueDeserializer,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
this.time,
this.retryBackoffMs);
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
log.debug("Kafka consumer created");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
close(true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer", t);
}
}
既然初始化完成了,接着就是调用KafkaConsumer的subscribe
方法,订阅一下topic,下面看一下subscribe
做了什么处理。
@Override
public void subscribe(Collection<String> topics) {
subscribe(topics, new NoOpConsumerRebalanceListener());
}
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
acquire();
try {
if (topics == null) {
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
} else if (topics.isEmpty()) {
// treat subscribing to empty topic list as the same as unsubscribing
this.unsubscribe();
} else {
for (String topic : topics) {
if (topic == null || topic.trim().isEmpty())
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
this.subscriptions.subscribe(new HashSet<>(topics), listener);
metadata.setTopics(subscriptions.groupSubscription());
}
} finally {
release();
}
}
从subscribe
方法中可以看到,它其实调用就是KafkaConsumer构造函数里初始化的subscriptions
的subscribe
方法,那么就直接接跟进这个方法里。
public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
if (listener == null)
throw new IllegalArgumentException("RebalanceListener cannot be null");
setSubscriptionType(SubscriptionType.AUTO_TOPICS);
this.listener = listener;
changeSubscription(topics);
}
先看第一个方法setSubscriptionType(SubscriptionType.AUTO_TOPICS)
,这里传入了一个参数SubscriptionType.AUTO_TOPICS
,下面简单介绍一下这些模式。
这些模式定义在SubscriptionType枚举类中,KafakaConsumer有三种订阅模式获取要消费的partiton,其中只有AUTO_TOPICS、AUTO_PATTERN模式才需要加入Group获取要消费的partion,USER_ASSIGNED模式则不用,下面简单介绍一下三个模式:
- AUTO_TOPICS模式:通过订阅相关topic,加入到指定Group后,由GroupCoordinator来分配要消费的partition。AUTO_TOPICS模式是topic粒度级别的订阅
- AUTO_PATTERN模式:用户可以指定一个parttern,consumer需要去获取所有topics,然后去匹配parttern,匹配上的那些topic就是要消费的那些topic,之后和AUTO_TOPICS模式加入Group获取要消费的partition。AUTO_PATTERN模式是topic粒度级别的订阅
- USER_ASSIGNED模式:直接执行KafkaConsumer#assign()方法来指定要消费的topic-partition。USER_ASSIGNED模式是parition粒度级别的订阅
如果是AUTO_TOPICS模式,Consumer会去broker拉取指定topics的元数据。如果是AUTO_PATTERN,Consumer就会将所有topics的元数据拉取下来,然后去匹配获取真正要消费的topics是哪些。
这里只是在subscriptions
中设置了一下subscriptionType
值记录。
private enum SubscriptionType {
NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
}
private void setSubscriptionType(SubscriptionType type) {
if (this.subscriptionType == SubscriptionType.NONE)
this.subscriptionType = type;
else if (this.subscriptionType != type)
throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
}
再看第二个方法changeSubscription
,可以简单的理解,这里就是记录一下topic和group:
/* the list of topics the user has requested */
private Set<String> subscription;
/* the list of topics the group has subscribed to (set only for the leader on join group completion) */
private final Set<String> groupSubscription;
private void changeSubscription(Set<String> topicsToSubscribe) {
if (!this.subscription.equals(topicsToSubscribe)) {
this.subscription = topicsToSubscribe;
this.groupSubscription.addAll(topicsToSubscribe);
}
}
到这里,KafakaConsumer对象初始化也完成了,topic也订阅并记录了,那么接下来就应该进行连接及数据获取了。
本文开始的时候,就简单介绍了使用流程,那么接下来就跟进ConsumerRecords<String, String> records = consumer.poll(10000)
这一行看下怎么获取数据的,先进入到KafkaConsumer.java
的poll
方法。
@Override
public ConsumerRecords<K, V> poll(long timeout) {
acquire();
try {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
// poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
do {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
fetcher.sendFetches();
client.pollNoWakeup();
if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
release();
}
}
可以看到,这里又来了一个do...while
循环,每次都是pollOnce
后,如果有数据返回,发送新的拉取请求并返回数据给业务层处理,每次处理都会计算消耗时间,对比timeout
做超时处理逻辑。
稳住,一步步来,先看一下pollOnce里面的处理逻辑。
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
coordinator.poll(time.milliseconds());
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
// if data is available already, return it immediately
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
long now = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
client.poll(pollTimeout, now, new PollCondition() {
@Override
public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
}
});
// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.needRejoin())
return Collections.emptyMap();
return fetcher.fetchedRecords();
}
pollOnce
方法依旧传入了超时时间,最大为KafkaConsumer.poll(long timeout)
的参数值,并在循环时递减消耗时间,在这里也是做超时处理逻辑。
大概介绍一下这个方法的功能
- 1.是通过
KafkaConsmuer
的构造函数中初始化的coordinator
的poll
方法,做好相关的连接处理以及一些未发送的请求发送 - 2.用
fetcher.fetchedRecords
取出数据(fetcher也是在构造函数中初始化的,可以回头看一下构造函数) - 3.如果拉取的数据不为空,则直接返回结果
- 4.如果接取的数据为空,这里就通过
fetcher.sendFetches
发送一次拉取请求,并再次fetcher.fetchedRecords
取一下数据再返回结果
看到这里,就可以看大概知道,获取数据是通过fetcher.fetchedRecords
获取到的,但是我们先不着急去看这个fetcher.fetchedRecords
方法,先跟着代码走,看第一行coordinator.poll
方法里面做了什么,它源码如下:
public void poll(long now) {
invokeCompletedOffsetCommitCallbacks();
if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
ensureCoordinatorReady();
now = time.milliseconds();
}
if (needRejoin()) {
// due to a race condition between the initial metadata fetch and the initial rebalance,
// we need to ensure that the metadata is fresh before joining initially. This ensures
// that we have matched the pattern against the cluster's topics at least once before joining.
if (subscriptions.hasPatternSubscription())
client.ensureFreshMetadata();
ensureActiveGroup();
now = time.milliseconds();
}
pollHeartbeat(now);
maybeAutoCommitOffsetsAsync(now);
}
还是第一行代码开始,先简单说一下invokeCompletedOffsetCommitCallbacks()
做了什么,顾名思义,它就回调完成偏移量提交的方法的,它的源码如下:
void invokeCompletedOffsetCommitCallbacks() {
while (true) {
OffsetCommitCompletion completion = completedOffsetCommits.poll();
if (completion == null)
break;
completion.invoke();
}
}
private static class OffsetCommitCompletion {
private final OffsetCommitCallback callback;
private final Map<TopicPartition, OffsetAndMetadata> offsets;
private final Exception exception;
public OffsetCommitCompletion(OffsetCommitCallback callback, Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
this.callback = callback;
this.offsets = offsets;
this.exception = exception;
}
public void invoke() {
if (callback != null)
callback.onComplete(offsets, exception);
}
}
completedOffsetCommits
就是一个ConcurrentLinkedQueue
,它在ConsumerCoordinator
的构造函数中就初始化了,而调用completion.invoke
实际上就是执行了添加到这个队列里的callback
的onComplete
方法,有点意思了,它说白了就是只做回调处理的功能。
这个队列主要是以下几种情况中会添加数据,这三个添加点,后续会看到相关的调用,现在只需要理解它干了什么就行。
- 1.提交偏移量时,节点不可用,导致查找节点失败时,添加一个带异常的
OffsetCommitCompletion
- 2.提交偏移量成功时,添加一个不带异常信息的
OffsetCommitCompletion
- 3.提交偏移量失败时,添加一个带异常信息的
OffsetCommitCompletion
那么接下来再跟着看coordinator.poll
方法,它第二个执行的代码段如下:
if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
ensureCoordinatorReady();
now = time.milliseconds();
}
public boolean partitionsAutoAssigned() {
return this.subscriptionType == SubscriptionType.AUTO_TOPICS || this.subscriptionType == SubscriptionType.AUTO_PATTERN;
}
先进去subscriptions.partitionsAutoAssigned
看一下,发现它就是用来判断订阅topic是用什么方式订阅的,上文在分析订阅subscribe
方法的源码时,我们知道只有AUTOTOPICS、AUTOPATTERN模式才需要加入Group获取要消费的partion,USER_ASSIGNED模式则不用,所以这里判断了是否是USER_ASSIGNED模式。
我们用consumer.subscribe
订阅时,调用setSubscriptionType
默认的参数就是SubscriptionType.AUTO_TOPICS
,所以这里需要看一下ensureCoordinatorReady
方法是做什么的,源码如下,有几个调用到的方法,一并都贴出来了。
public synchronized void ensureCoordinatorReady() {
while (coordinatorUnknown()) {
RequestFuture<Void> future = lookupCoordinator();
client.poll(future);
if (future.failed()) {
if (future.isRetriable())
client.awaitMetadataUpdate();
else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
coordinatorDead();
time.sleep(retryBackoffMs);
}
}
}
protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// find a node to ask about the coordinator
Node node = this.client.leastLoadedNode();
if (node == null) {
// TODO: If there are no brokers left, perhaps we should use the bootstrap set
// from configuration?
return RequestFuture.noBrokersAvailable();
} else
findCoordinatorFuture = sendGroupCoordinatorRequest(node);
}
return findCoordinatorFuture;
}
private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
// initiate the group metadata request
log.debug("Sending coordinator request for group {} to broker {}", groupId, node);
GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
.compose(new GroupCoordinatorResponseHandler());
}
private class GroupCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
@Override
public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
// use MAX_VALUE - node.id as the coordinator id to mimic separate connections
// for the coordinator in the underlying network client layer
// TODO: this needs to be better handled in KAFKA-1935
Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
clearFindCoordinatorFuture();
if (error == Errors.NONE) {
synchronized (AbstractCoordinator.this) {
AbstractCoordinator.this.coordinator = new Node(
Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
groupCoordinatorResponse.node().host(),
groupCoordinatorResponse.node().port());
log.info("Discovered coordinator {} for group {}.", coordinator, groupId);
client.tryConnect(coordinator);
heartbeat.resetTimeouts(time.milliseconds());
}
future.complete(null);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
log.debug("Group coordinator lookup for group {} failed: {}", groupId, error.message());
future.raise(error);
}
}
@Override
public void onFailure(RuntimeException e, RequestFuture<Void> future) {
clearFindCoordinatorFuture();
super.onFailure(e, future);
}
}
这里总结下,它的大概逻辑就是:
- 1.调用
lookupCoordinator
,通过client.leastLoadedNode()
找到一个未完成请求最少的节点,通过它发送一个ApiKeys.GROUP_COORDINATOR
请求,并添加一个回调监听GroupCoordinatorResponseHandler
,同进 - 2.通过
client.poll
发送请求 - 3.在
GroupCoordinatorResponseHandler
的onSuccess中监听结果处理,如果返回数据没有异常,则调用client.tryConnect
进行连接
在这里,我们可以发现,所有以请求都是通过client
这个对象进行处理了,比如client.leastLoadedNode
、client.send
、client.tryConnect
、client.poll
、client.awaitMetadataUpdate
。
那么这个对象怎么来的?再回头看一下KafkaConsumer
的构造方法,client是在这里进行初始化的,并且在初始化coordinator
、fetcher
时,都把它传入进去了,也就是说这里用的就是kafkaConsumer
里的初始化的client。
调用的client的这几个方法,从方法名上就很容易看出来它干了什么,这里就看一下client.send
方法,它主要就是构建一些API请求,上面这里sendGroupCoordinatorRequest
发送请求时,传入的就是ApiKeys.GROUP_COORDINATOR
,通进client.send
方法,会把它放到一个unsent
的HashMap中等待发送,只要一调用的trySend
,就会循环发送所有unsent
中的请求,client.send
方法的源码如下:
private RequestFuture<ClientResponse> send(Node node,
ApiKeys api,
short version,
AbstractRequest request) {
long now = time.milliseconds();
RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
RequestHeader header = client.nextRequestHeader(api, version);
RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
put(node, new ClientRequest(now, true, send, completionHandler));
// wakeup the client in case it is blocking in poll so that we can send the queued request
client.wakeup();
return completionHandler.future;
}
private void put(Node node, ClientRequest request) {
synchronized (this) {
List<ClientRequest> nodeUnsent = unsent.get(node);
if (nodeUnsent == null) {
nodeUnsent = new ArrayList<>();
unsent.put(node, nodeUnsent);
}
nodeUnsent.add(request);
}
}
那么在哪里会调用trySend
这个方法呢?可以发现,只有在ConsumerNetworkClient.poll
方法中,才会调用trySend
把这些数据给发送出去,在上面ensureCoordinatorReady
逻辑的第2步,可以看到调用了client.poll
方法,把数据一并给发送出去了,下面这个是ConsumerNetworkClient.poll
方法的源码。
public void poll(long timeout, long now, PollCondition pollCondition) {
// there may be handlers which need to be invoked if we woke up the previous call to poll
firePendingCompletedRequests();
synchronized (this) {
// send all the requests we can send now
trySend(now);
// check whether the poll is still needed by the caller. Note that if the expected completion
// condition becomes satisfied after the call to shouldBlock() (because of a fired completion
// handler), the client will be woken up.
if (pollCondition == null || pollCondition.shouldBlock()) {
// if there are no requests in flight, do not block longer than the retry backoff
if (client.inFlightRequestCount() == 0)
timeout = Math.min(timeout, retryBackoffMs);
client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now);
now = time.milliseconds();
} else {
client.poll(0, now);
}
// handle any disconnects by failing the active requests. note that disconnects must
// be checked immediately following poll since any subsequent call to client.ready()
// will reset the disconnect status
checkDisconnects(now);
// trigger wakeups after checking for disconnects so that the callbacks will be ready
// to be fired on the next call to poll()
maybeTriggerWakeup();
// try again to send requests since buffer space may have been
// cleared or a connect finished in the poll
trySend(now);
// fail requests that couldn't be sent if they have expired
failExpiredRequests(now);
}
// called without the lock to avoid deadlock potential if handlers need to acquire locks
firePendingCompletedRequests();
}
而调用trySend
实际上就是调用了NetworkClient.java
的send
方法,通过selector.send
发送请求,下面贴上相关调用的源码。
private boolean trySend(long now) {
// send any requests that can be sent now
boolean requestsSent = false;
for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
Node node = requestEntry.getKey();
Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
while (iterator.hasNext()) {
ClientRequest request = iterator.next();
if (client.ready(node, now)) {
client.send(request, now);
iterator.remove();
requestsSent = true;
}
}
}
return requestsSent;
}
@Override
public void send(ClientRequest request, long now) {
String nodeId = request.request().destination();
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
doSend(request, now);
}
private void doSend(ClientRequest request, long now) {
request.setSendTimeMs(now);
this.inFlightRequests.add(request);
selector.send(request.request());
}
//selector
public void send(Send send) {
KafkaChannel channel = channelOrFail(send.destination());
try {
channel.setSend(send);
} catch (CancelledKeyException e) {
this.failedSends.add(send.destination());
close(channel);
}
}
//KafkaChannel
private final TransportLayer transportLayer;
private Send send;
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
public Send write() throws IOException {
Send result = null;
if (send != null && send(send)) {
result = send;
send = null;
}
return result;
}
private boolean send(Send send) throws IOException {
send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();
}
这里就是把数据保存到了KafkaChannel
的send
对象中,通过调用selector.poll
到selector.pollSelectionKeys
,再调用KafkaChannel.write
到KafkaChannel.send
方法,由send.writeTo
将数据发送出去。
好了,跟到这里就当它已发送出去了,有兴趣的可以跟进KafkaChannel
和Send
去深入看一下,这里暂时只分析KafkaConsumer的数据逻辑。
到这里绕代码已经有点多,目前跟着代码走,也只是连上节点,还没有做加入Group处理,先简单总结一下
- 1.判断是AUTO_TOPICS或AUTO_PATTERN,准备加入组
- 2.找到一个未完成请求最少的节点,通过它发送一个
GROUP_COORDINATOR
请求,并在响应后,连上这个节点
然后再回到coordinator.poll
方法中去,跟着看下一个要执行的代码块。
if (needRejoin()) {
// due to a race condition between the initial metadata fetch and the initial rebalance,
// we need to ensure that the metadata is fresh before joining initially. This ensures
// that we have matched the pattern against the cluster's topics at least once before joining.
if (subscriptions.hasPatternSubscription())
client.ensureFreshMetadata();
ensureActiveGroup();
now = time.milliseconds();
}
这里明显的看到加入组的逻辑代码,那就直接跟进AbstractCoordinator
的ensureActiveGroup
方法中。
首先看到是启动了一个心跳线程,这个线程主要是发送心跳请求,同时有一些判断节点可用并重连、节点dead判断、节点入组离开组等,发送心跳请求,心跳请求也是调用的client.send
方法,传入的是ApiKeys.HEARTBEAT
。
public void ensureActiveGroup() {
// always ensure that the coordinator is ready because we may have been disconnected
// when sending heartbeats and does not necessarily require us to rejoin the group.
ensureCoordinatorReady();
startHeartbeatThreadIfNeeded();
joinGroupIfNeeded();
}
private synchronized void startHeartbeatThreadIfNeeded() {
if (heartbeatThread == null) {
heartbeatThread = new HeartbeatThread();
heartbeatThread.start();
}
}
//HeartbeatThread
synchronized RequestFuture<Void> sendHeartbeatRequest() {
HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation.generationId, this.generation.memberId);
return client.send(coordinator, ApiKeys.HEARTBEAT, req)
.compose(new HeartbeatResponseHandler());
}
这里主要看一下joinGroupIfNeeded
方法处理逻辑,这个方法调用是initiateJoinGroup
通过sendJoinGroupRequest
发送一个ApiKeys.JOIN_GROUP
请求,看到没,这里我们又调用了client.send
,从上面的源码分析可以知道,client.send
只是把请求放到了一个unsent
的集合中,等待调用trySend
进行发送,而trySend
只由client.poll
进行调用,所以,这里initiateJoinGroup
后,又调用了client.poll
方法,把数据发送出去,源码如下:
void joinGroupIfNeeded() {
while (needRejoin() || rejoinIncomplete()) {
ensureCoordinatorReady();
// call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
// time if the client is woken up before a pending rebalance completes. This must be called
// on each iteration of the loop because an event requiring a rebalance (such as a metadata
// refresh which changes the matched subscription set) can occur while another rebalance is
// still in progress.
if (needsJoinPrepare) {
onJoinPrepare(generation.generationId, generation.memberId);
needsJoinPrepare = false;
}
RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future);
resetJoinGroupFuture();
if (future.succeeded()) {
needsJoinPrepare = true;
onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());
} else {
RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException)
continue;
else if (!future.isRetriable())
throw exception;
time.sleep(retryBackoffMs);
}
}
}
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
// we store the join future in case we are woken up by the user after beginning the
// rebalance in the call to poll below. This ensures that we do not mistakenly attempt
// to rejoin before the pending rebalance has completed.
if (joinFuture == null) {
// fence off the heartbeat thread explicitly so that it cannot interfere with the join group.
// Note that this must come after the call to onJoinPrepare since we must be able to continue
// sending heartbeats if that callback takes some time.
disableHeartbeatThread();
state = MemberState.REBALANCING;
joinFuture = sendJoinGroupRequest();
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer value) {
// handle join completion in the callback so that the callback will be invoked
// even if the consumer is woken up before finishing the rebalance
synchronized (AbstractCoordinator.this) {
log.info("Successfully joined group {} with generation {}", groupId, generation.generationId);
state = MemberState.STABLE;
if (heartbeatThread != null)
heartbeatThread.enable();
}
}
@Override
public void onFailure(RuntimeException e) {
// we handle failures below after the request finishes. if the join completes
// after having been woken up, the exception is ignored and we will rejoin
synchronized (AbstractCoordinator.this) {
state = MemberState.UNJOINED;
}
}
});
}
return joinFuture;
}
private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
// send a join group request to the coordinator
log.info("(Re-)joining group {}", groupId);
JoinGroupRequest request = new JoinGroupRequest(
groupId,
this.sessionTimeoutMs,
this.rebalanceTimeoutMs,
this.generation.memberId,
protocolType(),
metadata());
log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator);
return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
.compose(new JoinGroupResponseHandler());
}
跟到这里,加入Group逻辑就整理完了。
再次回到coordinator.poll
,上面处理完成后,假设正常,那么它就连接节点并加入组,但是并没有接收消费数据,那接下来再跟着看最一行代码maybeAutoCommitOffsetsAsync
里的逻辑。
maybeAutoCommitOffsetsSync
方法会根据enable.auto.commit
配置项决定是否自动提交偏移量,也就是说,如果配置false的话,是不会自动提交偏移量的,既然到这里了,那就跟进这个方法看看它是怎么做的吧,因为上面在coordinator.poll
的第一行,还有一个invokeCompletedOffsetCommitCallbacks
需要理解下。
先来看这个doAutoCommitOffsetsAsync
方法,它实际上是通过doAutoCommitOffsetsAsync
调用commitOffsetsAsync
方法
private void doAutoCommitOffsetsAsync() {
commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
if (exception instanceof RetriableException)
nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
} else {
log.debug("Completed autocommit of offsets {} for group {}", offsets, groupId);
}
}
});
}
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
invokeCompletedOffsetCommitCallbacks();
if (!coordinatorUnknown()) {
doCommitOffsetsAsync(offsets, callback);
} else {
// we don't know the current coordinator, so try to find it and then send the commit
// or fail (we don't want recursive retries which can cause offset commits to arrive
// out of order). Note that there may be multiple offset commits chained to the same
// coordinator lookup request. This is fine because the listeners will be invoked in
// the same order that they were added. Note also that AbstractCoordinator prevents
// multiple concurrent coordinator lookup requests.
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
doCommitOffsetsAsync(offsets, callback);
}
@Override
public void onFailure(RuntimeException e) {
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e)));
}
});
}
// ensure the commit has a chance to be transmitted (without blocking on its completion).
// Note that commits are treated as heartbeats by the coordinator, so there is no need to
// explicitly allow heartbeats through delayed task execution.
client.pollNoWakeup();
}
从commitOffsetsAsync
可以看到,如果节点不可用时,会再次查找节点,并生成一个OffsetCommitCompletion
,添加到completedOffsetCommits
队列中。
还记得上文说的,在coordinator.poll
方法第一行invokeCompletedOffsetCommitCallbacks
做了什么吗?就是遍历completedOffsetCommits
,并一个个执行OffsetCommitCallback
方法,这个OffsetCommitCallback
就是在doAutoCommitOffsetsAsync
这个方法里初始化的。
commitOffsetsAsync
在节点正常的情况就会从doCommitOffsetsAsync
走下去,而这个方法,它会调用sendOffsetCommitRequest
发送提交请求,同时在这里可以看到,它不论成功或失败,也都会生成一个OffsetCommitCompletion
放到completedOffsetCommits
队列中,在调用invokeCompletedOffsetCommitCallbacks
时,会处理相关callback。
private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
this.subscriptions.needRefreshCommits();
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
future.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
if (interceptors != null)
interceptors.onCommit(offsets);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
}
@Override
public void onFailure(RuntimeException e) {
Exception commitException = e;
if (e instanceof RetriableException)
commitException = new RetriableCommitFailedException(e);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
}
});
}
好了,接下来看看它是怎么发送偏移量请求,并获取到数据的。从上面这个方法看到它实际上调用sendOffsetCommitRequest
,这个方法在调用consumer.commitSync
手动同步提交偏移量里,也是从这里处理,所以还是得跟进里面看看它做了啥。
private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (offsets.isEmpty())
return RequestFuture.voidSuccess();
Node coordinator = coordinator();
if (coordinator == null)
return RequestFuture.coordinatorNotAvailable();
// create the offset commit request
Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
OffsetAndMetadata offsetAndMetadata = entry.getValue();
offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
}
final Generation generation;
if (subscriptions.partitionsAutoAssigned())
generation = generation();
else
generation = Generation.NO_GENERATION;
// if the generation is null, we are not part of an active group (and we expect to be).
// the only thing we can do is fail the commit and let the user rejoin the group in poll()
if (generation == null)
return RequestFuture.failure(new CommitFailedException());
OffsetCommitRequest req = new OffsetCommitRequest(
this.groupId,
generation.generationId,
generation.memberId,
OffsetCommitRequest.DEFAULT_RETENTION_TIME,
offsetData);
log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);
return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
.compose(new OffsetCommitResponseHandler(offsets));
}
这个方法最后走的是client.send
发送ApiKeys.OFFSET_COMMIT
,同时添加了一个OffsetCommitResponseHandler
的回调处理。
下面来看一下OffsetCommitResponseHandler
类,它到底干了啥?在发送ApiKeys.OFFSET_COMMIT
请求成功后,它获取到的response做了什么处理。
private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
private final Map<TopicPartition, OffsetAndMetadata> offsets;
public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
this.offsets = offsets;
}
@Override
public OffsetCommitResponse parse(ClientResponse response) {
return new OffsetCommitResponse(response.responseBody());
}
@Override
public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
sensors.commitLatency.record(response.requestLatencyMs());
Set<String> unauthorizedTopics = new HashSet<>();
for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
long offset = offsetAndMetadata.offset();
Errors error = Errors.forCode(entry.getValue());
if (error == Errors.NONE) {
log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp);
if (subscriptions.isAssigned(tp))
// update the local cache only if the partition is still assigned
subscriptions.committed(tp, offsetAndMetadata);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
log.error("Not authorized to commit offsets for group {}", groupId);
future.raise(new GroupAuthorizationException(groupId));
return;
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
unauthorizedTopics.add(tp.topic());
} else if (error == Errors.OFFSET_METADATA_TOO_LARGE
|| error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
// raise the error to the user
log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
future.raise(error);
return;
} else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
// just retry
log.debug("Offset commit for group {} failed: {}", groupId, error.message());
future.raise(error);
return;
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP
|| error == Errors.REQUEST_TIMED_OUT) {
log.debug("Offset commit for group {} failed: {}", groupId, error.message());
coordinatorDead();
future.raise(error);
return;
} else if (error == Errors.UNKNOWN_MEMBER_ID
|| error == Errors.ILLEGAL_GENERATION
|| error == Errors.REBALANCE_IN_PROGRESS) {
// need to re-join group
log.debug("Offset commit for group {} failed: {}", groupId, error.message());
resetGeneration();
future.raise(new CommitFailedException());
return;
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic"));
return;
} else {
log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message());
future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
return;
}
}
if (!unauthorizedTopics.isEmpty()) {
log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId);
future.raise(new TopicAuthorizationException(unauthorizedTopics));
} else {
future.complete(null);
}
}
}
从handle方法中,看到它获取到了OffsetAndMetadata
,这个就是数据的偏移量元数据。这里,我们就只看没有异常的处理逻辑,它最终调用的是subscriptions.committed(tp, offsetAndMetadata)
将数据保存到assignment
中,这个subscriptions
又是何方神圣?
回过头来,再看看KafkaConsumer
的构造函数,它就是在这里生成的。
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(offsetResetStrategy);
好了,知道subscriptions
是啥,也是知道这些偏移量元数据是保存在subscriptions
的assignment
中了,到这里,整个coordinator.poll
方法也跟着走完了,是时候再往上一级,看看KafkaConsumer
的pollOnce
了,直接把这个方法的源码再贴一下,不用再向上翻了。
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
coordinator.poll(time.milliseconds());
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
// if data is available already, return it immediately
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
long now = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
client.poll(pollTimeout, now, new PollCondition() {
@Override
public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
}
});
// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.needRejoin())
return Collections.emptyMap();
return fetcher.fetchedRecords();
}
从这个方法可以看出,它最终返回的结果是fetcher.fetchedRecords()
,并且在coordinator.poll
结束后,会先调用一下fetcher.fetchedRecords()
获取数据,如果有结果的话,直接返回结果,反之就调用fetcher.sendFetches()
发送拉取请求,然后再返回fetcher.fetchedRecords()
。发送请求,再拉取数据,可以简单这么理解。
这个fetcher
是哪来的?再回过头来看一下KafkaConsumer
的构造函数,它就是在这里生成的,那么首先我们跟进去看一下发送请求fetcher.sendFetches()
它到底干了些什么。
public void sendFetches() {
for (Map.Entry<Node, FetchRequest> fetchEntry : createFetchRequests().entrySet()) {
final FetchRequest request = fetchEntry.getValue();
final Node fetchTarget = fetchEntry.getKey();
client.send(fetchTarget, ApiKeys.FETCH, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
FetchResponse response = new FetchResponse(resp.responseBody());
if (!matchesRequestedPartitions(request, response)) {
// obviously we expect the broker to always send us valid responses, so this check
// is mainly for test cases where mock fetch responses must be manually crafted.
log.warn("Ignoring fetch response containing partitions {} since it does not match " +
"the requested partitions {}", response.responseData().keySet(),
request.fetchData().keySet());
return;
}
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = request.fetchData().get(partition).offset;
FetchResponse.PartitionData fetchData = entry.getValue();
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
}
sensors.fetchLatency.record(resp.requestLatencyMs());
sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
}
@Override
public void onFailure(RuntimeException e) {
log.debug("Fetch request to {} failed", fetchTarget, e);
}
});
}
}
看到没,它又是调用client.send
发送了一个ApiKeys.FETCH
的请求,然后在onSuccess
进行数据处理。
在Listener中可以看到,它最终在for循环中,把拉取到的数据,保存到completedFetches
这个队列里,放到这个队列里,就是给fetcher.fetchedRecords()
方法获取数据用的,那么接下来就看一下fetcher.fetchedRecords()
获取逻辑。
private PartitionRecords<K, V> nextInLineRecords = null;
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
int recordsRemaining = maxPollRecords;
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
CompletedFetch completedFetch = completedFetches.poll();
if (completedFetch == null)
break;
nextInLineRecords = parseFetchedData(completedFetch);
} else {
TopicPartition partition = nextInLineRecords.partition;
List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining);
if (!records.isEmpty()) {
List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
if (currentRecords == null) {
drained.put(partition, records);
} else {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
// we have to copy to a new list because the old one may be immutable
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
drained.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
}
return drained;
}
这个方法先从completedFetches
这个队列中取出来一个,通过parseFetchedData
解析获取到它的PartitionRecords
。
private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
TopicPartition tp = completedFetch.partition;
FetchResponse.PartitionData partition = completedFetch.partitionData;
long fetchOffset = completedFetch.fetchedOffset;
int bytes = 0;
int recordsCount = 0;
PartitionRecords<K, V> parsedRecords = null;
Errors error = Errors.forCode(partition.errorCode);
try {
if (!subscriptions.isFetchable(tp)) {
// this can happen when a rebalance happened or a partition consumption paused
// while fetch is still in-flight
log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
} else if (error == Errors.NONE) {
// we are interested in this fetch only if the beginning offset matches the
// current consumed position
Long position = subscriptions.position(tp);
if (position == null || position != fetchOffset) {
log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
"the expected offset {}", tp, fetchOffset, position);
return null;
}
ByteBuffer buffer = partition.recordSet;
MemoryRecords records = MemoryRecords.readableRecords(buffer);
List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
for (LogEntry logEntry : records) {
// Skip the messages earlier than current position.
if (logEntry.offset() >= position) {
parsed.add(parseRecord(tp, logEntry));
bytes += logEntry.size();
}
}
recordsCount = parsed.size();
this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, recordsCount);
if (!parsed.isEmpty()) {
log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
}
} else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
this.metadata.requestUpdate();
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.warn("Received unknown topic or partition error in fetch for partition {}. The topic/partition " +
"may not exist or the user may not have Describe access to it", tp);
this.metadata.requestUpdate();
} else if (error == Errors.OFFSET_OUT_OF_RANGE) {
if (fetchOffset != subscriptions.position(tp)) {
log.debug("Discarding stale fetch response for partition {} since the fetched offset {}" +
"does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
} else if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
subscriptions.needOffsetReset(tp);
} else {
throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
}
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
log.warn("Not authorized to read from topic {}.", tp.topic());
throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
} else if (error == Errors.UNKNOWN) {
log.warn("Unknown error fetching data for topic-partition {}", tp);
} else {
throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
}
} finally {
completedFetch.metricAggregator.record(tp, bytes, recordsCount);
}
// we move the partition to the end if we received some bytes or if there was an error. This way, it's more
// likely that partitions for the same topic can remain together (allowing for more efficient serialization).
if (bytes > 0 || error != Errors.NONE)
subscriptions.movePartitionToEnd(tp);
return parsedRecords;
}
这里面有一个判断subscriptions.isFetchable(tp)
,就是通过sendOffsetCommitRequest
获取到的OffsetAndMetadata
,它保存的就是在subscriptions
的assignment
中,就是提交偏移量后,在回调里进行保存的数据。
public boolean isFetchable(TopicPartition tp) {
return isAssigned(tp) && assignedState(tp).isFetchable();
}
public boolean isAssigned(TopicPartition tp) {
return assignment.contains(tp);
}
接下来跟着看正常情况下走error == Errors.NONE
的逻辑,它会根据subscriptions
中这个TopicPartition
所在的Position,生成一个parsedRecords
返回交由fetcher.fetchedRecords()
的while循环继续处理,再返回到fetcher.fetchedRecords()
,这里再贴下fetcher.fetchedRecords()
的代码:
private PartitionRecords<K, V> nextInLineRecords = null;
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
int recordsRemaining = maxPollRecords;
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
CompletedFetch completedFetch = completedFetches.poll();
if (completedFetch == null)
break;
nextInLineRecords = parseFetchedData(completedFetch);
} else {
TopicPartition partition = nextInLineRecords.partition;
List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining);
if (!records.isEmpty()) {
List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
if (currentRecords == null) {
drained.put(partition, records);
} else {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
// we have to copy to a new list because the old one may be immutable
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
drained.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
}
return drained;
}
接下来跟着看while循环,在if中的处理,正常获取到数据的话,会给nextInLineRecords
赋值并不为空,所以直接看else代码块逻辑处理,它通过drainRecords
解析出数据,并将数据添加了drained
这个HashMap中,跳出循环,返回数据。
private List<ConsumerRecord<K, V>> drainRecords(PartitionRecords<K, V> partitionRecords, int maxRecords) {
if (partitionRecords.isDrained())
return Collections.emptyList();
if (!subscriptions.isAssigned(partitionRecords.partition)) {
// this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);
} else {
// note that the consumed position should always be available as long as the partition is still assigned
long position = subscriptions.position(partitionRecords.partition);
if (!subscriptions.isFetchable(partitionRecords.partition)) {
// this can happen when a partition is paused before fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
} else if (partitionRecords.fetchOffset == position) {
// we are ensured to have at least one record since we already checked for emptiness
List<ConsumerRecord<K, V>> partRecords = partitionRecords.drainRecords(maxRecords);
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
"position to {}", position, partitionRecords.partition, nextOffset);
subscriptions.position(partitionRecords.partition, nextOffset);
return partRecords;
} else {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
partitionRecords.partition, partitionRecords.fetchOffset, position);
}
}
partitionRecords.drain();
return Collections.emptyList();
}
好了,到这里,整个获取数据的流程都跟看分析完了,我们知道,在new KafkaConsumer<>(new Properties())
的这个properties
中,如果没有配置enable.auto.commit
的话,是不会自动提交偏移量的,所以,在获取完成数据并处理后,需要手动提交一下偏移量,调用consumer.commitSync
,这里最后再跟进去看一下偏移量手动提交的源码吧。
public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
invokeCompletedOffsetCommitCallbacks();
if (offsets.isEmpty())
return;
while (true) {
ensureCoordinatorReady();
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
client.poll(future);
if (future.succeeded()) {
if (interceptors != null)
interceptors.onCommit(offsets);
return;
}
if (!future.isRetriable())
throw future.exception();
time.sleep(retryBackoffMs);
}
}
它实际上调用的还是sendOffsetCommitRequest
,和上面在coordinator.poll
中调用的maybeAutoCommitOffsetsAsync
走的是一个逻辑,只不过在这里,它是一个同步的调用,需要等待提交完成后并返回结果才能继续处理,这里就不再重复分析了。