三、Spring Cloud Stream+Kafka重复消费问题

在 [二、Spring Cloud Stream整合Kafka](https://www.jianshu.com/p/eed59e87e45a)
的基础上再创建一个module , kafka-consumer2 (创建过程可参考https://www.jianshu.com/p/d7771682688b)
子级 生产者(kafka-producer) application.yml 更新后如下:
server:
  port: 8181

spring:
  application:
    name: kafka_producer
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092   #Kafka的消息中间件服务器
          zk-nodes: localhost:2181  #Zookeeper的节点,如果集群,后面加,号分隔
          auto-create-topics: true  #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
          auto-add-partitions: true     # 当partition-count设置的值超过原来设置的值,true=自动创建分区
      bindings:
        stream-demo:                          #这里可以任意写,消费者应与之一致
          destination: custom-message-topic   #这里可以任意写,消费者应与之一致,消息发往的目的地
          content-type: application/json      #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
          producer:
            # 分区的数量(默认为1)
            partition-count: 2
主要目的是设置分区的值

auto-add-partitions: true

producer:
  # 分区的数量(默认为1)
   partition-count: 2
子级 消费者(kafka-consumer) application.yml 更新后如下:
server:
  port: 8081

kafka:
  group: kafka_g1
spring:
  application:
    name: kafka_consumer
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092   #Kafka的消息中间件服务器
          zk-nodes: localhost:2181  #Zookeeper的节点,如果集群,后面加,号分隔
          auto-create-topics: true  #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
      bindings:
        stream-demo:                          #这里可以任意写,生产者应与之一致
          destination: custom-message-topic   #这里可以任意写,生产者应与之一致,消息发往的目的地
          content-type: application/json      #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
          group: ${kafka.group}

主要目的是分组

kafka:
  group: kafka_g1

group: ${kafka.group}
子级 消费者(kafka-consumer2) application.yml 同kafka-consumer基本一致
server:
  port: 8082

kafka:
  group: kafka_g1
spring:
  application:
    name: kafka_consumer2
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092   #Kafka的消息中间件服务器
          zk-nodes: localhost:2181  #Zookeeper的节点,如果集群,后面加,号分隔
          auto-create-topics: true  #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
      bindings:
        stream-demo:                          #这里可以任意写,生产者应与之一致
          destination: custom-message-topic   #这里可以任意写,生产者应与之一致,消息发往的目的地
          content-type: application/json      #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
          group: ${kafka.group}

测试  http://localhost:8181/produce
消费者kafka-consumer 打印如下:
2022-04-07 15:40:56.561  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息26 
2022-04-07 15:40:56.562  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息27 
2022-04-07 15:40:56.564  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息28 
2022-04-07 15:40:56.565  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息29 
2022-04-07 15:40:56.565  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息30 
2022-04-07 15:40:56.566  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息31 
2022-04-07 15:40:56.566  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息32 
2022-04-07 15:40:56.580  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息46 
2022-04-07 15:40:56.580  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息47 
2022-04-07 15:40:56.581  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息49 
2022-04-07 15:40:56.581  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息55 
2022-04-07 15:40:56.581  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息56 
2022-04-07 15:40:56.581  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息59 
2022-04-07 15:40:56.582  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息60 
2022-04-07 15:40:56.582  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息64 
2022-04-07 15:40:56.583  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息68 
2022-04-07 15:40:56.583  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息74 
2022-04-07 15:40:56.583  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息76 
2022-04-07 15:40:56.584  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息77 
2022-04-07 15:40:56.587  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息84 
2022-04-07 15:40:56.588  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息86 
2022-04-07 15:40:56.588  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息87 
2022-04-07 15:40:56.589  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息93 

消费者kafka-consumer2 打印如下:
2022-04-07 15:40:56.599  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息0 
2022-04-07 15:40:56.600  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息1 
2022-04-07 15:40:56.600  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息2 
2022-04-07 15:40:56.600  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息3 
2022-04-07 15:40:56.601  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息4 
2022-04-07 15:40:56.601  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息5 
2022-04-07 15:40:56.601  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息6 
2022-04-07 15:40:56.602  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息7 
2022-04-07 15:40:56.602  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息8 
2022-04-07 15:40:56.602  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息9 
2022-04-07 15:40:56.602  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息10 
2022-04-07 15:40:56.603  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息11 
2022-04-07 15:40:56.603  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息12 
2022-04-07 15:40:56.603  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息13 
2022-04-07 15:40:56.603  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息14 
2022-04-07 15:40:56.604  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息15 
2022-04-07 15:40:56.604  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息16 
2022-04-07 15:40:56.605  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息17 
2022-04-07 15:40:56.605  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息18 
2022-04-07 15:40:56.605  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息19 
2022-04-07 15:40:56.606  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息20 
2022-04-07 15:40:56.606  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息21 
2022-04-07 15:40:56.606  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息22 
2022-04-07 15:40:56.607  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息23 
2022-04-07 15:40:56.607  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息24 
2022-04-07 15:40:56.607  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息25 
2022-04-07 15:40:56.627  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息33 
2022-04-07 15:40:56.628  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息34 
2022-04-07 15:40:56.628  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息35 
2022-04-07 15:40:56.629  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息36 
2022-04-07 15:40:56.629  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息37 
2022-04-07 15:40:56.629  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息38 
2022-04-07 15:40:56.629  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息39 
2022-04-07 15:40:56.630  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息40 
2022-04-07 15:40:56.630  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息41 
2022-04-07 15:40:56.630  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息42 
2022-04-07 15:40:56.631  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息48 
2022-04-07 15:40:56.632  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息53 
2022-04-07 15:40:56.632  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息54 
2022-04-07 15:40:56.633  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息57 
2022-04-07 15:40:56.633  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息58 
2022-04-07 15:40:56.634  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息63 
2022-04-07 15:40:56.634  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息65 
2022-04-07 15:40:56.635  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息66 
2022-04-07 15:40:56.635  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息67 
2022-04-07 15:40:56.636  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息70 
2022-04-07 15:40:56.636  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息73 
2022-04-07 15:40:56.636  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息75 
2022-04-07 15:40:56.637  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息78 
2022-04-07 15:40:56.637  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息79 
2022-04-07 15:40:56.637  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息82 
2022-04-07 15:40:56.637  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息43 
2022-04-07 15:40:56.638  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息44 
2022-04-07 15:40:56.638  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息45 
2022-04-07 15:40:56.638  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息50 
2022-04-07 15:40:56.639  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息51 
2022-04-07 15:40:56.639  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息52 
2022-04-07 15:40:56.639  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息61 
2022-04-07 15:40:56.640  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息62 
2022-04-07 15:40:56.641  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息69 
2022-04-07 15:40:56.641  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息71 
2022-04-07 15:40:56.642  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息72 
2022-04-07 15:40:56.642  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息80 
2022-04-07 15:40:56.643  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息81 
2022-04-07 15:40:56.656  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息96 
2022-04-07 15:40:56.657  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息97 
2022-04-07 15:40:56.657  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息98 
2022-04-07 15:40:56.658  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息83 
2022-04-07 15:40:56.659  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息85 
2022-04-07 15:40:56.659  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息88 
2022-04-07 15:40:56.660  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息89 
2022-04-07 15:40:56.660  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息90 
2022-04-07 15:40:56.660  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息91 
2022-04-07 15:40:56.661  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息92 
2022-04-07 15:40:56.661  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息94 
2022-04-07 15:40:56.662  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息95 
2022-04-07 15:40:56.662  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息99 


遇到问题2:
Caused by: org.springframework.cloud.stream.provisioning.ProvisioningException: 
The number of expected partitions was: 2, but 1 have been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`

解决:
生产者yml文件中 spring.cloud.stream.kafka.binder 后面加入
auto-add-partitions: true     
# 当partition-count设置的值超过原来设置的值,true=自动创建分区
# partition-count: 2 这里的值超过原来设置的值,如果不是自动创建分区会抛上面的异常
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容