一、基本概念
1、broker
broker
指的一个kafka
服务器,一个kafka集群是由多个 kafka broker 组成。
2、producer
producer
指的是消息生产者,即发送消息到 kafka broker
的客户端。
3、consumer
consumer
指的消息消费者,即从 kafka broker
获取消息的客户端。
4、cousumer group
consumer group
指的是消费者组,拥有相同的 group id
的消费者构成一个消费者组。
- 消费者组与消费者之间互不影响。
- 消费者组内的每个消费者负责消费不同的分区的数据。
-
一个分区只能有同一个分区中的一个消费者消费
。
5、topic
topic
指的是主题。生产者生产消息、消费者消费消息,都需要指定一个topic。
6、partition
partition
指的是分区。
- 一个
topic
可以存在多个分区。 - 一个分区,只能被同一个消息者组中的某一个消费者消费。
-
每个分区上的消息都是有序的
,但是主题(topic
)上的消息不是有序的。 - 每个分区可能存在多个
follower
,其中负责读/写
的是leader
。 - 每个
kafka broker
可以是当前分区的leader,也可以是其它分区的follower
。 - 多个分区可以提高程序的并发性,因为一个分区只能一个分区消息,多个分区可以多个消费者同时消费。
7、replicas
replicas
指的是副本。数据冗余,保证集群的可用性。
leader
指的是topic
主题的分区中,每个分区的主
。生产者发送数据、消费者消费数据都是从leader分区
中操作的。follower
指的是topic
主题的分区中,每个分区的从
。负责从leader分区
同步数据,当 leader 宕机时,follower 可能成为新的 leader。默认是从ISR
中进行选举。设置unclean.leader.election.enable = true
可以从非ISR
节点中进行选举,这样可以导致丢失数据,默认值是false
,建议是false
。-
ISR(in-sync replicas)
- 与leader保持同步的follower集合
- 如果follower在超过
replica.lag.time.max.ms
毫秒,没有与leader进行同步,则踢出ISR。
-
OSR(out-sync replicas)
- 落后leader太多的副本
-
AR
- ISR + OSR
是否允许非ISR的副本参与选举。
二、生产者分区策略
即生产者,发送发送消息后,该消息保存到kafka broker
上的topic
上的那个partition
上。
1、默认分区策略
默认的分区策略使用的是 org.apache.kafka.clients.producer.internals.DefaultPartitioner
这个类。
即我们代码中使用org.apache.kafka.clients.producer.ProducerRecord
发送消息时。
- 在指定了
partition
的情况下,直接使用指定的 分区。 - 没有指定
partition
但指定了key
的情况下,将key的hash值与topic的partition
数进行取余得到partition值。 - 没有指
partition
和key
,在一个批次满的时候会发送到同一个分区,当一个新的批次创建时,会发送到另外的分区中。可以通过KIP-480
获取更多的粘性分区知识。
2、自定义分区策略
- 实现
org.apache.kafka.clients.producer.Partitioner
接口。 - 代码实现
Properties prop = new Properties();
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"自定义实现的分区器的全路径");
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
三、生产者 acks 应答机制
1、acks = 0
producer
不等待broker
返回ack
信息,只是将消息加入到socket的缓冲中,可能还未发送出去。
- 重试(
retries
)不会生效。 - 为每个记录返回的偏移量始终是
-1
。 - 可能会存在
丢失数据
的情况。
2、acks = 1
producer
等待broker
返回ack
信息,只要这个partition
的leader
将消息成功保存后返回ack
。
- 如果
leader
返回ack
后,但是还未同步给follower
,此时leader
宕机了,可能会出现丢失数据
的情况。
3、acks = [all | -1]
producer
等待broker
返回ack
信息,partition
的leader
和follower
全部将消息保存成功后,才返回ack
信息。
如果在
leader
和follower
将消息都保存完毕,但是在返回ack
之前,leader
宕机了,此时可能导致消息重复
。-
此处指的是 所有的follower 都不同完,而不是半数以上同步完,才发送ack。
- 因为,当需要容忍 `n` 台节点故障时,`半数以上` 需要`2n+1`个副本,而`所有follower`都同步完,只需要`n+1`个副本。
四、日志文件的 HW 和 LEO
1、HW
HW
: 即 High Watermark,指的是该分区(partition
)中所有副本(leader+follower)中最小的LEO
。
- 上面中所有的副本:指的是 该分区的
ISR集合。
-
HW 之前的消息是可消费的
,比如 HW=5,那么可以消费的 offset 是 [0-4] 不包括5.
2、LEO
LEO
:即 Log End Offset,即当前日志文件中,下一条
消息待写入的 offset。
3、举例说明
比如,一个分区下有3个ISR,一个leader和2个follower。
leader 的leo 是 6
follower01 的leo是 5
follower02 的 leo 是 4
那么该分区的 hw 是 4,即最小的leo。
五、消费者的分区策略
我们知道,我们的消费者(consumer
)是隶属于消费者组(consumer group
)的,消费者组中可以存在多个消费者,每个消费者可以消费多个主题(topic
),每个主题又存在多个分区(partition
),每个分区只能由一个消费者来消费,每个消费者又可以消费多个分区。那么就必然涉及到 如何将某个分区分配给那个消费者消费。
1、RangeAssignor - 基于订阅的 topic来分配
默认分区策略
2、RoundRobinAssignor - 基于consumer group来分配,可能出现误消费别的主题的情况
eg:
topicA 存在 0,1,2 三个分区
topicB 也存在 0,1,2三个分区
consumerA 和 consumerB 同属于一个组
consumerA 订阅 topicA
consumerB 订阅 tocpiA 和 topicB
此时按照 RoundRobinAssignor 策略,会先将所有的分区进行排序,则会产生6个主题分区对象(`TopicPartition`),因为topicA和topicB 每个主题共有3个分区。此时就有可能将topicB的消息发送给了cousumerA,导致消费错误。
3、StickyAssignor -
4、CooperativeStickyAssignor -
六、消费者offset的维护
1、自动提交offset
Properties prop = new Properties();
// 设置自动提交offset
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 自动提交offset的间隔,单位ms
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(prop);
2、手动提交offset
consumer.commitAsync()
: 异步提交
consumer.commitSync()
: 同步提交
Properties prop = new Properties();
// 设置手动提交offset
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Arrays.asList("topicA", "topicB"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.out.println("offset: " + record.offset() + "value:" + record.value());
}
// 异步提交offset
consumer.commitAsync();
// 同步提交offset
// consumer.commitSync();
}
3、自定义提交offset
需要实现 org.apache.kafka.clients.consumer.ConsumerRebalanceListener
接口,在消费者发生 rebalance
时,保存或获取自定义的offset。
七、拦截器
1、生产者拦截器
实现org.apache.kafka.clients.producer.ProducerInterceptor
接口。可以实现消息发送到kafka broker
之前的消息拦截。
1、onSend(ProducerRecord<K, V> record)方法
该方法运行在主线程中,我们可以在该方法中对消费进行任何操作,但是最好不要修改topic
和partition
,否则可能影响消息目标分区的计算。
2、onAcknowledgement(RecordMetadata metadata, Exception exception)
该方法运行在producer 的 I/O
线程中,因此不要执行一些比较耗时的操作,否则会拖慢producer的发送消息的速度。该方法在消息发送kafka broker之后返回ack之后执行或发送到kafka broker的过程中发生异常执行。
需要自己保证实现的拦截器的线程安全问题。
Properties prop = new Properties();
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("实现ProducerInterceptor类的全类名"));
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
2、消费者拦截器
实现org.apache.kafka.clients.consumer.ConsumerInterceptor
类,可以对从kafka broker
获取到的消息进行拦截。和consumer#poll
运行在同一个线程中。
Properties prop = new Properties();
prop.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("实现ConsumerInterceptor类的全类名"));
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);