SpringBoot+Kafka实现发布订阅

pom.xml文件

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
        </dependency>
   </dependencies>
  • 生产者(发布消息)

application.yml文件

server:
  port: 8091
spring:
  application:
    name: producer
  kafka:
    # kafka 配置
    bootstrap-servers: 192.168.75.149:9092 # 本地利用centOs7搭建kafka的地址
    producer:
      acks: all
      client-id: producer-demo
      retries: 0
      # 每次批量发送消息的数量
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

KafkaConfig

/**
 * 启动默认创建topic,分区,副本数量,topic存在则自动忽略
 * @author wushiyi
 **/
@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    public NewTopic initNewTopic(){
        return new NewTopic("userInfo",1,(short) 1);
    }
}

发布消息

/**
 * @author wushiyi
 **/
@Service
@Slf4j
public class UserService {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    /**
     * 发送消息
     */
    public void send() {
        User user = new User("ZhangSan","男","12345678901");
        log.info("向kafka推送用户信息:{}",gson.toJson(user));
        this.kafkaTemplate.send("userInfo",gson.toJson(user));
    }
}

调用接口控制台打印

  • 消费者(订阅消息)

application.yml文件

server:
  port: 8092
spring:
  application:
    name: consumer
  kafka:
    # kafka配置
    bootstrap-servers: 192.168.75.149:9092
    consumer:
      # 指定默认消费者group id
      group-id: test-consumer-group
      auto-offset-reset: earliest
      enable-auto-commit: false
      auto-commit-interval: 5000
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

消费

/**
 * @author wushiyi
 **/
@Slf4j
@Component
public class UserConsumer {
    private Gson gson = new GsonBuilder().create();

    @KafkaListener(topics = "userInfo")
    public void userConsumer(String message) {
        log.info("receive msg {}" , message);
    }
}

订阅结果

  • 补充

producer启动出现以下报错检查防火墙是否开启,把防火墙关闭即可

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容