KafkaConsumer(0.10.11)源码分析

首先说下简单的使用,先实例化一个KafkaConsumer对象,再通过对象的subscribe方法订阅topic,通过poll方法获取到数据并做相应处理,完成处理后,调用commitSync提交获取到数据的偏移量。


    //初始化
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(new Properties());
    //订阅
    consumer.subscribe(Collections.singletonList("topic"));
    while (Constant.MQTT_SERVICE_STARTED) {//死循环一直获取
        ConsumerRecords<String, String> records = consumer.poll(10000);//获取数据
        for (TopicPartition partition : records.partitions()) {//处理数据
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            for (ConsumerRecord<String, String> record : partitionRecords) {
                if (record != null && null != record.value()) {
                    //TODO:处理
                }
            }
            //偏移量处理
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
        }
    }

那么这样的话,就先从KafkaConsumer的构造函数开始,进到KafkaConsumer.java,可以发现,所有的构造函数,最终执行的都是下面这个。

简单的总结一下这个构造函数中干了什么,在这里从传入的properties参数中读取到一些相关的配置,初始化metadataclientsubscriptionscoordinatorfetcher等等,构造函数的相关源码如下,代码不少,暂时可以先只了解一下,后续可以再回过头来看看这个方法。


    private KafkaConsumer(ConsumerConfig config,
                          Deserializer<K> keyDeserializer,
                          Deserializer<V> valueDeserializer) {
        try {
            this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
            int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
            int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
            if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
                throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
            this.time = new SystemTime();

            String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
            if (clientId.length() <= 0)
                clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
            this.clientId = clientId;
            Map<String, String> metricsTags = new LinkedHashMap<>();
            metricsTags.put("client-id", clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
                    .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                    .tags(metricsTags);
            List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                    MetricsReporter.class);
            this.metrics = new Metrics(metricConfig, reporters, time);
            this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);

            // load interceptors and make sure they get clientId
            Map<String, Object> userProvidedConfigs = config.originals();
            userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
            List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ConsumerInterceptor.class);
            this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
            if (keyDeserializer == null) {
                this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                        Deserializer.class);
                this.keyDeserializer.configure(config.originals(), true);
            } else {
                config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
                this.keyDeserializer = keyDeserializer;
            }
            if (valueDeserializer == null) {
                this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                        Deserializer.class);
                this.valueDeserializer.configure(config.originals(), false);
            } else {
                config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
                this.valueDeserializer = valueDeserializer;
            }
            ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
            this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners);
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
            this.metadata.update(Cluster.bootstrap(addresses), 0);
            String metricGrpPrefix = "consumer";
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
            NetworkClient netClient = new NetworkClient(
                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
                    this.metadata,
                    clientId,
                    100, // a fixed large enough value will suffice
                    config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                    config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                    config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
            this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
            OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
            this.subscriptions = new SubscriptionState(offsetResetStrategy);
            List<PartitionAssignor> assignors = config.getConfiguredInstances(
                    ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                    PartitionAssignor.class);
            this.coordinator = new ConsumerCoordinator(this.client,
                    config.getString(ConsumerConfig.GROUP_ID_CONFIG),
                    config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
                    config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
                    config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
                    assignors,
                    this.metadata,
                    this.subscriptions,
                    metrics,
                    metricGrpPrefix,
                    this.time,
                    retryBackoffMs,
                    new ConsumerCoordinator.DefaultOffsetCommitCallback(),
                    config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
                    config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
                    this.interceptors,
                    config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
            this.fetcher = new Fetcher<>(this.client,
                    config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
                    config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
                    config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
                    config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
                    config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
                    config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
                    this.keyDeserializer,
                    this.valueDeserializer,
                    this.metadata,
                    this.subscriptions,
                    metrics,
                    metricGrpPrefix,
                    this.time,
                    this.retryBackoffMs);

            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);

            log.debug("Kafka consumer created");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed
            // this is to prevent resource leak. see KAFKA-2121
            close(true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka consumer", t);
        }
    }

既然初始化完成了,接着就是调用KafkaConsumer的subscribe方法,订阅一下topic,下面看一下subscribe做了什么处理。


    @Override
    public void subscribe(Collection<String> topics) {
        subscribe(topics, new NoOpConsumerRebalanceListener());
    }
    @Override
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
        acquire();
        try {
            if (topics == null) {
                throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
            } else if (topics.isEmpty()) {
                // treat subscribing to empty topic list as the same as unsubscribing
                this.unsubscribe();
            } else {
                for (String topic : topics) {
                    if (topic == null || topic.trim().isEmpty())
                        throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
                }
                log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
                this.subscriptions.subscribe(new HashSet<>(topics), listener);
                metadata.setTopics(subscriptions.groupSubscription());
            }
        } finally {
            release();
        }
    }

subscribe方法中可以看到,它其实调用就是KafkaConsumer构造函数里初始化的subscriptionssubscribe方法,那么就直接接跟进这个方法里。


    public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
        if (listener == null)
            throw new IllegalArgumentException("RebalanceListener cannot be null");

        setSubscriptionType(SubscriptionType.AUTO_TOPICS);

        this.listener = listener;

        changeSubscription(topics);
    }

先看第一个方法setSubscriptionType(SubscriptionType.AUTO_TOPICS),这里传入了一个参数SubscriptionType.AUTO_TOPICS,下面简单介绍一下这些模式。

这些模式定义在SubscriptionType枚举类中,KafakaConsumer有三种订阅模式获取要消费的partiton,其中只有AUTO_TOPICS、AUTO_PATTERN模式才需要加入Group获取要消费的partion,USER_ASSIGNED模式则不用,下面简单介绍一下三个模式:

  • AUTO_TOPICS模式:通过订阅相关topic,加入到指定Group后,由GroupCoordinator来分配要消费的partition。AUTO_TOPICS模式是topic粒度级别的订阅
  • AUTO_PATTERN模式:用户可以指定一个parttern,consumer需要去获取所有topics,然后去匹配parttern,匹配上的那些topic就是要消费的那些topic,之后和AUTO_TOPICS模式加入Group获取要消费的partition。AUTO_PATTERN模式是topic粒度级别的订阅
  • USER_ASSIGNED模式:直接执行KafkaConsumer#assign()方法来指定要消费的topic-partition。USER_ASSIGNED模式是parition粒度级别的订阅

如果是AUTO_TOPICS模式,Consumer会去broker拉取指定topics的元数据。如果是AUTO_PATTERN,Consumer就会将所有topics的元数据拉取下来,然后去匹配获取真正要消费的topics是哪些。

这里只是在subscriptions中设置了一下subscriptionType值记录。


    private enum SubscriptionType {
        NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
    }

    private void setSubscriptionType(SubscriptionType type) {
        if (this.subscriptionType == SubscriptionType.NONE)
            this.subscriptionType = type;
        else if (this.subscriptionType != type)
            throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
    }

再看第二个方法changeSubscription,可以简单的理解,这里就是记录一下topic和group:


    /* the list of topics the user has requested */
    private Set<String> subscription;

    /* the list of topics the group has subscribed to (set only for the leader on join group completion) */
    private final Set<String> groupSubscription;

    private void changeSubscription(Set<String> topicsToSubscribe) {
        if (!this.subscription.equals(topicsToSubscribe)) {
            this.subscription = topicsToSubscribe;
            this.groupSubscription.addAll(topicsToSubscribe);
        }
    }

到这里,KafakaConsumer对象初始化也完成了,topic也订阅并记录了,那么接下来就应该进行连接及数据获取了。

本文开始的时候,就简单介绍了使用流程,那么接下来就跟进ConsumerRecords<String, String> records = consumer.poll(10000)这一行看下怎么获取数据的,先进入到KafkaConsumer.javapoll方法。


    @Override
    public ConsumerRecords<K, V> poll(long timeout) {
        acquire();
        try {
            if (timeout < 0)
                throw new IllegalArgumentException("Timeout must not be negative");

            if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");

            // poll for new data until the timeout expires
            long start = time.milliseconds();
            long remaining = timeout;
            do {
                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
                if (!records.isEmpty()) {
                    // before returning the fetched records, we can send off the next round of fetches
                    // and avoid block waiting for their responses to enable pipelining while the user
                    // is handling the fetched records.
                    //
                    // NOTE: since the consumed position has already been updated, we must not allow
                    // wakeups or any other errors to be triggered prior to returning the fetched records.
                    fetcher.sendFetches();
                    client.pollNoWakeup();

                    if (this.interceptors == null)
                        return new ConsumerRecords<>(records);
                    else
                        return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }

                long elapsed = time.milliseconds() - start;
                remaining = timeout - elapsed;
            } while (remaining > 0);

            return ConsumerRecords.empty();
        } finally {
            release();
        }
    }

可以看到,这里又来了一个do...while循环,每次都是pollOnce后,如果有数据返回,发送新的拉取请求并返回数据给业务层处理,每次处理都会计算消耗时间,对比timeout做超时处理逻辑。

稳住,一步步来,先看一下pollOnce里面的处理逻辑。


    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        coordinator.poll(time.milliseconds());

        // fetch positions if we have partitions we're subscribed to that we
        // don't know the offset for
        if (!subscriptions.hasAllFetchPositions())
            updateFetchPositions(this.subscriptions.missingFetchPositions());

        // if data is available already, return it immediately
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty())
            return records;

        // send any new fetches (won't resend pending fetches)
        fetcher.sendFetches();

        long now = time.milliseconds();
        long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);

        client.poll(pollTimeout, now, new PollCondition() {
            @Override
            public boolean shouldBlock() {
                // since a fetch might be completed by the background thread, we need this poll condition
                // to ensure that we do not block unnecessarily in poll()
                return !fetcher.hasCompletedFetches();
            }
        });

        // after the long poll, we should check whether the group needs to rebalance
        // prior to returning data so that the group can stabilize faster
        if (coordinator.needRejoin())
            return Collections.emptyMap();

        return fetcher.fetchedRecords();
    }

pollOnce方法依旧传入了超时时间,最大为KafkaConsumer.poll(long timeout)的参数值,并在循环时递减消耗时间,在这里也是做超时处理逻辑。

大概介绍一下这个方法的功能

  • 1.是通过KafkaConsmuer的构造函数中初始化的coordinatorpoll方法,做好相关的连接处理以及一些未发送的请求发送
  • 2.用fetcher.fetchedRecords取出数据(fetcher也是在构造函数中初始化的,可以回头看一下构造函数)
  • 3.如果拉取的数据不为空,则直接返回结果
  • 4.如果接取的数据为空,这里就通过fetcher.sendFetches发送一次拉取请求,并再次fetcher.fetchedRecords取一下数据再返回结果

看到这里,就可以看大概知道,获取数据是通过fetcher.fetchedRecords获取到的,但是我们先不着急去看这个fetcher.fetchedRecords方法,先跟着代码走,看第一行coordinator.poll方法里面做了什么,它源码如下:


    public void poll(long now) {
        invokeCompletedOffsetCommitCallbacks();

        if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
            ensureCoordinatorReady();
            now = time.milliseconds();
        }

        if (needRejoin()) {
            // due to a race condition between the initial metadata fetch and the initial rebalance,
            // we need to ensure that the metadata is fresh before joining initially. This ensures
            // that we have matched the pattern against the cluster's topics at least once before joining.
            if (subscriptions.hasPatternSubscription())
                client.ensureFreshMetadata();

            ensureActiveGroup();
            now = time.milliseconds();
        }

        pollHeartbeat(now);
        maybeAutoCommitOffsetsAsync(now);
    }

还是第一行代码开始,先简单说一下invokeCompletedOffsetCommitCallbacks()做了什么,顾名思义,它就回调完成偏移量提交的方法的,它的源码如下:


    void invokeCompletedOffsetCommitCallbacks() {
        while (true) {
            OffsetCommitCompletion completion = completedOffsetCommits.poll();
            if (completion == null)
                break;
            completion.invoke();
        }
    }

    private static class OffsetCommitCompletion {
        private final OffsetCommitCallback callback;
        private final Map<TopicPartition, OffsetAndMetadata> offsets;
        private final Exception exception;

        public OffsetCommitCompletion(OffsetCommitCallback callback, Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            this.callback = callback;
            this.offsets = offsets;
            this.exception = exception;
        }

        public void invoke() {
            if (callback != null)
                callback.onComplete(offsets, exception);
        }
    }

completedOffsetCommits就是一个ConcurrentLinkedQueue,它在ConsumerCoordinator的构造函数中就初始化了,而调用completion.invoke实际上就是执行了添加到这个队列里的callbackonComplete方法,有点意思了,它说白了就是只做回调处理的功能。

这个队列主要是以下几种情况中会添加数据,这三个添加点,后续会看到相关的调用,现在只需要理解它干了什么就行。

  • 1.提交偏移量时,节点不可用,导致查找节点失败时,添加一个带异常的OffsetCommitCompletion
  • 2.提交偏移量成功时,添加一个不带异常信息的OffsetCommitCompletion
  • 3.提交偏移量失败时,添加一个带异常信息的OffsetCommitCompletion

那么接下来再跟着看coordinator.poll方法,它第二个执行的代码段如下:


    if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
        ensureCoordinatorReady();
        now = time.milliseconds();
    }

    public boolean partitionsAutoAssigned() {
        return this.subscriptionType == SubscriptionType.AUTO_TOPICS || this.subscriptionType == SubscriptionType.AUTO_PATTERN;
    }

先进去subscriptions.partitionsAutoAssigned看一下,发现它就是用来判断订阅topic是用什么方式订阅的,上文在分析订阅subscribe方法的源码时,我们知道只有AUTOTOPICS、AUTOPATTERN模式才需要加入Group获取要消费的partion,USER_ASSIGNED模式则不用,所以这里判断了是否是USER_ASSIGNED模式。

我们用consumer.subscribe订阅时,调用setSubscriptionType默认的参数就是SubscriptionType.AUTO_TOPICS,所以这里需要看一下ensureCoordinatorReady方法是做什么的,源码如下,有几个调用到的方法,一并都贴出来了。


    public synchronized void ensureCoordinatorReady() {
        while (coordinatorUnknown()) {
            RequestFuture<Void> future = lookupCoordinator();
            client.poll(future);

            if (future.failed()) {
                if (future.isRetriable())
                    client.awaitMetadataUpdate();
                else
                    throw future.exception();
            } else if (coordinator != null && client.connectionFailed(coordinator)) {
                // we found the coordinator, but the connection has failed, so mark
                // it dead and backoff before retrying discovery
                coordinatorDead();
                time.sleep(retryBackoffMs);
            }
        }
    }

    protected synchronized RequestFuture<Void> lookupCoordinator() {
        if (findCoordinatorFuture == null) {
            // find a node to ask about the coordinator
            Node node = this.client.leastLoadedNode();
            if (node == null) {
                // TODO: If there are no brokers left, perhaps we should use the bootstrap set
                // from configuration?
                return RequestFuture.noBrokersAvailable();
            } else
                findCoordinatorFuture = sendGroupCoordinatorRequest(node);
        }
        return findCoordinatorFuture;
    }

    private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
        // initiate the group metadata request
        log.debug("Sending coordinator request for group {} to broker {}", groupId, node);
        GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
        return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
                     .compose(new GroupCoordinatorResponseHandler());
    }

    private class GroupCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {

        @Override
        public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {

            GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
            // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
            // for the coordinator in the underlying network client layer
            // TODO: this needs to be better handled in KAFKA-1935
            Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
            clearFindCoordinatorFuture();
            if (error == Errors.NONE) {
                synchronized (AbstractCoordinator.this) {
                    AbstractCoordinator.this.coordinator = new Node(
                            Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
                            groupCoordinatorResponse.node().host(),
                            groupCoordinatorResponse.node().port());
                    log.info("Discovered coordinator {} for group {}.", coordinator, groupId);
                    client.tryConnect(coordinator);
                    heartbeat.resetTimeouts(time.milliseconds());
                }
                future.complete(null);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                log.debug("Group coordinator lookup for group {} failed: {}", groupId, error.message());
                future.raise(error);
            }
        }

        @Override
        public void onFailure(RuntimeException e, RequestFuture<Void> future) {
            clearFindCoordinatorFuture();
            super.onFailure(e, future);
        }
    }

这里总结下,它的大概逻辑就是:

  • 1.调用lookupCoordinator,通过client.leastLoadedNode()找到一个未完成请求最少的节点,通过它发送一个ApiKeys.GROUP_COORDINATOR请求,并添加一个回调监听GroupCoordinatorResponseHandler,同进
  • 2.通过client.poll发送请求
  • 3.在GroupCoordinatorResponseHandler的onSuccess中监听结果处理,如果返回数据没有异常,则调用client.tryConnect进行连接

在这里,我们可以发现,所有以请求都是通过client这个对象进行处理了,比如client.leastLoadedNodeclient.sendclient.tryConnectclient.pollclient.awaitMetadataUpdate

那么这个对象怎么来的?再回头看一下KafkaConsumer的构造方法,client是在这里进行初始化的,并且在初始化coordinatorfetcher时,都把它传入进去了,也就是说这里用的就是kafkaConsumer里的初始化的client。

调用的client的这几个方法,从方法名上就很容易看出来它干了什么,这里就看一下client.send方法,它主要就是构建一些API请求,上面这里sendGroupCoordinatorRequest发送请求时,传入的就是ApiKeys.GROUP_COORDINATOR,通进client.send方法,会把它放到一个unsent的HashMap中等待发送,只要一调用的trySend,就会循环发送所有unsent中的请求,client.send方法的源码如下:


    private RequestFuture<ClientResponse> send(Node node,
                                              ApiKeys api,
                                              short version,
                                              AbstractRequest request) {
        long now = time.milliseconds();
        RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
        RequestHeader header = client.nextRequestHeader(api, version);
        RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
        put(node, new ClientRequest(now, true, send, completionHandler));

        // wakeup the client in case it is blocking in poll so that we can send the queued request
        client.wakeup();
        return completionHandler.future;
    }

    private void put(Node node, ClientRequest request) {
        synchronized (this) {
            List<ClientRequest> nodeUnsent = unsent.get(node);
            if (nodeUnsent == null) {
                nodeUnsent = new ArrayList<>();
                unsent.put(node, nodeUnsent);
            }
            nodeUnsent.add(request);
        }
    }
    

那么在哪里会调用trySend这个方法呢?可以发现,只有在ConsumerNetworkClient.poll方法中,才会调用trySend把这些数据给发送出去,在上面ensureCoordinatorReady逻辑的第2步,可以看到调用了client.poll方法,把数据一并给发送出去了,下面这个是ConsumerNetworkClient.poll方法的源码。


    public void poll(long timeout, long now, PollCondition pollCondition) {
        // there may be handlers which need to be invoked if we woke up the previous call to poll
        firePendingCompletedRequests();

        synchronized (this) {
            // send all the requests we can send now
            trySend(now);

            // check whether the poll is still needed by the caller. Note that if the expected completion
            // condition becomes satisfied after the call to shouldBlock() (because of a fired completion
            // handler), the client will be woken up.
            if (pollCondition == null || pollCondition.shouldBlock()) {
                // if there are no requests in flight, do not block longer than the retry backoff
                if (client.inFlightRequestCount() == 0)
                    timeout = Math.min(timeout, retryBackoffMs);
                client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now);
                now = time.milliseconds();
            } else {
                client.poll(0, now);
            }

            // handle any disconnects by failing the active requests. note that disconnects must
            // be checked immediately following poll since any subsequent call to client.ready()
            // will reset the disconnect status
            checkDisconnects(now);

            // trigger wakeups after checking for disconnects so that the callbacks will be ready
            // to be fired on the next call to poll()
            maybeTriggerWakeup();

            // try again to send requests since buffer space may have been
            // cleared or a connect finished in the poll
            trySend(now);

            // fail requests that couldn't be sent if they have expired
            failExpiredRequests(now);
        }

        // called without the lock to avoid deadlock potential if handlers need to acquire locks
        firePendingCompletedRequests();
    }

而调用trySend实际上就是调用了NetworkClient.javasend方法,通过selector.send发送请求,下面贴上相关调用的源码。


    private boolean trySend(long now) {
        // send any requests that can be sent now
        boolean requestsSent = false;
        for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
            Node node = requestEntry.getKey();
            Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
            while (iterator.hasNext()) {
                ClientRequest request = iterator.next();
                if (client.ready(node, now)) {
                    client.send(request, now);
                    iterator.remove();
                    requestsSent = true;
                }
            }
        }
        return requestsSent;
    }

    @Override
    public void send(ClientRequest request, long now) {
        String nodeId = request.request().destination();
        if (!canSendRequest(nodeId))
            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
        doSend(request, now);
    }

    private void doSend(ClientRequest request, long now) {
        request.setSendTimeMs(now);
        this.inFlightRequests.add(request);
        selector.send(request.request());
    }

    //selector
    public void send(Send send) {
        KafkaChannel channel = channelOrFail(send.destination());
        try {
            channel.setSend(send);
        } catch (CancelledKeyException e) {
            this.failedSends.add(send.destination());
            close(channel);
        }
    }
    
    //KafkaChannel

    private final TransportLayer transportLayer;
    private Send send;

    public void setSend(Send send) {
        if (this.send != null)
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
        this.send = send;
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
    }

    public Send write() throws IOException {
        Send result = null;
        if (send != null && send(send)) {
            result = send;
            send = null;
        }
        return result;
    }

    private boolean send(Send send) throws IOException {
        send.writeTo(transportLayer);
        if (send.completed())
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

        return send.completed();
    }

这里就是把数据保存到了KafkaChannelsend对象中,通过调用selector.pollselector.pollSelectionKeys,再调用KafkaChannel.writeKafkaChannel.send方法,由send.writeTo将数据发送出去。

好了,跟到这里就当它已发送出去了,有兴趣的可以跟进KafkaChannelSend去深入看一下,这里暂时只分析KafkaConsumer的数据逻辑。

到这里绕代码已经有点多,目前跟着代码走,也只是连上节点,还没有做加入Group处理,先简单总结一下

  • 1.判断是AUTO_TOPICS或AUTO_PATTERN,准备加入组
  • 2.找到一个未完成请求最少的节点,通过它发送一个GROUP_COORDINATOR请求,并在响应后,连上这个节点

然后再回到coordinator.poll方法中去,跟着看下一个要执行的代码块。


    if (needRejoin()) {
        // due to a race condition between the initial metadata fetch and the initial rebalance,
        // we need to ensure that the metadata is fresh before joining initially. This ensures
        // that we have matched the pattern against the cluster's topics at least once before joining.
        if (subscriptions.hasPatternSubscription())
            client.ensureFreshMetadata();

        ensureActiveGroup();
        now = time.milliseconds();
    }

这里明显的看到加入组的逻辑代码,那就直接跟进AbstractCoordinatorensureActiveGroup方法中。

首先看到是启动了一个心跳线程,这个线程主要是发送心跳请求,同时有一些判断节点可用并重连、节点dead判断、节点入组离开组等,发送心跳请求,心跳请求也是调用的client.send方法,传入的是ApiKeys.HEARTBEAT


    public void ensureActiveGroup() {
        // always ensure that the coordinator is ready because we may have been disconnected
        // when sending heartbeats and does not necessarily require us to rejoin the group.
        ensureCoordinatorReady();
        startHeartbeatThreadIfNeeded();
        joinGroupIfNeeded();
    }

    private synchronized void startHeartbeatThreadIfNeeded() {
        if (heartbeatThread == null) {
            heartbeatThread = new HeartbeatThread();
            heartbeatThread.start();
        }
    }


    //HeartbeatThread
    synchronized RequestFuture<Void> sendHeartbeatRequest() {
        HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation.generationId, this.generation.memberId);
        return client.send(coordinator, ApiKeys.HEARTBEAT, req)
                .compose(new HeartbeatResponseHandler());
    }

这里主要看一下joinGroupIfNeeded方法处理逻辑,这个方法调用是initiateJoinGroup通过sendJoinGroupRequest发送一个ApiKeys.JOIN_GROUP请求,看到没,这里我们又调用了client.send,从上面的源码分析可以知道,client.send只是把请求放到了一个unsent的集合中,等待调用trySend进行发送,而trySend只由client.poll进行调用,所以,这里initiateJoinGroup后,又调用了client.poll方法,把数据发送出去,源码如下:


    void joinGroupIfNeeded() {
        while (needRejoin() || rejoinIncomplete()) {
            ensureCoordinatorReady();

            // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
            // time if the client is woken up before a pending rebalance completes. This must be called
            // on each iteration of the loop because an event requiring a rebalance (such as a metadata
            // refresh which changes the matched subscription set) can occur while another rebalance is
            // still in progress.
            if (needsJoinPrepare) {
                onJoinPrepare(generation.generationId, generation.memberId);
                needsJoinPrepare = false;
            }

            RequestFuture<ByteBuffer> future = initiateJoinGroup();
            client.poll(future);
            resetJoinGroupFuture();

            if (future.succeeded()) {
                needsJoinPrepare = true;
                onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());
            } else {
                RuntimeException exception = future.exception();
                if (exception instanceof UnknownMemberIdException ||
                        exception instanceof RebalanceInProgressException ||
                        exception instanceof IllegalGenerationException)
                    continue;
                else if (!future.isRetriable())
                    throw exception;
                time.sleep(retryBackoffMs);
            }
        }
    }   

    private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
        // we store the join future in case we are woken up by the user after beginning the
        // rebalance in the call to poll below. This ensures that we do not mistakenly attempt
        // to rejoin before the pending rebalance has completed.
        if (joinFuture == null) {
            // fence off the heartbeat thread explicitly so that it cannot interfere with the join group.
            // Note that this must come after the call to onJoinPrepare since we must be able to continue
            // sending heartbeats if that callback takes some time.
            disableHeartbeatThread();

            state = MemberState.REBALANCING;
            joinFuture = sendJoinGroupRequest();
            joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
                @Override
                public void onSuccess(ByteBuffer value) {
                    // handle join completion in the callback so that the callback will be invoked
                    // even if the consumer is woken up before finishing the rebalance
                    synchronized (AbstractCoordinator.this) {
                        log.info("Successfully joined group {} with generation {}", groupId, generation.generationId);
                        state = MemberState.STABLE;

                        if (heartbeatThread != null)
                            heartbeatThread.enable();
                    }
                }

                @Override
                public void onFailure(RuntimeException e) {
                    // we handle failures below after the request finishes. if the join completes
                    // after having been woken up, the exception is ignored and we will rejoin
                    synchronized (AbstractCoordinator.this) {
                        state = MemberState.UNJOINED;
                    }
                }
            });
        }
        return joinFuture;
    }

    private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
        if (coordinatorUnknown())
            return RequestFuture.coordinatorNotAvailable();

        // send a join group request to the coordinator
        log.info("(Re-)joining group {}", groupId);
        JoinGroupRequest request = new JoinGroupRequest(
                groupId,
                this.sessionTimeoutMs,
                this.rebalanceTimeoutMs,
                this.generation.memberId,
                protocolType(),
                metadata());

        log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator);
        return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
                .compose(new JoinGroupResponseHandler());
    }

跟到这里,加入Group逻辑就整理完了。

再次回到coordinator.poll,上面处理完成后,假设正常,那么它就连接节点并加入组,但是并没有接收消费数据,那接下来再跟着看最一行代码maybeAutoCommitOffsetsAsync里的逻辑。

maybeAutoCommitOffsetsSync方法会根据enable.auto.commit配置项决定是否自动提交偏移量,也就是说,如果配置false的话,是不会自动提交偏移量的,既然到这里了,那就跟进这个方法看看它是怎么做的吧,因为上面在coordinator.poll的第一行,还有一个invokeCompletedOffsetCommitCallbacks需要理解下。

先来看这个doAutoCommitOffsetsAsync方法,它实际上是通过doAutoCommitOffsetsAsync调用commitOffsetsAsync方法

    private void doAutoCommitOffsetsAsync() {
        commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                if (exception != null) {
                    log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
                    if (exception instanceof RetriableException)
                        nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
                } else {
                    log.debug("Completed autocommit of offsets {} for group {}", offsets, groupId);
                }
            }
        });
    }

    public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
        invokeCompletedOffsetCommitCallbacks();

        if (!coordinatorUnknown()) {
            doCommitOffsetsAsync(offsets, callback);
        } else {
            // we don't know the current coordinator, so try to find it and then send the commit
            // or fail (we don't want recursive retries which can cause offset commits to arrive
            // out of order). Note that there may be multiple offset commits chained to the same
            // coordinator lookup request. This is fine because the listeners will be invoked in
            // the same order that they were added. Note also that AbstractCoordinator prevents
            // multiple concurrent coordinator lookup requests.
            lookupCoordinator().addListener(new RequestFutureListener<Void>() {
                @Override
                public void onSuccess(Void value) {
                    doCommitOffsetsAsync(offsets, callback);
                }

                @Override
                public void onFailure(RuntimeException e) {
                    completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e)));
                }
            });
        }

        // ensure the commit has a chance to be transmitted (without blocking on its completion).
        // Note that commits are treated as heartbeats by the coordinator, so there is no need to
        // explicitly allow heartbeats through delayed task execution.
        client.pollNoWakeup();
    }

commitOffsetsAsync可以看到,如果节点不可用时,会再次查找节点,并生成一个OffsetCommitCompletion,添加到completedOffsetCommits队列中。

还记得上文说的,在coordinator.poll方法第一行invokeCompletedOffsetCommitCallbacks做了什么吗?就是遍历completedOffsetCommits,并一个个执行OffsetCommitCallback方法,这个OffsetCommitCallback就是在doAutoCommitOffsetsAsync这个方法里初始化的。

commitOffsetsAsync在节点正常的情况就会从doCommitOffsetsAsync走下去,而这个方法,它会调用sendOffsetCommitRequest发送提交请求,同时在这里可以看到,它不论成功或失败,也都会生成一个OffsetCommitCompletion放到completedOffsetCommits队列中,在调用invokeCompletedOffsetCommitCallbacks时,会处理相关callback。


    private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
        this.subscriptions.needRefreshCommits();
        RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
        final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
        future.addListener(new RequestFutureListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                if (interceptors != null)
                    interceptors.onCommit(offsets);

                completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
            }

            @Override
            public void onFailure(RuntimeException e) {
                Exception commitException = e;

                if (e instanceof RetriableException)
                    commitException = new RetriableCommitFailedException(e);

                completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
            }
        });
    }

好了,接下来看看它是怎么发送偏移量请求,并获取到数据的。从上面这个方法看到它实际上调用sendOffsetCommitRequest,这个方法在调用consumer.commitSync手动同步提交偏移量里,也是从这里处理,所以还是得跟进里面看看它做了啥。

    private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
        if (offsets.isEmpty())
            return RequestFuture.voidSuccess();

        Node coordinator = coordinator();
        if (coordinator == null)
            return RequestFuture.coordinatorNotAvailable();

        // create the offset commit request
        Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            OffsetAndMetadata offsetAndMetadata = entry.getValue();
            offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
                    offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
        }

        final Generation generation;
        if (subscriptions.partitionsAutoAssigned())
            generation = generation();
        else
            generation = Generation.NO_GENERATION;

        // if the generation is null, we are not part of an active group (and we expect to be).
        // the only thing we can do is fail the commit and let the user rejoin the group in poll()
        if (generation == null)
            return RequestFuture.failure(new CommitFailedException());

        OffsetCommitRequest req = new OffsetCommitRequest(
                this.groupId,
                generation.generationId,
                generation.memberId,
                OffsetCommitRequest.DEFAULT_RETENTION_TIME,
                offsetData);

        log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);

        return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
                .compose(new OffsetCommitResponseHandler(offsets));
    }

这个方法最后走的是client.send发送ApiKeys.OFFSET_COMMIT,同时添加了一个OffsetCommitResponseHandler的回调处理。

下面来看一下OffsetCommitResponseHandler类,它到底干了啥?在发送ApiKeys.OFFSET_COMMIT请求成功后,它获取到的response做了什么处理。


    private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {

        private final Map<TopicPartition, OffsetAndMetadata> offsets;

        public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
            this.offsets = offsets;
        }

        @Override
        public OffsetCommitResponse parse(ClientResponse response) {
            return new OffsetCommitResponse(response.responseBody());
        }

        @Override
        public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
            sensors.commitLatency.record(response.requestLatencyMs());
            Set<String> unauthorizedTopics = new HashSet<>();

            for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
                TopicPartition tp = entry.getKey();
                OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
                long offset = offsetAndMetadata.offset();

                Errors error = Errors.forCode(entry.getValue());
                if (error == Errors.NONE) {
                    log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp);
                    if (subscriptions.isAssigned(tp))
                        // update the local cache only if the partition is still assigned
                        subscriptions.committed(tp, offsetAndMetadata);
                } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    log.error("Not authorized to commit offsets for group {}", groupId);
                    future.raise(new GroupAuthorizationException(groupId));
                    return;
                } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                    unauthorizedTopics.add(tp.topic());
                } else if (error == Errors.OFFSET_METADATA_TOO_LARGE
                        || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
                    // raise the error to the user
                    log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
                    future.raise(error);
                    return;
                } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                    // just retry
                    log.debug("Offset commit for group {} failed: {}", groupId, error.message());
                    future.raise(error);
                    return;
                } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                        || error == Errors.NOT_COORDINATOR_FOR_GROUP
                        || error == Errors.REQUEST_TIMED_OUT) {
                    log.debug("Offset commit for group {} failed: {}", groupId, error.message());
                    coordinatorDead();
                    future.raise(error);
                    return;
                } else if (error == Errors.UNKNOWN_MEMBER_ID
                        || error == Errors.ILLEGAL_GENERATION
                        || error == Errors.REBALANCE_IN_PROGRESS) {
                    // need to re-join group
                    log.debug("Offset commit for group {} failed: {}", groupId, error.message());
                    resetGeneration();
                    future.raise(new CommitFailedException());
                    return;
                } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                    log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
                    future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic"));
                    return;
                } else {
                    log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message());
                    future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
                    return;
                }
            }

            if (!unauthorizedTopics.isEmpty()) {
                log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId);
                future.raise(new TopicAuthorizationException(unauthorizedTopics));
            } else {
                future.complete(null);
            }
        }
    }

从handle方法中,看到它获取到了OffsetAndMetadata,这个就是数据的偏移量元数据。这里,我们就只看没有异常的处理逻辑,它最终调用的是subscriptions.committed(tp, offsetAndMetadata)将数据保存到assignment中,这个subscriptions又是何方神圣?

回过头来,再看看KafkaConsumer的构造函数,它就是在这里生成的。

    
    OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
    this.subscriptions = new SubscriptionState(offsetResetStrategy);

好了,知道subscriptions是啥,也是知道这些偏移量元数据是保存在subscriptionsassignment中了,到这里,整个coordinator.poll方法也跟着走完了,是时候再往上一级,看看KafkaConsumerpollOnce了,直接把这个方法的源码再贴一下,不用再向上翻了。


    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        coordinator.poll(time.milliseconds());

        // fetch positions if we have partitions we're subscribed to that we
        // don't know the offset for
        if (!subscriptions.hasAllFetchPositions())
            updateFetchPositions(this.subscriptions.missingFetchPositions());

        // if data is available already, return it immediately
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty())
            return records;

        // send any new fetches (won't resend pending fetches)
        fetcher.sendFetches();

        long now = time.milliseconds();
        long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);

        client.poll(pollTimeout, now, new PollCondition() {
            @Override
            public boolean shouldBlock() {
                // since a fetch might be completed by the background thread, we need this poll condition
                // to ensure that we do not block unnecessarily in poll()
                return !fetcher.hasCompletedFetches();
            }
        });

        // after the long poll, we should check whether the group needs to rebalance
        // prior to returning data so that the group can stabilize faster
        if (coordinator.needRejoin())
            return Collections.emptyMap();

        return fetcher.fetchedRecords();
    }

从这个方法可以看出,它最终返回的结果是fetcher.fetchedRecords(),并且在coordinator.poll结束后,会先调用一下fetcher.fetchedRecords()获取数据,如果有结果的话,直接返回结果,反之就调用fetcher.sendFetches()发送拉取请求,然后再返回fetcher.fetchedRecords()。发送请求,再拉取数据,可以简单这么理解。

这个fetcher是哪来的?再回过头来看一下KafkaConsumer的构造函数,它就是在这里生成的,那么首先我们跟进去看一下发送请求fetcher.sendFetches()它到底干了些什么。


    public void sendFetches() {
        for (Map.Entry<Node, FetchRequest> fetchEntry : createFetchRequests().entrySet()) {
            final FetchRequest request = fetchEntry.getValue();
            final Node fetchTarget = fetchEntry.getKey();

            client.send(fetchTarget, ApiKeys.FETCH, request)
                    .addListener(new RequestFutureListener<ClientResponse>() {
                        @Override
                        public void onSuccess(ClientResponse resp) {
                            FetchResponse response = new FetchResponse(resp.responseBody());
                            if (!matchesRequestedPartitions(request, response)) {
                                // obviously we expect the broker to always send us valid responses, so this check
                                // is mainly for test cases where mock fetch responses must be manually crafted.
                                log.warn("Ignoring fetch response containing partitions {} since it does not match " +
                                        "the requested partitions {}", response.responseData().keySet(),
                                        request.fetchData().keySet());
                                return;
                            }

                            Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
                            FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);

                            for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                                TopicPartition partition = entry.getKey();
                                long fetchOffset = request.fetchData().get(partition).offset;
                                FetchResponse.PartitionData fetchData = entry.getValue();
                                completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
                            }

                            sensors.fetchLatency.record(resp.requestLatencyMs());
                            sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
                        }

                        @Override
                        public void onFailure(RuntimeException e) {
                            log.debug("Fetch request to {} failed", fetchTarget, e);
                        }
                    });
        }
    }

看到没,它又是调用client.send发送了一个ApiKeys.FETCH的请求,然后在onSuccess进行数据处理。

在Listener中可以看到,它最终在for循环中,把拉取到的数据,保存到completedFetches这个队列里,放到这个队列里,就是给fetcher.fetchedRecords()方法获取数据用的,那么接下来就看一下fetcher.fetchedRecords()获取逻辑。


    private PartitionRecords<K, V> nextInLineRecords = null;

    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
        int recordsRemaining = maxPollRecords;

        while (recordsRemaining > 0) {
            if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
                CompletedFetch completedFetch = completedFetches.poll();
                if (completedFetch == null)
                    break;

                nextInLineRecords = parseFetchedData(completedFetch);
            } else {
                TopicPartition partition = nextInLineRecords.partition;

                List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining);
                if (!records.isEmpty()) {
                    List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
                    if (currentRecords == null) {
                        drained.put(partition, records);
                    } else {
                        // this case shouldn't usually happen because we only send one fetch at a time per partition,
                        // but it might conceivably happen in some rare cases (such as partition leader changes).
                        // we have to copy to a new list because the old one may be immutable
                        List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
                        newRecords.addAll(currentRecords);
                        newRecords.addAll(records);
                        drained.put(partition, newRecords);
                    }
                    recordsRemaining -= records.size();
                }
            }
        }

        return drained;
    }

这个方法先从completedFetches这个队列中取出来一个,通过parseFetchedData解析获取到它的PartitionRecords


    private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
        TopicPartition tp = completedFetch.partition;
        FetchResponse.PartitionData partition = completedFetch.partitionData;
        long fetchOffset = completedFetch.fetchedOffset;
        int bytes = 0;
        int recordsCount = 0;
        PartitionRecords<K, V> parsedRecords = null;
        Errors error = Errors.forCode(partition.errorCode);

        try {
            if (!subscriptions.isFetchable(tp)) {
                // this can happen when a rebalance happened or a partition consumption paused
                // while fetch is still in-flight
                log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
            } else if (error == Errors.NONE) {
                // we are interested in this fetch only if the beginning offset matches the
                // current consumed position
                Long position = subscriptions.position(tp);
                if (position == null || position != fetchOffset) {
                    log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
                            "the expected offset {}", tp, fetchOffset, position);
                    return null;
                }

                ByteBuffer buffer = partition.recordSet;
                MemoryRecords records = MemoryRecords.readableRecords(buffer);
                List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
                for (LogEntry logEntry : records) {
                    // Skip the messages earlier than current position.
                    if (logEntry.offset() >= position) {
                        parsed.add(parseRecord(tp, logEntry));
                        bytes += logEntry.size();
                    }
                }

                recordsCount = parsed.size();
                this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, recordsCount);

                if (!parsed.isEmpty()) {
                    log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
                    parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
                    ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
                    this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
                }
            } else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
                log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
                this.metadata.requestUpdate();
            } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                log.warn("Received unknown topic or partition error in fetch for partition {}. The topic/partition " +
                        "may not exist or the user may not have Describe access to it", tp);
                this.metadata.requestUpdate();
            } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
                if (fetchOffset != subscriptions.position(tp)) {
                    log.debug("Discarding stale fetch response for partition {} since the fetched offset {}" +
                            "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
                } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
                    log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
                    subscriptions.needOffsetReset(tp);
                } else {
                    throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
                }
            } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                log.warn("Not authorized to read from topic {}.", tp.topic());
                throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
            } else if (error == Errors.UNKNOWN) {
                log.warn("Unknown error fetching data for topic-partition {}", tp);
            } else {
                throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
            }
        } finally {
            completedFetch.metricAggregator.record(tp, bytes, recordsCount);
        }

        // we move the partition to the end if we received some bytes or if there was an error. This way, it's more
        // likely that partitions for the same topic can remain together (allowing for more efficient serialization).
        if (bytes > 0 || error != Errors.NONE)
            subscriptions.movePartitionToEnd(tp);

        return parsedRecords;
    }

这里面有一个判断subscriptions.isFetchable(tp),就是通过sendOffsetCommitRequest获取到的OffsetAndMetadata,它保存的就是在subscriptionsassignment中,就是提交偏移量后,在回调里进行保存的数据。

    public boolean isFetchable(TopicPartition tp) {
        return isAssigned(tp) && assignedState(tp).isFetchable();
    }

    public boolean isAssigned(TopicPartition tp) {
        return assignment.contains(tp);
    }

接下来跟着看正常情况下走error == Errors.NONE的逻辑,它会根据subscriptions中这个TopicPartition所在的Position,生成一个parsedRecords返回交由fetcher.fetchedRecords()的while循环继续处理,再返回到fetcher.fetchedRecords(),这里再贴下fetcher.fetchedRecords()的代码:


    private PartitionRecords<K, V> nextInLineRecords = null;

    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
        int recordsRemaining = maxPollRecords;

        while (recordsRemaining > 0) {
            if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
                CompletedFetch completedFetch = completedFetches.poll();
                if (completedFetch == null)
                    break;

                nextInLineRecords = parseFetchedData(completedFetch);
            } else {
                TopicPartition partition = nextInLineRecords.partition;

                List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining);
                if (!records.isEmpty()) {
                    List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
                    if (currentRecords == null) {
                        drained.put(partition, records);
                    } else {
                        // this case shouldn't usually happen because we only send one fetch at a time per partition,
                        // but it might conceivably happen in some rare cases (such as partition leader changes).
                        // we have to copy to a new list because the old one may be immutable
                        List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
                        newRecords.addAll(currentRecords);
                        newRecords.addAll(records);
                        drained.put(partition, newRecords);
                    }
                    recordsRemaining -= records.size();
                }
            }
        }

        return drained;
    }

接下来跟着看while循环,在if中的处理,正常获取到数据的话,会给nextInLineRecords赋值并不为空,所以直接看else代码块逻辑处理,它通过drainRecords解析出数据,并将数据添加了drained这个HashMap中,跳出循环,返回数据。


    private List<ConsumerRecord<K, V>> drainRecords(PartitionRecords<K, V> partitionRecords, int maxRecords) {
        if (partitionRecords.isDrained())
            return Collections.emptyList();

        if (!subscriptions.isAssigned(partitionRecords.partition)) {
            // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
            log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);
        } else {
            // note that the consumed position should always be available as long as the partition is still assigned
            long position = subscriptions.position(partitionRecords.partition);
            if (!subscriptions.isFetchable(partitionRecords.partition)) {
                // this can happen when a partition is paused before fetched records are returned to the consumer's poll call
                log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
            } else if (partitionRecords.fetchOffset == position) {
                // we are ensured to have at least one record since we already checked for emptiness
                List<ConsumerRecord<K, V>> partRecords = partitionRecords.drainRecords(maxRecords);
                long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;

                log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
                        "position to {}", position, partitionRecords.partition, nextOffset);

                subscriptions.position(partitionRecords.partition, nextOffset);
                return partRecords;
            } else {
                // these records aren't next in line based on the last consumed position, ignore them
                // they must be from an obsolete request
                log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
                        partitionRecords.partition, partitionRecords.fetchOffset, position);
            }
        }

        partitionRecords.drain();
        return Collections.emptyList();
    }

好了,到这里,整个获取数据的流程都跟看分析完了,我们知道,在new KafkaConsumer<>(new Properties())的这个properties中,如果没有配置enable.auto.commit的话,是不会自动提交偏移量的,所以,在获取完成数据并处理后,需要手动提交一下偏移量,调用consumer.commitSync,这里最后再跟进去看一下偏移量手动提交的源码吧。


    public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        invokeCompletedOffsetCommitCallbacks();

        if (offsets.isEmpty())
            return;

        while (true) {
            ensureCoordinatorReady();

            RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
            client.poll(future);

            if (future.succeeded()) {
                if (interceptors != null)
                    interceptors.onCommit(offsets);
                return;
            }

            if (!future.isRetriable())
                throw future.exception();

            time.sleep(retryBackoffMs);
        }
    }

它实际上调用的还是sendOffsetCommitRequest,和上面在coordinator.poll中调用的maybeAutoCommitOffsetsAsync走的是一个逻辑,只不过在这里,它是一个同步的调用,需要等待提交完成后并返回结果才能继续处理,这里就不再重复分析了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容