版本选择
- 0.7版本:
只有基础消息队列功能,无副本;打死也不使用 - 0.8版本:
增加了副本机制,新的producer API;建议使用0.8.2.2版本;不建议使用0.8.2.0之后的producer API - 0.9版本:
增加权限和认证,新的consumer API,Kafka Connect功能;不建议使用consumer API; - 0.10版本:
引入Kafka Streams功能,bug修复;建议版本0.10.2.2;建议使用新版consumer API - 0.11版本:
producer API幂等,事物API,消息格式重构;建议版本0.11.0.3;谨慎对待消息格式变化 - 1.0和2.0版本:
Kafka Streams改进;建议版本2.0;
部署需要注意

重要参数
- Broker 端参数
- log.dirs:这是非常重要的参数,指定了 Broker 需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定。log.dir:注意这是 dir,结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的。
提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量
Zookeeper
zookeeper.connect:zk1:2181,zk2:2181,zk3:2181/kafka1listeners:学名叫监听器
其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。advertised.listeners:和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。host.name/port:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。
最好全部使用主机名,即 Broker 端和 Client 端应用配置中全部填写主机名
- topic:
- auto.create.topics.enable:是否允许自动创建 Topic。推荐设置为false
规避线上自动创建topic问题 - unclean.leader.election.enable:是否允许 Unclean Leader 选举。推荐为false
规避落后的副本自动选为leader,导致数据丢失. - auto.leader.rebalance.enable:是否允许定期进行 Leader 选举。推荐false
规避自动切换leader造成不必要的性能开销
- 消息保存三兄弟
- log.retention.{hours|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hours 最低
- log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。
- message.max.bytes:控制 Broker 能够接收的最大消息大小。
- 创建topic时带参数
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
- 修改topic参数
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760
Kafka jvm设置推荐参数
- KAFKA_HEAP_OPTS:指定堆大小。
- KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数。
kafka 零拷贝
- 数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝,从而实现快速的数据传输
分区策略
- 轮询策略
生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。
轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一 - 随机策略
我们随意地将消息放置到任意一个分区上
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
- 按消息键保序策略[官方没有该策略]
Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
kafka生产者优化
- 开启压缩
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启GZIP压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
-
压缩参数
image.png
结论:
- 吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy。
- 压缩比方面: zstd > LZ4 > GZIP > snappy
kafka消息不丢失保证策略
- 生产者 选择合适的api
Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback) - 消费者维护好偏移量
维持先消费消息(阅读),再更新位移(书签)的顺序
如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移
3.最佳实践
- 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
- 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
- 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
- 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
- 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
- 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
- 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。
- 推荐设置成 replication.factor = min.insync.replicas + 1。确保消息消费完成再提交。
- Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

kafka中的拦截器
- 参数设置
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
- 生产者
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
private Jedis jedis; // 省略Jedis初始化
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
jedis.incr("totalSentMessage");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<java.lang.String, ?> configs) {
}
- 消费者
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Jedis jedis; //省略Jedis初始化
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long lantency = 0L;
for (ConsumerRecord<String, String> record : records) {
lantency += (System.currentTimeMillis() - record.timestamp());
}
jedis.incrBy("totalLatency", lantency);
long totalLatency = Long.parseLong(jedis.get("totalLatency"));
long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
java生产者是如何管理tcp连接的?
Apache Kafka的所有通信都是基于TCP的,而不是于HTTP或其他协议的
1 为什采用TCP?
(1)TCP拥有一些高级功能,如多路复用请求和同时轮询多个连接的能力。
(2)很多编程语言的HTTP库功能相对的比较简陋。
名词解释:
多路复用请求:multiplexing request,是将两个或多个数据合并到底层—物理连接中的过程。TCP的多路复用请求会在一条物理连接上创建若干个虚拟连接,每个虚拟连接负责流转各自对应的数据流。严格讲:TCP并不能多路复用,只是提供可靠的消息交付语义保证,如自动重传丢失的报文。
2 何时创建TCP连接?
(1)在创建KafkaProducer实例时,
A:生产者应用会在后台创建并启动一个名为Sender的线程,该Sender线程开始运行时,首先会创建与Broker的连接。
B:此时不知道要连接哪个Broker,kafka会通过METADATA请求获取集群的元数据,连接所有的Broker。
(2)还可能在更新元数据后,或在消息发送时
3 何时关闭TCP连接
(1)Producer端关闭TCP连接的方式有两种:用户主动关闭,或kafka自动关闭。
A:用户主动关闭,通过调用producer.close()方关闭,也包括kill -9暴力关闭。
B:Kafka自动关闭,这与Producer端参数connection.max.idles.ms的值有关,默认为9分钟,9分钟内没有任何请求流过,就会被自动关闭。这个参数可以调整。
C:第二种方式中,TCP连接是在Broker端被关闭的,但这个连接请求是客户端发起的,对TCP而言这是被动的关闭,被动关闭会产生大量的CLOSE_WAIT连接。
代码实现幂等性
props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。
代码开启生产者事务
开启 enable.idempotence = true。
设置 Producer 端参数 transactional. id。最好为其设置一个有意义的名字
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
消费组平衡问题
- 组成员数量发生变化。
- 订阅主题数量发生变化。
- 订阅主题的分区数发生变化。

kafka位移
Committing Offsets:Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移
- 自动提交位移
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 开启自动提交位移
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
- 手动提交
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使用异步提交规避阻塞
}
} catch(Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
- 精细提交位移
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 处理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回调处理逻辑是null
count++;
}
}
对于一次要处理很多消息的 Consumer 而言,它会关心社区有没有方法允许它在消费的中间进行位移提交。比如前面这个 5000 条消息的例子,你可能希望每处理完 100 条消息就提交一次位移,这样能够避免大批量的消息重新消费。

CommitFailedException异常怎么处理?
- 场景一
当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常
四种处理方式
- 缩短单条消息处理的时间
- 增加 Consumer 端允许下游系统消费一批消息的最大时长
- 减少下游系统一次性消费的消息总数
-
下游系统使用多线程来加速消费
image.png
- 场景二
Kafka Java Consumer 端还提供了一个名为 Standalone Consumer 的独立消费者。它没有消费者组的概念,每个消费者实例都是独立工作的,彼此之间毫无联系。不过,你需要注意的是,独立消费者的位移提交机制和消费者组是一样的,因此独立消费者的位移提交也必须遵守之前说的那些规定,比如独立消费者也要指定 group.id 参数才能提交位移.

多线程开发消费者实例
- 消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records =
consumer.poll(Duration.ofMillis(10000));
// 执行消息处理逻辑
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
- 2.消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...
private int workerNum = ...;
executors = new ThreadPoolExecutor(
workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
...
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for (final ConsumerRecord record : records) {
executors.submit(new Worker(record));
}
}
..

kafka副本机制

kafka如何控制请求?
https://www.processon.com/view/link/5d481e6be4b07c4cf3031755
消费组重平衡
- heartbeat.interval.ms
这个参数的真正作用是控制重平衡通知的频率。如果你想要消费者实例更迅速地得到通知,那么就可以给这个参数设置一个非常小的值,这样消费者就能更快地感知到重平衡已经开启了
费者组状态机


Broker 端重平衡场景剖析
场景一:新成员入组。

场景二:组成员主动离组

场景四:重平衡时协调者对组内成员提交位移的处理。

控制器
1作用:
控制器组件(Controller),是Apache Kafka的核心组件。它的主要作用是Apache Zookeeper的帮助下管理和协调整个Kafka集群。
集群中任意一台Broker都能充当控制器的角色,但在运行过程中,只能有一个Broker成为控制器。
2 特点:控制器是重度依赖Zookeeper。
3 产生:
控制器是被选出来的,Broker在启动时,会尝试去Zookeeper中创建/controller节点。Kafka当前选举控制器的规则是:第一个成功创建/controller节点的Broker会被指定为控制器。
4 功能:
A :主题管理(创建,删除,增加分区)
当执行kafka-topics脚本时,大部分的后台工作都是控制器来完成的。
B :分区重分配
Kafka-reassign-partitions脚本提供的对已有主题分区进行细粒度的分配功能。
C :Preferred领导者选举
Preferred领导者选举主要是Kafka为了避免部分Broker负载过重而提供的一种换Leade的方案。
D :集群成员管理(新增Broker,Broker主动关闭,Broker宕机)
控制器组件会利用watch机制检查Zookeeper的/brokers/ids节点下的子节点数量变更。当有新Broker启动后,它会在/brokers下创建专属的znode节点。一旦创建完毕,Zookeeper会通过Watch机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化。进而开启后续新增Broker作业。
侦测Broker存活性则是依赖于刚刚提到的另一个机制:临时节点。每个Broker启动后,会在/brokers/ids下创建一个临时的znode。当Broker宕机或主机关闭后,该Broker与Zookeeper的会话结束,这个znode会被自动删除。同理,Zookeeper的Watch机制将这一变更推送给控制器,这样控制器就能知道有Broker关闭或宕机了,从而进行善后。
E :数据服务
控制器上保存了最全的集群元数据信息,其他所有Broker会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
5 控制器保存的数据
控制器中保存的这些数据在Zookeeper中也保存了一份。每当控制器初始化时,它都会从Zookeeper上读取对应的元数据并填充到自己的缓存中。
6 控制器故障转移(Failover)
故障转移是指:当运行中的控制器突然宕机或意外终止时,Kafka能够快速地感知到,并立即启用备用控制器来替代之前失败的控制器。
7 内部设计原理
A :控制器的内部设计相当复杂
控制器是多线程的设计,会在内部创建很多线程。如:
(1)为每个Broker创建一个对应的Socket连接,然后在创建一个专属的线程,用于向这些Broker发送特定的请求。
(2)控制连接zookeeper,也会创建单独的线程来处理Watch机制通知回调。
(3)控制器还会为主题删除创建额外的I/O线程。
这些线程还会访问共享的控制器缓存数据,为了维护数据安全性,控制在代码中大量使用ReetrantLock同步机制,进一步拖慢了整个控制器的处理速度。
B :在0.11版对控制器的低沉设计进了重构。
(1)最大的改进是:把多线程的方案改成了单线程加事件对列的方案。
a. 单线程+队列的实现方式:社区引入了一个事件处理线程,统一处理各种控制器事件,然后控制器将原来执行的操作全部建模成一个个独立的事件,发送到专属的事件队列中,供此线程消费。
b. 单线程不代表之前提到的所有线程都被干掉了,控制器只是把缓存状态变更方面的工作委托给了这个线程而已。
(2)第二个改进:将之前同步操作Zookeeper全部改为异步操作。
a. Zookeeper本身的API提供了同步写和异步写两种方式。同步操作zk,在有大量主题分区发生变更时,Zookeeper容易成为系统的瓶颈。
高水位的讨论
Leader 副本保持同步条件
- 该远程 Follower 副本在 ISR 中。
- 该远程 Follower 副本 LEO 值落后于 Leader 副本 LEO 值的时间,不超过 Broker 端参数 replica.lag.time.max.ms 的值。如果使用默认值的话,就是不超过 10 秒。

kafka调优
1.操作系统调优
系统时禁掉 atime 更新
至少选择 ext4 或 XFS
ulimit -n 和 vm.max_map_count
操作系统页缓存
2.JVM 层调优
设置堆大小 6-8G
GC 收集器 G1
- Broker 端调优
Producer -> Broker -> Consumer三端kafka版本要保持一致
4.应用层调优
不要频繁地创建 Producer 和 Consumer 对象实例
用完及时关闭
合理利用多线程来改善性能

