下面来说一下Kafka的consumer设计。在最新版本的Kafka中,consumer group管理依赖于broker的协调者管理consumer的实例并下发分配方案到每一个consumer实例上。由consumer leader指定分配方案。该分配方案是consumer group的每一个成员都同意的。当有新的consumer加入或者有consumer退出consumer group时,会进行rebalance。Kafka有自己的rebalance协议。下面我们来看一下Kafka源码是怎么实现的。
先来看一下KafkaConsumer的poll方法
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}
// poll for new data until the timeout expires
do {
client.maybeTriggerWakeup();
if (includeMetadataInTimeout) {
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn("Still waiting for metadata");
}
}
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.pollNoWakeup();
}
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());
return ConsumerRecords.empty();
} finally {
release();
}
}
再到pollForFetches方法
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
long pollTimeout = coordinator == null ? timer.remainingMs() :
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
// if data is available already, return it immediately
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure
// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}
Timer pollTimer = time.timer(pollTimeout);
client.poll(pollTimer, () -> {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
});
timer.update(pollTimer.currentTimeMs());
// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator != null && coordinator.rejoinNeededOrPending()) {
return Collections.emptyMap();
}
return fetcher.fetchedRecords();
}
Kafka有3种请求类型,一种是PRODUCE,就是生产消息发送到broker上,一种是FETCH,就是从broker上拉取消息,一种是METADATA,就是获取broker上的数据。这里就是发送FETCH请求,通过Fetcher进行fetch操作。
我们来看一下Fetcher的fetchedRecords方法
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
int recordsRemaining = maxPollRecords;
try {
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isFetched) {
CompletedFetch completedFetch = completedFetches.peek();
if (completedFetch == null) break;
try {
nextInLineRecords = parseCompletedFetch(completedFetch);
} catch (Exception e) {
// Remove a completedFetch upon a parse with exception if (1) it contains no records, and
// (2) there are no fetched records with actual content preceding this exception.
// The first condition ensures that the completedFetches is not stuck with the same completedFetch
// in cases such as the TopicAuthorizationException, and the second condition ensures that no
// potential data loss due to an exception in a following record.
FetchResponse.PartitionData partition = completedFetch.partitionData;
if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) {
completedFetches.poll();
}
throw e;
}
completedFetches.poll();
} else {
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
TopicPartition partition = nextInLineRecords.partition;
if (!records.isEmpty()) {
List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
if (currentRecords == null) {
fetched.put(partition, records);
} else {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
// we have to copy to a new list because the old one may be immutable
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
fetched.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
}
} catch (KafkaException e) {
if (fetched.isEmpty())
throw e;
}
return fetched;
}
其实就是从completedFetches中拿获取到的消息,我们来看一下completedFetches这个属性
private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
这是一个ConcurrentLinkedQueue,我们来看一下sendFetches方法
public synchronized int sendFetches() {
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();
final FetchRequest.Builder request = FetchRequest.Builder
.forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
.isolationLevel(isolationLevel)
.setMaxBytes(this.maxBytes)
.metadata(data.metadata())
.toForget(data.toForget());
if (log.isDebugEnabled()) {
log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);
}
client.send(fetchTarget, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
synchronized (Fetcher.this) {
@SuppressWarnings("unchecked")
FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
FetchSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler == null) {
log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
fetchTarget.id());
return;
}
if (!handler.handleResponse(response)) {
return;
}
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;
FetchResponse.PartitionData<Records> fetchData = entry.getValue();
log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
isolationLevel, fetchOffset, partition, fetchData);
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));
}
sensors.fetchLatency.record(resp.requestLatencyMs());
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (Fetcher.this) {
FetchSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler != null) {
handler.handleError(e);
}
}
}
});
}
return fetchRequestMap.size();
}
调用ConsumerNetworkClient进行数据的发送,我们来看一下ConsumerNetworkClient的send方法
public RequestFuture<ClientResponse> send(Node node,
AbstractRequest.Builder<?> requestBuilder,
int requestTimeoutMs) {
long now = time.milliseconds();
RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
requestTimeoutMs, completionHandler);
unsent.put(node, clientRequest);
// wakeup the client in case it is blocking in poll so that we can send the queued request
client.wakeup();
return completionHandler.future;
}
把要发送的请求封装成ClientRequest对象,然后放进unsent中,我们来看一下unsent是什么,来看一下UnsentRequests这个类
private final ConcurrentMap<Node, ConcurrentLinkedQueue<ClientRequest>> unsent;
private UnsentRequests() {
unsent = new ConcurrentHashMap<>();
}
public void put(Node node, ClientRequest request) {
// the lock protects the put from a concurrent removal of the queue for the node
synchronized (unsent) {
ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
if (requests == null) {
requests = new ConcurrentLinkedQueue<>();
unsent.put(node, requests);
}
requests.add(request);
}
}
其实就是一个ConcurrentHashMap,这个map的key是Node,value是存放ClientRequest的ConcurrentLinkedQueue。我们再来看一下ConsumerNetworkClient的poll方法
public void poll(Timer timer, PollCondition pollCondition, boolean disableWakeup) {
// there may be handlers which need to be invoked if we woke up the previous call to poll
firePendingCompletedRequests();
lock.lock();
try {
// Handle async disconnects prior to attempting any sends
handlePendingDisconnects();
// send all the requests we can send now
long pollDelayMs = trySend(timer.currentTimeMs());
// check whether the poll is still needed by the caller. Note that if the expected completion
// condition becomes satisfied after the call to shouldBlock() (because of a fired completion
// handler), the client will be woken up.
if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) {
// if there are no requests in flight, do not block longer than the retry backoff
long pollTimeout = Math.min(timer.remainingMs(), pollDelayMs);
if (client.inFlightRequestCount() == 0)
pollTimeout = Math.min(pollTimeout, retryBackoffMs);
client.poll(pollTimeout, timer.currentTimeMs());
} else {
client.poll(0, timer.currentTimeMs());
}
timer.update();
// handle any disconnects by failing the active requests. note that disconnects must
// be checked immediately following poll since any subsequent call to client.ready()
// will reset the disconnect status
checkDisconnects(timer.currentTimeMs());
if (!disableWakeup) {
// trigger wakeups after checking for disconnects so that the callbacks will be ready
// to be fired on the next call to poll()
maybeTriggerWakeup();
}
// throw InterruptException if this thread is interrupted
maybeThrowInterruptException();
// try again to send requests since buffer space may have been
// cleared or a connect finished in the poll
trySend(timer.currentTimeMs());
// fail requests that couldn't be sent if they have expired
failExpiredRequests(timer.currentTimeMs());
// clean unsent requests collection to keep the map from growing indefinitely
unsent.clean();
} finally {
lock.unlock();
}
// called without the lock to avoid deadlock potential if handlers need to acquire locks
firePendingCompletedRequests();
}
再到trySend方法
private long trySend(long now) {
long pollDelayMs = maxPollTimeoutMs;
// send any requests that can be sent now
for (Node node : unsent.nodes()) {
Iterator<ClientRequest> iterator = unsent.requestIterator(node);
if (iterator.hasNext())
pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));
while (iterator.hasNext()) {
ClientRequest request = iterator.next();
if (client.ready(node, now)) {
client.send(request, now);
iterator.remove();
}
}
}
return pollDelayMs;
}
就是从unsent中取出ClientRequest对象,调用client进行发送。要发送的消息会是一个ConsumerRecord,我们来看一下这个类有哪些属性
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private final Optional<Integer> leaderEpoch;
private volatile Long checksum;
基本上所有的consumer发送消息的属性这里都有了,topic,partition,offset,还有序列化器,然后会在fetch的时候把结果封装成ConsumerRecord返回。我们看一下Fetcher的fetchRecords方法
private List<ConsumerRecord<K, V>> fetchRecords(PartitionRecords partitionRecords, int maxRecords) {
if (!subscriptions.isAssigned(partitionRecords.partition)) {
// this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for partition {} since it is no longer assigned",
partitionRecords.partition);
} else if (!subscriptions.isFetchable(partitionRecords.partition)) {
// this can happen when a partition is paused before fetched records are returned to the consumer's
// poll call or if the offset is being reset
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
partitionRecords.partition);
} else {
long position = subscriptions.position(partitionRecords.partition);
if (partitionRecords.nextFetchOffset == position) {
List<ConsumerRecord<K, V>> partRecords = partitionRecords.fetchRecords(maxRecords);
long nextOffset = partitionRecords.nextFetchOffset;
log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
"position to {}", position, partitionRecords.partition, nextOffset);
subscriptions.position(partitionRecords.partition, nextOffset);
Long partitionLag = subscriptions.partitionLag(partitionRecords.partition, isolationLevel);
if (partitionLag != null)
this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag);
Long lead = subscriptions.partitionLead(partitionRecords.partition);
if (lead != null) {
this.sensors.recordPartitionLead(partitionRecords.partition, lead);
}
return partRecords;
} else {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
partitionRecords.partition, partitionRecords.nextFetchOffset, position);
}
}
partitionRecords.drain();
return emptyList();
}
根据partition去相应的broker上拉取消息,然后从nextFetchOffset位置开始读取消息,并封装成ConsumerRecord返回。我们来看一下fetchedRecords方法
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
int recordsRemaining = maxPollRecords;
try {
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isFetched) {
CompletedFetch completedFetch = completedFetches.peek();
if (completedFetch == null) break;
try {
nextInLineRecords = parseCompletedFetch(completedFetch);
} catch (Exception e) {
// Remove a completedFetch upon a parse with exception if (1) it contains no records, and
// (2) there are no fetched records with actual content preceding this exception.
// The first condition ensures that the completedFetches is not stuck with the same completedFetch
// in cases such as the TopicAuthorizationException, and the second condition ensures that no
// potential data loss due to an exception in a following record.
FetchResponse.PartitionData partition = completedFetch.partitionData;
if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) {
completedFetches.poll();
}
throw e;
}
completedFetches.poll();
} else {
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
TopicPartition partition = nextInLineRecords.partition;
if (!records.isEmpty()) {
List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
if (currentRecords == null) {
fetched.put(partition, records);
} else {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
// we have to copy to a new list because the old one may be immutable
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
fetched.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
}
} catch (KafkaException e) {
if (fetched.isEmpty())
throw e;
}
return fetched;
}
只要有数据可以读取,就从completedFetches里获取CompletedFetch对象,然后解析CompletedFetch对象,返回一个PartitionRecords对象,再来看一下parseRecord方法
private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
RecordBatch batch,
Record record) {
try {
long offset = record.offset();
long timestamp = record.timestamp();
Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch());
TimestampType timestampType = batch.timestampType();
Headers headers = new RecordHeaders(record.headers());
ByteBuffer keyBytes = record.key();
byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
ByteBuffer valueBytes = record.value();
byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
timestamp, timestampType, record.checksumOrNull(),
keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
key, value, headers, leaderEpoch);
} catch (RuntimeException e) {
throw new SerializationException("Error deserializing key/value for partition " + partition +
" at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
}
}
就是解析TopicPartition对象并封装成ConsumerRecord返回。
Kafka的consumer拉取消息的流程就是先启动client,然后去broker上拉取消息,放到ConcurrentLinkedQueue中,在获取消息的时候,去ConcurrentLinkedQueue里获取消息并封装成ConsumerRecord返回。
Kafka的consumer设计就介绍到这里了。