我自己集成的时候发现没有一篇较为完整的例子,我根据查阅资料和我自己碰到的坑,做了一个合集,希望能帮到大家,每部分的原文链接都附在标题之后,有意向看的可以点击查看。
在Windows上搭建Kafka(知乎原文链接)
一、下载、安装Kafka
访问Kafka的主页:Apache Kafka
进入其下载页面,截图如下:
选择相应的版本,这里选择 kafka_2.11-2.4.0.tgz,进入下面的页面:
选择清华的镜像站点进行下载。
下载到本地后,将文件解压到 D:\kafka_2.11-2.4.0,该文件夹包括了所有相关的运行文件及配置文件,其子文件夹bin\windows 下放的是在Windows系统启动zookeeper和kafka的可执行文件,子文件夹config下放的是zookeeper和kafka的配置文件。
二、配置Kafka(原文链接)
kafka的配置文件在config/server.properties文件中,主要修改参数如下,更具体的参数说明以后再整理下。
broker.id是kafka broker的编号,集群里每个broker的id需不同。我是从0开始。
listeners是监听地址,设置本地的IP地址,本机就设置127.0.0.1
log.dirs是日志目录,需要设置
设置Zookeeper集群地址,我是在同一环境搭建了kafka和Zookeeper,所以填的本地地址,这里最好也使用127.0.0.1
注:仅设置2181即可,如果有冲突,将server.properties和zookeeper.properties中的zookeeper端口换成另外一个不冲突的端口即可(这两个文件中设置的端口要一样)。
三、启动kafka服务
我们需要先后启动zookeeper和kafka服务。
它们都需要进入 D:\kafka_2.11-2.4.0 目录,然后再启动相应的命令。
更换D目录执行D:
然后执行cd D:\kafka_2.11-2.4.0
启动zookeeper服务,运行命令:
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
启动kafka服务,运行命令:
bin\windows\kafka-server-start.bat config\server.properties
四、集成代码(原文链接)
我们在项目中新建一个配置类专门用来初始化topic,如下,
@Configuration
public class KafkaInitialConfiguration {
// 创建一个名为testtopic的Topic并设置分区数为8,分区副本数为2
@Bean
public NewTopic initialTopic() {
return new NewTopic("testtopic",8, (short) 2 );
}
// 如果要修改分区数,只需修改配置值重启项目即可
// 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
@Bean
public NewTopic updateTopic() {
return new NewTopic("testtopic",10, (short) 2 );
}
}
1、新建SpringBoot项目
① 引入pom依赖
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
② application.propertise配置(本文用到的配置项这里全列了出来)
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50
如果使用的配置文件是yml,可以对标更换
例如
spring.kafka.producer.retries=0
换成
kafka:
producer:
retries: 0
2、Hello Kafka
这部分private KafkaTemplate<String, Object> kafkaTemplate;
代码中kafkaTemplate可能报错,Could not autowire. No beans of 'xxxx' type found
。
解决办法:降低Autowired检测的级别,将Severity的级别由之前的error改成warning或其它可以忽略的级别。
①简单生产者
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/kafka/normal/{message}")
public void sendMessage1(@PathVariable("message") String normalMessage) {
kafkaTemplate.send("topic1", normalMessage);
}
}
②简单消费
@Component
public class KafkaConsumer {
// 消费监听
@KafkaListener(topics = {"topic1"})
public void onMessage1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}
上面示例创建了一个生产者,发送消息到topic1,消费者监听topic1消费消息。监听器用@KafkaListener注解,topics表示监听的topic,支持同时监听多个,用英文逗号分隔。启动项目,postman调接口触发生产者发送消息。
3、生产者
带回调的生产者
kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法,
@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}
@GetMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:"+ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}