mq事务实战/mq延迟队列探索

mq的应用场景
1。解耦
2。削峰
3。【延迟消息】
4。【分布式事务】
5.日志收集
6.消息分发

采用分布式事务方式确保消息默认发送到rocketmq上,
默认开启了数据持久化位置:${user.home}/store
消费的时候报错会自动重试

【默认重试间隔是多少】
无序消息
对于无序消息(通过实现MessageListenerConcurrently接口来消费),第一次重试间隔时间是 10 秒。第二次重试间隔时间是 30 秒,第三次重试间隔时间是 1 分钟,第四次重试间隔时间是 2 分钟,第五次重试间隔时间是 3 分钟,第六次重试间隔时间是 4 分钟,第七次重试间隔时间是 5 分钟,第八次重试间隔时间是 6 分钟,第九次重试间隔时间是 7 分钟,第十次重试间隔时间是 8 分钟,第十一次重试间隔时间是 9 分钟,第十二次重试间隔时间是 10 分钟,第十三次重试间隔时间是 20 分钟,第十四次重试间隔时间是 30 分钟,第十五次重试间隔时间是 1 小时,第十六次重试间隔时间是 2 小时。
顺序消息
对于顺序消息(通过实现MessageListenerOrderly接口来消费),默认的重试间隔时间是 1000 毫秒(1 秒)。可以通过MessageListenerOrderly接口的SUSPEND_CURRENT_QUEUE_TIME_MILLIS属性来设置具体的重试间隔时间。

【重试次数限制及死信队列】
RocketMQ 会限制消息的重试次数。当消息重试达到一定次数(默认 16 次)后,如果仍然无法成功消费,消息会被发送到死信队列(DLQ,Dead - Letter - Queue)。
死信队列中的消息可以通过专门的工具或者自定义的程序进行后续处理。例如,开发人员可以编写一个工具来定期扫描死信队列中的消息,分析消费失败的原因,可能是消息格式错误、业务逻辑处理有严重问题(如数据库连接异常无法恢复等),然后根据具体情况对消息进行手动修复或者调整业务逻辑后重新消费。

mq事务


import com.alibaba.fastjson.JSON;
import com.csw.RocketMQ.dao.TxLogDao;
import com.csw.domain.Order;
import com.csw.domain.TxLog;
import com.csw.orderAndMsg.dao.OrderDao;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Date;
import java.util.UUID;

@Service
public class OrderServiceImpl4 {

    @Autowired
    private OrderDao orderDao;

    @Autowired
    private TxLogDao txLogDao;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    //事务mq【第1步】
    public void createOrderBefore(Order order) {

        String txId = UUID.randomUUID().toString();
        String orderJson = JSON.toJSONString(order);

        //发送半事务消息
        rocketMQTemplate.sendMessageInTransaction(
                "tx_producer_group",
                "tx_topic",
                MessageBuilder.withPayload(orderJson).setHeader("txId", txId).build(),
                order
        );
    }

    //事务mq【第3步】
    @Transactional
    public void createOrder(String txId, Order order) {
        //保存订单
        orderDao.save(order);

        TxLog txLog = new TxLog();
        txLog.setTxId(txId);
        txLog.setDate(new Date());

        //记录事物日志
        txLogDao.save(txLog);
    }

 



import com.csw.RocketMQ.dao.TxLogDao;
import com.csw.domain.Order;
import com.csw.domain.TxLog;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

/**
 * 事务消息接受
 * 普通消息接收在user里
 */
@Service
@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderServiceImpl4 orderServiceImpl4;


    @Autowired
    private TxLogDao txLogDao;

    //执行本地事物
    //事务mq【第2步】
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {

        String txId = (String) msg.getHeaders().get("txId");

        try {
            //本地事物
            Order order = (Order) arg;
            orderServiceImpl4.createOrder(txId,order);

            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    //消息回查【如果一段时间没有收到消息比如,报错还没有来得及提交就宕机了,网络延迟了mq没收到等】
    //事务mq【第4步】
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String txId = (String) msg.getHeaders().get("txId");

 TxLog txLog = null;
    int retryCount = 0;
    while (retryCount < 3) { // 最多重试3次
     txLog = txLogDao.findById(txId).get();
        if (txLog!= null) {
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            try {
                Thread.sleep(1000); // 等待1秒后重试
            } catch (InterruptedException e) {
                // 处理异常
            }
            retryCount++;
        }
    }

    return RocketMQLocalTransactionState.ROLLBACK;

    }
}



import com.csw.domain.Order;
import com.csw.domain.User;
import com.csw.sms.utils.SmsUtil;
import com.csw.user.dao.UserDao;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Random;

/**
 * 普通消息接受
 * 事务消息接收在order里
 */
@Slf4j
@Service("shopSmsService")
//consumerGroup-消费者组名  topic-要消费的主题
@RocketMQMessageListener(
        consumerGroup = "tx_producer_group", //消费者组名
        topic = "tx_topic",//消费主题
        consumeMode = ConsumeMode.CONCURRENTLY,//消费模式,指定是否顺序消费 CONCURRENTLY(同步,默认) ORDERLY(顺序)
        messageModel = MessageModel.CLUSTERING//消息模式 BROADCASTING(广播)  CLUSTERING(集群,默认)在广播模式下,消费失败的消息会被丢弃,而在集群模式下,【消费失败的消息会被重新入队等待稍后消费】
)
public class RocketSms implements RocketMQListener<Order> {

    @Autowired
    private UserDao userDao;

    //消费逻辑
    @Override
    public void onMessage(Order order) {
        log.error("接收到了一个订单信息{},接下来就可以发送短信通知了", order);
// 获取消息体中的JSON字符串


        //根据uid 获取手机号
        User user = userDao.findById(order.getUid()).get();

        //消费失败测试
        //int mm = 1 / 0;

        //生成验证码 1-9 6
        StringBuilder builder = new StringBuilder();
        for (int i = 0; i < 6; i++) {
            builder.append(new Random().nextInt(9) + 1);
        }
        String smsCode = builder.toString();

        Param param = new Param(smsCode);
        try {
            //发送短信 {"code":"123456"}
            SmsUtil smsUtil = new SmsUtil();
            //节省资源
            //smsUtil.send(user.getTelephone(),JSON.toJSONString(param),"yzm");
            log.error("短信发送成功");

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    class Param {
        private String code;
    }
}

延迟队列

  /**
     * 发送延迟消息
     *
     * RocketMQ 默认提供了 18 个延迟级别,每个延迟级别对应的延迟时间如下:
     * 1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h。
     */
    public void sendDelayMessage() {
        // 构建消息,这里以简单的字符串消息为例
        Message<String> message = MessageBuilder.withPayload("This is a delay message").build();
        //超时10秒钟,延迟级别是5等于1分钟
        rocketMQTemplate.syncSend("topic-dely-test", message, 10 * 1000, 5);
    }


}
package com.csw.rocketMQ;

import com.csw.user.dao.UserDao;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * 延迟队列的组名可以随便指定,但是不要指定已有的组名
*在
 */
@Slf4j
@Service
@RocketMQMessageListener(
        consumerGroup = "topic-dely-test", topic = "topic-dely-test")
public class RocketDelaySms implements RocketMQListener<String> {

    @Autowired
    private UserDao userDao;

    //消费逻辑
    @Override
    public void onMessage(String message) {
        log.error("接收到了一个订单信息{},接下来就可以发送短信通知了", message);

    }


}

配置

【配置文件】
rocketmq:
  name-server: 192.168.147.131:9876   #rocketMQ服务的地址

【依赖】
<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
  <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
下载RocketMQ并解压
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/

 nohup ./bin/mqnamesrv &
 nohup bin/mqbroker -n localhost:9876 &



下载控制台
# 在git上下载下面的工程 rocketmq-console-1.0.0
https://github.com/apache/rocketmq-externals/releases

# 修改配置文件 rocketmq-console\src\main\resources\application.properties
server.port=7777 #项目启动后的端口号
rocketmq.config.namesrvAddr=192.168.109.131:9876 #nameserv的地址,注意防火墙要开启
9876端口

# 进入控制台项目,将工程打成jar包
mvn clean package -Dmaven.test.skip=true

# 启动控制台
java -jar target/rocketmq-console-ng-1.0.0.jar

浏览器访问
http://localhost:7777/#/consumer
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352