拦截器
- 客户端kafka拦截器 拦截器类似于spring等切面,可以对生产者和消费者收发消息的过程进行自动增强;分为生产者拦截器和消费者拦截器
- 生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑;
- 具体的代码配置
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
// 拦截器 1
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor");
// 拦截器 2
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
- 生产者拦截器类需要继承 org.apache.kafka.clients.producer.ProducerInterceptor 类并实现其中的方法,其中OnSend是在消息发送之前执行的; onAcknowledgement是在消息提交成功或者消息提交失败的时候处理的,onAcknowledgement 的调用要早于 callback 的调用,处理的线程和callback不是一个线程;
- 消费者拦截器类需要继承的 org.apache.kafka.clients.consumer.ConsumerInterceptor ,并实现其中的OnConsumer 和 onCommit分别代表消息在消费之间进行了一侧过滤,onConsumer是是在提交位移之后处理的。
- kafka拦截器是一个低频使用的功能,但是可以作为一种监控等使用;
Kafka TCP连接的那些事
- kafka生产者创建tcp连接
- kafka在消息传输协议上使用的是TCP连接,关于生产者在什么时候创建tcp连接以及什么时候关闭tcp连接的
- 生产者在Producer被初始化的时候,创建tcp连接,这是最早进行的tcp连接;在使用的过程中如果尝试给一个不存在(当前存在的tcp连接中)topic发送消息的时候,或者如果设置定期更新tcp连接集群信息的时候,回你创建tcp连接(此时创建多少个?)
- 生产者什么时候关闭tcp连接:一种是用户主动关闭,一般是应用程序的关闭或者是调用producer.close进行关闭;一种是 Kafka 自动关闭,Producer 端参数 connections.max.idle.ms 的值有关。默认情况下该参数值是 9 分钟,即如果在 9 分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。如果设置成-1的话,会产生很多僵尸连接;
- 因此在创建生产者的时候,不需要指定整个集群的所有地址,如果指定的话,在初始化的时候就会创建很多tcp连接,造成资源的不必要浪费
- kafka消费者创建tcp连接与关闭连接
- 消费者也是使用tcp协议进行的,但是消费端不是在初始化的时候就创建tcp连接了,而是在poll的时候才会创建tcp连接,具体的过程分为以下三个步骤
- 创建连接 来 find coordinator
- 根据find coordinator获取到的真正broker的信息,创建与 coordinator 的tcp连接,coordinator负责对consumer的注册,管理,位移等操作
- 在消费不同分区的消息的时候,创建与该分区副本leader的broker建立连接,比如说现在有五个分区消息需要消费,但是只有两个broker的话,最多会创建 2 个tcp连接;这个连接创建之后,第一个连接会被后台在某个时间段进行关闭
- 什么时候关闭连接
- 主动关闭,直接kill或者调用close()方法进行关闭
- 被动关闭,消费者端参数 connection.max.idle.ms控制的,该参数现在的默认值是 9 分钟,即如果某个 Socket 连接上连续 9 分钟都没有任何请求“过境”的话,那么消费者会强行“杀掉”这个 Socket 连接,如果写了个程序,用循环poll的话,建立起一个伪长连接的话,socket一直不会断开
- 消费者也是使用tcp协议进行的,但是消费端不是在初始化的时候就创建tcp连接了,而是在poll的时候才会创建tcp连接,具体的过程分为以下三个步骤
- 关于消费者连接的问题,在第一个连接创建之后,到第三种连接创建的时候,需要把第一个连接关闭,因为第一个连接不知道具体的节点信息,没法复用,导致资源的浪费,需要重新创建新的连接,这个问题是否能够进行改善?
kafka的幂等生产者和事务生产者
- kafka消息交付的可靠性保障目前来说主要支持至少一次的发送,也就是如果生产者网络波动的话,允许多次发送,这样的话,可能会造成消息的重复;kafka因此还支持精确一次的发送,即消息既不会重复发送也不会丢失;
- 精确一次发送的主要支持是 幂等和事务型发送,幂等的话,直接在配置中设置enable.idempotence = true;如果是事务型的话,除了设置enable.idempotence = true之外还需要设置 transctional. id;幂等消息的话,只能够保障某个主题的分区一致性,不能够跨区,如果producer重新启动的话,丧失对上一次的幂等约束,这也就要求所有需要保障幂等消息的数据必须落在一个分区上;如果想在所有分区上都保持幂等,且不受重新启动的影响的话,就需要使用事务性生产者,能够保障一批消息同时成功和同时失败,且在消费端可以控制对整个事务性消息的隔离级别isolation.level = read_uncommitted/read_committed。具体的生产端代码
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
- 不管是幂等还是事务性发送者,对整个kafka的性能是影响的,而且也不能够保证消费端一定只送达一次,在异常情况下消费端可能会重复消费某些消息,因此在使用的过程中还是倾向于消费端进行幂等处理,这样既不会降低性能,也不会影响整个业务逻辑的处理;
kafka的消费组概念
- 消费组作为一个所有消费者的代理,用于管理订阅主题的所有分区,然后来根据consumer的数量来进行负载均衡,不同的消费组之间没有关系,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费;一个group中只能有一个consumer去消费某条消息,不能同时有多个进行消费,这个类似于传统的点对点发送的模型,只不过将粒度缩小,减小所有consumer的抢占;
- 同一个group下的consumer如何进行分配消费消息?
- kafka的group如何与topic进行关联,是在消费端监听的时候进行设置?如果在消费端进行设置的话,如何能够控制消费实例与分区数量的关系?
- consumer实例和分区数量:消费组订阅主题的分区数大于等于消费实例的数量,不会造成资源的浪费;比如如果有十个consumer实例,只有3个分区,那么有7个consumer是空闲状态
- 空闲状态的consumer不会进行消费,也就是说所有消息不会进入到这个consumer中?
- 理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。但是在微服务盛行的阶段,服务扩容很正常,会造成consumer实例的不定期变更,会导致重新的负载均衡,kafka在重新给consumer重新分配分区的时候,会进行类似于垃圾回收的Stop the world操作,可能有停顿,如果设置分区数量过大的话,kafka有一个对分区备份的功能,也就是如果有十个分区的话,消息会备份十份的大小,最后进行统一的发送,分区数量过大的话会导致内存占用过大,同时如果想提高吞吐量的话,分区数量和consumer client数量保持一致,这样的话又会增加实例的连接数量?这个问题怎么处理
- Rebalance: 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区;既然是平衡consumer和topic分区之间负载均衡的话,那么在以下三种情况下会发生变化
- consumer实例的数量发生了变化,重新分配; 那么consumer的集群重启过程中岂不是要不断的进行rebalance?
- group订阅的topic数量发生了变化,归根结底是自己管理的topic分区数量发生了变化,以及要剔除某些不适合的consumer而导致负载的变动,这个是运维操作,一般的话,可以进行避免
- 主题的分区数量发生了变动,导致consumer重新分配,这个是运维操作,一般的话,可以进行避免
- 那么如何避免reblance呢,大部分情况下是尽量的避免第一种情况,也就是实例数量的变更,实例数量的变更主要分以下几种情况
- (真的需要变化)实例真的需要增加或者减少,这种情况下无法避免
- (kafka认为你变化了,但是你没有变化,只不过是有些异常导致kafka的监控认为你挂了)心跳监控,通过所有consumer实例通过向Coordinator 发送心跳来实时的报告自己的存活状态,主要的参数
- session.timeout.ms 来控制,超过这个时间没有发送心跳,认为死亡) ;还需要设置
- heartbeat.interval.ms用来控制心跳上报的频率,也就是上个参数内,自己需要上报多少次心跳,这个需要根据自己的网络带宽来考虑,
- max.poll.interval.ms 来控制,Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance
- 推荐的参数
设置 session.timeout.ms = 6s。
设置 heartbeat.interval.ms = 2s。
- 推荐的参数
- 关于kafka如何管理consumer的
- coordinator 作为一个对consumer的注册,管理中心,是在broker创建的时候创建,作为broker的组件存在的,其主要作用是协助consumer group来管理负载均衡,位移,以及成员情况的管理,每个consumer启动的时候,都会向coordinator报备,然后被其注册,管理;consumer group是如何和coordinator进行关联的呢?一般是通过位移主题来确定的 gruopId的hashCode / 分区总数 获取到位移主题的分区id,然后根据这个id获取到当前分区的leader的副本所在的broker的coordiantor
Kafka 会计算该 Group 的 group.id 参数的哈希值。比如你有个 Group 的 group.id 设置成了“test-group”,那么它的 hashCode 值就应该是 627841412。其次,Kafka 会计算 __consumer_offsets 的分区数,通常是 50 个分区,之后将刚才那个哈希值对分区数进行取模加求绝对值计算,即 abs(627841412 % 50) = 12。此时,我们就知道了位移主题的分区 12 负责保存这个 Group 的数据。有了分区号,算法的第 2 步就变得很简单了,我们只需要找出位移主题分区 12 的 Leader 副本在哪个 Broker 上就可以了。这个 Broker,就是我们要找的 Coordinator
kafka位移主题
- Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。可以这么说,__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息
- kafka如何记录某个consumer group中consumer消费那些分区的消息的位移或者说作为标示,用于consumer之后消费的起始点呢,在老版本的kafka中是将这些数据保存到zk中的,但是zk不适合高频的读写等操作,因此在0.8.x版本之后,kafka自行进行管理这个记录,在内部自己创建一个消息主题,来进行维护;
- 消息主题的具体格式:消息主题的主要作用是记录定位consumer的消费记录,由于consumer group下的consumer可能会进行负载均衡,最好的办法就是一group为维度,来进行记录,那么首先记录 gourp-id, 然后是 topic 名称 ,最后是 分区号,这样的话,在使用的时候,不管group下的哪个consumer来消费的话,都可以直接获取现在应该消费哪个区域
- kafka在默认情况下创建50个分区,3个备份副本;可以手动指定分区数量;如果自动创建分区的数量的话,那么在日志中可能会有 _consumer_offsets-xxx 这个就是向位移主题中写的消息记录
- 关于设置自动提交和手动提交对位移主题的影响: 如果设置手动的话,那么每一次提交的话,会进行写入一个位移主题消息,但是如果使用自动的话,只要 Consumer 一直启动着,它就会无限期地向位移主题写入消息,因为会定时的进行提交,即使没有消息的话,也会不断的进行提交,这样会导致消息队列占用的内存不断增加最终撑爆内存,这就需要在kafka上配置Compact(整理整个消息队列,保留某个消息中的最新的消息,进行压缩数据) 策略来删除位移主题中的过期消息,避免该主题无限期膨胀;
- Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的
kafka 位移提交
记录了 Consumer 要消费的下一条消息的位移。这可能和你以前了解的有些出入,不过切记是下一条消息的位移,而不是目前最新消费消息的位移;
Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移,提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。
换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移 X,那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了
- 那位移主题的作用是作什么的,在什么情况下会被启用?仅仅是用于找到coordinator么?
- kafka 分为自动提交和手动提交,kafka会无脑的认为你的提交是正确的,如果随便提交的话,会导致最终消息紊乱
- 自动提交的话 auto.commit.interval.ms 默认值是5s,也就是kafka会每五秒进行一次批量的提交位移信息,如果在这5s期间发生了reblace的,可能没有提交成功,导致重复消费的问题;自动提交的逻辑是 在调用poll的时候,会先上报上一次poll的消费位移结果,然后在进行poll;没等到下次poll,就挂了,消息就要重复消费
- 手动提交的话,不仅仅是enable.auto.commit 为 false,还需要手动的进行代码调用
//同步的处理,导致consumer进程阻塞,影响整个系统的吞吐量
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
}
//使用 异步消息的话,不会影响吞吐量,但是不知道是否上报成功,如果没有成功的话,不会进行重试,
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception);
});
}
//终极代码
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使用异步提交规避阻塞
}
} catch (Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
//但是如何解决每一次poll提交的消息数量过大问题,如果一次间隔处理了很多条消息,如果出错了整体都要再次处理一次,对于系统不是很友好
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 处理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回调处理逻辑是 null
count++;
}
}
kafka 多线程消费问题
- Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程,但是从用户角度来看属于单线程,心跳线程和消息消费线程是隔离开的;Consumer 设计了单线程 + 轮询的机制。这种设计能够较好地实现非阻塞式的消息获取,用户可以在接到消息之后 进行自定义消息的处理;
- kafka实现多线程消费的方案
- 一个应用中创建多个consumer实例,相当于同时存在多个consumer,进行消费,这样的好处是业务上简单,并且能够在某个分区上实现消息的顺序执行;缺点资源浪费严重,容易造成reblance,影响整个kafka的吞吐量。
- 创建一个接受消息的程序,然后处理消息的时候,使用多线程进行消费,这样的好处是对于kafka来说,始终就是一个consumer实例,即使有一个出现异常了,也不会引发reblance的操作,但是缺点是如何处理异步消息处理中的位移提交以及失败重试问题
- 多线程版本的代码:
kafka 消息的监控指标以及参数
- Lag 或 Consumer Lag 用来描述当前消费者还有多少消息没有消费完成,也就是消息的滞后型,Lag的单位是消息条数,以分区为维度进行统计的,如果需要统计整个topic的话,需要自己进行求和计算
- Lead 这个是JMX监控的特有指标,这个是表示当前消费的消息的位移和第一条消息之间的差值,其实就是总消息数 - Lag 数的值,用这个指标来衡量的时候,更能够看出来整个消息的延迟有多大
- 有三种方式进行监控
-
使用kafka的脚本进行监控 $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test,如果consumer-id, host,client-id 是空的话,表示consumer实例不存在,如果执行完脚本根本没有反应的话,可能是版本不支持
- 使用java api进行程序上面上的调用监控
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException { Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); try (AdminClient client = AdminClient.create(props)) { //设置groupId ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID); try { Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { //获取订阅分区的最新消息位移 Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet()); //执行相应的减法操作,获取 Lag 值并封装进一个 Map 对象 return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset())); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 处理中断异常 // ... return Collections.emptyMap(); } catch (ExecutionException e) { // 处理 ExecutionException // ... return Collections.emptyMap(); } catch (TimeoutException e) { throw new TimeoutException("Timed out when getting lag for consumer group " + groupID); } } }
- 使用Kafka JMX 监控指标
-