Spring Boot 整合 Kafka 基础应用

一、引言

Kafka 是一个分布式的流处理平台,具有高吞吐量、可扩展性和容错性等特点,广泛应用于日志收集、消息系统、实时数据处理等场景。Spring Boot 是一个简化 Spring 应用开发的框架,通过自动配置和约定优于配置的原则,能够快速搭建应用。本文将介绍如何使用 Spring Boot 整合 Kafka,实现基础的消息生产和消费功能。

二、环境准备

2.1 安装 Kafka

首先,需要在本地或者服务器上安装 Kafka。可以从 Kafka 官方网站(https://kafka.apache.org/downloads)下载最新版本的 Kafka。解压下载的文件后,按照官方文档的步骤启动 ZooKeeper 和 Kafka 服务。

2.2 创建 Spring Boot 项目

使用 Spring Initializr(https://start.spring.io/)创建一个新的 Spring Boot 项目,添加以下依赖:

  • Spring for Apache Kafka
  • Spring Web

pom.xml 中可以看到添加的依赖如下:

<dependencies>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

三、配置 Kafka

application.propertiesapplication.yml 中配置 Kafka 的相关信息。以下是使用 application.yml 的配置示例:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  • bootstrap-servers:Kafka 服务器的地址。
  • producer:生产者配置,指定键和值的序列化器。
  • consumer:消费者配置,指定消费者组 ID、偏移量重置策略以及键和值的反序列化器。

四、创建 Kafka 生产者

创建一个 Kafka 生产者服务类,用于向 Kafka 主题发送消息。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static final String TOPIC = "test-topic";

    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);
        System.out.println("Message sent: " + message);
    }
}

在上述代码中,使用 KafkaTemplate 向名为 test-topic 的主题发送消息。

五、创建 Kafka 消费者

创建一个 Kafka 消费者服务类,用于从 Kafka 主题接收消息。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "test-topic", groupId = "my-group")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}

使用 @KafkaListener 注解监听 test-topic 主题,当有新消息到达时,会调用 listen 方法处理消息。

六、创建控制器

创建一个 REST 控制器,用于调用生产者发送消息。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    @Autowired
    private KafkaProducerService kafkaProducerService;

    @GetMapping("/send")
    public String sendMessage(@RequestParam("message") String message) {
        kafkaProducerService.sendMessage(message);
        return "Message sent successfully";
    }
}

在上述代码中,通过 /send 接口接收消息参数,并调用生产者服务发送消息。

七、测试应用

启动 Spring Boot 应用程序,确保 Kafka 服务正常运行。打开浏览器或使用工具(如 Postman)访问以下 URL:

http://localhost:8080/send?message=Hello,Kafka!

如果一切正常,你将在控制台看到生产者发送消息的日志和消费者接收消息的日志。

八、总结

通过以上步骤,可以成功地使用 Spring Boot 整合了 Kafka,实现了基础的消息生产和消费功能。在实际应用中,可以根据需求进一步扩展和优化,例如处理消息的序列化和反序列化、错误处理、批量生产和消费等。


©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容