RocketMQ消费者在生产环境上出现无法消费到数据的问题

1、问题现象

开发组在项目中加了一个RocketMQ的消费者逻辑,代码如下:
引入依赖

       <!-- rocketmq-->
       <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.3.0</version>
        </dependency>

RocketMQ配置信息

# RocketMQ 核心配置
rocketmq:
  name-server: 127.0.0.1:9876;127.0.0.2:9876 # NameServer地址,多个用分号分隔
  consumer:
    topic: demo-topic
    group: demo-consumer-group # 消费者组名(必须唯一)

通过注解监听Topic的数据

@Slf4j
@Component
// 核心注解:配置消费者组、主题、标签等
@RocketMQMessageListener(
        consumerGroup = "${rocketmq.consumer.group}", // 消费者组(推荐读取配置)
        topic = "${rocketmq.consumer.topic}"                          // 订阅的主题
)
public class DemoConsumer implements RocketMQListener<String> {

    /**
     * 消息消费核心方法
     * @param message 消息体(支持String、字节数组、自定义对象等)
     */
    @Override
    public void onMessage(String message) {
        try {
            // 1. 业务处理逻辑(示例:打印消息+业务处理)
            log.info("接收到RocketMQ消息:{}", message);
            // TODO: 你的业务逻辑(如入库、调用接口、消息转发等)
            // 2. 消费成功:无需返回值,无异常即表示消费成功
        } catch (Exception e) {
            log.error("消息消费失败,内容:{}", message, e);
            throw new RuntimeException("消费失败,触发重试", e);
        }
    }
}

问题现象是:代码启动正常,监听器也正常开启,测试环境也可以正常消费,但是生产环境就是没有消费(接收)到数据。
启动日志如下:


代码启动日志

RocketMQ的消息模型


RocketMQ的消息模型

2、问题定位过程

2.1、 检查消费者和RocketMQ的网络联通,排除防火墙导致的无法消费,包括NameServer和Broker

telnet测试网络联通

2.2、登陆RocketMQ的控制台,查看Topic和消费组的创建情况

可以看出没有存活的订阅组(消费者)


主题和消费组的查询

2.3、通过RocketMQ的控制台在对应Topic上发送消息

发送消息

从下图中的报错消息也能看出来,消息没有被消费组消费,正在重试


发送消息报错信息

通过主题状态查询消息发送情况,图中可以看出有消息发送到该主题上(可用于排除生产者是否正确推数的问题)


主题状态

2.4、查看消费组创建情况

在消费者列表中查询根据消费组名称查询消费者,发现没有创建消费组。到此问题就清晰了。

消费者列表

3、问题原因和解决措施

问题原因:
经上述的信息分析和最后的排查,发现是生产环境为了安全和规范,防止随意创建垃圾组或权限失控,将自动创建消费组关闭了,需要手工创建。但是,生产环境上没有创建消费组,导致消费者无法消费消息。而测试环境自动创建消费组是开启的,因此没有遇到这个问题。
检查Broker 的配置文件 (broker.conf) 中的autoCreateSubscriptionGroup发现,生产上的配置如下:

autoCreateSubscriptionGroup=false

解决措施:
通过命令行创建消费组

# 语法
sh mqadmin updateSubGroup -n <NameServerIP:Port> -c <集群名> -g <消费组名>

# 示例:在 DefaultCluster 集群创建一个名为 OrderConsumerGroup 的组
sh mqadmin updateSubGroup -n 192.168.1.100:9876 -c DefaultCluster -g OrderConsumerGroup

验证组是否已存在并查看初始状态

sh mqadmin consumerProgress -n 192.168.1.100:9876 -g OrderConsumerGroup

RocketMQ的产品文档:
https://rocketmq.apache.org/zh/docs/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容