Kafka简介
Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
Apache Kafka与传统消息系统相比,有以下不同:
- 它被设计为一个分布式系统,易于向外扩展;
- 它同时为发布和订阅提供高吞吐量;
- 它支持多订阅者,当失败时能自动平衡消费者;
- 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。
SpringBoot集成Kafka
- 修改config/server.properties文件,在很靠前的位置有listeners和 advertised.listeners两处配置的注释,去掉这两个注释,并且根据当前服务器的IP修改如下:
listeners=PLAINTEXT://ip:9092 #当前服务器的IP
advertised.listeners=PLAINTEXT://ip:9092 #当前服务器的IP
- pom文件
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- application.yml
spring:
kafka:
bootstrap-servers: 192.168.0.197:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- KafkaProducerController
@RestController
@RequestMapping("/kafka")
public class KafkaProducerController {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private KafkaTemplate kafkaTemplate;
@RequestMapping("send")
public String send(String msg){
logger.info("生产者生产的消息:"+msg);
kafkaTemplate.send("test_topic", msg);
return "success";
}
}
- TestConsumer
@Component
public class TestConsumer {
@KafkaListener(topics = "test_topic")
public void listen (ConsumerRecord<?, ?> record) throws Exception {
System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}
这里写图片描述
这里写图片描述