第一章 初始kafka
参考书籍: 朱小厮--深入理解Kafka 核心设计与实践原理
Kafka体系结构
-
Kafka体系架构包含若干Producer, 若干Broker , 若干Consumer,以及一个Zookeeper集群。
- Zookeeper是Kafka用来负责集群元数据的管理、控制器的选举等操作。
- Producer:生产者,即发送消息的一方。生产者负责创建消息,然后将其投递到Kafka中
- Broker:一个独立的Kafka服务节点。 一个或多个Broker组成了一个Kafka集群
- Consumer: 消费者,也就是接收消息的一方。消费者连接到Kafka上并拉取消息,进行相应的业务逻辑处理
主题和分区
Kafka的每条消息都属于一个主题,生产者负责将消息发送到特定的主题,而消费者负责订阅主题并消费
-
一个主题可以细分为多个分区,一个分区属于单个主题。 分区可以看成是一个可追加的日志文件, 消息在被追加到分区日志文件时会分配一个
偏移量
, 偏移量是消息在分区中的唯一标识,Kafka保证了 偏移量在分区中是有序的。 每一条消息发送时会根据分区规则选择存储到哪一个分区,在主题创建之后可以通过修改分区的数量实现水平扩展。
多副本(Replica机制)
- 多副本机制是通过增加副本数量进行数据冗余,从而提高容灾能力。 副本之间是“一主多从”的关系。其中leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步 (区别于读写分离) , 当leader副本宕机时通过leader选举和失效转移,保证了Kafka的高可用性。
- folower副本的消息相对于leader副本具有一定的滞后性
几个重要名词概念
AR (Assigned Replicas): 分区中的所有副本
ISR (In-Sync-Replicas): 与leader副本保持一定程度同步的副本
-
OSR(Out-of-Sync-Replicas): 与leader副本滞后过多的副本
即 AR = ISR + OR;
HW(High WaterMark): 高水位, 用来标记一个特定的消息偏移量,消费者只能拉取到这个offset之前的消息(可见性)
LEO( Log End Offset) : 标志着当前日志文件中下一条待写入消息的offset 。 分区ISR集合中的每个副本都维护自身的LEO,而ISR集合中的最小LEO为分区的HW,对消费者而言只能消费HW之前的消息。
第二章 生产者
KafkaProducer是线程安全的
public class KafkaProducerAnalysis {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "producer.client.id.demo");
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!");
producer.send(record);
}
}
- 生产逻辑的几个步骤
- 配置生产者客户端参数并创建生产者实例
- 构建待发送消息
- 发送消息
- 关闭生产者实例
发送消息的三种模式
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
- 发后即忘
- 只管发送消息,不关心信息是否正确到达。
- 优点:性能最高,吞吐量大 缺点:会造成数据丢失,可靠性低
- 同步
- 发送消息后返回Future对象,调用get()方法时阻塞等待,直到发送成功或出现异常
- 优点:可靠性高,如有异常可处理或进行消息重发 缺点:性能低,造成阻塞
- 异步
- 发送消息时指定回调函数,Kafka在返回响应时会调用该函数实现异步的发送确认。
- 在同一个分区中,如果消息record1比record2先发送,那么它会保证callback1在callback2之前调用。
序列化器
- 生产者使用序列化器将对象转换为字节数组,才能通过网络发送给Kafka
- 消费者使用反序列化其把Kafka中收到的字节数组转换为相应的对象。
- 因此生产者的序列化器和消费者使用的反序列化器要一一对应。
分区器
分区器 是根据key这个字段来计算partition值。它的作用是为消息分配分区
生产者拦截器
生产者拦截器既可用来在消息发送前做一些准备工作如 按照某个规则过滤掉不符合要求的消息,修改消息内容等。也可以用来在发送回调逻辑前做一些定制化需求,如统计工作。 还可以指定多个拦截器形成拦截器链
生产者整体架构
- 整个生产者客户端由 主线程和Sender线程构成
- 在④中,是用于缓存消息,以便Sender线程进行批量发送,进而减少网络传输
- 在⑤中,是将 <分区,消息集合> 转化为 <brokerId, 消息集合>。 即逻辑地址到物理地址的转化
- 在⑦中,用于缓存尚未收到回应的消息,以便异常时可进行重发
- 重要参数 max.in.flight.requests.per 默认值为5,即每个连接最多只能缓存5个未响应的请求。 可类比于TCP连接中的滑动窗口大小
元数据的更新
元数据是指Kafka集群中的元数据,这些元数据记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follwer副本分配在哪些节点上,哪些副本在AR,ISR集合中,集群有哪些节点,控制节点又是哪一个等信息。
元数据更新会挑选 InFlightRequests中当前负载最小的节点发送更新元数据请求。 由于Sender线程需要更新,而主线程需要读取。因此数据同步问题也要考虑。使用synchronized和final保证。
几个重要的参数
- acks : 用来指定分区中必须要有多少个副本收到这条消息,这样生产者才认为消息写入成功
- 取值为1 : 只要leader副本成功写入消息,就会收到kafka的成功响应
- 取值为0: 不需要等待任何服务器响应,写入就认为成功
- 取值为-1或all:需要等待ISR中的所有副本都成功写入消息,才会收到kafka的成功响应
- max.request.size
- 限制生产者客户端能发送消息最大值
- retires 、retry.backoff.ms
- 配置生产者重试次数 、 两次重试的时间间隔
- max.in.flight.requests.per.connection
- 默认值为5,即每个连接最多只能缓存5个未响应的请求。
- 当此参数 > 1 ,则会因为重发而出现错序的问题
第三章 消费者
kafkaConsumer是线程不安全的
消费者和消费者组
每个消费者只能消费所分配到的分区中的消息,即每一个分区只能被一个消费者组中的一个消费者所消费
-
当消费组内的消费者个数变化时对应的分区分配演变如下:(默认的RangeAssinor为例)
消费者与消费者组的模型让整体消费能力具有了伸缩性。可以增加消费者个数来提高(或降低)整体消费能力
-
当消费者过多,出现消费者分配不到任何分区时,那么这些消费者将无法消费消息,造成浪费
-
Kafka基于消费者和消费者组模型 支持了 点对点和发布/订阅两种模式
- 如果所有消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,相当于点对点模式
- 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,相当于发布/订阅模式的应用
每一个消费者只隶属于一个消费组。消息发送时可指定消费者组, 消费者客户端通过group.id配置消费者组名称,默认为空字符串。
消费者客户端开发
KafkaConsumer是非线程安全的
public class KafkaConsumerAnalysis {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static Properties initConfig() {
Properties props = new Properties();
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("client.id", "consumer.client.id.demo");
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = " + record.topic()
+ ", partition = " + record.partition()
+ ", offset = " + record.offset());
System.out.println("key = " + record.key()
+ ", value = " + record.value());
//do something to process record.
}
}
} catch (Exception e) {
log.error("occur exception ", e);
} finally {
consumer.close();
}
}
}
-
Kafka的消费逻辑
- 配置消费者客户端参数及创建相应消费者实例
- 订阅主题
- 拉取消息并消费
- 提交消费位移(后面会讲)
- 关闭消费者实例
-
订阅主题和分区的细节
-
有三种订阅的方式。 集合订阅的方式subscribe(Collection)、正则表达式订阅方式subscribe(Pattern)、
指定分区的订阅方式assign(Collection)
subscribe订阅主题时具有再平衡(后面会讲)的功能,而assign没有。
-
反序列化器
将字节数组转化为对象,与生产者的序列化器要一一对应
消息消费
几个常用API
#KafkaConsumer
public ConsumerRecords<K, V> poll(Duration timeout);
#ConsumerRecords
public List<ConsumerRecord<K, V>> records(TopicPartition partition);
public Iterable<ConsumerRecord<K, V>> records(String topic);
- timeout参数用于控制阻塞时间,再消费者缓冲区里没有数据时会发生阻塞
- 按照分区的维度,获取拉取消息中 某个分区的所有记录
- 按照主题的维度,获取拉取消息中 某个主题的所有记录
位移提交
-
Consumer会记录上一次的消费位移,并进行持久化保存。 存储到Kafka的内部主题__consumer_offset中
-
位移的提交时机也有讲究,可能会造成重复消费和消息丢失的现象
拉取到消息之后就进行位移提交, 若消费到一半时宕机,则造成消失丢失现象
-
消费完所有消息后在进行位移提交, 若消费到一半时宕机,则造成重复消费现象
Kafka默认的消费位移提交方式是自动提交(定期)。
enable.auto.commit
默认为true;auto.commit.interval.ms
配置提交的周期。自动提交的动作是在poll()方法的逻辑中完成的,会在每次拉取请求之间检查是否可以进行位移提交。-
自动提交会造成重复消费和消息丢失的现象
重复消费: 消费到一半时宕机,而尚未提交,则造成重复消费
-
消息丢失:如图线程A进行拉取消息到缓存,线程B从缓存中处理逻辑。 若线程B处理到一半时宕机,那么下次恢复时又从【X+7】开始拉取,造成了【x+4】-【X+7】消息的丢失
可以看出自动提交编码简单但会出现消息丢失和重复消费现象,并且无法做到精确的位移管理,因此Kafka还提供了 手动提交的方式。通常不是拉取到消息就算消费完成了,而是当我们通过这条消息完成一系列业务处理后,才认为消息被成功消费。开启手动提交需要
enable.auto.commit
设置为false-
手动提交可分为 同步提交和异步提交。 即commitSync()和commitAsync()两种方式
以下是同步提交和异步提交的一些案例
#拉取所有消息并处理后进行同步提交 while (running.get()) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { //do some logical processing. } consumer.commitSync(); } #批量处理+批量提交 int minBatchSize = 200; while (running.get()) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { //do some logical processing with buffer. consumer.commitSync(); buffer.clear(); } } #带参数的同步位移提交,可控制提交的offset,该案例为每消费一条就提交一次 while (running.get()) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { //do some logical processing. long offset = record.offset(); TopicPartition partition = new TopicPartition(record.topic(), record.partition()); consumer.commitSync(Collections .singletonMap(partition, new OffsetAndMetadata(offset + 1))); } } #按分区粒度同步提交消费位移,每处理完一个分区就提交一次 while (running.get()) { ConsumerRecords<String, String> records = consumer.poll(1000); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { //do some logical processing. } long lastConsumedOffset = partitionRecords .get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastConsumedOffset + 1))); } } #异步提交,可指定提交完成后的回调函数 public void commitAsync(); public void commitAsync(OffsetCommitCallback callback); public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
异步提交也存在重复消费的问题。如果先提交了【X+2】,再提交【X+8】。如果后者提交成功而前者提交失败。 如果此时前者进行重试提交,那么成功后会造成数据的重复消费。
对于异步提交可以设置一个递增的序号维护异步提交的顺序,如当位移提交失败需要重试提交时,对比所提交的位移和维护的序号大小,如果前者小于后者,就不需要再重复提交了。
控制或关闭消费
- KafkaConsumer提供了暂停pause()和恢复resume()某些分区的消费;以及关闭close()的方法'
指定位移消费
- 如用一个新的消费者组来消费主题时,由于没有可查找的消费偏移,因此会按照
auto.offset.reset
配置来决定从何处开始消费消息。 - seek()方法为我们提供了从特定位置读取消息的能力,通过这个方法来向前跳过若干消息,也可以通过这个方法来向后回溯若干消息
再均衡
- 再均衡是指分区所属权从一个消费者转移到另一个消费者的行为。它为消费者组具备高可用性和伸缩性提供了保障。
- 在再均衡期间,消费组内的消费者是无法读取消息的。即在再均衡发生期间的一段时间内,消费者会变得不可用
- 当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。比如消费者消费完某个分区的一部分信息但还没来得及提交消费位移就发生了再均衡操作
- 调用subscribe()方法时可以提供再均衡监听器来设置 消费者停止读取消息 和 开始读取消费 时的回调函数
消费者拦截器
- 在消费者poll()方法返回之前 、 提交完消息位移之后 、关闭消费者之前 调用拦截器中的方法
- 多个消费者拦截器也能组成拦截器链
多线程的实现
-
一个消费线程消费一个或多个分区
多个消费线程同时消费同一个分区 (会使得提交偏移量异常复杂)
-
类似IO多路复用,一个线程拉取消息,拉取消息后提交到线程池中。 (因为拉取消息会比处理业务逻辑快)
- 会出现消息丢失的情况,线程A消费0-99 ,线程B消费100-199 后提交。 线程A未提交而挂掉后,那么0-99这一段数据就丢失了