RocketMQ——消息轨迹方案实现

前言

在分布式消息系统中,确保消息在生产、传递和消费过程中的可追溯性至关重要。消息跟踪机制使得开发者和运维人员能够监控和调试消息流转过程,快速定位和解决问题。RocketMQ 作为一款高性能、高可用的消息中间件,提供了完善的消息跟踪机制。

一、什么是消息跟踪

消息跟踪是指记录和追踪消息在整个生命周期中的各种状态和操作。通过消息跟踪,可以详细了解消息的生产、传递和消费情况,便于监控和调试。

1.1、消息跟踪的工作原理

RocketMQ 的消息跟踪机制主要包括以下步骤:

  • 1、消息生产跟踪:
    在生产者发送消息时,记录消息的相关信息(如消息ID、主题、标签、发送时间等)。
  • 2、消息传递跟踪:
    在消息传递过程中,记录消息在各个节点的流转情况(如消息队列的路2``由、存储节点等)。
  • 3、消息消费跟踪:
    在消费者接收到消息时,记录消息的消费情况(如消费时间、消费结果、消费延迟等)。
  • 4、消息状态存储:
    将消息的跟踪信息存储在专用的存储系统中,便于后续查询和分析。

1.2、消息跟踪的应用场景

消息跟踪广泛应用于以下场景:

  • 1、问题排查:
    在消息传递过程中出现问题时,通过消息跟踪可以快速定位问题所在,并进行相应的处理。
  • 2、性能监控:
    通过跟踪消息的生产、传递和消费时间,可以监控系统的性能指标,及时发现和解决性能瓶颈。
  • 3、审计和合规:
    在一些对消息传递有严格审计和合规要求的场景中,消息跟踪可以提供详细的操作记录,满足审计需求。
  • 4、业务分析:
    通过分析消息的流转情况,可以获得业务运行的详细数据,辅助业务决策和优化。

二、RocketMQTemplate开启消息轨迹

2.1、导入RocketMQ依赖

首先,你需要在你的项目中导入RocketMQ的依赖。这可以通过在你的构建文件中添加以下代码来实现,具体代码如下所示:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

2.2、配置RocketMQTemplate

接下来,你需要配置RocketMQTemplate以使用消息轨迹功能。在你的应用程序的配置文件中添加以下属性,具体代码如下所示:

rocketmq.producer.enable-msg-trace=true
rocketmq.producer.customized-trace-topic=Topic_Trace_Data
rocketmq:
  producer:
    enable-msg-trace: true
    customized-trace-topic: Topic_Trace_Data

在这个配置中,enable-msg-trace设置为true启用消息跟踪,customized-trace-topic指定跟踪消息发送时使用的自定义跟踪主题。
在发送消息时,你可以通过sendMessage方法发送消息,并指定消息跟踪的开启。

2.3、发送消息

@Component
@Slf4j
public class RocketMqProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;
    
    public <T> void sendMessageWithTrace(String topic, String tag, String messageKey, T msg) {
        if(!StringUtils.isEmpty(tag)){
            topic = topic + ":" + tag;
        }
        Message<T> message = MessageBuilder.withPayload(msg)
                .setHeader(MessageConst.PROPERTY_TRACE_SWITCH, 1)
                .setHeader(MessageConst.PROPERTY_KEYS, messageKey)
                .build();
        rocketMQTemplate.convertAndSend(topic + ":tag1", message);
    }
}

在这个示例中,MessageConst.PROPERTY_TRACE_SWITCH是一个常量,它代表消息跟踪的开关,1表示开启消息跟踪。
MessageConst.PROPERTY_KEYS表示在构建消息的时候,指定了这条消息的键
确保你的RocketMQ服务端配置正确,并且支持消息跟踪功能。以上配置和代码可以启用消息跟踪,并将消息跟踪数据发送到指定的主题中。

三、rocketmq-client开启消息轨迹

3.1、导入RocketMQ依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.0</version>
</dependency>

3.2、如何使用消息轨迹

  • 1、修改Broker服务端配置,设置 traceTopicEnable=true
    表示在Broker上创建名为RMQ_SYS_TRACE_TOPIC的topic,队列个数为1。所有的msgTrace信息默认都存储在这个topic中。

  • 2、Producer中开启消息轨迹

public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)
  • boolean类型的入参enableMsgTrace设置为true,表示启用消息轨迹追踪,默认为false。
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
  • String类型的入参customizedTraceTopic,表示用于记录消息轨迹的topic,不设置默认为RMQ_SYS_TRACE_TOPIC。

  • 3、Consuemr中开启消息轨迹

public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace)
  • boolean类型的入参enableMsgTrace设置为true,表示启用消息轨迹追踪,默认为false。

如果启用了消息轨迹,在消息发送时尽量为消息指定Key属性,以便在RocketMQ-Console中对消息进行高性能的查询。

3.3、使用案例

  • 1、broker的配置文件(broker.conf)中增加如下配置,然后重启Broker:
traceTopicEnable = true
  • 2、Producer
public class TraceProducer {
    public static void main(String[] args) throws Exception {
        // 第二个参数TRUE,表示开启MsgTrace
        DefaultMQProducer producer = new DefaultMQProducer("saint-test", true);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setMaxMessageSize(1024 * 1024 * 10);
        producer.start();

        Message msg = new Message("test-topic-trace",null, "key-trace", "trace-2".getBytes(StandardCharsets.UTF_8));
        SendResult send = producer.send(msg);

        System.out.println("sendResult: " + send);

        // 关闭生产者
        producer.shutdown();
        System.out.println("已经停机");
    }
}
  • 3、Consumer
public class TraceConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("study-consumer", true);
        consumer.setNamesrvAddr("127.0.0.1:9876");

        consumer.subscribe("test-topic-trace", "*");
        consumer.setConsumeTimeout(20L);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
        System.out.println("Consumer start。。。。。。");

    }
}

3.4 消息轨迹内容

消息轨迹内容包括:消息ID(MessageID)、消息Tag、消息Key(MessageKey)、消息的存储时间、处理消息的客户端IP、存储服务器IP、发送/消费耗时、消息轨迹状态、跟踪类型。

在RocketMQ-Console中的消息轨迹内容如下:

image.png

3.5 RocketMQ-Console中查看消息轨迹

在MessageTrace大分类下有两种方式可以查看消息轨迹,一种是根据 原消息Topic + MessageKey、另一种是根据 原消息Topic + MessageID;


image.png

所以建议如果启用了消息轨迹,在消息发送时尽量为消息指定Key属性,以便在RocketMQ-Console中对消息进行高性能的查询。

  • 1、根据Message Key查询
    image.png
  • 2、根据Message ID查询
    image.png

四、RocketMQ消息轨迹实现原理

参考:
https://blog.csdn.net/Saintmm/article/details/128936017
https://blog.csdn.net/FireFox1997/article/details/140871165

https://blog.51cto.com/u_16175520/9169486

https://blog.csdn.net/CoderChronicle/article/details/135286211

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容