深入理解Kafka(二) Consumer设计

下面来说一下Kafka的consumer设计。在最新版本的Kafka中,consumer group管理依赖于broker的协调者管理consumer的实例并下发分配方案到每一个consumer实例上。由consumer leader指定分配方案。该分配方案是consumer group的每一个成员都同意的。当有新的consumer加入或者有consumer退出consumer group时,会进行rebalance。Kafka有自己的rebalance协议。下面我们来看一下Kafka源码是怎么实现的。
先来看一下KafkaConsumer的poll方法

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
    acquireAndEnsureOpen();
    try {
        if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
        }

        // poll for new data until the timeout expires
        do {
            client.maybeTriggerWakeup();

            if (includeMetadataInTimeout) {
                if (!updateAssignmentMetadataIfNeeded(timer)) {
                    return ConsumerRecords.empty();
                }
            } else {
                while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
                    log.warn("Still waiting for metadata");
                }
            }

            final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
            if (!records.isEmpty()) {
                // before returning the fetched records, we can send off the next round of fetches
                // and avoid block waiting for their responses to enable pipelining while the user
                // is handling the fetched records.
                //
                // NOTE: since the consumed position has already been updated, we must not allow
                // wakeups or any other errors to be triggered prior to returning the fetched records.
                if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                    client.pollNoWakeup();
                }

                return this.interceptors.onConsume(new ConsumerRecords<>(records));
            }
        } while (timer.notExpired());

        return ConsumerRecords.empty();
    } finally {
        release();
    }
}

再到pollForFetches方法

 private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
    long pollTimeout = coordinator == null ? timer.remainingMs() :
            Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

    // if data is available already, return it immediately
    final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    if (!records.isEmpty()) {
        return records;
    }

    // send any new fetches (won't resend pending fetches)
    fetcher.sendFetches();

    // We do not want to be stuck blocking in poll if we are missing some positions
    // since the offset lookup may be backing off after a failure

    // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
    // updateAssignmentMetadataIfNeeded before this method.
    if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
        pollTimeout = retryBackoffMs;
    }

    Timer pollTimer = time.timer(pollTimeout);
    client.poll(pollTimer, () -> {
        // since a fetch might be completed by the background thread, we need this poll condition
        // to ensure that we do not block unnecessarily in poll()
        return !fetcher.hasCompletedFetches();
    });
    timer.update(pollTimer.currentTimeMs());

    // after the long poll, we should check whether the group needs to rebalance
    // prior to returning data so that the group can stabilize faster
    if (coordinator != null && coordinator.rejoinNeededOrPending()) {
        return Collections.emptyMap();
    }

    return fetcher.fetchedRecords();
}

Kafka有3种请求类型,一种是PRODUCE,就是生产消息发送到broker上,一种是FETCH,就是从broker上拉取消息,一种是METADATA,就是获取broker上的数据。这里就是发送FETCH请求,通过Fetcher进行fetch操作。
我们来看一下Fetcher的fetchedRecords方法

public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
    Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
    int recordsRemaining = maxPollRecords;

    try {
        while (recordsRemaining > 0) {
            if (nextInLineRecords == null || nextInLineRecords.isFetched) {
                CompletedFetch completedFetch = completedFetches.peek();
                if (completedFetch == null) break;

                try {
                    nextInLineRecords = parseCompletedFetch(completedFetch);
                } catch (Exception e) {
                    // Remove a completedFetch upon a parse with exception if (1) it contains no records, and
                    // (2) there are no fetched records with actual content preceding this exception.
                    // The first condition ensures that the completedFetches is not stuck with the same completedFetch
                    // in cases such as the TopicAuthorizationException, and the second condition ensures that no
                    // potential data loss due to an exception in a following record.
                    FetchResponse.PartitionData partition = completedFetch.partitionData;
                    if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) {
                        completedFetches.poll();
                    }
                    throw e;
                }
                completedFetches.poll();
            } else {
                List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
                TopicPartition partition = nextInLineRecords.partition;
                if (!records.isEmpty()) {
                    List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
                    if (currentRecords == null) {
                        fetched.put(partition, records);
                    } else {
                        // this case shouldn't usually happen because we only send one fetch at a time per partition,
                        // but it might conceivably happen in some rare cases (such as partition leader changes).
                        // we have to copy to a new list because the old one may be immutable
                        List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
                        newRecords.addAll(currentRecords);
                        newRecords.addAll(records);
                        fetched.put(partition, newRecords);
                    }
                    recordsRemaining -= records.size();
                }
            }
        }
    } catch (KafkaException e) {
        if (fetched.isEmpty())
            throw e;
    }
    return fetched;
}

其实就是从completedFetches中拿获取到的消息,我们来看一下completedFetches这个属性

private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;

这是一个ConcurrentLinkedQueue,我们来看一下sendFetches方法

 public synchronized int sendFetches() {
    Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
    for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
        final Node fetchTarget = entry.getKey();
        final FetchSessionHandler.FetchRequestData data = entry.getValue();
        final FetchRequest.Builder request = FetchRequest.Builder
                .forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
                .isolationLevel(isolationLevel)
                .setMaxBytes(this.maxBytes)
                .metadata(data.metadata())
                .toForget(data.toForget());
        if (log.isDebugEnabled()) {
            log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);
        }
        client.send(fetchTarget, request)
                .addListener(new RequestFutureListener<ClientResponse>() {
                    @Override
                    public void onSuccess(ClientResponse resp) {
                        synchronized (Fetcher.this) {
                            @SuppressWarnings("unchecked")
                            FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
                            FetchSessionHandler handler = sessionHandler(fetchTarget.id());
                            if (handler == null) {
                                log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
                                        fetchTarget.id());
                                return;
                            }
                            if (!handler.handleResponse(response)) {
                                return;
                            }

                            Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
                            FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);

                            for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {
                                TopicPartition partition = entry.getKey();
                                long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;
                                FetchResponse.PartitionData<Records> fetchData = entry.getValue();

                                log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
                                        isolationLevel, fetchOffset, partition, fetchData);
                                completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                        resp.requestHeader().apiVersion()));
                            }

                            sensors.fetchLatency.record(resp.requestLatencyMs());
                        }
                    }

                    @Override
                    public void onFailure(RuntimeException e) {
                        synchronized (Fetcher.this) {
                            FetchSessionHandler handler = sessionHandler(fetchTarget.id());
                            if (handler != null) {
                                handler.handleError(e);
                            }
                        }
                    }
                });
    }
    return fetchRequestMap.size();
}

调用ConsumerNetworkClient进行数据的发送,我们来看一下ConsumerNetworkClient的send方法

public RequestFuture<ClientResponse> send(Node node,
                                          AbstractRequest.Builder<?> requestBuilder,
                                          int requestTimeoutMs) {
    long now = time.milliseconds();
    RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
    ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
            requestTimeoutMs, completionHandler);
    unsent.put(node, clientRequest);

    // wakeup the client in case it is blocking in poll so that we can send the queued request
    client.wakeup();
    return completionHandler.future;
}

把要发送的请求封装成ClientRequest对象,然后放进unsent中,我们来看一下unsent是什么,来看一下UnsentRequests这个类

private final ConcurrentMap<Node, ConcurrentLinkedQueue<ClientRequest>> unsent;

    private UnsentRequests() {
        unsent = new ConcurrentHashMap<>();
    }

    public void put(Node node, ClientRequest request) {
        // the lock protects the put from a concurrent removal of the queue for the node
        synchronized (unsent) {
            ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
            if (requests == null) {
                requests = new ConcurrentLinkedQueue<>();
                unsent.put(node, requests);
            }
            requests.add(request);
        }
    }

其实就是一个ConcurrentHashMap,这个map的key是Node,value是存放ClientRequest的ConcurrentLinkedQueue。我们再来看一下ConsumerNetworkClient的poll方法

public void poll(Timer timer, PollCondition pollCondition, boolean disableWakeup) {
    // there may be handlers which need to be invoked if we woke up the previous call to poll
    firePendingCompletedRequests();

    lock.lock();
    try {
        // Handle async disconnects prior to attempting any sends
        handlePendingDisconnects();

        // send all the requests we can send now
        long pollDelayMs = trySend(timer.currentTimeMs());

        // check whether the poll is still needed by the caller. Note that if the expected completion
        // condition becomes satisfied after the call to shouldBlock() (because of a fired completion
        // handler), the client will be woken up.
        if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) {
            // if there are no requests in flight, do not block longer than the retry backoff
            long pollTimeout = Math.min(timer.remainingMs(), pollDelayMs);
            if (client.inFlightRequestCount() == 0)
                pollTimeout = Math.min(pollTimeout, retryBackoffMs);
            client.poll(pollTimeout, timer.currentTimeMs());
        } else {
            client.poll(0, timer.currentTimeMs());
        }
        timer.update();

        // handle any disconnects by failing the active requests. note that disconnects must
        // be checked immediately following poll since any subsequent call to client.ready()
        // will reset the disconnect status
        checkDisconnects(timer.currentTimeMs());
        if (!disableWakeup) {
            // trigger wakeups after checking for disconnects so that the callbacks will be ready
            // to be fired on the next call to poll()
            maybeTriggerWakeup();
        }
        // throw InterruptException if this thread is interrupted
        maybeThrowInterruptException();

        // try again to send requests since buffer space may have been
        // cleared or a connect finished in the poll
        trySend(timer.currentTimeMs());

        // fail requests that couldn't be sent if they have expired
        failExpiredRequests(timer.currentTimeMs());

        // clean unsent requests collection to keep the map from growing indefinitely
        unsent.clean();
    } finally {
        lock.unlock();
    }

    // called without the lock to avoid deadlock potential if handlers need to acquire locks
    firePendingCompletedRequests();
}

再到trySend方法

 private long trySend(long now) {
    long pollDelayMs = maxPollTimeoutMs;

    // send any requests that can be sent now
    for (Node node : unsent.nodes()) {
        Iterator<ClientRequest> iterator = unsent.requestIterator(node);
        if (iterator.hasNext())
            pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));

        while (iterator.hasNext()) {
            ClientRequest request = iterator.next();
            if (client.ready(node, now)) {
                client.send(request, now);
                iterator.remove();
            }
        }
    }
    return pollDelayMs;
}

就是从unsent中取出ClientRequest对象,调用client进行发送。要发送的消息会是一个ConsumerRecord,我们来看一下这个类有哪些属性

private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private final Optional<Integer> leaderEpoch;

private volatile Long checksum;

基本上所有的consumer发送消息的属性这里都有了,topic,partition,offset,还有序列化器,然后会在fetch的时候把结果封装成ConsumerRecord返回。我们看一下Fetcher的fetchRecords方法

private List<ConsumerRecord<K, V>> fetchRecords(PartitionRecords partitionRecords, int maxRecords) {
    if (!subscriptions.isAssigned(partitionRecords.partition)) {
        // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
        log.debug("Not returning fetched records for partition {} since it is no longer assigned",
                partitionRecords.partition);
    } else if (!subscriptions.isFetchable(partitionRecords.partition)) {
        // this can happen when a partition is paused before fetched records are returned to the consumer's
        // poll call or if the offset is being reset
        log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
                partitionRecords.partition);
    } else {
        long position = subscriptions.position(partitionRecords.partition);
        if (partitionRecords.nextFetchOffset == position) {
            List<ConsumerRecord<K, V>> partRecords = partitionRecords.fetchRecords(maxRecords);

            long nextOffset = partitionRecords.nextFetchOffset;
            log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
                    "position to {}", position, partitionRecords.partition, nextOffset);
            subscriptions.position(partitionRecords.partition, nextOffset);

            Long partitionLag = subscriptions.partitionLag(partitionRecords.partition, isolationLevel);
            if (partitionLag != null)
                this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag);

            Long lead = subscriptions.partitionLead(partitionRecords.partition);
            if (lead != null) {
                this.sensors.recordPartitionLead(partitionRecords.partition, lead);
            }

            return partRecords;
        } else {
            // these records aren't next in line based on the last consumed position, ignore them
            // they must be from an obsolete request
            log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
                    partitionRecords.partition, partitionRecords.nextFetchOffset, position);
        }
    }

    partitionRecords.drain();
    return emptyList();
}

根据partition去相应的broker上拉取消息,然后从nextFetchOffset位置开始读取消息,并封装成ConsumerRecord返回。我们来看一下fetchedRecords方法

public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
    Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
    int recordsRemaining = maxPollRecords;

    try {
        while (recordsRemaining > 0) {
            if (nextInLineRecords == null || nextInLineRecords.isFetched) {
                CompletedFetch completedFetch = completedFetches.peek();
                if (completedFetch == null) break;

                try {
                    nextInLineRecords = parseCompletedFetch(completedFetch);
                } catch (Exception e) {
                    // Remove a completedFetch upon a parse with exception if (1) it contains no records, and
                    // (2) there are no fetched records with actual content preceding this exception.
                    // The first condition ensures that the completedFetches is not stuck with the same completedFetch
                    // in cases such as the TopicAuthorizationException, and the second condition ensures that no
                    // potential data loss due to an exception in a following record.
                    FetchResponse.PartitionData partition = completedFetch.partitionData;
                    if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) {
                        completedFetches.poll();
                    }
                    throw e;
                }
                completedFetches.poll();
            } else {
                List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
                TopicPartition partition = nextInLineRecords.partition;
                if (!records.isEmpty()) {
                    List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
                    if (currentRecords == null) {
                        fetched.put(partition, records);
                    } else {
                        // this case shouldn't usually happen because we only send one fetch at a time per partition,
                        // but it might conceivably happen in some rare cases (such as partition leader changes).
                        // we have to copy to a new list because the old one may be immutable
                        List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
                        newRecords.addAll(currentRecords);
                        newRecords.addAll(records);
                        fetched.put(partition, newRecords);
                    }
                    recordsRemaining -= records.size();
                }
            }
        }
    } catch (KafkaException e) {
        if (fetched.isEmpty())
            throw e;
    }
    return fetched;
}

只要有数据可以读取,就从completedFetches里获取CompletedFetch对象,然后解析CompletedFetch对象,返回一个PartitionRecords对象,再来看一下parseRecord方法

private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
                                         RecordBatch batch,
                                         Record record) {
    try {
        long offset = record.offset();
        long timestamp = record.timestamp();
        Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch());
        TimestampType timestampType = batch.timestampType();
        Headers headers = new RecordHeaders(record.headers());
        ByteBuffer keyBytes = record.key();
        byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
        K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
        ByteBuffer valueBytes = record.value();
        byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
        V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
        return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
                                    timestamp, timestampType, record.checksumOrNull(),
                                    keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
                                    valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
                                    key, value, headers, leaderEpoch);
    } catch (RuntimeException e) {
        throw new SerializationException("Error deserializing key/value for partition " + partition +
                " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
    }
}

就是解析TopicPartition对象并封装成ConsumerRecord返回。
Kafka的consumer拉取消息的流程就是先启动client,然后去broker上拉取消息,放到ConcurrentLinkedQueue中,在获取消息的时候,去ConcurrentLinkedQueue里获取消息并封装成ConsumerRecord返回。
Kafka的consumer设计就介绍到这里了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容