自定义分区器
public class BananaPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
if (Objects.isNull(key) || (!(key instanceof String))) {
}
//特定值走指定分区
if (key.equals("special")) {
return partitionInfos.size() / 2;
}
return Math.abs(key.hashCode()) % partitionInfos.size();
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
自定义生产者拦截器
public class CountingPI implements ProducerInterceptor {
@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
return null;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
1. 不要让消费者群组里的消费者数量超过 topic 内分区的数量,多余的消费者只会闲置
2. 新增一个消费群组,每个群组都会收到 topic 内的全部数据
分区所有权从一个消费者转移到另一个消费者的行为称为分区再均衡
-
主动再均衡分配期间,所有消费者将停止读取消息,放弃分区所有权,重新加入消费者群组,并获得重新分配到的分区
主动再均衡分两个阶段,第一个阶段. 所有消费者放弃分区所有权。第二个阶段,消费者重新加入消费者群组并开始读取消息
协作再均衡,指将消费者的部分分区重新分配给另一个消费者,其他消费者继续读取没有被重新分配的分区
消费者会向一个叫作 __consumer_offset 的主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么实际作用。但是,如果消费者发生崩溃或有新的消费者加入群组,则会触发再均衡。
如果使用自动提交或不指定提交的偏移量,那么将默认提交 poll() 返回的最后一个位置之后的偏移量。在进行手动提交或需要提交特定的偏移量时,一定要记住这一点。
最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设置为 true,那么每过5 秒,消费者就会自动提交 poll() 返回的最大偏移量。提交时间间隔通过auto.commit.interval.ms 来设定,默认是 5 秒
- 自动提交偏移量,默认间隔时间 5s
- 手动提交偏移量,enable.auto.commit = false,使用客户端提交偏移量,commitSync 同步提交 commitAsync 异步提交,在遇到无法恢复的错误时,sync 会一直重试,commitAsync 不会重试
再均衡监听器
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
这个方法会在重新分配分区之后以及消费者开始读取消息之前被调用。你可以在这个方法中准备或加载与分区相关的状态信息、找到正确的偏移量,等等
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
这个方法会在消费者放弃对分区的所有权时调用——可能是因为发生了再均衡或者消费者正在被关闭
public void onPartitionsLost(Collection<TopicPartition> partitions)
这个方法只会在使用了协作再均衡算法并且之前不是通过再均衡获得的分区被重新分配给其他消费者时调用(之前通过再均衡获得的分区被重新分配时会调用 onPartitions Revoked())
spring 最佳实践:关闭自动提交,处理完立刻提交,再均衡时 Spring 自动提交已确认的偏移量,不需要手动在 onPartitionsRevoked 里额外操作——这是 Spring Kafka 和原生 Kafka Client 的主要区别
kafka 可从指定偏移量读取消息,用于消费者消费缓慢,或消费者消息丢失,回溯偏移量从指定位置重新消费
独立的消费者,不需要订阅主题使用分区均衡,可以直接消费某个分区的数据
启用幂等生产者,会记录每条消息的 的生产者 id 和序列号,在重试过程中避免存储重复数据,但由于客户端逻辑错误产生的重复发送也无可避免
Kafka 事务引入了原子多分区写入的概念
事务性生产者,配置了transaction_id 应用 inittrasaction 初始化的生产者
在 spring 使用环境中,需要注意配置 transaction_id, 并且设置 acks = all 确保所有分区副本都写入,enable.idempotence=true 开启幂等性,防止重复写入。readcommit 级别的事务难以控制,在 spring 中事务管理器分别使用 kafka 事务管理器,db 事务管理器分开管理,但如果db 写入失败,卡夫卡的消息不会撤回,所以需要补偿机制。这里就涉及到了三阶段提交,如果纯纯为了系统解耦,可以直接使用 rabbitmq结合自己建表实现三阶段提交。rocketmq 天然强事务支持
spring 的链式kafka 事务管理在 spring boot 3.2 + 以后已被放弃
构建流式管道
可靠性,及时性,高吞吐量动态吞吐量,动态应对突发增长
构建数据管道有两种方式,即 ETL 和 ELT。ETL 表示提取–转换–加载(extract-transform-load)
KafkaConnect,连接器插件
提供了 api 和运行时,可开发针对特定生产者消费者的连接器,connect 支持集群部署
group.id 具有相同 group.id 的 connect 组成 connect 集群
plugin.path 可插拔架构,存储插件的位置
key.converter 和 value.converter 默认 jsonconverter
rest.host.name 和 rest.port 通过 connect rest api 监控和配置 connect
模式: mysql -> kafka topic -> es
Confluent Hub,连接器社区
单一消息转换single message transformation,SMT 可以在 Connect 中完成,也就是在复制消息时对消息进行转换,通常不需要编写额外的代码
Kafka 支持以下这些 SMT。
Cast -改变一个字段的数据类型。
MaskField - 将一个字段的内容替换成 null。这在移除敏感信息或个人识别数据时非常有用。
Filter - 消息(值为 null)。
Flatten
HeaderFrom - 丢弃或包含符合指定条件的记录。内置的条件包括匹配主题名称、匹配特定的标头、消息是否为墓碑
将嵌套的数据结构扁平化,也就是将所有字段的名字连接成路径的形式。将消息里的字段移动或复制到标头里。
InsertHeader - 在每一条消息的标头里加入一个固定的字符串。
InsertField - 在消息里添加一个字段,字段的值既可以来自元数据(如偏移量),也可以是一个固定的值。
RegexRouter - 使用正则表达式和替换字符串改变目标主题。
ReplaceField - 移除或重命名消息里的字段。
TimestampConverter - 修改一个字段的时间格式,比如将 Unix Epoch 转成字符串。
TimestampRouter - 根据消息的时间戳来改变主题。这在数据池连接器中很有用
kafka 支持跨集群镜像
灾备,合规,区域集群,中心集群
kafka 安全,认证
SSL - 带有可选 SSL 客户端身份验证的 SSL 传输层,适用于不安全网络,因为它支持客户端和服务器端身
份验证以及加密。
SASL_PLAINTEXT - 带有 SASL 客户端身份验证的 PLAINTEXT 传输层。一些 SASL 机制也支持服务器端身份验证。它不
支持加密,因此只适用于私有网络。
SASL_SSL - 带有 SASL 身份验证的 SSL 传输层,适用于不安全网络,因为它支持客户端和服务器端身份验证以
及加密。
TLS/SSL - TLS 是在公共互联网上使用最为广泛的加密协议之一。一些应用程序协议(比如 HTTP、SMTP 和
FTP)都依赖 TLS 提供数据传输的隐私性和完整性。TLS 用公钥(PKI)创建、管理和分发可用于非
对称加密的数字证书,避免在服务器端和客户端之间共享密钥。在 TLS 握手过程中生成的会话密钥
可用于对称加密,这样后续的数据传输就有了更高的性能。
可以通过 inter.broker.listener.name 或 security.inter.broker.protocol 来配置用于broker 间通信的监听器
客户端认证
SASL/PLAIN — 用户名密码认证
最简单的认证方式,客户端发送明文用户名和密码到 Broker
SASL/SCRAM — 挑战响应认证(生产首选)
SCRAM(Salted Challenge Response Authentication Mechanism)不会将密码发送到 Broker,而是发送基于 SHA-256/SHA-512 的挑战响应哈希。
SASL/GSSAPI — Kerberos 认证
企业环境集成 Active Directory / LDAP 的首选方案
SASL/OAUTHBEARER — JWT Token 认证
云原生场景的最佳选择,与 OIDC(如 Keycloak、Auth0)集成
mTLS — 双向 TLS 证书认证
不依赖密码,通过客户端证书验证身份。金融、政务等高安全场景首选
kafka stream
- Kafka Streams 是库不是框架 — 嵌入 Spring Boot 即可,无需额外集群
- KStream 是流、KTable 是表 — 流表二元性(duality)是核心思维模型
- 有状态操作依赖 RocksDB + Changelog — 生产环境必须用持久化存储
- Interactive Queries — 可以把 Streams 状态暴露为 REST API,实现近实时查询
- Exactly-once_v2 — 生产首选处理语义
- 实例数 >= partition 数 — 才能充分利用所有 partition
DSL API — 常用操作
1. filter / map / flatMap — 无状态操作
KStream<String, OrderEvent> orders = builder.stream("orders");
// filter: 过滤大额订单
KStream<String, OrderEvent> bigOrders = orders
.filter((key, order) -> order.getAmount() > 1000);
// map: 转换值
KStream<String, String> orderSummaries = orders
.map((key, order) -> new KeyValue<>(key, order.getSummary()));
// flatMap: 一条拆多条
KStream<String, OrderItem> orderItems = orders
.flatMap((key, order) -> order.getItems().stream()
.map(item -> KeyValue.pair(item.getId(), item))
.collect(Collectors.toList()));
2. groupBy + aggregate / count / reduce — 有状态聚合
// 按用户统计订单总金额
KTable<String, Double> userTotalAmount = orders
.groupBy((key, order) -> order.getUserId(),
Grouped.with(Serdes.String(), new JsonSerde<>(OrderEvent.class)))
.aggregate(
() -> 0.0, // 初始值
(userId, order, currentTotal) -> // 聚合逻辑
currentTotal + order.getAmount(),
Materialized.as("user-total-store") // 状态存储名称(RocksDB)
.with(Serdes.String(), Serdes.Double())
);
// 简单计数
KTable<String, Long> orderCountByUser = orders
.groupBy((key, order) -> order.getUserId())
.count(Materialized.as("order-count-store"));
// reduce: 合并两条同 key 记录
KTable<String, OrderEvent> latestOrder = orders
.groupByKey()
.reduce((oldVal, newVal) -> newVal, // 新值覆盖旧值
Materialized.as("latest-order-store"));
3. Windowed 聚合 — 时间窗口
// 5分钟滑动窗口统计每用户的订单数
KTable<Windowed<String>, Long> windowedCount = orders
.groupBy((key, order) -> order.getUserId())
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1)))
.count(Materialized.as("windowed-count-store"));
// 会话窗口(按活跃间隔自动划分)
KTable<Windowed<String>, Long> sessionCount = orders
.groupBy((key, order) -> order.getUserId())
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
.count();
4. Join — 流与表关联
// KStream + KTable: 每条订单事件 enrich 用户信息
KStream<String, EnrichedOrder> enriched = orders
.join(profiles, // KTable
(order, profile) -> new EnrichedOrder(order, profile),
Joined.with(Serdes.String(), new JsonSerde<>(OrderEvent.class), new JsonSerde<>(UserProfile.class))
);
// KStream + KStream: 两条流时间窗口内关联
KStream<String, PaymentMatched> matched = orderStream
.join(paymentStream,
(order, payment) -> new PaymentMatched(order, payment),
JoinWindows.of(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1)),
Joined.with(Serdes.String(), new JsonSerde<>(OrderEvent.class), new JsonSerde<>(PaymentEvent.class))
);
// KStream + GlobalKTable: 跨分区 enrich 产品信息
KStream<String, EnrichedOrder> enriched2 = orders
.join(products, // GlobalKTable
(order) -> order.getProductId(), // 从 order 取出 join key
(order, product) -> new EnrichedOrder(order, product)
);
5. branch — 分流
// 按条件分流到不同 Topic
KStream<String, OrderEvent>[] branches = orders
.branch(
(key, order) -> order.getAmount() > 10000, // 大额
(key, order) -> order.getAmount() > 1000, // 中额
(key, order) -> true // 小额(兜底)
);
branches[0].to("orders-large");
branches[1].to("orders-medium");
branches[2].to("orders-small");
名称:Samza
描述:Apache Samza 是专门为 Kafka 设计的流式处理框架。它早于 Streams 问世,是由同一批人开发的,
因此它们共享了很多概念。但与 Streams 不同,Samza 运行在 Yarn 上,并为应用程序提供了一个完整的运
行框架。
名称:Spark
描述:Spark 是另一个面向数据批处理的 Apache 项目。它通过微批次来处理数据流,所以延迟会高一
些,但它可以通过重新处理批次来实现容错,并且可以很容易地实现 Lambda 架构。广泛的社区支持也是
它的优势之一。
名称:Flink
描述:Apache Flink 专门面向流式处理,具有非常低的延迟。与 Samza 一样,它支持 Yarn,但也可以运行
在 Mesos、Kubernetes 或独立集群中。它的高级 API 支持 Python 和 R 语言。
名称:Beam
描述:Apache Beam 并不直接提供流式处理,它是一种集批处理和流式处理于一身的统一编程模型。它将
Samza、Spark 和 Flink 作为管道组件的运行器。