变化
- 支持ssl/sasl
- 支持group management protocol,允许consumer groups随着broker数增大
- 更小的依赖,不需要依赖kafka core
概念
- Consumer group
一组消费同一个Topic的Consumer的集合,每个consumer的加入和离开会导致rebalance partition在各个consumer的分配。
一个Brokers会充当coordinator,其保存partition的分配,和这个组的member成员。 - Offset Management
从配置文件中读取offset的起始位置(最早或者最晚),提交offset有自动模式和手动模式。自动模式会每隔一段时间自动提交一次
配置
- Core Configuration
总是把bootstrap.servers设置一个client.id - Group Configuration
- 设置group.id
- session.timeout.ms,正常是30s,如果程序中使用consumer和处理在同一个线程,建议提升这个值,避免rebalance过快。唯一的缺点是探测consumer失败的时间过长导致某些partition消费慢,但是通常情况下一个consumer退出会立刻通知coordinator
- heartbeat.interval.ms,提升他以减少rebalance
-
Offset Management
- enable.auto.commit
- auto.commit.interval.ms
- auto.offset.reset(earliest/latest)
管理
-
list Groups
bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --list
Describe Group
bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --describe --group foo
例子
Basic Poll Loop
public abstract class BasicConsumeLoop implements Runnable {
private final KafkaConsumer<K, V> consumer;
private final List<String> topics;
private final AtomicBoolean shutdown;
private final CountDownLatch shutdownLatch;
public BasicConsumeLoop(Properties config, List<String> topics) {
this.consumer = new KafkaConsumer<>(config);
this.topics = topics; this.shutdown = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1); }
public abstract void process(ConsumerRecord<K, V> record);
public void run() {
try {
consumer.subscribe(topics);
while (!shutdown.get()) {
// 这里 这里可以采用wakeup模式,此处
// ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
// 可以在另一个线程consumer.wakeup();
ConsumerRecords<K, V> records = consumer.poll(500);
records.forEach(record -> process(record));
}
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
shutdown.set(true);
// 确定consumer.close关闭才返回
shutdownLatch.await();
}
}
提交offset
autoCommitOffset的问题是如果重启可能会有数据重复处理的问题,可以通过减少commit interval的方式来减少这种重复。
同步提交最保险:
private void doCommitSync() {
try {
consumer.commitSync();
} catch (WakeupException e) {
// we're shutting down, but finish the commit first and then
// rethrow the exception so that the main loop can exit
doCommitSync();
throw e;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
}
}
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
doCommitSync();
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
如果group已经被rabalanced,此时commit会失败,抛出CommitFailedException。在处理event的时候可能sessionTimeout,有两种方法:
- 调整session.timeout.ms足够大,调整max.partition.fetch.bytes减少一次batch的返回事件数。
- 把事件处理放到另一个线程里面做。比如把event放到一个BlockingQueue里,
但是这有个问题,就是heartbeat request要在两个poll()调用之间处理,如果在offer处理中block时间很长,会导致该节点被踢出去。
参考:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
推荐1)
Delivery Guarantees####
- at least once:
auto-commit - at most once:
private boolean doCommitSync() {
try {
consumer.commitSync();
return true;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
return false;
}
}
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
if (doCommitSync())
records.forEach(record -> process(record));
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
- Exactly-once delivery
不支持
异步offset提交
异步提交可以提升吞吐,但是会有风险:如果commit失败不会retry。
在callback中自行记录失败的offset
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (e != null)
log.debug("Commit failed for offsets {}", offsets, e);
}
});
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
offsetcommit 失败通常不会引起太大的问题,因为并没有重复读原数据,一种比较常见的方式是在poll中异步提交,同时在rebalance和shutdown时同步提交:
private void doCommitSync() {
try {
consumer.commitSync();
} catch (WakeupException e) {
// we're shutting down, but finish the commit first and then
// rethrow the exception so that the main loop can exit
doCommitSync();
throw e;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
}
}
public void run() {
try {
consumer.subscribe(topics, new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
doCommitSync();
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
});
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
consumer.commitAsync();
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
doCommitSync();
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
}
异步提交只能处理at least once 这种情况,对于at most once这种情况,由于不能在确认是否commit成功再consumer数理数据,除非我们有unread语意!
多线程
Multi-threaded Processing
The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException
.The only exception to this rule is wakeup()
, which can safely be used from an external thread to interrupt an active operation. In this case, a WakeupException
will be thrown from the thread blocking on the operation. This can be used to shutdown the consumer from another thread. The following snippet shows the typical pattern:
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(10000);
// Handle new records
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
closed.set(true);
consumer.wakeup();
We have intentionally avoided implementing a particular threading model for processing. This leaves several options for implementing multi-threaded processing of records.
One Consumer Per Thread
A simple option is to give each thread its own consumer instance. Here are the pros and cons of this approach:PRO: It is the easiest to implement
PRO: It is often the fastest as no inter-thread co-ordination is needed
PRO: It makes in-order processing on a per-partition basis very easy to implement (each thread just processes messages in the order it receives them).
CON: More consumers means more TCP connections to the cluster (one per thread). In general Kafka handles connections very efficiently so this is generally a small cost.
CON: Multiple consumers means more requests being sent to the server and slightly less batching of data which can cause some drop in I/O throughput.
CON: The number of total threads across all processes will be limited by the total number of partitions.Decouple Consumption and Processing
Another alternative is to have one or more consumer threads that do all data consumption and hands off ConsumerRecords
instances to a blocking queue consumed by a pool of processor threads that actually handle the record processing. This option likewise has pros and cons:
PRO: This option allows independently scaling the number of consumers and processors. This makes it possible to have a single consumer that feeds many processor threads, avoiding any limitation on partitions.
CON: Guaranteeing order across the processors requires particular care as the threads will execute independently an earlier chunk of data may actually be processed after a later chunk of data just due to the luck of thread execution timing. For processing that has no ordering requirements this is not a problem.
CON: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure that processing is complete for that partition.
There are many possible variations on this approach. For example each processor thread can have its own queue, and the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify commit.
http://docs.confluent.io/2.0.1/clients/consumer.html#asynchronous-commits