一、Kafka简介
Kafka是一个分布式消息队列。★Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。
二、Maven依赖
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
三、配置
application.properties
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.77.132:9092
# 生产者序列化
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.linger.ms=1
# 消费者配置
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=100ms
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.session.timeout.ms=15000
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-hello-group
四、简单测试
- 控制器
/**
* Kafka消息队列测试
*/
@RestController
@Slf4j
public class KafkaController {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 生成消息
*/
@GetMapping("send")
public void produce() {
kafkaTemplate.send("test_topic", "kafka消费测试");
log.info("生产者消息发送成功");
}
/**
* 消费消息
* @param record
*/
@KafkaListener(topics = "test_topic")
public void listen(ConsumerRecord<?, ?> record) {
log.info("topic = {}, offset = {}, value = {} \n", record.topic(), record.offset(), record.value());
}
}
- 测试
http://localhost:8081/send
通过Kafka管理器查看:
http://192.168.77.132:8080
常见问题:
- kafka启动报错[could not be established. Broker may not be available.]
将配置中的localhost改为IP地址。
listeners=PLAINTEXT://192.168.77.132:9092