ActiveMQ

目录
    1 概述
    2 JMS模式
    3 JMS创建通用步骤
    4 JMS消息可靠性
    5 ActiveMQ作用
    6 协议
    7 可持久化
    8 ActiveMQ部署模式
    9 高级特性
    10 Springboot整合ActiveMQ

参考资料
    · 有道云笔记
    · SCDN

1 概述

1.1 背景

    在客户端与服务器进行通讯时.客户端调用后,必须等待服务对象完成处理返回结果才能继续执行。客户与服务器对象的生命周期紧密耦合,客户进程和服务对象进程都都必须正常运行;如果由于服务对象崩溃或者网络故障导致用户的请求不可达,客户受到异常。

1.2 实质

    ActiveMQ 是一个 MOM(消息中间件),具体来说是一个实现了 JMS 规范的系统间远程通信的消息代理。其他实现JMS规范的还有kafka、RabbitMQ、RockitMQ等。

    ActiveMQ解决了耦合调用、异步模型、抵御洪峰流量,保护了主业务,消峰。

1.3 MOM

    MOM 就是面向消息中间件(Message-oriented middleware),是用于以分布式应用或系统中的异步、松耦合、可靠、可扩展和安全通信的一类软件。MOM 的总体思想是它作为消息发送器和消息接收器之间的消息中介,这种中介提供了一个全新水平的松耦合。

1.4 JMS

1.4.1 概念

    JMS 叫做 Java 消息服务(Java Message Service),是 Java 平台上有关面向 MOM 的技术规范,旨在通过提供标准的产生、发送、接收和处理消息的 API 简化企业应用的开发,类似于 JDBC 和关系型数据库通信方式的抽象。

图 1-1 JMS规范

1.4.2 部件

1.4.2.1 JMS provider

    实现JMS 的消息中间件,也就是MQ服务器。

1.4.2.2 JMS producer

    消息生产者,创建和发送消息的客户端

1.4.2.3 JMS consumer

    消息消费者,接收和处理消息的客户端

1.4.2.4 JMS message

    JMS 消息,分为消息头、消息属性、消息体

(1)消息头

    · JMSDestination 寻址对象

    · JMSDeliveryMode 是否持久

    · JMSExpiration 过期时间

    · JMSPriority 优先级,默认是4,有0~9 共10个等级,5-9 是紧急的,0-4 是普通的

    · JMSMessageId 唯一的消息ID

(2)消息体

       发送和接收的消息类型必须一致

    · TextMessage 普通字符串消息,包含一个String

    · Mapmessage Map 类型的消息,k-> String ,v -> Java 基本类型

    · BytesMessage 二进制数组消息,包含一个byte[]

    · StreamMessage Java 数据流消息,用标准流操作来顺序的填充读取

    · ObjectMessage  对象消息,包含一个可序列化的Java 对象

(3)消息属性:识别、去重、重点标注

1.4.3 其他概念

(1)Domains:消息传递方式,包括点对点(P2P)、发布/订阅(Pub/Sub)两种

(2)Connection factory:客户端使用连接工厂来创建与 JMS provider 的连接

(3)Destination:消息被寻址、发送以及接收的对象,包括queue(点对点模式)和topic(发布订阅模式)

图 1-2 JMS编码总体架构

(4)broker 就是实现了用代码形式启动 ActiveMQ 将 MQ 内嵌到 Java 代码中,可以随时启动,节省资源,提高了可靠性。即将 MQ 服务器作为了 Java 对象

2 JMS模式

图 2-1 JMS模式之间的对比

2.1 点对点模式(P2P)

(1)使用queue作为Destination

(2)消息接收方式

 · 依赖

<dependency>

    <groupId>org.apache.activemq</groupId>

    <artifactId>activemq-core</artifactId>

    <version>5.7.0</version>

</dependency>

· 生产者

public class Producter {

    public static void main(String[] args) throws JMSException {

        // ConnectionFactory:连接工厂,JMS 用它创建连接

        ConnectionFactory connectionFactory =         new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,

        ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");

        // JMS客户端到JMS Provider 的连接

        Connection connection = connectionFactory.createConnection();

        connection.start();

        // Session: 一个发送或接收消息的线程

        Session session = connection.createSession(Boolean.falst,         Session.AUTO_ACKNOWLEDGE);

        // Destination:消息的目的地;消息发送给谁.

        //获取session注意参数值my-queue是Query的名字

        Destination destination = session.createQueue("my-queue");

        // MessageProducer:消息生产者

        MessageProducer producer = session.createProducer(destination);

        //设置不持久化

        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        //发送一条消息

        for (int i = 1; i <= 5; i++) {

            sendMsg(session, producer, i);

        }

        session.commit();

        connection.close();

    }

/**

*在指定的会话上,通过指定的消息生产者发出一条消息

 *

 * @param session

*消息会话

 * @param producer

*消息生产者

 */

    public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {

    //创建一条文本消息

        TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);

        //通过消息生产者发出消息

        producer.send(message);

    }

}

· 同步:Consumer 可以使用 MessageConsumer.receive() 同步地接收消息

public class JmsReceiver {

    public static void main(String[] args) throws JMSException {

        // ConnectionFactory:连接工厂,JMS 用它创建连接

        ConnectionFactory connectionFactory =         new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,

        ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");

        // JMS客户端到JMS Provider 的连接

        Connection connection = connectionFactory.createConnection();

        connection.start();

        // Session: 一个发送或接收消息的线程

        Session session = connection.createSession(Boolean.TRUE,         Session.AUTO_ACKNOWLEDGE);

        // Destination:消息的目的地;消息发送给谁.

        //获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置

        Destination destination = session.createQueue("my-queue");

        //消费者,消息接收者

        MessageConsumer consumer = session.createConsumer(destination);

        while (true) {

            TextMessage message = (TextMessage) consumer.receive();

            if (null != message) {

                System.out.println("收到消息:" + message.getText());

                session.commit();

            } else

                break;

            }

        session.close();

        connection.close();

    }

}

· 异步:Consumer 也可以使用MessageConsumer.setMessageListener() 注册一个 MessageListener 实现异步接收

  // 通过监听的方式来消费消息

  // 通过异步非阻塞的方式消费消息

  // 通过messageConsumer 的setMessageListener 注册一个监听器,

  // 当有消息发送来时,系统自动调用MessageListener 的 onMessage 方法处理消息

  messageConsumer.setMessageListener(new MessageListener() {  // 可以用监听器替换之前的同步receive 方法

      public void onMessage(Message message)  {

              if (null != message  && message instanceof TextMessage){

                  TextMessage textMessage = (TextMessage)message;

                  try {

              System.out.println("****消费者的消息:"+textMessage.getText());

              }catch (JMSException e) {

                      e.printStackTrace();

              }

          }

      }

  });

(3)工作模式

    · “负载均衡”,当先启动两个消费者时,在启动生产者发出消息,此时的消息平均的被两个消费者消费。 并且消费者不会消费已经被消费的消息(即为已经出队的消息)。

    · 多个 Consumer 可以注册到同一个 queue 上,但一个消息只能被一个 Consumer 所接收,然后由该 Consumer 来确认消息。并且在这种情况下,Provider 对所有注册的 Consumer 以轮询的方式发送消息。

图 2-2 P2P工作模式

(4)有无状态

        Queue数据默认会在mq服务器上以文件形式保存,也可以配置成DB存储。比如ActiveMQ 一般保存在$AMQ_HOME\data\kr-store\data下面。

(5)传递完整性

        如果没有消费者,保留消息直到被消费或者到达消息超时时间。

2.2 发布/订阅模式(Pub/Sub)

(1)使用topic作为Destination

(2)消息接收方式

        同步和异步两种方式,同P2P模式。

(3)工作模式

图 2-3 Pub/Sub工作模式 

    · 有订阅者,则所有订阅者都会受到消息。消息会按照订阅者的数量进行复制。故随着订阅者的增加,处理性能也会明显降低。

    · 无订阅者,消息将会被丢弃。先要有订阅者,生产者才有意义

· 生产者

public class TOPSendf {
     private static String BROKERURL = "tcp://127.0.0.1:61616";
     private static String TOPIC = "my-topic";
     public static void main(String[] args) throws JMSException {
         start();
     }
     static public void start() throws JMSException {
        System.out.println("生产者已经启动....");
         //创建ActiveMQConnectionFactory 会话工厂
         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
         Connection connection = activeMQConnectionFactory.createConnection();
         //启动JMS 连接
         connection.start();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer producer = session.createProducer(null);        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        send(producer, session);
        System.out.println("发送成功!");
        connection.close();
     }
     static public void send(MessageProducer producer, Session session) throws JMSException {
     for (int i = 1; i <= 5; i++) {
         System.out.println("我是消息" + i);
         TextMessage textMessage = session.createTextMessage("我是消息" + i);                          Destination destination = session.createTopic(TOPIC);
          producer.send(destination, textMessage);
         }
     }
}

· 消费者

public class TopReceiver {
     private static String BROKERURL = "tcp://127.0.0.1:61616";
     private static String TOPIC = "my-topic";
     public static void main(String[] args) throws JMSException {
         start();
     }
     static public void start() throws JMSException {
         System.out.println("消费点启动...");
         // 创建ActiveMQConnectionFactory 会话工厂
         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
         Connection connection = activeMQConnectionFactory.createConnection();
         // 启动JMS 连接
         connection.start();
         // 不开消息启事物,消息主要发送消费者,则表示消息已经签收
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         // 创建一个队列
         Topic topic = session.createTopic(TOPIC);
         MessageConsumer consumer = session.createConsumer(topic);
         // consumer.setMessageListener(new MsgListener());
         while (true) {
             TextMessage textMessage = (TextMessage) consumer.receive();
             if (textMessage != null) {
                 System.out.println("接受到消息:" + textMessage.getText());
                     // textMessage.acknowledge();// 手动签收
                     // session.commit();
             } else {
                 break;
             }
         }
         connection.close();
     }
 }

(4)有无状态

        无状态

(5)传递完整性

        · 默认情况下,如果没有订阅者,消息会被丢弃

        · 也可以通过持久化(Durable)订阅来实现消息的保存。这种情况下,当订阅者与 Provider 断开时,Provider 会为它存储消息。当持久化订阅者重新连接时,将会受到所有的断连期间未消费的消息。

3 JMS创建通用步骤

(1)获取连接工厂

(2)使用连接工厂创建连接

(3)启动连接

(4)从连接创建会话

(5)获取 Destination

(6)创建 Producer,或创建 message

(7)创建 Consumer,或发送或接收message发送或接收 message

(8)创建 Consumer

(9)注册消息监听器(可选)

(10)发送或接收 message

(11)关闭资源(connection, session, producer, consumer 等)

4 JMS消息可靠性

4.1 持久化

(1)配置

// 在队列为目的地的时候持久化消息

messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

// 队列为目的地的非持久化消息

messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

(2)概念

    持久化的消息,服务器宕机后消息依旧存在,只是没有入队,当服务器再次启动,消息任就会被消费。

    但是非持久化的消息,服务器宕机后消息永远丢失。

(3)默认情况

    · P2P模式:默认是持久化的

    · 发布订阅模式:默认是非持久化的

// 持久化生产者

        MessageProducer messageProducer = session.createProducer(topic);

        // 6 通过messageProducer 生产 3 条 消息发送到消息队列中

        // 设置持久化topic 在启动

        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

        connection.start();

        for (int i = 1; i < 4 ; i++) {

            // 7  创建字消息

            TextMessage textMessage = session.createTextMessage("topic_name--" + i);

            // 8  通过messageProducer发布消息

            messageProducer.send(textMessage);

            MapMessage mapMessage = session.createMapMessage();

            //    mapMessage.setString("k1","v1");

            //    messageProducer.send(mapMessage);

        }

        // 9 关闭资源

//  接收持久化消息的消费者 

Topic topic = session.createTopic(TOPIC_NAME);

TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");

 //  发布订阅

connection.start();

Message message = topicSubscriber.receive();// 一直等

while (null != message){

      TextMessage textMessage = (TextMessage)message;

       System.out.println(" 收到的持久化 topic :"+textMessage.getText());

       message = topicSubscriber.receive(3000L);    // 等1秒后meesage 为空,跳出循环,控制台关闭

}

4.2 事务

(1)开启事务

    createSession的第一个参数为true 为开启事务,开启事务之后必须在将消息提交,才可以在队列中看到消息

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

提交:session.commit();

(2)意义

    · 如果对于多条必须同批次传输的消息,可以使用事务,如果一条传输失败,可以将事务回滚,再次传输,保证数据的完整性。

    · 对于消息消费者来说,开启事务的话,可以避免消息被多次消费,以及后台和服务器数据的不一致性。

    · 如果消息消费的  createSession  设置为 ture  ,但是没有 commit ,此时就会造成非常严重的后果,那就是在后台看来消息已经被消费,但是对于服务器来说并没有接收到消息被消费,此时就有可能被多次消费。

4.3 Acknowledge 签收(ack)

(1)非事务的session

    · AUTO_ACKNOWLEDGE  自动签收(默认情况)

    · CLIENT_ACKNOWLEDGE  手动签收

        手动签收需要acknowledge,即textMessage.acknowledge();
    · DUPS_OK_ACKNOWLEDGE 非必须签收

(2) 而对于开启事务时,设置手动签收和自动签收没有多大的意义,都默认自动签收,也就是说事务的优先级更高一些。但是开启事务没有commit 仍就会重复消费

4.4 保证消息幂等性的方法

    幂等性即不能重复消费消息。

(1)使用手动签收。自动签收在高并发下可能存在重复消费

(2)数据库主键唯一性。消费一条数据往数据库里插入一条,若重复消费,数据库插入报错

(3)如果收到的数据是写redis,天然的幂等性,每次都是set

(4)使用全局唯一的MessageId

(5)日志记录

5 ActiveMQ作用

(1)解决耦合调用

(2)削峰,抵御洪峰流量

(3)异步模型

6 协议

6.1 支持的协议

    支持的协议有 TCP 、 UDP、NIO、SSL、HTTP(S) 、VM,可以在配置文件中配置

6.2 默认配置

        默认使用配置文件中name为"openwire"的协议,即TCP协议。

        默认的Broker 配置,TCP 的Client 监听端口 61616 ,在网络上传输数据,必须序列化数据,消息是通过一个 write protocol 来序列化为字节流。默认情况 ActiveMQ 会把 wire protocol 叫做 Open Wire ,它的目的是促使网络上的效率和数据快速交互 。

6.3 NIO 协议为ActiveMQ 提供更好的性能

(1)适合NIO 使用的场景:

    · 当有大量的Client 连接到Broker 上 , 使用NIO 比使用 tcp 需要更少的线程数量,所以使用 NIO

    · 可能对于 Broker 有一个很迟钝的网络传输, NIO 的性能高于 TCP

(2)连接形式:

        nio://hostname:port?key=value

(3)NIO 增强

    · auto 就像是一个网络协议的适配器,可以自动检测协议的类型,并作出匹配

<transportConnector name="auto" uri="auto://localhost:5671?auto.protocols=default,stomp"/>

7 可持久化

7.1 概念

    将MQ 收到的消息存储到文件、硬盘、数据库 等、 则叫MQ 的持久化,这样即使服务器宕机,消息在本地还是有,仍就可以访问到。

7.2 ActiveMQ 支持的消息持久化机制

    带赋值功能的 LeavelDB 、 KahaDB 、 AMQ 、 JDBC。

7.2.1 KahaDB

(1)概念

        ActiveMQ 5.3 版本起的默认存储方式。KahaDB存储是一个基于文件的快速存储消息,设计目标是易于使用且尽可能快。它使用基于文件的消息数据库意味着没有第三方数据库的先决条件。

(2)配置

<broker brokerName="broker" persistent="true" useShutdownHook="false">     <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>     </persistenceAdapter>
</broker>

(3)它使用一个事务日志和索引(B-Tree索引)文件来存储所有的地址

7.2.2 AMQ 

(1)概念

    · 文件存储形式,不依赖于第三方数据库,能够快速启动和运行

    · 写入快、易恢复 默认 32M 在 ActiveMQ 5.3 之后不再适用

    · AMQ 消息存储库是可靠持久性和高性能索引(B-Tree索引)的事务日志组合,当消息吞吐量是应用程序的主要需求时,该存储是最佳选择

    · 但因为它为每个索引使用两个分开的文件,并且每个 Destination 都有一个索引,所以当你打算在代理中使用数千个队列的时候,不应该使用它

(2)配置

<persistenceAdapter>
     <amqPersistenceAdapter
         directory="${activemq.data}/kahadb"
         syncOnWrite="true"
         indexPageSize="16kb"
         indexMaxBinSize="100"
         maxFileLength="10mb" />
</persistenceAdapter>

7.2.3 LeavelDB

(1)基于文件存储,不适用B-Tree索引,而使用自带的LeavelDB 索引

(2)可以和zookeeper结合,作为Master-Slave数据复制首选方案

7.2.4 JDBC

(1)有一部分数据会真实的存储到数据库中,使用JDBC 的持久化。

(2)ActiveMQ 启动后会自动在 mysql 的activemq 数据库下创建三张表:

    · activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存

    · activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker

    · activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中

8 ActiveMQ部署模式

8.1 单例模式

      acitvemq单机存储方式,以本地的kahadb文件的方式存储,所以性能完全依赖于本地的磁盘IO,所以不能提供高可用。

8.2 无共享主从模式

        主从节点分别存储 Message。

图 8-1 无共享主从模式 

(1)主节点

    · 当主节点收到持久消息,会等待从节点完成消息的处理(通常是持久化到存储),然后再自己完成消息的处理(如持久化到存储)后,再返回对 Producer 的回执。

    · 主节点只能有一个从节点,并且从节点不允许再有其他从节点

    · 当从节点没有连接上主节点之前,任何主节点处理的 Message 或者消息确认都会在主节点失效后丢失

(2)从节点

    · 从节点需要配置为连接到主节点,并且需要特殊配置其状态。

    · 所有消息命令(消息,确认,订阅,事务等)都从主节点复制到从节点,这种复制发生在主节点对其接收的任何命令生效之前。并且,

    · 从节点不启动任何传输,也不能接受任何客户端或网络连接,除非主节点失效。当主节点失效后,从节点自动成为主节点,并且开启传输并接受连接。这时,使用 failover 传输的客户端就会连接到该新主节点

(3)配置

    Broker 连接配置如下:

failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false

    从节点配置

<services>
     <masterConnector remoteURI="tcp://remotehost:62001" userName="Rob" password="Davies"/>
</services>

8.3 共享主从模式

    允许多个代理共享存储,但任意时刻只有一个是活动的。这种情况下,当主节点失效时,无需人工干预来维护应用的完整性。
    另外一个好处就是没有从节点数的限制。

    有两种细分模式:基于数据库和基于文件系统

(1)基于数据库

    它会获取一个表上的排它锁,以确保没有其他 ActiveMQ 代理可以同时访问数据库。其他未获得锁的代理则处于轮询状态,就会被当做是从节点,不会开启传输也不会接受连接。

图 8-2 基于数据库

(2)基于文件系统

    需要获取分布式共享文件锁,linux 系统下推荐用 GFS2。

图8-3 基于文件系统

8.4 Zookeeper+ActiveMQ

    具体搭建方法见:ActiveMQ集群的搭建(高可用) - 简书

图8-4 Zookeeper+ActiveMQ

(1)Zookeeper集群下,每个节点搭载一个ActiveMQ

(2)启动后只有一个ActiveMQ可以被访问,即为Master节点,其余节点成为从节点,不提供服务

(3)当 Master 宕机后,zookeper 监测到没有心跳信号, 则认为 master 宕机了,然后选举机制会从剩下的 Slave 中选出一个作为 新的 Master

(4)必须保证需要半数以上的MQ存活才能保证集群正常运行,即ActiveMQ的高可用依赖于Zookeeper的高可用。

9 高级特性

9.1 Failover机制 

    在项目上线后,需求方要求Agent可以在故障时自动切换到另一个MQ集群以实现容错,且在原集群正常后可以自动切换回来

    需要在建立连接时把randomize设置为false,然后加个priorityBackup=true,failover中配置上两个MQ集群的F5就OK。

9.2 异步投递

    对于一个慢消费者,使用同步有可能造成堵塞,消息消费较慢时适合用异步发送消息 。

(1)activemq 支持同步异步 发送的消息,默认异步。当你设定同步发送的方式和 未使用事务的情况下发持久化消息,这时是同步的。

· 同步发送和异步发送的区别就在于 :

      同步发送send 不阻塞就代表消息发送成功

      异步发送需要接收回执并又客户端在判断一次是否发送

(2)开启异步投递

// 开启异步投递

activeMQConnectionFactory.setUseAsyncSend(true);   

(3)不足

    在高性能要求下,可以使用异步提高producer 的性能。但会消耗较多的client 端内存,也不能完全保证消息发送成功。在 useAsyncSend = true 情况下容忍消息丢失。

(4)正确的异步发送方法需要接收回调

activeMQConnectionFactory.setUseAsyncSend(true);

    …… 

for (int i = 1; i < 4 ; i++) {

        textMessage = session.createTextMessage("msg--" + i);

        textMessage.setJMSMessageID(UUID.randomUUID().toString()+"--  orderr");

        String msgid = textMessage.getJMSMessageID();

        messageProducer.send(textMessage, new AsyncCallback() {

                @Override

                public void onSuccess() {

                    // 发送成功怎么样

                    System.out.println(msgid+"has been successful send ");

                }

                @Override

                public void onException(JMSException e) {

                    // 发送失败怎么样

                    System.out.println(msgid+" has been failure send ");

                }

            });

}   

9.3 定时投递与延时投递

(1)在配置文件中设置定时器开关 为 true

图 9-1 开启配置

(2)Java 代码中封装的辅助消息类型ScheduleMessage,可以设置的常用参数。

图 9-2 ScheduleMessage 常用参数

(3)demo

long delay = 3 * 1000 ;

long perid = 4 * 1000 ;

int repeat = 7 ;

for (int i = 1; i < 4 ; i++) {

    TextMessage textMessage = session.createTextMessage("delay msg--" + i);

    // 消息每过 3 秒投递,每 4 秒重复投递一次 ,一共重复投递 7 次

 textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);

 textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,perid);

 textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);

messageProducer.send(textMessage);

}

9.4 ActiveMQ消息重试

(1)重发情况

    · 使用事务会话并调用rollback()

    · client使用事务并在commit()前关闭会话或者没有commit()

    · 手动签收模式下,在session中调用recover()

    · 客户端连接超时

(2)一个消息被重发超过默认的最大重发次数(默认6次)时,消费端会给MQ发送一个“poison ack”,告诉broker不要再发了,此时broker会把消息放到DLQ(死信队列)

(3)修改DLQ设置

RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();

redeliveryPolicy.setMaximumRedeliveries(3);

activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);

10 Springboot整合ActiveMQ

10.1 依赖

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-activemq</artifactId>

</dependency>

10.2 配置文件

spring:

  activemq:

    broker-url: tcp://127.0.0.1:61616

    user: admin

    password: admin

queue: kmx.pas.job.sgtest

@Configuration

public class QueueConfig {

    @Value("${queue}")

    private String queue;

    @Bean

    public Queue logQueue() {

        return new ActiveMQQueue(queue);

    }

}

10.3 生产者

@Component

@EnableScheduling

public class Producer {

    @Autowired

    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired

    private Queue queue;

    @Scheduled(fixedDelay = 5000)

    public void send() {

        jmsMessagingTemplate.convertAndSend(queue, "测试消息队列" +         System.currentTimeMillis());

    }

}

10.4 消费者

@Component

public class Consumer {

    @JmsListener(destination = "${queue}")

    public void receive(String msg) {

        System.out.println("监听器收到msg:" + msg);

    }

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容