消费者
1. 消费方式
采取pull的模式从kafka集群获取消息。
push的模式很难适用于消费速率不同的消费者。pull模式的不足之处在于,如果kafka没有数据,消费者可能会陷入循环。
2. 分区分配策略
一个消费者组有多个消费者,一个topic有个多分区。那么哪个分区由哪个消费者消费。
kafka两种分配策略:roundrobin和range。
2.1 RoundRobin 轮询
订阅一个topic时,将各个partition轮询分配到各个消费者。
多个topic时,TopicAndPartition组合,轮询分配到各个消费者。
2.2 Range
按照单个主题来分配。缺点在于随着topic增加,会造成消费者负载不均衡。
2.3 分区分配策略的触发时机?
消费者个数发生变化的时候
3. offset的维护
消费者组,主题,分区 -> offset
Kafka 0.9版本之前,consumer默认将offset保存在zk中,从0.9版本开始,consumer默认将offset保存在kafka一个内置的topic中,该topic为__consumer_offsets
既然位移信息存放在这个topic中,那么我们可以开启一个消费者,订阅这个主题,看看里面的位移数据。
(1) 首先,开启一个生产者生产数据都某个topic例如“test”主题,开启一个消费者消费这个主题的数据,指定这个消费者的group为“group_test”。
$ kafka-console-producer --broker-list localhost:9092 --topic test
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test -group group_test
(2) 接着,更改consumer.properties,将属性设置为false。
/usr/local/etc/kafka/consumer.properties
加上一行 exclude.internal.topics=false
(3) 再开启一个消费者,消费kafka内部主题__consumer_offsets。由于内部主题__consumer_offsets默认有50个分区,可以通过“group_test”.hashCode()%50来计算出分区号,也就是group_test的位移消息都是保存在这个分区上的。所以可以通过命令行直接查看__consumer_offsets在这个分区上的消息。
$ kafka-console-consumer --topic __consumer_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config consumer.properties --partition 38
[group_test,test,0]::OffsetAndMetadata(offset=104, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1616823705396, expireTimestamp=None)
[group_test,test,0]::OffsetAndMetadata(offset=104, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1616823710400, expireTimestamp=None)
[group_test,test,0]::OffsetAndMetadata(offset=104, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1616823715401, expireTimestamp=None)
[group_test,test,0]::OffsetAndMetadata(offset=104, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1616823720402, expireTimestamp=None)
[group_test,test,0]::OffsetAndMetadata(offset=104, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1616823725408, expireTimestamp=None)
[group_test,test,0]::OffsetAndMetadata(offset=105, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1616823730410, expireTimestamp=None)
[group_test,test,0]::OffsetAndMetadata(offset=105, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1616823735412, expireTimestamp=None)
其中[group_test,test,0],表示[消费者组,主题,分区号],那么这一条消息是group_test消费者组,消费test主题的0号分区,的消费位移。offset=104表示消费到104条消息了。
消费者Java API
1. 简单消费者
package com.examples.kafka_consumer;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class MyConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//开启自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//自动提交offset的延迟时间,1000ms提交一次
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//订阅主题
consumer.subscribe(Arrays.asList("test"));
//获取数据,隔100ms拉取一次
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(10000);
//解析并打印consumerRecords
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + "--" + consumerRecord.value());
}
//break;
}
}
}
2. 重置消费者offset
启动消费者时,默认不会消费之前的数据。不过可以通过AUTO_OFFSET_RESET_CONFIG来重置offset,从头开始消费消息。这个属性有两个值,earliest和latest。
不过这个配置只在两种情况下生效:
(1). 消费者在kafka中没有初始化的offset,也就是消费者组第一次消费。或者-
(2). 如果当前的offset已经不存在了,例如消费者消费到offset为10,此时消费者挂掉了,过了八天,kafka上面的数据被清除掉了,消费者被修好,继续按照offset为10来消费,但是kafka上最小的数据都已经是位移为1000了。
/**
* <code>auto.offset.reset</code>
*/
public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
public static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): <ul><li>earliest: automatically reset the offset to the earliest offset<li>latest: automatically reset the offset to the latest offset</li><li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li></ul>";
可以在Java 消费者中加上以下配置来试验:
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
3. 消费者offset是什么时候提交的?
笔者留意到一个情况,进行如下配置,也就是开启自动提交,此消费者组第一次消费(kafka上没有初始位移信息),重置offset为earliest。调用一次consumer.poll,程序结束。
第一次运行时,可以看到消费者从头消费了消息。
第二次运行时,依然是从头消费消息,这就有点奇怪了,第二次消费不是应该有上次的位移信息了,再继续上次的消费吗?难道上一次消费的位移信息没有被提交?
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata120");
ConsumerRecords<String, String> consumerRecords = consumer.poll(10000);
再继续研究,发现连续poll两次,第二次poll时,第一次poll之后的位移被提交了。
因此可以知道,一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。
4. 关闭自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
消费者不会自动向kafka提交消费位移。虽然消费者自己运行期间会在本地内存维护自己的消费位移,下一次poll会继续上一次的位移来消费,但是如果消费者挂掉了或者重启了,位移信息就没有了。
5. 手动提交offset
5.1 自动提交的缺点
自动提交的延时比较短的话,容易丢失数据。例如消费者取下来的数据还没处理完,offset就被自动提交了,这时消费者挂了,重启后,一部分没处理完的数据就在消费者这边丢了。
自动提交的延时比较长的话,容易造成数据的重复。例如消费者取下来的数据,一部分处理好了,还有一部分正在处理,这时消费者挂了,而offset还没有被提交。那么下次消费者重启时,是根据之前的位移来拉取数据,会将上次的数据重新取下来,造成部分数据的重复。
也就是说自动提交难以控制提交的时机,容易造成数据丢失或者重复。
5.2 手动提交offset
两种方式,commitSync(同步提交)和commitAsync(异步提交)
(1)同步提交offset
在每次处理完数据之后,调用consumer.commitSync()
同步提交,当前线程会阻塞知道offset提交成功。有重试机制,所以更可靠,但由于是同步所以效率相对较低。
(2)异步提交offset
consumer.commitAsync()
开启新线程去提交offset,主线程依然往下执行。
手动提交的缺点在于,不管是同步还是异步,都有可能重复数据。
5.3 自定义存储offset
不管是自动还是手动提交,offset都是被提交到kafka。
为了解决数据丢失或者重复,可以自己维护offset。最好的应用场景是,通过kafka拉取的数据,做过计算之后,要放到mysql里。那么刚好可以将存到mysql的操作和提交offset的操作,放在一个事务里。
自己维护offset比较麻烦的一点在于,需要考虑消费者的rebalance。consumer rebalance指的是,新的消费者加入消费者组,或者已有的消费者退出消费者组,会触发分区的重新分配。这时需要借助ConsumerRebalanceListener来处理这种情况。
//消费者订阅主题
consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {
//该方法会在Rebalance之前调用
@Override
public void onPartitionsRevoked(Collection partitions) {
commitOffset(currentOffset);
}
//该方法会在Rebalance之后调用
@Override
public void onPartitionsAssigned(Collection partitions) {
currentOffset.clear();
for (TopicPartition partition : partitions) {
consumer.seek(partition, getOffset(partition));//定位到最近提交的offset位置继续消费
}
}
});
//获取某分区的最新offset
private static long getOffset(TopicPartition partition) {
return 0;
}
//提交该消费者所有分区的offset
private static void commitOffset(Map currentOffset) {
}