前言
在分布式消息系统中,确保消息在生产、传递和消费过程中的可追溯性至关重要。消息跟踪机制使得开发者和运维人员能够监控和调试消息流转过程,快速定位和解决问题。RocketMQ 作为一款高性能、高可用的消息中间件,提供了完善的消息跟踪机制。
一、什么是消息跟踪
消息跟踪是指记录和追踪消息在整个生命周期中的各种状态和操作。通过消息跟踪,可以详细了解消息的生产、传递和消费情况,便于监控和调试。
1.1、消息跟踪的工作原理
RocketMQ 的消息跟踪机制主要包括以下步骤:
-
1、消息生产跟踪:
在生产者发送消息时,记录消息的相关信息(如消息ID、主题、标签、发送时间等)。 -
2、消息传递跟踪:
在消息传递过程中,记录消息在各个节点的流转情况(如消息队列的路2``由、存储节点等)。 -
3、消息消费跟踪:
在消费者接收到消息时,记录消息的消费情况(如消费时间、消费结果、消费延迟等)。 -
4、消息状态存储:
将消息的跟踪信息存储在专用的存储系统中,便于后续查询和分析。
1.2、消息跟踪的应用场景
消息跟踪广泛应用于以下场景:
-
1、问题排查:
在消息传递过程中出现问题时,通过消息跟踪可以快速定位问题所在,并进行相应的处理。 -
2、性能监控:
通过跟踪消息的生产、传递和消费时间,可以监控系统的性能指标,及时发现和解决性能瓶颈。 -
3、审计和合规:
在一些对消息传递有严格审计和合规要求的场景中,消息跟踪可以提供详细的操作记录,满足审计需求。 -
4、业务分析:
通过分析消息的流转情况,可以获得业务运行的详细数据,辅助业务决策和优化。
二、RocketMQTemplate开启消息轨迹
2.1、导入RocketMQ依赖
首先,你需要在你的项目中导入RocketMQ的依赖。这可以通过在你的构建文件中添加以下代码来实现,具体代码如下所示:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
2.2、配置RocketMQTemplate
接下来,你需要配置RocketMQTemplate以使用消息轨迹功能。在你的应用程序的配置文件中添加以下属性,具体代码如下所示:
rocketmq.producer.enable-msg-trace=true
rocketmq.producer.customized-trace-topic=Topic_Trace_Data
rocketmq:
producer:
enable-msg-trace: true
customized-trace-topic: Topic_Trace_Data
在这个配置中,enable-msg-trace设置为true启用消息跟踪,customized-trace-topic指定跟踪消息发送时使用的自定义跟踪主题。
在发送消息时,你可以通过sendMessage方法发送消息,并指定消息跟踪的开启。
2.3、发送消息
@Component
@Slf4j
public class RocketMqProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
public <T> void sendMessageWithTrace(String topic, String tag, String messageKey, T msg) {
if(!StringUtils.isEmpty(tag)){
topic = topic + ":" + tag;
}
Message<T> message = MessageBuilder.withPayload(msg)
.setHeader(MessageConst.PROPERTY_TRACE_SWITCH, 1)
.setHeader(MessageConst.PROPERTY_KEYS, messageKey)
.build();
rocketMQTemplate.convertAndSend(topic + ":tag1", message);
}
}
在这个示例中,MessageConst.PROPERTY_TRACE_SWITCH是一个常量,它代表消息跟踪的开关,1表示开启消息跟踪。
MessageConst.PROPERTY_KEYS表示在构建消息的时候,指定了这条消息的键
确保你的RocketMQ服务端配置正确,并且支持消息跟踪功能。以上配置和代码可以启用消息跟踪,并将消息跟踪数据发送到指定的主题中。
三、rocketmq-client开启消息轨迹
3.1、导入RocketMQ依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
</dependency>
3.2、如何使用消息轨迹
1、修改Broker服务端配置,设置 traceTopicEnable=true
表示在Broker上创建名为RMQ_SYS_TRACE_TOPIC的topic,队列个数为1。所有的msgTrace信息默认都存储在这个topic中。2、Producer中开启消息轨迹
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)
- boolean类型的入参enableMsgTrace设置为true,表示启用消息轨迹追踪,默认为false。
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
String类型的入参customizedTraceTopic,表示用于记录消息轨迹的topic,不设置默认为RMQ_SYS_TRACE_TOPIC。
3、Consuemr中开启消息轨迹
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace)
- boolean类型的入参enableMsgTrace设置为true,表示启用消息轨迹追踪,默认为false。
如果启用了消息轨迹,在消息发送时尽量为消息指定Key属性,以便在RocketMQ-Console中对消息进行高性能的查询。
3.3、使用案例
- 1、broker的配置文件(broker.conf)中增加如下配置,然后重启Broker:
traceTopicEnable = true
- 2、Producer
public class TraceProducer {
public static void main(String[] args) throws Exception {
// 第二个参数TRUE,表示开启MsgTrace
DefaultMQProducer producer = new DefaultMQProducer("saint-test", true);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setMaxMessageSize(1024 * 1024 * 10);
producer.start();
Message msg = new Message("test-topic-trace",null, "key-trace", "trace-2".getBytes(StandardCharsets.UTF_8));
SendResult send = producer.send(msg);
System.out.println("sendResult: " + send);
// 关闭生产者
producer.shutdown();
System.out.println("已经停机");
}
}
- 3、Consumer
public class TraceConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("study-consumer", true);
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("test-topic-trace", "*");
consumer.setConsumeTimeout(20L);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("Consumer start。。。。。。");
}
}
3.4 消息轨迹内容
消息轨迹内容包括:消息ID(MessageID)、消息Tag、消息Key(MessageKey)、消息的存储时间、处理消息的客户端IP、存储服务器IP、发送/消费耗时、消息轨迹状态、跟踪类型。
在RocketMQ-Console中的消息轨迹内容如下:
3.5 RocketMQ-Console中查看消息轨迹
在MessageTrace大分类下有两种方式可以查看消息轨迹,一种是根据 原消息Topic + MessageKey、另一种是根据 原消息Topic + MessageID;
所以建议如果启用了消息轨迹,在消息发送时尽量为消息指定Key属性,以便在RocketMQ-Console中对消息进行高性能的查询。
-
1、根据Message Key查询
-
2、根据Message ID查询
四、RocketMQ消息轨迹实现原理
参考:
https://blog.csdn.net/Saintmm/article/details/128936017
https://blog.csdn.net/FireFox1997/article/details/140871165
https://blog.51cto.com/u_16175520/9169486
https://blog.csdn.net/CoderChronicle/article/details/135286211