客户端从kafka集群中消费数据,同时对于kafka broker的失败客户端可以自动进行处理,也可以自动的适应topic partition在集群间的迁移。允许使用consumer group来与broker进行交互以实现负载均衡。
consumer维护着到broker的TCP链接以便获取数据。在使用consumer之后如果没有关闭这些链接的话会导致资源泄露,consumer不是线程安全的,有关详细信息请参看: Multi-threaded Processing
一、Cross-Version Compatibility
该文章主要针对0.10.0或者更新的版本介绍,老版本的broker可能不支持这些特性,例如0.10.0不支持offsetsForTimew,因为这个特性是在0.10.1中添加的。当你执行不可用的API时,会收到一个 UnsupportedVersionException 异常。
二、Offsets and Consumer Position
kafka为partition中的每一条记录对应着一个数字的偏移量,我们称之为:offset。在一个partition中每一个记录的offset是该记录的唯一标识,即每一个offset唯一标识当前partition中的一条记录,同时offset也可以标识consumer在partition中的position,例如,一个consumer的position为5,其深层层含义就是该consumer已经消费了partition中offset为0到4之间的所有记录,下一次接收的将会是offset等于5的记录。实际上有两个概念与consumer position相关:
- 调用consumer的 position方法将返回该consumer下一条将要处理消息的offset。且这个position的值等于consumer在partition看到的最大的offset+1。每当consumer通过调用 poll(Duration).获取记录之后consumer 的position都会自动增加。
- 成功提交的position, 即 committed position 是最后一个成功存储的offset。如果consumer进程失败并且重启,那么这个offset就是consumer将要恢复的position。consumer可以定期自动提交offset,或者可以通过调用API (e.g. commitSync and commitAsync)来选择手动控制提交,这种设计使得consumer可以自己控制何时消费记录。
三、Consumer Groups and Topic Subscriptions
kafka 使用 consumer groups的概念来允许一个线程池来划分消费任务和处理记录。这些线程可以运行在同一台机器上也可以分布在多台机器上以便提供扩展性和容错能力。所有共享同一个 group.id的consumer属于同一个consumer group。
consumer group中的每一个consumer可以通过 subscribe API动态的获取他想要订阅的Topic列表。Kafka将把topic中的每一条消息分配给所有订阅该Topic的consumer group中的一个线程进行处理。 同时可以平衡consumer group 中所有成员之间消费的partition,consumer Group中的每个consumer之间消费的partition个数基本一致,这样Topic的每个partition都被精确地分配给consumer group中的一个consumer。 如果一个Topic有四个partition,一个consumer group 有两个进程,那么每个进程将消费两个partition。
consumer group中成员的关系是动态维护的:如果一个consumer进程失败,之前分配给该consumer进程的所有partition将会被重新分配给同一consumer group中的其他的consumer。如果一个新的consumer加入consumer group,该consumer group中其他的consumer将会把自身处理的一部分partition分配这个新的consumer。这就是consumer group中的rebalancing。当一个consumer group中添加订阅新的topic时,group rebalancing同样适用,group将定期刷新元数据以便检测新的partition并将他们分配给组中的成员。
从概念上讲,您可以将consumer group看作是由多个consumer进程组成的单个逻辑订阅者。作为一个多订阅系统( multi-subscriber system),kafka天然支持为一个Topic分配任意数量的consumer group,并且这不是通过数据复制实现的,因此对于Kafka而言,添加额外的consumer代价非常低。
本文是消息系统中常见功能的一个简单概括, 为了获得与传统消息交付系统中的队列类似的语义,所有consumer都将是单个consumer group的一部分,因此消息交付将会在group中进行平衡,就像使用队列一样。 但是,与传统的消息传递系统不同,您可以有多个这样的group。 要获得与传统消息传递系统中类似的 pub-sub的语义时,每个consumer进程可以拥有自己的consumer group,即所有的consumer进程都属于不同的consumer group,因此每个consumer进程将订阅发布到主题的所有记录。
此外,当组 rebalancing 分配自动发生时,consumers会通过 ConsumerRebalanceListener 被通知到(ConsumerRebalanceListener是一个回调接口,当分配给consumer的partition发生更改时,用户可以实现该回调接口来触发自定义操作),以便它们完成必要的应用程序级逻辑,如状态清理,手动提交offset 等等。详见 Offsets Outside Kafka。
开发人员也可以通过使用 assign(Collection) 为consumer手动分配partition,此时将禁用自动partition分配以及consumer group协调。
四、Detecting Consumer Failures
订阅一系列的topic之后,当consumer调用 poll(Duration) 时会自动加入相应的consumer group。 poll API的设计是为了确保consumer的活力,只要持续调用 poll,consumer将留在组中,并持续从分配给它的topic partitions中接收消息。consumer会在后台持续向服务发送心跳,如果consumer进程崩溃或者在 session.timeout.ms期间没有发送心跳,这个consumer将会被认为已经死掉了,他的分区将会被重新分配。
还有一种可能,消费者可能会遇到一种“ livelock”的情况,即它继续发送心跳,但没有取得任何进展(即处理消息的进程很缓慢)。 在这种情况下,为了防止consumer无限期地保留其分区,我们通过max.poll.interval.ms配置提供了一种活动检测机制。 基本上,如果consumer不在max.poll.interval.ms配置的最大间隔内频繁地调用poll,那么consumer将主动离开组,以便另一个consumer可以接管其分区, 发生这种情况时,您可能会看到offset提交失败( as indicated by a CommitFailedException thrown from a call to commitSync())。这是一种安全机制,它保证只有组中的活动成员才能提交offset。因此,要留在组中,必须继续调用poll。
consumer 提供了两个配置项来控制轮询poll的行为:
max.poll.interval.ms: 通过增加poll之间的间隔,您可以给consumer更多的时间来处理从 poll(Duration)返回的一批记录。 缺点是,增加这个值可能会延迟group rebalancing,因为consumer只会在调用 poll时参与rebalancing。 您可以使用此设置来限制完成rebalancing的时间,但是如果consumer实际上不能频繁地调用poll,那么你可能会面临进展缓慢的风险。
max.poll.records: 使用此设置可对单次调用poll返回的记录数进行限制。这样可以更容易地预测每个poll调用间隔内consumer所需处理的最大值。通过调优此值,你能够减少轮询间隔,这将降低group rebalancing的影响。
对于消息处理时间变化不可预测的用例, 这两种选择可能不够。 处理这些情况的推荐方法是将消息处理转移到另一个线程,这允许consumer在处理器仍在工作时继续调用poll。但是必须采取一些措施以确保提交的offset不会超出实际位置。 通常,你必须禁用自动提交,并仅在线程完成对记录的处理后手动提交处理过的offset(取决于您需要的交付语义)。 还请注意,你需要 pause 分区(即 暂停从请求的分区获取数据),以便在线程处理完上一次poll返回的记录之前,不会从poll中接收到任何新记录。
五、Usage Examples
consumer 的api提供了针对各种消费用例的灵活性。下面是一些示例来演示如何使用它们
5.1、Automatic Offset Committing
这个示例演示了Kafka的consumer api的简单用法,它依赖于自动提交offset。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
通过使用配置bootstrap.servers指定要连接的一个或多个broker的列表以便连接到集群,此列表仅用于发现集群中的其他broker,不需要是集群中服务器的详尽列表(尽管您可能希望指定多个服务器,以防客户机连接时服务器宕机)。
设置enable.auto.commit意味着将以配置项auto.commit.interval.ms指定的频率自动提交offset。
在本例中,consumer订阅了主题foo和bar,group.id配置为test,意味着该consumer属于一个名为test的consumer group。
deserializer设置指定如何将字节转换为对象。例如,通过指定StringDeserializer,我们可以认为记录的键和值将只是简单的字符串。
5.2、Manual Offset Control
与依赖consumer定期提交已使用的offset不同,用户还可以控制何时应将记录视为已使用,从而提交其offset。当消息的使用与某些处理逻辑结合在一起时,这非常有用,因此在完成处理之前,不应将消息视为已使用。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
在本例中,我们将在内存中缓存一批记录并对它们进行批处理。当我们批处理了足够多的记录时,我们将把它们插入数据库中。如果我们像前面的示例那样允许offset自动提交,那么在将记录返回给调用poll的consumer之后,它们将被视为已使用。 然而,在批处理完成之后,但在将记录插入数据库之前,我们的进程可能会失败。
为了避免这种情况,我们仅仅在相应的记录插入数据库之后手动提交offset。这使我们能够准确地控制记录何时被消费。同时,也存在另一种可能性:进程可能在插入数据库之后但在提交offset之前的间隔时间内失败(尽管这可能只有几毫秒,但这是一种可能性)。 在这种情况下,接管消费的进程将从上次提交的offset之后继续消费,并重复插入最后一批数据。 以这种方式就是Kafka中所提供的“ at-least-once”的交付保证, 因为每个记录至少只交付一次,但是在失败的情况下可能会重复。
注意:使用自动offset提交也可以提供“at-least-once”的交付, 但是要求您在调用 poll(Duration) 或者 closing consumer之前必须在消费掉所有返回的数据,。 如果这两种方法都失败,提交的offset就有可能超过消耗的位置,从而导致丢失记录。使用手动offset控制的优点是,您可以直接控制记录何时被认为“消费”。
上面的示例使用 commitSync 将所有接收到的记录标记为已提交。在某些情况下,您可能希望通过显式地指定offset来更好地控制已提交的记录。在下面的示例中,我们在处理完每个分区中的记录后提交offset。
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
提交的offset应该始终是应用程序将读取的下一条消息的offset。因此,在调用 commitSync(offsets)时,应该是最后处理的消息的offset+1。
5.3、Manual Partition Assignment
在前面的示例中,我们订阅了感兴趣的主题,并让Kafka根据consumer group中活跃的consumer将Topic的partition动态的分配给各个consumer。但是,在某些情况下,你可能需要手动的为某个consumer分配特定的partition。例如:
- 如果consumer进程维护这与该分区相关的某种本地状态(比如本地磁盘上的键值存储),那么它应该只获取它在磁盘上维护的分区中的记录。
- 如果进程本身是高可用的,并且在失败时将重新启动( 可能使用诸如YARN、Mesos或AWS设施之类的集群管理框架,或者作为流处理框架的一部分)。 在这种情况下,Kafka不需要检测故障并重新分配分区,因为消费进程将在另一台机器上重新启动。
要使用这种模式,不需要使用 subscribe 订阅topic,只需使用要使用的分区的完整列表调用 assign(Collection) 。
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
一旦分配完成,你可以循环调用 poll ,正如前面例子中所展示的那样。 使用者指定的组仍然用于提交偏移量,但是现在分区集只会随着另一个 assign 的调用而更改,不会自动进行rebalancing。 手动分区分配不使用组协调( group coordination),因此使用者失败不会导致分配的分区被重新 rebalanced, 即使与另一个consumer共享一个groupId,每个consumer的行为也是独立的。 为了避免offset提交冲突,你通常应该确保每个consumer实例都有唯一的groupId。
注意,手动的partition分配( i.e. using assign)与动态分区分配( i.e. using subscribe)混合使用是不可能的。
5.4、Storing Offsets Outside Kafka
消费者应用程序不需要使用Kafka内置的offset存储,它可以在自己选择的存储中存储offset。 这方面的主要用例是允许应用程序原子性的存储结果和offset。 这并非不可能的,但如果是这样,它将使消费完全原子化,并且可以实现比kafka自动提交offset所默认的“ at-least once”语义更强的“ exactly once”语义。 下面是一些这类用法的例子:
- 如果消费的结果存储在关系数据库中,那么在数据库中存储offset时,允许在单个事务中同时提交结果和对应的offset。 因此,要么事务成功,offset将根据实际情况进行更新,要么结果将不会存储,offset也不会更新
- 如果结果存储在本地存储中,那么也可以在那里存储offset。例如,可以通过订阅特定分区并同时存储offset和索引数据来构建搜索索引。如果以原子的方式执行此操作,通常可能会出现这样的情况,即发生了崩溃导致未同步的数据丢失,但是以存储的数据所对应的offset不会丢失。在这种情况下,可以从已保存的数据位置恢复数据,确保数据不会丢失。
每个记录都有自己的偏移量,所以要管理自己的偏移量,只需执行以下操作:
- 配置 enable.auto.commit=false
- 使用每个 ConsumerRecord 提供的偏移量来保存你的position。
- 在重新启动时,使用 seek(TopicPartition, long) )恢复consumer的position。
当分区分配也是手工完成时,这种类型的使用是最简单的(这可能在上面描述的搜索索引用例中)。 如果分区分配是自动完成的,则需要特别注意处理分区分配发生更改的情况。 这可以通过 This can be done by providing a ConsumerRebalanceListener instance in the call to subscribe(Collection, ConsumerRebalanceListener) and subscribe(Pattern, ConsumerRebalanceListener)。 例如,当从consumer获取分区时,consumer将希望通过实现 ConsumerRebalanceListener.onPartitionsRevoked(Collection)来提交这些分区的offset。当分区被分配一个consumer时 consumer将希望查找这些新分区的offset,并将consumer正确初始化到该位置,此时可以调用 ConsumerRebalanceListener.onPartitionsAssigned(Collection).
ConsumerRebalanceListener 的另一个常见用途是刷新应用程序为分区维护的缓存。
5.4、Controlling The Consumer's Position
在大多数用例中,consumer只是从头到尾地使用记录,定期提交其position(自动或手动)。然而Kafka允许consumer手动控制它的position,在分区中任意向前或向后移动。这意味着consumer可以重新使用旧记录,或者跳到最近的记录,而不需要实际使用中间记录。
在一些情况下,手动控制consumer的位置是有用的。
一种情况是时间敏感的记录处理,对于远远落后于处理所有记录的consumer来说,这可能是有意义的,因为他们不会试图赶上处理所有记录的速度,而只是跳到最近的记录
另一个用例用于维护上一节中描述的本地状态的系统。在这样的系统中,consumer将希望在启动时将其position初始化为本地存储的内容。同样,如果本地状态被破坏(比如磁盘丢失),则可以通过重新使用所有数据和重新创建状态(假设Kafka保留了足够的历史)在新机器上重新创建状态。
通过调用 seek(TopicPartition, long)Kafka允许指定一个新的 position , 也可以使用特殊的方法来查找服务器维护的最早的offset和最近的offset( seekToBeginning(Collection) and seekToEnd(Collection) )。
5.5、Consumption Flow Control
如果将多个partition分配给consumer以从中获取数据,则consumer将尝试同时从所有partition中进行消费,从而有效地为这些partition提供相同的消费优先级。但是,在某些情况下,使用者可能希望首先集中从指定partition的某个子集全速获取数据,并且只在这些partition只有很少或没有数据要使用时才开始获取其他partition
其中一种情况是流处理,处理器从两个topic 获取数据并在这两个流上执行连接。当其中一个topic远远滞后于另一个topic时,处理器希望暂停从前一个topic获取数据,以便让滞后的流赶上来。另一个例子是在用户启动时引导,在启动时需要跟踪大量历史数据,应用程序通常希望在考虑获取其他topic之前获得其中一些topic的最新数据。
通过使用 pause(Collection) 和 resume(Collection)kafka支持动态控制消费流,以便在 poll(Duration)调用中暂停消费和恢复消费。
六、Reading Transactional Messages
Kafka 0.11.0引入了事务,其中应用程序可以原子地写入多个topic和partition。为了使其工作,从这些topic读取数据的consumer应该配置为只读取已提交的数据。这可以通过在consumer的配置中设置** isolation.level=read_committed**
In read_committed mode, consumer将只读取那些已成功提交的事务消息。它将像以前一样继续读取非事务性消息。 read_committed模式下没有客户端缓冲。 相反,read_committed consumer 的分区的end offset 将是属于开放事务的分区中的第一个消息的offset。这个offset称为“最后稳定偏移量”(Last Stable offset, LSO)。
一个read_committed consumer 只读取到LSO并过滤掉任何已中止的事务消息。 LSO还影响read_committed consumer 的 seekToEnd(Collection) 和 endOffsets(Collection) 的行为,每个方法的文档中都有详细的说明。 最后,对于read_committed consumer,fetch滞后指标也被调整为相对于LSO.
带有事务消息的分区将包括提交或中止标记,这些标记指示事务的结果。 这些标记没有返回给应用程序,但是日志中有一个偏移量。 因此,应用程序从具有事务性消息的主题中读取消费的offset将看到空白。 这些丢失的消息将是事务标记,并在两个隔离级别中为consumer过滤掉它们。此外,使用read_committed consumer 的应用程序可能还会看到由于事务中止而造成的差距,因为这些消息不会被consumer返回,但是会有有效的offset。
七、Multi-threaded Processing
Kafka消费者不是线程安全的。所有网络I/O都发生在调用应用程序的线程中。确保多线程访问正确同步是用户的责任。非同步访问将导致ConcurrentModificationException异常。 此规则的唯一例外是 wakeup(),可以从外部线程安全地使用它来中断活动操作。在这种情况下,将从操作的线程阻塞中抛出 WakeupException。这可以用于从另一个线程关闭consumer。下面的代码片段展示了典型的模式:
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
// Handle new records
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
然后在单独的线程中,consumer可以被关闭通过设置closed 标值
closed.set(true);
consumer.wakeup();
请注意,虽然可以使用线程中断而不是 wakeup()来中止阻塞操作(在这种情况下,将引发 InterruptException),但是我们不建议使用它们,因为它们可能会导致consumer完全关闭,从而中止阻塞操作。中断主要支持那些不可能使用 wakeup()的情况,例如,当consumer线程由位置代码的Kafka客户机管理时。
我们有意避免为处理实现特定的线程模型。这为实现记录的多线程处理留下了几个选项
7.1、One Consumer Per Thread
一个简单的选项是给每个线程一个它自己的consumer实例。下面是这种方法的优缺点
优点:
- 它是最容易实现的
- 它通常是最快的,因为不需要线程间的协作
- 它使得在每个分区的基础上按顺序处理非常容易实现(每个线程只按照它接收到的顺序处理消息)
缺点:
- 更多的consumer意味着到集群的TCP连接更多(每个线程一个)。一般来说,Kafka非常有效地处理连接,所以这通常是一个小成本。
- 多个consumer意味着发送到服务器的请求更多,批处理的数据更少,这可能会导致I/O吞吐量下降。
- 所有进程的线程总数将受到分区总数的限制
7.2、 Decouple Consumption and Processing
另一种选择是让一个或多个consumer线程执行所有数据消费,并将 ConsumerRecords 实例传递给由实际处理记录处理的处理器线程池所消费的阻塞队列。这个选择也有优点和缺点:
优点:
- 这个选项允许独立地扩展消费者和处理器的数量。这样就可以有一个单一的consumer来提供许多处理器线程,从而避免分区上的任何限制
缺点:
- 确保处理器之间的顺序需要特别注意,因为线程将独立执行较早的数据块,由于线程执行计时的幸运,实际上可能在稍后的数据块之后处理较早的数据块。对于没有排序需求的处理,这不是问题。
- 手动提交位置变得更加困难,因为它要求所有线程协调以确保该分区的处理完成
这种方法有许多可能的变体。例如,每个处理器线程可以有自己的队列,而consumer线程可以使用TopicPartition散列到这些队列中,以确保按顺序使用和简化提交。