SpringBoot 集成 Kafka 教程

版本

  • SpringBoot:2.7.2

pom.xml

集成 kafka 相关依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.yaml

spring:
  kafka:
    template:
      default-topic: myTest
    bootstrap-servers: <Kafka地址>
    jaas:
      enabled: true
      loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
      options:
        username: <Username>
        password: <Password>
    consumer:
      ssl:
        truststoreLocation: file:/Users/cary/Documents/java/SpringBootTutorial/New/kafka-demo/src/main/resources/only.4096.client.truststore.jks
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_SSL
        ssl.endpoint.identification.algorithm:
      group-id: myTestGroup
      max-poll-records: 2
    producer:
      ssl:
        truststoreLocation: file:/Users/cary/Documents/java/SpringBootTutorial/New/kafka-demo/src/main/resources/only.4096.client.truststore.jks
      retries: 3
      acks: 1
      compression-type: lz4
      buffer-memory: 33554432
      batch-size: 51200
      properties:
        send.buffer.bytes: 262144
        sasl.mechanism: PLAIN
        security.protocol: SASL_SSL
        ssl.endpoint.identification.algorithm:

其中,default-topic 表示 topic 默认名字,bootstrap-servers 表示 kafka 连接地址,usernamepassword 是访问 kafka 的账号信息(注:作者使用的是阿里云 kafka 实例,可在阿里云后台查看该信息),truststoreLocation 表示证书所在路径。

Receiver 监听

@Component
public class Receiver {

    private Logger log = LoggerFactory.getLogger(Receiver.class);

    @KafkaListener(topics = { "myTest" })
    public void receiveMessage(ConsumerRecord<String, String> record) {
        log.info("Receive Message, key = {}, value = {}", record.key(), record.value());
    }
}

Sender 发送

@Component
public class Sender {

    private Logger log = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, String> template;

    public void send(String msg) {
        final String key = "my_msg";

        this.template.send("myTest", key, msg);
        log.info("send message, key: {}, dada: {}", key, msg);
    }
}

使用

@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private Sender sender;

    @GetMapping("/send")
    public String send() {
        sender.send("Hello world from xxxx");

        return "Send success";
    }
}
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容