SpringBoot--实战开发--整合Kafka(六十三)

一、Kafka简介

  Kafka是一个分布式消息队列。★Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。

二、Maven依赖

<!--kafka-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

三、配置

application.properties

# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.77.132:9092
# 生产者序列化
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.linger.ms=1
# 消费者配置
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=100ms
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.session.timeout.ms=15000
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-hello-group

四、简单测试

  1. 控制器
/**
 * Kafka消息队列测试
 */
@RestController
@Slf4j
public class KafkaController {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 生成消息
     */
    @GetMapping("send")
    public void produce() {
        kafkaTemplate.send("test_topic", "kafka消费测试");
        log.info("生产者消息发送成功");
    }

    /**
     * 消费消息
     * @param record
     */
    @KafkaListener(topics = "test_topic")
    public void listen(ConsumerRecord<?, ?> record) {
        log.info("topic = {}, offset = {}, value = {} \n", record.topic(), record.offset(), record.value());
    }
}

  1. 测试
    http://localhost:8081/send
    测试结果

    通过Kafka管理器查看:
    http://192.168.77.132:8080
    查看结果

    查看结果

常见问题:

  1. kafka启动报错[could not be established. Broker may not be available.]
    将配置中的localhost改为IP地址。
listeners=PLAINTEXT://192.168.77.132:9092
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容