mq事务实战/mq延迟队列探索

mq的应用场景
1。解耦
2。削峰
3。【延迟消息】
4。【分布式事务】
5.日志收集
6.消息分发

采用分布式事务方式确保消息默认发送到rocketmq上,
默认开启了数据持久化位置:${user.home}/store
消费的时候报错会自动重试

【默认重试间隔是多少】
无序消息
对于无序消息(通过实现MessageListenerConcurrently接口来消费),第一次重试间隔时间是 10 秒。第二次重试间隔时间是 30 秒,第三次重试间隔时间是 1 分钟,第四次重试间隔时间是 2 分钟,第五次重试间隔时间是 3 分钟,第六次重试间隔时间是 4 分钟,第七次重试间隔时间是 5 分钟,第八次重试间隔时间是 6 分钟,第九次重试间隔时间是 7 分钟,第十次重试间隔时间是 8 分钟,第十一次重试间隔时间是 9 分钟,第十二次重试间隔时间是 10 分钟,第十三次重试间隔时间是 20 分钟,第十四次重试间隔时间是 30 分钟,第十五次重试间隔时间是 1 小时,第十六次重试间隔时间是 2 小时。
顺序消息
对于顺序消息(通过实现MessageListenerOrderly接口来消费),默认的重试间隔时间是 1000 毫秒(1 秒)。可以通过MessageListenerOrderly接口的SUSPEND_CURRENT_QUEUE_TIME_MILLIS属性来设置具体的重试间隔时间。

【重试次数限制及死信队列】
RocketMQ 会限制消息的重试次数。当消息重试达到一定次数(默认 16 次)后,如果仍然无法成功消费,消息会被发送到死信队列(DLQ,Dead - Letter - Queue)。
死信队列中的消息可以通过专门的工具或者自定义的程序进行后续处理。例如,开发人员可以编写一个工具来定期扫描死信队列中的消息,分析消费失败的原因,可能是消息格式错误、业务逻辑处理有严重问题(如数据库连接异常无法恢复等),然后根据具体情况对消息进行手动修复或者调整业务逻辑后重新消费。

mq事务


import com.alibaba.fastjson.JSON;
import com.csw.RocketMQ.dao.TxLogDao;
import com.csw.domain.Order;
import com.csw.domain.TxLog;
import com.csw.orderAndMsg.dao.OrderDao;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Date;
import java.util.UUID;

@Service
public class OrderServiceImpl4 {

    @Autowired
    private OrderDao orderDao;

    @Autowired
    private TxLogDao txLogDao;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    //事务mq【第1步】
    public void createOrderBefore(Order order) {

        String txId = UUID.randomUUID().toString();
        String orderJson = JSON.toJSONString(order);

        //发送半事务消息
        rocketMQTemplate.sendMessageInTransaction(
                "tx_producer_group",
                "tx_topic",
                MessageBuilder.withPayload(orderJson).setHeader("txId", txId).build(),
                order
        );
    }

    //事务mq【第3步】
    @Transactional
    public void createOrder(String txId, Order order) {
        //保存订单
        orderDao.save(order);

        TxLog txLog = new TxLog();
        txLog.setTxId(txId);
        txLog.setDate(new Date());

        //记录事物日志
        txLogDao.save(txLog);
    }

 



import com.csw.RocketMQ.dao.TxLogDao;
import com.csw.domain.Order;
import com.csw.domain.TxLog;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

/**
 * 事务消息接受
 * 普通消息接收在user里
 */
@Service
@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderServiceImpl4 orderServiceImpl4;


    @Autowired
    private TxLogDao txLogDao;

    //执行本地事物
    //事务mq【第2步】
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {

        String txId = (String) msg.getHeaders().get("txId");

        try {
            //本地事物
            Order order = (Order) arg;
            orderServiceImpl4.createOrder(txId,order);

            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    //消息回查【如果一段时间没有收到消息比如,报错还没有来得及提交就宕机了,网络延迟了mq没收到等】
    //事务mq【第4步】
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String txId = (String) msg.getHeaders().get("txId");

 TxLog txLog = null;
    int retryCount = 0;
    while (retryCount < 3) { // 最多重试3次
     txLog = txLogDao.findById(txId).get();
        if (txLog!= null) {
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            try {
                Thread.sleep(1000); // 等待1秒后重试
            } catch (InterruptedException e) {
                // 处理异常
            }
            retryCount++;
        }
    }

    return RocketMQLocalTransactionState.ROLLBACK;

    }
}



import com.csw.domain.Order;
import com.csw.domain.User;
import com.csw.sms.utils.SmsUtil;
import com.csw.user.dao.UserDao;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Random;

/**
 * 普通消息接受
 * 事务消息接收在order里
 */
@Slf4j
@Service("shopSmsService")
//consumerGroup-消费者组名  topic-要消费的主题
@RocketMQMessageListener(
        consumerGroup = "tx_producer_group", //消费者组名
        topic = "tx_topic",//消费主题
        consumeMode = ConsumeMode.CONCURRENTLY,//消费模式,指定是否顺序消费 CONCURRENTLY(同步,默认) ORDERLY(顺序)
        messageModel = MessageModel.CLUSTERING//消息模式 BROADCASTING(广播)  CLUSTERING(集群,默认)在广播模式下,消费失败的消息会被丢弃,而在集群模式下,【消费失败的消息会被重新入队等待稍后消费】
)
public class RocketSms implements RocketMQListener<Order> {

    @Autowired
    private UserDao userDao;

    //消费逻辑
    @Override
    public void onMessage(Order order) {
        log.error("接收到了一个订单信息{},接下来就可以发送短信通知了", order);
// 获取消息体中的JSON字符串


        //根据uid 获取手机号
        User user = userDao.findById(order.getUid()).get();

        //消费失败测试
        //int mm = 1 / 0;

        //生成验证码 1-9 6
        StringBuilder builder = new StringBuilder();
        for (int i = 0; i < 6; i++) {
            builder.append(new Random().nextInt(9) + 1);
        }
        String smsCode = builder.toString();

        Param param = new Param(smsCode);
        try {
            //发送短信 {"code":"123456"}
            SmsUtil smsUtil = new SmsUtil();
            //节省资源
            //smsUtil.send(user.getTelephone(),JSON.toJSONString(param),"yzm");
            log.error("短信发送成功");

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    class Param {
        private String code;
    }
}

延迟队列

  /**
     * 发送延迟消息
     *
     * RocketMQ 默认提供了 18 个延迟级别,每个延迟级别对应的延迟时间如下:
     * 1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h。
     */
    public void sendDelayMessage() {
        // 构建消息,这里以简单的字符串消息为例
        Message<String> message = MessageBuilder.withPayload("This is a delay message").build();
        //超时10秒钟,延迟级别是5等于1分钟
        rocketMQTemplate.syncSend("topic-dely-test", message, 10 * 1000, 5);
    }


}
package com.csw.rocketMQ;

import com.csw.user.dao.UserDao;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * 延迟队列的组名可以随便指定,但是不要指定已有的组名
*在
 */
@Slf4j
@Service
@RocketMQMessageListener(
        consumerGroup = "topic-dely-test", topic = "topic-dely-test")
public class RocketDelaySms implements RocketMQListener<String> {

    @Autowired
    private UserDao userDao;

    //消费逻辑
    @Override
    public void onMessage(String message) {
        log.error("接收到了一个订单信息{},接下来就可以发送短信通知了", message);

    }


}

配置

【配置文件】
rocketmq:
  name-server: 192.168.147.131:9876   #rocketMQ服务的地址

【依赖】
<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
  <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
下载RocketMQ并解压
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/

 nohup ./bin/mqnamesrv &
 nohup bin/mqbroker -n localhost:9876 &



下载控制台
# 在git上下载下面的工程 rocketmq-console-1.0.0
https://github.com/apache/rocketmq-externals/releases

# 修改配置文件 rocketmq-console\src\main\resources\application.properties
server.port=7777 #项目启动后的端口号
rocketmq.config.namesrvAddr=192.168.109.131:9876 #nameserv的地址,注意防火墙要开启
9876端口

# 进入控制台项目,将工程打成jar包
mvn clean package -Dmaven.test.skip=true

# 启动控制台
java -jar target/rocketmq-console-ng-1.0.0.jar

浏览器访问
http://localhost:7777/#/consumer
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。