1、Consumer工作过程
(1)、在启动时或者协调节点故障转移时,消费者发送ConsumerMetadataRequest给bootstrap brokers列表中的任意一个brokers。在ConsumerMetadataResponse中,它接收消费者对应的消费组所属的协调节点GroupCoordinator的位置信息。
(2)、消费者连接协调节点GroupCoordinator,并发送HeartbeatRequest。如果返回的HeartbeatResponse中返回IllegalGeneration错误码,说明协调节点GroupCoordinator已经在初始化平衡。消费者就会停止抓取数据,提交offsets,发送JoinGroupRequest给协调节点。在JoinGroupResponse,它接收消费者应该拥有的topic-partitions列表以及当前消费组的新的generation编号。这个时候消费组管理已经完成,消费者就可以开始抓取数据,并为它拥有的partitions提交offsets。
(3)、如果HeartbeatResponse没有错误返回,消费者会从它上次拥有的partitions列表继续抓取数据,这个过程是不会被中断的。
交互数据格式:
对于每个消费者组,GroupCoordinator存储的数据:
- 对每个存在的topic,可以有多个消费组订阅同一个topic(对应消息系统中的广播)
- 对每个消费组,元数据如下:
消费组订阅的topics列表
Group配置信息,包括session timeout等
组中每个消费者的元数据。消费者元数据包括主机名,consumer id
每个正在消费的topic partition的当前offsets
Partition的ownership元数据,包括consumer到分配给消费者的partitions映射
GroupCoordinator工作过程参加上篇:
Consumer id的分配
消费者启动后,从协调者接收到的第一次JoinGroupResponse中有consumer id。从这里开始,消费者的每次心跳以及提交offset请求都必须要包含这个consumer id,作为消费者的唯一标识。协调者在成功rebalance时,会为消费者分配一个consumer id。(rebalance之前,协调者也会根据JoinGroupRequest中consumer id判断是否消费者都重新申请入组)
如果消费者发送的JoinGroupRequest带了consumer id,但是不匹配当前组成员的ids,协调者会在JoinGroupResponse中返回UnknownConsumer错误码,避免这个消费者加入到不认识的消费组中。这也不会触发组中其他消费者的rebalance操作。
2、KafkaConsumer示例和offset提交
消费者可以定时自动地提交offset,或者手动控制什么时候提交offset。
手动提交时,使用commitSync手动提交commitOffset,会阻塞调用线程,直到offsets成功被提交,或者在提交过程中发生错误。使用commitAsync则是非阻塞方式,会在成功提交或者失败时,触发OffsetCommitCallback回调函数的执行。
手动提交适合的场景
手动提交适合消息消费和业务处理逻辑耦合的场景。比如我们消费了一批记录,并且在内存中暂时保存,当有足够的记录时插入到数据库中,插入数据库成功,才允许提交offset。这样可以保存数据插入数据库成功才算正常消费消息。但是也会出现另外一种情况,插入数据库成功后,应用挂掉,导致提交offset失败,应用恢复是就会重新消费该消息。就是说,对于kafka而言,只能保证消息”至少发送一次”,但不能保证”正好一次”(交给了客户端自己实现,如可以将offset存储到kafka外部,保存offset和消费结果放在同一个事务里)。
自动提交
// 配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
// 创建消费者实例, 并且订阅topic
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
// 消费者消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
手动提交
props.put("enable.auto.commit", "false"); // 设置autoCommit为false
int commitInterval = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
if (buffer.size() >= commitInterval) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
}
消费者订阅指定分区(静态订阅)
在动态分配partition的场景下,消费者的加入和删除,都会导致partition的重新分配给其他的消费者。而静态分配partition下,如果消费者挂掉后,分配给这个消费者的partition并不会负载给其他消费者。静态分配partition的模式,消费者不是订阅主题,而是订阅指定的partition。
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
KafkaConsumer consumer = new KafkaConsumer(props);
// subscribe to some partitions of topic foo
TopicPartition partition0 = new TopicPartition("foo", 0);
TopicPartition partition1 = new TopicPartition("foo", 1);
TopicPartition[] partitions = new TopicPartition[2];
partitions[0] = partition0;
partitions[1] = partition1;
consumer.subscribe(partitions);
// seek to the last committed offsets to avoid duplicates
Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore();
consumer.seek(lastCommittedOffsets);
// find the offsets of the latest available messages to know where to stop consumption
Map<TopicPartition, Long> latestAvailableOffsets =
consumer.offsetsBeforeTime(-2, partition0, partition1);
boolean isRunning = true;
Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
while(isRunning) {
Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
Map<TopicPartition, Long> lastConsumedOffsets = process(records);
consumedOffsets.putAll(lastConsumedOffsets);
// commit offsets for partitions 0,1 for topic foo to custom store,offset存储到kafka外部
commitOffsetsToCustomStore(consumedOffsets);
for(TopicPartition partition : partitions) {
if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
isRunning = false;
else isRunning = true;
}
}
commitOffsetsToCustomStore(consumedOffsets);
consumer.close();
多线程消费
kafka的消费者(KafkaConsumer对象)并不是线程安全的。客户端代码需要自己确保多线程的访问是同步的。 唯一例外的是wakeup方法(是线程安全的):它可以被外部线程用来安全地中断一个进行中的操作。对于阻塞在wakeup方法上的线程会抛出WakeupException。可以被另外的线程用来作为关闭consumer的钩子。
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {
consumer.subscribe("topic");
while (!closed.get()) {
ConsumerRecords records = consumer.poll(10000);
// 处理新的记录
}
} catch (WakeupException e) {
if (!closed.get()) throw e; //如果关闭了忽略异常
} finally {
consumer.close();
}
}
// 关闭钩子,可以在另一个线程中调用
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
- 一个线程一个消费者
每个线程都有自己的消费者实例,消息消费逻辑和消息处理逻辑都在消费者线程中完成。这种方式的利弊:
优点:很容易实现,执行很快,因为没有线程之间的交互和协调。
优点:对于每个partition要保证顺序处理比较容易实现。每个线程只需要按照顺序处理它接收到的消息即可。
缺点:更多的消费者意味着集群的TCP连接也很多。不过kafka处理连接是很高效的,所以这个代价并不是很大。
缺点:多个消费者意味着发送更多的请求给服务器,每一批发送的数据变少(发送更多批),就会降低I/O吞吐量。
缺点:所有进程之间的线程数量会被partitions的数量所限制。
- 解耦消费和处理逻辑,共享消费者线程
另一种方式是有一个或多个消费者线程用来消费消息,并将消费结果ConsumerRecords转移一个阻塞队列中,
它会被消息处理线程池消费,消息处理线程顾名思义就是处理消息的线程。这种方式的利弊:
优点:可以相互独立地扩展消费者数量和处理器数量。可以只用一个消费者线程服务于多个处理线程,避免partitions的限制。
缺点:在处理器线程之间保证消息处理的顺序是比较困难的。因为线程之间是独立的,线程之间的顺序是无法保证的。所以即使是比较早的数据块也有可能比靠后面的数据块更晚被处理到。如果要求消息的处理是无序的,当然是没有问题的。
缺点:手动提交offset变得困难,因为它需要所有的线程协调起来确保这个partition的消息已经被处理完毕。
解决上面的缺点有多种方式。比如每个处理线程都可以有自己的队列,消费者可以对TopicPartition的hash结果放入不同处理线程的队列中,这样也可以确保消息被顺序地消费,并且简化提交offset的逻辑。
ConsumerRebalanceListener
ConsumerRebalanceListener用于在Rebalance之后,添加回调逻辑。
onPartitionsAssigned监听分区分配事件
onPartitionsRevoked监听分区撤销事件
KafkaConsumer consumer = new KafkaConsumer(props,
new ConsumerRebalanceListener() {
boolean rewindOffsets = true; // should be retrieved from external application config
public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) {
Map<TopicPartition, Long> latestCommittedOffsets = consumer.committed(partitions);
if(rewindOffsets)
Map<TopicPartition, Long> newOffsets = rewindOffsets(latestCommittedOffsets, 100);
consumer.seek(newOffsets);
}
public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) {
consumer.commit();
}
// this API rewinds every partition back by numberOfMessagesToRewindBackTo messages
private Map<TopicPartition, Long> rewindOffsets(Map<TopicPartition, Long> currentOffsets,
long numberOfMessagesToRewindBackTo) {
Map<TopicPartition, Long> newOffsets = new HashMap<TopicPartition, Long>();
for(Map.Entry<TopicPartition, Long> offset : currentOffsets.entrySet())
newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo);
return newOffsets;
}
});
consumer.subscribe("foo", "bar");
//...同上调用了process消费消息,并保存到consumedOffsets内存中
consumer.close();
控制消费者的position
kafka允许通过seek(TopicPartition,long)指定新的位置,或者seekToBeginning,seekToEnd定位到最早或最近的offset。
int commitInterval = 100;
int numRecords = 0;
boolean isRunning = true;
Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
while(isRunning) {
Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
try {
Map<TopicPartition, Long> lastConsumedOffsets = process(records);
consumedOffsets.putAll(lastConsumedOffsets);
numRecords += records.size();
// commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
if(numRecords % commitInterval == 0) consumer.commit();
} catch(Exception e) {
try {
// rewind consumer's offsets for failed partitions
// assume failedPartitions() returns the list of partitions for which the processing of the last batch of messages failed
List<TopicPartition> failedPartitions = failedPartitions();
Map<TopicPartition, Long> offsetsToRewindTo = new HashMap<TopicPartition, Long>();
for(TopicPartition failedPartition : failedPartitions) {
// rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset
// should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to.
offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition));
}
// seek to new offsets only for partitions that failed the last process()
consumer.seek(offsetsToRewindTo);
} catch(Exception e) { break; } // rewind failed
}
}
consumer.close();
private Map<TopicPartition, Long> process(Map<String, ConsumerRecords> records) {
Map<TopicPartition, Long> processedOffsets = new HashMap<TopicPartition, Long>();
for(Entry<String, ConsumerRecords> recordMetadata : records.entrySet()) {
List<ConsumerRecord> recordsPerTopic = recordMetadata.getValue().records();
for(int i = 0;i < recordsPerTopic.size();i++) {
ConsumerRecord record = recordsPerTopic.get(i);
// process record
processedOffsets.put(record.partition(), record.offset());
}
}
return processedOffsets;
}
refer:
http://blog.csdn.net/u014393917/article/details/52043317
http://www.cnblogs.com/huxi2b/p/6124937.html
http://blog.csdn.net/louisliaoxh/article/details/51577117
http://blog.csdn.net/chunlongyu/article/details/52663090
http://blog.csdn.net/u014393917/article/details/52043317
http://zqhxuyuan.github.io/2016/02/22/2016-02-22-Kafka-Consumer-new/