Kafka 源码解析之 Consumer Poll 模型

Consumer 示例

这里以一个 Consumer 的实例代码作为开始,一个比较常见的 Consumer 示例代码如下所示,其主要包含一下几个步骤:

  • 构造 Propertity,进行 consumer 相关的配置;
  • 创建 KafkaConsumer 的对象 consumer;
  • 订阅相应的 topic 列表;
  • 调用 consumer 的 poll 方法拉取订阅的消息。

前面两步在 Consumer 底层上只是创建了一个 consumer 对象,第三步只有记录一下订阅的 topic 信息,consumer 实际的操作都是第四步,也就是在 poll 方法中实现的,这也是 poll 模型对于理解 consumer 设计非常重要的原因。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * 自动 commit 的情况
 */
public class ConsumerAutoOffsetCommit {
    private static String topic;
    private static String group;
    public static void main(String[] args) {
        Properties props = new Properties();
        topic=args[0];
        group=args[1]; // auto-offset-commit
        props.put("bootstrap.servers", "XXX:9092,XXX:9092");
        props.put("group.id", group);
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "true"); // 自动commit
        props.put("auto.commit.interval.ms", "1000"); // 自动commit的间隔
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic)); // 可消费多个topic,组成一个list
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Poll 模型综述

当一个 consumer 对象创建之后,只有 poll 方法调用时,consumer 才会真正去连接 kafka 集群,进行相关的操作,其 poll 方法具体实现如下:

//timeout(ms): buffer 中的数据未就绪情况下,等待的最长时间,如果设置为0,立即返回 buffer 中已经就绪的数据
public ConsumerRecords<K, V> poll(long timeout) {
    acquire();
    try {
        if (timeout < 0)
            throw new IllegalArgumentException("Timeout must not be negative");

        if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");

        // poll for new data until the timeout expires
        long start = time.milliseconds();
        long remaining = timeout;
        do {
            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
            //note: 从订阅的 partition 中拉取数据,pollOnce() 才是对 Consumer 客户端拉取数据的核心实现
            if (!records.isEmpty()) {
                // 在返回数据之前,发送下次的 fetch 请求,避免用户在下次获取数据时线程 block
                if (fetcher.sendFetches() > 0 || client.pendingRequestCount() > 0)
                    client.pollNoWakeup();

                if (this.interceptors == null)
                    return new ConsumerRecords<>(records);
                else
                    return this.interceptors.onConsume(new ConsumerRecords<>(records));
            }

            long elapsed = time.milliseconds() - start;
            remaining = timeout - elapsed;
        } while (remaining > 0);

        return ConsumerRecords.empty();
    } finally {
        release();
    }
}

consumer poll 方法主要做了以下几件事情:

检查这个 consumer 是否订阅的相应的 topic-partition;
调用 pollOnce() 方法获取相应的 records;
在返回获取的 records 前,发送下一次的 fetch 请求,避免用户在下次请求时线程 block 在 pollOnce() 方法中;
如果在给定的时间(timeout)内获取不到可用的 records,返回空数据。
这里可以看出,poll 方法的真正实现是在 pollOnce 方法中,poll 方法通过 pollOnce 方法获取可用的数据。

pollOnce 方法

// note: 一次 poll 过程,除了获取新数据外,还会做一些必要的 offset-commit 核 reset-offset  的操作
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
    // note: 1. 获取 GroupCoordinator 地址并连接、加入 Group、sync Group、自动 commit, join 及 sync 期间 group 会进行 rebalance
    coordinator.poll(time.milliseconds());
    // note: 2. 更新订阅的 topic-partition 的 offset(如果订阅的 topic-partition list 没有有效的 offset 的情况下)
    if (!subscriptions.hasAllFetchPositions())
        updateFetchPositions(this.subscriptions.missingFetchPositions());

    // note: 3. 获取 fetcher 已经拉取到的数据
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    if (!records.isEmpty())
        return records;
    // note: 说明上次 fetch 到是的数据已经全部拉取了,需要再次发送 fetch 请求,从 broker 拉取数据

    // note: 4. 发送 fetch 请求,会从多个 topic-partition 拉取数据(只要对应的 topic-partition 没有未完成的请求)
    fetcher.sendFetches();

    long now = time.milliseconds();
    long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);

    //note: 5. 调用 poll 方法发送请求(底层发送请求的接口)
    client.poll(pollTimeout, now, new PollCondition() {
        @Override
        public boolean shouldBlock() {
            return !fetcher.hasCompletedFetches();//note: 有完成的 fetcher 请求的话,这里就不会 block,但是 block 也是有最大时间限制
        }
    });

    //note: 6. 如果 group 需要 rebalance,直接返回空数据,这样更快地让 group 进行稳定状态
    if (coordinator.needRejoin())
        return Collections.emptyMap();

    return fetcher.fetchedRecords();
}

pollOnce 可以简单分为6步来看,其作用分别如下:

  • coordinator.poll():获取 GroupCoordinator 的地址,并建立相应 tcp 连接,发送 join-group、sync-group,之后才真正加入到了一个 group 中,这时会获取其要消费的 topic-partition 列表,如果设置了自动 commit,也会在这一步进行 commit,
  • updateFetchPositions(): 在上一步中已经获取到了这个 consumer 实例要订阅的 topic-partition list,这一步更新其 fetch-position offset,以便进行拉取;
  • fetcher.fetchedRecords():返回其 fetched records,并更新其 fetch-position offset,只有在 offset-commit 时(自动 commit 时,是在第一步实现的),才会更新其 committed offset;
  • fetcher.sendFetches():只要订阅的 topic-partition list 没有未处理的 fetch 请求,就发送对这个 topic-partition 的 fetch 请求,在真正发送时,还是会按 node 级别去发送,leader 是同一个 node 的 topic-partition 会合成一个请求去发送;
  • client.poll():调用底层 NetworkClient 提供的接口去发送相应的请求;
  • coordinator.needRejoin():如果当前实例分配的 topic-partition 列表发送了变化,那么这个 consumer group 就需要进行 rebalance。

PollOnce 整体流程

image.png

从上图可以看出,Consumer 在实现上,其调用还是比较复杂,不过复杂的地方都封装在底层了,Consumer 的网络模型如下图所示


image.png

coordinator.poll()

一个 consumer 实例在这一步实现的内容是:

  • 获取 GroupCoordinator 的地址,并建立相应 tcp 连接;
  • 发送 join-group 请求,然后 group 将会进行 rebalance;
  • 发送 sync-group 请求,之后才正在加入到了一个 group 中,这时会通过请求获取其要消费的 topic-partition 列表;
  • 如果设置了自动 commit,也会在这一步进行 commit offset。

updateFetchPositions()

这个方法主要是用来更新这个 consumer 实例订阅的 topic-partition 列表的 fetch-offset 信息。

在 Fetcher 中,这个 consumer 实例订阅的每个 topic-partition 都会有一个对应的 TopicPartitionState 对象,在这个对象中会记录以下这些内容:

//note: 记录 tp 的一些 offset 信息
private static class TopicPartitionState {
    private Long position; // last consumed position
    private Long highWatermark; // the high watermark from last fetch
    private OffsetAndMetadata committed;  // last committed position
    private boolean paused;  // whether this partition has been paused by the user
    private OffsetResetStrategy resetStrategy;  // the strategy to use if the offset needs resetting
}
  • position:Fetcher 下次去拉取时的 offset,Fecher 在拉取时需要知道这个值;
  • committed:consumer 已经处理完的最新一条消息的 offset,consumer 主动调用 offset-commit 时会更新这个值;
  • resetStrategy:这 topic-partition offset 重置的策略,重置之后,这个策略就会改为 null,防止再次操作。

updateFetchPositions() 这个方法的目的就是为了获取其订阅的每个 topic-partition 对应的 position,这样 Fetcher 才知道从哪个 offset 开始去拉取这个 topic-partition 的数据。

//将 the fetch position 设置为 the committed position(如果有 committed offset 的话),否则就使用配置的重置策略去设置 offset
private void updateFetchPositions(Set<TopicPartition> partitions) {
    //note: 先重置那些调用 seekToBegin 和 seekToEnd 的 offset 的 tp,设置其  the fetch position 的 offset
    fetcher.resetOffsetsIfNeeded(partitions);

    if (!subscriptions.hasAllFetchPositions(partitions)) {
        //note: 获取所有分配 tp 的 offset, 即 committed offset, 更新到 TopicPartitionState 中的 committed offset 中
        coordinator.refreshCommittedOffsetsIfNeeded();

        //note: 如果 the fetch position 值无效,则将上步获取的 committed offset 设置为 the fetch position
        fetcher.updateFetchPositions(partitions);
    }
}

上述过程主要分为三步,可以结合前面的流程来看,这里就不再详细去介绍其下面几层调用的实现了:

  • fetcher.resetOffsetsIfNeeded():处理那些 resetStrategy 不为 null 的 topic-partition(一般是使用了 seekToBegin() 和 seekToEnd() 方法的 topic-partition),Fetcher 会发送 list-offset 请求去获取相应的 offset,实际上在获取时,是根据时间戳(earliest:-2, latest:-1)去查找的相应的 offset,因为从 0.10.2 之后系统在保存 topic 数据时,会保存相应的 timestrap 信息;
  • coordinator.refreshCommittedOffsetsIfNeeded():发送 offset-fetch 请求获取其所订阅的所有 topic-partition 的 commited offset,如果这个 group 没有关于这个 topic-partition 的 offset 就会根据其默认的 auto.offset.reset 信息返回 -1或-2,并将获取到的信息更新到 committed offset 中;
  • fetcher.updateFetchPositions():如果 the fetch position 还没有有效值(第一步处理的那些 topic-partition 已经有了有效值),那么就将 the fetch position 设置为 committed offset。

到这一步,这个 consumer 订阅的 topic-partition list 都有了相应的 the fetch position,Fetcher 在发送 fetch 请求就知道应该从哪个 offset 开始去拉取这个 topic-partition,自此,发送 fetch 请求前的准备都已经完成。

fetcher.sendFetches()

这个虽然是 pollOnce 的第四步,但我们这里放在第三步来讲,只有在发送 fetch 请求后,才能调用 fetcher.fetchedRecords() 获取到其拉取的数据,所以这里先介绍这个方法,其具体实现如下:

//note: 向订阅的所有 partition (只要该 leader 暂时没有拉取请求)所在 leader 发送 fetch 请求
public int sendFetches() {
    //note: 1 创建 Fetch Request
    Map<Node, FetchRequest.Builder> fetchRequestMap = createFetchRequests();
    for (Map.Entry<Node, FetchRequest.Builder> fetchEntry : fetchRequestMap.entrySet()) {
        final FetchRequest.Builder request = fetchEntry.getValue();
        final Node fetchTarget = fetchEntry.getKey();

        log.debug("Sending fetch for partitions {} to broker {}", request.fetchData().keySet(), fetchTarget);
        //note: 2 发送 Fetch Request
        client.send(fetchTarget, request)
                .addListener(new RequestFutureListener<ClientResponse>() {
                    @Override
                    public void onSuccess(ClientResponse resp) {
                        FetchResponse response = (FetchResponse) resp.responseBody();
                        if (!matchesRequestedPartitions(request, response)) {
                            // obviously we expect the broker to always send us valid responses, so this check
                            // is mainly for test cases where mock fetch responses must be manually crafted.
                            log.warn("Ignoring fetch response containing partitions {} since it does not match " +
                                    "the requested partitions {}", response.responseData().keySet(),
                                    request.fetchData().keySet());
                            return;
                        }

                        Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
                        FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);

                        for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                            TopicPartition partition = entry.getKey();
                            long fetchOffset = request.fetchData().get(partition).offset;
                            FetchResponse.PartitionData fetchData = entry.getValue();
                            completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                    request.version()));//note: 成功后加入 CompletedFetch
                        }

                        sensors.fetchLatency.record(resp.requestLatencyMs());
                        sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
                    }

                    @Override
                    public void onFailure(RuntimeException e) {
                        log.debug("Fetch request to {} for partitions {} failed", fetchTarget, request.fetchData().keySet(), e);
                    }
                });
    }
    return fetchRequestMap.size();
}

在发送的 fetch 的过程中,总共分为以下两步:

  • createFetchRequests():为订阅的所有 topic-partition list 创建 fetch 请求(只要该topic-partition 没有还在处理的请求),创建的 fetch 请求依然是按照 node 级别创建的;
  • client.send():发送 fetch 请求,并设置相应的 Listener,请求处理成功的话,就加入到 completedFetches 中,在加入这个 completedFetches 集合时,是按照 topic-partition 级别去加入,这样也就方便了后续的处理。

从这里可以看出,在每次发送 fetch 请求时,都会向所有可发送的 topic-partition 发送 fetch 请求,调用一次 fetcher.sendFetches,拉取到的数据,可需要多次 pollOnce 循环才能处理完,因为 Fetcher 线程是在后台运行,这也保证了尽可能少地阻塞用户的处理线程,因为如果 Fetcher 中没有可处理的数据,用户的线程是会阻塞在 poll 方法中的。

fetcher.fetchedRecords()
这个方法的作用就获取已经从 Server 拉取到的 Records,其源码实现如下所示:

// 返回获取到的 the fetched records, 并更新 the consumed position
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
    Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
    int recordsRemaining = maxPollRecords;//NOTE: 在 max.poll.records 中设置单词最大的拉取条数

    while (recordsRemaining > 0) {
        if (nextInLineRecords == null || nextInLineRecords.isDrained()) { //note: nextInLineRecords 为空时
            CompletedFetch completedFetch = completedFetches.poll();//note: 当一个 nextInLineRecords 处理完,就从 completedFetches 处理下一个完成的 Fetch 请求
            if (completedFetch == null)
                break;

            nextInLineRecords = parseCompletedFetch(completedFetch);//note: 获取下一个要处理的 nextInLineRecords
        } else {
            TopicPartition partition = nextInLineRecords.partition;
            List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining);//note:拉取records,更新 position
            if (!records.isEmpty()) {
                List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
                if (currentRecords == null) { //note: 正常情况下,一个 node 只会发送一个 request,一般只会有一个
                    drained.put(partition, records);
                } else {
                    List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
                    newRecords.addAll(currentRecords);
                    newRecords.addAll(records);
                    drained.put(partition, newRecords);
                }
                recordsRemaining -= records.size();
            }
        }
    }

    return drained;
}


private List<ConsumerRecord<K, V>> drainRecords(PartitionRecords<K, V> partitionRecords, int maxRecords) {
    if (!subscriptions.isAssigned(partitionRecords.partition)) {
        // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
        log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);
    } else {
        // note that the consumed position should always be available as long as the partition is still assigned
        long position = subscriptions.position(partitionRecords.partition);
        if (!subscriptions.isFetchable(partitionRecords.partition)) {//note: 这个 tp 不能来消费了,比如调用 pause
            log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
        } else if (partitionRecords.fetchOffset == position) {//note: offset 对的上,也就是拉取是按顺序拉的
            //note: 获取该 tp 对应的records,并更新 partitionRecords 的 fetchOffset(用于判断是否顺序)
            List<ConsumerRecord<K, V>> partRecords = partitionRecords.drainRecords(maxRecords);
            if (!partRecords.isEmpty()) {
                long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
                log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
                        "position to {}", position, partitionRecords.partition, nextOffset);

                subscriptions.position(partitionRecords.partition, nextOffset);//note: 更新消费的到 offset( the fetch position)
            }

            //note: 获取 Lag(即 position与 hw 之间差值),hw 为 null 时,才返回 null
            Long partitionLag = subscriptions.partitionLag(partitionRecords.partition);
            if (partitionLag != null)
                this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag);

            return partRecords;
        } else {
            // these records aren't next in line based on the last consumed position, ignore them
            // they must be from an obsolete request
            log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
                    partitionRecords.partition, partitionRecords.fetchOffset, position);
        }
    }

    partitionRecords.drain();
    return Collections.emptyList();
}

PartitionRecords 是 parseCompletedFetch() 处理后的类型,其成员变量如下所示:

private static class PartitionRecords<K, V> {
    private long fetchOffset;
    private TopicPartition partition;
    private List<ConsumerRecord<K, V>> records;
    private int position = 0;
}

consumer 的 Fetcher 处理从 server 获取的 fetch response 大致分为以下几个过程:

  • 通过 completedFetches.poll() 获取已经成功的 fetch response(在 sendFetches() 方法中会把成功的结果放在这个集合中,是拆分为 topic-partition 的粒度放进去的)
  • parseCompletedFetch() 处理上面获取的 completedFetch,构造成 PartitionRecords 类型;
  • 通过 drainRecords() 方法处理 PartitionRecords 对象,在这个里面会去验证 fetchOffset 是否能对得上,只有 fetchOffset 是一致的情况下才会去处理相应的数据,并更新 the fetch offset 的信息,如果 fetchOffset 不一致,这里就不会处理,the fetch offset 就不会更新,下次 fetch 请求时是会接着 the fetch offset 的位置去请求相应的数据。
  • 返回相应的 Records 数据。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352