环境配置
安装
#上传kafka /usr/local/kafka
#解压
tar -zxvf kafka.tar.gz -C/usr/local/kafka
修改属性
#修改config⽬录内,修改server.properties
#broker.id属性在kafka集群中必须要是唯⼀
broker.id=0
#kafka部署的机器ip和提供服务的端⼝号
listeners=PLAINTEXT://localhost:9092
#kafka的消息存储⽂件
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=localhost:2181
属性列表
Property | Default | Description |
---|---|---|
broker.id | 0 | 每个broker都可以⽤⼀个唯⼀的⾮负整数id进⾏标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯⼀的即可。 |
log.dirs | /tmp/kafka-logs | kafka存放数据的路径。这个路径并不是唯⼀的,可以是多个,路径之间只需要使⽤逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进⾏。 |
listeners | PLAINTEXT://localhost:9092 | server接受客户端连接的端⼝,ip配置kafka本机ip即可 |
zookeeper.connect | localhost:2181 | zooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接⽅式为hostname1:port1, hostname2:port2,hostname3:port3 |
log.retention.hours | 168 | 每个⽇志⽂件删除之前保存的时间。默认数据保存时间对所有topic都⼀样。 |
num.partitions | 1 | 创建topic的默认分区数 |
default.replication.factor | 1 | ⾃动创建topic的默认副本数量,建议设置为⼤于等于2 |
min.insync.replicas | 1 | 当producer设置acks为-1时,min.insync.replicas指定replicas的最⼩数⽬(必须确认每⼀个repica的写数据都是成功的),如果这个数⽬没有达到,producer发送消息会产⽣异常 |
delete.topic.enable | false | 是否允许删除主题 |
启动kafka
#后台启动kafka 需要先启动zookeeper 还需要进入bin目录启动命令
./kafka-server-start.sh -daemon ../config/server.properties
#后台启动
nohup ./kafka-server-start.sh ../config/server.properties &
使用
#创建topic
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test1
#同一主题创建多个partitions
./kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 2 --topic test1
#生产消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic test1
#消费消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
集群实战
三个properties
broker.id= 0
listeners=PLAINTEXT://localhost:9090
log.dir=/usr/local/data/kafka-logs-0
broker.id= 1
listeners=PLAINTEXT://localhost:9091
log.dir=/usr/local/data/kafka-logs-1
broker.id= 2
listeners=PLAINTEXT://localhost:9092
log.dir=/usr/local/data/kafka-logs-2
启动三个集群
./kafka-server-start.sh -daemon../config/server0.properties
./kafka-server-start.sh -daemon../config/server1.properties
./kafka-server-start.sh -daemon../config/server2.properties
副本
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 2 --topic my-replicated-topic
replicas:当前副本存在的broker节点
leader:副本里的概念
每个partition都有一个broker作为leader。
消息发送方要把消息发给哪个broker?就看副本的leader是在哪个broker上面。副本里的leader专⻔用来接收消息。
接收到消息,其他follower通过poll的方式来同步数据。
follower:leader处理所有针对这个partition的读写请求,而follower被动复制leader,不提供读写(主要是为了保证多副本数据与消费的一致性),如果leader所在的broker挂掉,那么就会进行新leader的选举,至于怎么选,在之后的controller的概念中介绍。
isr: 可以同步的broker节点和已同步的broker节点,存放在isr集合中。
java实战
导入maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
生产消息
public class MyProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws ExecutionException,InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:端口号");
//把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<String,String>(props);
Order order = new Order((long) i, i);
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));
RecordMetadata metadata = producer.send(producerRecord).get();
//=====阻塞=======
System.out.println("同步方式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" +metadata.offset());
如未指定分区
//未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));
异步发送
//要发送 5 条消息
Order order = new Order((long) i, i);
//指定发送分区
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0 , order.getOrderId().toString(),JSON.toJSONString(order));
//异步回调方式发送消息
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("发送消息失败:" +
exception.getStackTrace());
}
if (metadata != null) {
System.out.println("异步方式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());
}
}
});
参数ack详解
0:表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
1:至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
2:需要等待 min.insync.replicas(默认为 1 ,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
消费消息
public class MyConsumer {
private final static String TOPIC_NAME = "my-replicated-topic";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP:端口号");
// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//创建一个消费者的客户端
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
// 消费者订阅主题列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
/*
* poll() API 是拉取消息的⻓轮询
*/
ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis( 1000 ));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key =%s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());
}
}
}
}
Kafka中消息的自动提交和手动提交
自动提交
自动提交可以很大程度上降低Kafka服务端的压力,并且减少客户端的网络开销。通常情况下都会设置开启自动提交模式。
使用自动提交时要注意的地方:
消费者的消费逻辑要做好业务幂等;
自动提交时间设置并不是非常严格,可能会大于设置的时间间隔;自动提交并不是严格地每间隔一段时间提交一次偏移量(旧版的客户端是有一个AutoCommitTask进行轮询提交),而是每次在调用KafkaConsumer.poll()时判断当前时间距离上次提交时间是否超过了配置了提交间隔,如果超过了就进行提交,所以实际上的提交时间会超过配置的提交间隔。
自动提交
手动提交
设置手动提交时需要主动调用提交方法,具体方法根据使用的客户端而定。当消息量较大时使用手动提交会给Kafka服务端带来压力,并增加客户端的网络开销。
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
• commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
• commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
手动提交
消费者原则
1、每个消费者的offset由消费者提交到系统主题保存。
2、每个分区的数据只能由消费者组中一个消费者消费。
3、一个消费者可以消费多个分区数据。
4、消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
offsets
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。
SpringBoot实战kafka
配置
server:
port: 8080
spring:
kafka:
bootstrap-servers: ip:端口
producer: # 生产者
retries: 3 # 设置大于 0 的值,则客户端会将发送失败的记录重新发送
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500
listener:
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
# MANUAL_IMMEDIATE
ack-mode: MANUAL_IMMEDIATE
redis:
host: ip:端口
生产
@RestController
public class KafkaController {
private final static String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public void send() {
kafkaTemplate.send(TOPIC_NAME, 0 , "key", "this is a msg");
}
}
消费
@KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")
public void listenGroup(ConsumerRecord<String, String> record,Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//手动提交offset
ack.acknowledge();
}
常见面试题
如何防止消息丢失
发送方: ack是 1 或者-1/all 可以防止消息丢失,如果要做到99.9999%,ack设成all,把min.insync.replicas配置成分区备份数
消费方:把自动提交改为手动提交。
重复消费
一条消息被消费者消费多次。如果为了消息的不重复消费,而把生产端的重试机制关闭、消费端的手动提交改成自动提交,这样反而会出现消息丢失,那么可以直接在防治消息丢失的手段上再加上消费消息时的幂等性保证,就能解决消息的重复消费问题。
幂等性如何保证
- mysql 插入业务id作为主键,主键是唯一的,所以一次只能插入一条
- 使用redis或zk的分布式锁(主流的方案)
消息积压问题
消息积压会导致很多问题,比如磁盘被打满、生产端发消息导致kafka性能过慢,就容易出现服务雪崩,就需要有相应的手段:
方案一:在一个消费者中启动多个线程,让多个线程同时消费。——提升一个消费者的消费能力(增加分区增加消费者)。
方案二:如果方案一还不够的话,这个时候可以启动多个消费者,多个消费者部署在不同的服务器上。其实多个消费者部署在同一服务器上也可以提高消费能力——充分利用服务器的cpu资源。
方案三:让一个消费者去把收到的消息往另外一个topic上发,另一个topic设置多个分区和多个消费者 ,进行具体的业务消费。
延迟队列
创建多个topic,每个topic表示延时的间隔
topic_5s: 延时5s执行的队列
topic_1m: 延时 1 分钟执行的队列
topic_30m: 延时 30 分钟执行的队列
消息发送者发送消息到相应的topic,并带上消息的发送时间
消费者订阅相应的topic,消费时轮询消费整个topic中的消息
如果消息的发送时间,和消费的当前时间超过预设的值,比如 30 分钟
如果消息的发送时间,和消费的当前时间没有超过预设的值,则不消费当前的offset及之后的offset的所有消息都消费
下次继续消费该offset处的消息,判断时间是否已满足预设值
使用kafka解决的一些场景问题
削峰/缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
异步通信
允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。