RabbitMQ进阶

1. RabbitMQ高级特性

1.1 消息的可靠性投递

RabbitMQ提供了两种方式用来控制消息的投递可靠性模式。

  • confirm

判断消息是否到达交换机,消息从 producer 到 exchange 则会返回一个 confirmCallback 。

代码实现,需在配置文件中将connection-factory的属性 publisher-confirms设置为true

<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context"
 xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
​
 <context:property-placeholder location="classpath:rabbitmq.properties"/>
 <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
 port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
 virtual-host="${rabbitmq.virtual}"
 publisher-confirms="true" publisher-returns="true"/>
 <!--定义管理交换机、队列-->
 <rabbit:admin connection-factory="connectionFactory"/>
 <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
 <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
​
 <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"/>
​
 <rabbit:direct-exchange name="test_exchange_confirm">
 <rabbit:bindings>
 <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
 </rabbit:bindings>
 </rabbit:direct-exchange>
</beans>

设置回调方法

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ProducerTest {
 @Autowired
 private RabbitTemplate rabbitTemplate;
 @Test
 public void testComfirm() {
 //设置回调
 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
 public void confirm(CorrelationData correlationData, boolean b, String s) {
 System.out.println("confirm方法被执行了");
 if(b){
 //接受成功
 System.out.println("方法接收成功:"+s);
 }else {
 //接收失败
 System.out.println("方法接收失败:"+s);
 }
 }
 });
 rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","msg confirm!");
 }
}

通过confirm中的b判断交换机是否接收成功消息,接收成功即执行相应的方法

  • return模式

判断消息是否成功被路由到某一个队列, 消息从 exchange-->queue 投递失败则会返回一个 returnCallback

代码实现,需在配置文件中将connection-factory的属性 publisher-returns设置为true

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context"
 xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
​
 <context:property-placeholder location="classpath:rabbitmq.properties"/>
 <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
 port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
 virtual-host="${rabbitmq.virtual}"
 publisher-confirms="true" publisher-returns="true"/>
 <!--定义管理交换机、队列-->
 <rabbit:admin connection-factory="connectionFactory"/>
 <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
 <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
​
 <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"/>
​
 <rabbit:direct-exchange name="test_exchange_confirm">
 <rabbit:bindings>
 <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
 </rabbit:bindings>
 </rabbit:direct-exchange>
</beans>

设置回调方法,需将托管开启rabbitTemplate.setMandatory(true),当路由失败时执行回调方法里头的操作。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ProducerTest {
 @Autowired
 private RabbitTemplate rabbitTemplate;
 @Test
 public void testReturn() {
​
 rabbitTemplate.setMandatory(true);
​
 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
 /**
 * @param message
 * @param i replyCode
 * @param s replyTest
 * @param s1 exchange
 * @param s2 routingKey
 */
 public void returnedMessage(Message message, int i, String s, String s1, String s2) {
 System.out.println("return 执行了...");
 System.out.println(message);
 System.out.println(i);
 System.out.println(s);
 System.out.println(s1);
 System.out.println(s2);
​
 }
 });
​
 rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","msg confirm!");
 }
}

1.2 Consumer Ack

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。 有三种确认方式:

  • 自动确认:acknowledge="none"
  • 手动确认:acknowledge="manual"
  • 根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

spring中xml文件配置, 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context"
 xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
​
 <!--加载配置文件-->
 <context:property-placeholder location="classpath:rabbitmq.properties"/>
​
 <!--创建connectionFactory-->
 <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
 port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual}"/>
 <context:component-scan base-package="com.itheima.listener"/>
 <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="5">
 <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>-
 </rabbit:listener-container>
</beans>

代码实现, 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息,如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。

@Component
public class AckListener implements ChannelAwareMessageListener {
 @Override
 public void onMessage(Message message, Channel channel) throws Exception {
 Thread.sleep(1000);
 long deliveryTag = message.getMessageProperties().getDeliveryTag();
 try {
 System.out.println(new String(message.getBody()));
 System.out.println("处理业务逻辑");
 int i=3/0;
 channel.basicAck(deliveryTag,true);
 } catch (Exception e) {
 //e.printStackTrace();
 channel.basicNack(deliveryTag,true,true);
 }
 }
 @Override
 public void onMessage(Message message) {
 }
}

1.3 消费端限流

消费端一次获取一批消息确认后在获取下一批

在rabbit:listener-container 中配置 prefetch属性设置消费端一次拉取多少消息,消费端的确认模式一定为手动确认。acknowledge="manual"

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context"
 xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
 <!--加载配置文件-->
 <context:property-placeholder location="classpath:rabbitmq.properties"/>
 <!--创建connectionFactory-->
 <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
 port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
 virtual-host="${rabbitmq.virtual}"/>
​
 <context:component-scan base-package="com.itheima.listener"/>
​
 <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
 <rabbit:listener ref="qosListener" queue-names="test_queue_confirm"/>
 </rabbit:listener-container>
</beans>
@Component
public class QosListener implements ChannelAwareMessageListener {
 @Override
 public void onMessage(Message message, Channel channel) throws Exception {
 Thread.sleep(1000);
 System.out.println(new String(message.getBody()));
 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
 }
 @Override
 public void onMessage(Message message) {
 }
}

1.4 TTL

  • TTL 全称 Time To Live(存活时间/过期时间)。
  • 当消息到达存活时间后,还没有被消费,会被自动清除。
  • RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual}"
    publisher-confirms="true" publisher-returns="true"/>
    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>
    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
    <!--设置消息过期时间-->
    <rabbit:queue id="test_queue_ttl" name="test_queue_ttl">
        <rabbit:queue-arguments>
            <!--队列的过期时间-->
            <entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_ttl">
        <rabbit:bindings>
            <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
</beans>

单独设置过期队列,需添加后置处理器messagePostProcessor(消息后置处理器)

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test4() {
        for (int i = 0; i <10; i++) {
            rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.test","msg ttl!");
        }
    }

    @Test
    public void test5() {

        //消息后处理对象
        MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
            public Message postProcessMessage(Message message) throws AmqpException {
                //设置消息的参数
                message.getMessageProperties().setExpiration("5000");
                return message;
            }
        };
        for (int i = 0; i <10 ; i++) {
            if(i==5){
                //消息单独过期
                rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.test","msg ttl!",messagePostProcessor);
            }else {
                rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.test","msg ttl!");
            }
        }
    }
  • 如果同时设置了队列和发送消息的过期时间以短的过期时间为准
  • 消息过期之后如果没有在队列顶端不会立即被移除

1.5 死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

消息成为死信的三种情况

  1. 队列消息长度到达限制;
  1. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
  1. 原队列存在消息过期设置,消息到达超时时间未被消费;

在spring配置文件中给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual}"
    publisher-confirms="true" publisher-returns="true"/>
    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>
    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
    <!--死信队列的设置-->
    <!--1.设置正常队列-->
    <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
        <!--3.绑定-->
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="exchange_dlx"/>
            <entry key="x-dead-letter-routing-key" value="dlx.hello"/>
            <entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"/>
            <entry key="x-max-length" value="10" value-type="java.lang.Integer"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--2.设置死信队列-->
    <rabbit:queue name="queue_dlx" id="queue_dlx"/>
    <rabbit:topic-exchange name="exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
</beans>

死信队列中过期时间设置和长度限制设置

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     *死信队列,过期时间设置
     */
    @Test
    public void test6() {
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hello","hello dlx");
    }

    /**
     *死信队列,长度限制设置
     */
    @Test
    public void test7() {
        for (int i = 0; i <20; i++) {
            rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hello","hello dlx");
        }

    }
}

死信队列中访问拒绝设置,需在消费端channel.basicNack(deliveryTag,true,false)即requeue=false,此处省略在配置文件中配置监听器

@Component
public class DlxListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(new String(message.getBody()));
            System.out.println("处理业务逻辑");
            int i=3/0;
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //e.printStackTrace();
            System.out.println("出现问题,拒绝接收");
            channel.basicNack(deliveryTag,true,false);
        }
    }
    @Override
    public void onMessage(Message message) {

    }
}

小结

  1. 死信交换机和死信队列和普通的没有区别

  2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队 列


1.6 延迟队列

在RabbitMQ中,TTL+死信队列 组合实现延迟队列的效果

  • 生产端
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual}"
    publisher-confirms="true" publisher-returns="true"/>
    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>
    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
    <!--延迟队列  死信队列+TTL-->
   <!-- 1.定义正常交换机-->
    <rabbit:queue id="order_queue" name="order_queue">
        <!--3.绑定,设置正常队列过期时间为30分钟-->
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="order_exchange_dlx"/>
            <entry key="x-dead-letter-routing-key" value="dlx.order.cancel"/>
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="order_exchange">
        <rabbit:bindings>
            <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--2.定义死信交换机和队列-->
    <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"/>
    <rabbit:topic-exchange name="order_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
</beans>
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     *延迟队列
     */
    @Test
    public void test8() throws InterruptedException {
        rabbitTemplate.convertAndSend("order_exchange","order.msg","msg:hello,order");

        for (int i = 10; i >0 ; i--) {
            System.out.println(i+"...");
            Thread.sleep(1000);
        }
    }
}
  • 消费端
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!--创建connectionFactory-->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual}"/>

    <context:component-scan base-package="com.itheima.listener"/>

    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
        <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"/>
    </rabbit:listener-container>

</beans>
@Component
public class OrderListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(new String(message.getBody()));
            System.out.println("处理业务逻辑");
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //e.printStackTrace();
            channel.basicNack(deliveryTag,true,false);
        }
    }

    @Override
    public void onMessage(Message message) {

    }
}

1.7 日志与监控

  • RabbitMQ默认日志存放路径:/var/log/rabbitmq/rabbit@xxx.log
  • 日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、 RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等

查看队列

rabbitmqctl list_queues

查看exchanges

rabbitmqctl list_exchanges

查看用户

rabbitmqctl list_users

查看连接

rabbitmqctl list_connections

查看消费者信息

rabbitmqctl list_consumers

查看环境变量

rabbitmqctl environment

查看未被确认的队列

rabbitmqctl list_queues name messages_unacknowledged

查看单个队列的内存使用

rabbitmqctl list_queues name memory

查看准备就绪的队列

rabbitmqctl list_queues name messages_ready

1.8消息追踪-Firehose

  • rabbitmqctl trace_on:开启Firehose命令
  • rabbitmqctl trace_off:关闭Firehose命令
  • 打开 trace 会影响消息写入功能,适当打开后请关闭。

rabbitmq_tracing和Firehose在实现上如出一辙,记录所有消息的入队(生产消息)以及出队(消费)的详细信息,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。


2. 应用问题

2.1 消息补偿

image-20200504183149908.png
  1. 三个队列:

    • 生产者和消费者

    • 消费者和状态检查服务

    • 生产者和检查服务的

  2. 利用dubbo实现检查服务远程过程调用消息生产者(如果说延迟消息检测发现没有正常处理)

2.2. 幂等性问题(不会重复消费)

使用乐观锁的思想

{ uid: 123546, money: 100 version: 1 } { uid: 123546, money: 100 version: 1 } { uid: 123546, money: 100 version: 1 }

数据库: 123546, 50000, 1 第一次消费后: 123546, 49000, 2

update set money = money - ${money}, version = version + 1 where version = ${version}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。