Springboot + ActiveMQ 消息队列小结

版本

Stringboot: 1.5.8.RELEASE
ActiveMQ: 5.15.4
JDK: 1.8
Maven: 3.3.9

概要(先来点干货)

本文主要关于消息队列发送与监听的一些学习使用小结,基于Springboot。

  1. Queue - Point-to-Point (点对点)
    一条消息只能被一个消费者消费, 且是持久化消息 - 当没有可用的消费者时,该消息保存直到被消费为止;当消息被消费者收到但不响应时(具体等待响应时间是多久,如何设置,暂时还没去了解),该消息会一直保留或会转到另一个消费者当有多个消费者的情况下。当一个Queue有多可用消费者时,可以在这些消费者中起到负载均衡的作用。
  2. Topic - Publisher/Subscriber Model (发布/订阅者)
    一条消息发布时,所有的订阅者都会收到,topic有2种模式,Nondurable subscription(非持久订阅)和durable subscription (持久化订阅 - 每个持久订阅者,都相当于一个持久化的queue的客户端), 默认是非持久订阅。
  • 持久化:消息产生后,会保存到文件/DB中,直到消息被消费, 如上述Queue的持久化消息。默认保存在ActiveMQ中:%ActiveMQ_Home%/data/kahadb
  • 非持久化:消息不会保存,若当下没有可用的消费者时,消息丢失。

下图引用自:
https://blog.csdn.net/yan69594281/article/details/72598313

image.png

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

配置

Springboot 自动配置方式:

spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin

若用Springboot这种方式去配置ActiveMQ,代码部署到JBOSS后,连不上ActiveMQ,不知道为什么,也没有去深究。若哪位大神知道,可否告知,非常感谢!
所以我就改用下面的方式来配置ActiveMQ:

activemq.url=tcp://localhost:61616
#cluster configuration
#activemq.url=failover:(tcp://host1:port1,tcp://host2:port2)?randomize=false
activemq.username=admin
activemq.password=admin

相应的Java代码如下,这种方式可以方便地配置多个ActiveMQ,下面的Demo只配置了一个:

@Configuration
@EnableAsync // enable asynchronous task
@EnableJms
public class JmsConfiguration {
    private Logger logger = LoggerFactory.getLogger(JmsConfiguration.class);
    
    @Bean(name = "firstConnectionFactory")
    public ActiveMQConnectionFactory getFirstConnectionFactory(@Value("${activemq.url}") String brokerUrl,
            @Value("${activemq.username}") String userName, @Value("${activemq.password}") String password)
    {
        logger.debug(brokerUrl + " - " + userName);
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(brokerUrl);
        connectionFactory.setUserName(userName);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean(name = "firstJmsTemplate")
    public JmsMessagingTemplate getFirstJmsTemplate(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
        JmsMessagingTemplate template = new JmsMessagingTemplate(connectionFactory);
        return template;
    }
    
    @Bean(name = "firstTopicListener")
    public DefaultJmsListenerContainerFactory getFirstTopicListener(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory)
    {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true); // if topic, set true
                // factory.setSessionAcknowledgeMode(4); // change acknowledge mode
        return factory;
    }

    @Bean(name = "firstQueueListener")
    public DefaultJmsListenerContainerFactory getFirstQueueListener(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory)
    {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
               // factory.setSessionAcknowledgeMode(4); // change acknowledge mode
        return factory;
    }
}

消息发送

  1. 配置
activemq.topic=T_Topic1
activemq.queue=Q_Queue1
activemq.virtual.topic=VirtualTopic.Topic1
activemq.virtual.topic.A=Consumer.A.VirtualTopic.Topic1
activemq.virtual.topic.B=Consumer.B.VirtualTopic.Topic1
  1. 实现
@Component
public class JmsProducer {

    @Autowired
    @Qualifier("firstJmsTemplate")
    private JmsMessagingTemplate jmsTemplate;
    
    @Value("${activemq.topic}")
    private String topic;
    
    @Value("${activemq.queue}")
    private String queue;

    @Value("${activemq.virtual.topic}")
    private String vTopic;
    
    public void sendMsg(Destination destination, Message msg) {
        jmsTemplate.convertAndSend(destination, msg);
    }

    /**
     * send msg to queue.
     * @param data
     */
    public void sendToQueue(Map<String, String> data) {
        ActiveMQQueue mqQueue = new ActiveMQQueue(queue);
        ActiveMQMessage msg = new ActiveMQMessage();
        try {
            msg.setStringProperty("value", data.get("value"));
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        sendMsg(mqQueue, msg);
    }

    /**
     * send msg to topic.
     * @param data
     */
    public void sendToTopic(Map<String, String> data) {
        ActiveMQTopic mqTopic = new ActiveMQTopic(topic);
        ActiveMQMessage msg = new ActiveMQMessage();
        try {
            msg.setStringProperty("value", data.get("value"));
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        sendMsg(mqTopic, msg);
    }

    /**
     * send msg to virtual topic.
     * @param data
     */
    public void sendToVTopic(Map<String, String> data) {
        ActiveMQTopic mqVTopic = new ActiveMQTopic(vTopic);
        ActiveMQMessage msg = new ActiveMQMessage();
        try {
            msg.setStringProperty("value", data.get("value"));
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        sendMsg(mqVTopic, msg);
    }
}

消息监听

  1. 实现
@Component
public class JmsConsumer {
    private Logger logger = LoggerFactory.getLogger(JmsConsumer.class);

    @JmsListener(destination = "${activemq.topic}", containerFactory = "firstTopicListener")
    @Async // receive msg asynchronously
    //@Async("taskExecutePool") 
    public void receiveTopic(Message msg) throws JMSException {
        logger.debug(Thread.currentThread().getName() + ": topic===========" + msg.getStringProperty("value"));
        try {
            Thread.sleep(1000L);
            // msg.acknowledge(); //消息确认
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        logger.debug(Thread.currentThread().getName() + ": topic===========" + msg.getStringProperty("value"));
    }
    
    @JmsListener(destination = "${activemq.queue}", containerFactory = "firstQueueListener")
    @Async
    public void receiveQueue(Message msg) throws JMSException {
        logger.debug(Thread.currentThread().getName() + ": Queue===========" + msg.getStringProperty("value"));
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        logger.debug(Thread.currentThread().getName() + ": Queue===========" + msg.getStringProperty("value"));
    }
    
    @JmsListener(destination = "${activemq.virtual.topic.A}", containerFactory = "firstQueueListener")
    @Async
    public void receiveVTopicA1(Message msg) throws JMSException {
        logger.debug(Thread.currentThread().getName() + ": vtopic A1===========" + msg.getStringProperty("value"));
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        logger.debug(Thread.currentThread().getName() + ": vtopic A1===========" + msg.getStringProperty("value"));
    }
    
    @JmsListener(destination = "${activemq.virtual.topic.A}", containerFactory = "firstQueueListener")
    @Async
    public void receiveVTopicA2(Message msg) throws JMSException {
        logger.debug(Thread.currentThread().getName() + ": vtopic A2===========" + msg.getStringProperty("value"));
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        logger.debug(Thread.currentThread().getName() + ": vtopic A2===========" + msg.getStringProperty("value"));
    }
    
    @JmsListener(destination = "${activemq.virtual.topic.B}", containerFactory = "firstQueueListener")
    @Async
    public void receiveVTopicB(Message msg) throws JMSException {
        logger.debug(Thread.currentThread().getName() + ": vtopic B===========" + msg.getStringProperty("value"));
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        logger.debug(Thread.currentThread().getName() + ": vtopic B===========" + msg.getStringProperty("value"));
    }
}

从上面代码可以看到,消费者并没有发送响应(消息确认)给ActiveMQ在监听到消息之后。这是因为SpringBoot在默认情况下,会自动发送消息确认给ActiveMQ.
SpringBoot可用下面这种方式进行改AcknowledgeMode:

spring.jms.listener.acknowledge-mode= # Acknowledge mode of the container. By default, the listener is transacted with automatic acknowledgment.

由于我上面不是用Springboot的配置自动注入ActiveMQ, 所以可以使用下面方法改AcknowledgeMode:

factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
JMS规范的ack消息确认机制有一下四种,定于在session对象中:
AUTO_ACKNOWLEDGE = 1 :自动确认
CLIENT_ACKNOWLEDGE = 2:客户端手动确认 
DUPS_OK_ACKNOWLEDGE = 3: 自动批量确认
SESSION_TRANSACTED = 0:事务提交并确认
但是在activemq补充了一个自定义的ACK模式:
INDIVIDUAL_ACKNOWLEDGE = 4:单条消息确认

但关于Spring下ActiveMQ的消息确认机制有一个问题,发现AcknowledgeMode设置成Session.CLIENT_ACKNOWLEDGE并没有什么用,还是会自动确认,以下是相关源码:

// org.springframework.jms.listener.AbstractMessageListenerContainer
protected void commitIfNecessary(Session session, Message message) throws JMSException {
        // Commit session or acknowledge message.
        if (session.getTransacted()) {
            // Commit necessary - but avoid commit call within a JTA transaction.
            if (isSessionLocallyTransacted(session)) {
                // Transacted session created by this container -> commit.
                JmsUtils.commitIfNecessary(session);
            }
        }
        else if (message != null && isClientAcknowledge(session)) {
            message.acknowledge();
        }
    }
protected boolean isClientAcknowledge(Session session) throws JMSException {
        return (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE);
}

由源码可知,设置sessionAcknowledgeMode为2时,虽然是客户端手动确认,但是却被spring自动确认了,造成设置无效。这时只需要把sessionAcknowledgeMode的值设置成activemq自定义的类型INDIVIDUAL_ACKNOWLEDGE = 4即可

关于消息自动确认,感谢这位大哥的帖子:
https://segmentfault.com/a/1190000008707181

虚拟Topic

个人觉得,虚拟Topic在分布式部署/集群中用处很大,最起码解决了我在现实工作中遇到的问题。

  1. 使用场景
    有一个application, 订阅一个Topic消息, 这个application部署到2台server上时(当然可以更多,这里以2台为例),2台server会同时监听这个Topic, 这样会导致一个消息重复消费2次,这不符合一些特定的应用场景。
    如果设置成Queue,能确保一个消息只被消费一次,但就不能有其他的application同时消费这个消息,也不符合一些特定的应该场景。
    为了达到在分布式部署的情况下,一个消息, 被同一个application只消费一次,同时也能被其他的application消费,就引入虚拟Topic. 这种方式可以说:在Topic消息层面上,实现负载均衡。
    先看代码:
activemq.virtual.topic=VirtualTopic.Topic1
activemq.virtual.topic.A=Consumer.A.VirtualTopic.Topic1
activemq.virtual.topic.B=Consumer.B.VirtualTopic.Topic1
@Value("${activemq.virtual.topic}")
private String vTopic;
// 消息发送
/**
 * send msg to virtual topic.
 * @param data
 */
public void sendToVTopic(Map<String, String> data) {
    ActiveMQTopic mqVTopic = new ActiveMQTopic(vTopic);
    ActiveMQMessage msg = new ActiveMQMessage();
    try {
        msg.setStringProperty("value", data.get("value"));
    } catch (JMSException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    sendMsg(mqVTopic, msg);
}

// 消息监听
@JmsListener(destination = "${activemq.virtual.topic.A}", containerFactory = "firstQueueListener")
@Async
public void receiveVTopicA1(Message msg) throws JMSException {
    logger.debug(Thread.currentThread().getName() + ": vtopic A1===========" + msg.getStringProperty("value"));
    try {
        Thread.sleep(500L);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    // logger.debug(Thread.currentThread().getName() + ": vtopic A1===========" + msg.getStringProperty("value"));
}

@JmsListener(destination = "${activemq.virtual.topic.A}", containerFactory = "firstQueueListener")
@Async
public void receiveVTopicA2(Message msg) throws JMSException {
    logger.debug(Thread.currentThread().getName() + ": vtopic A2===========" + msg.getStringProperty("value"));
    try {
        Thread.sleep(500L);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    // logger.debug(Thread.currentThread().getName() + ": vtopic A2===========" + msg.getStringProperty("value"));
}

@JmsListener(destination = "${activemq.virtual.topic.B}", containerFactory = "firstQueueListener")
@Async
public void receiveVTopicB(Message msg) throws JMSException {
    logger.debug(Thread.currentThread().getName() + ": vtopic B===========" + msg.getStringProperty("value"));
    try {
        Thread.sleep(500L);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    // logger.debug(Thread.currentThread().getName() + ": vtopic B===========" + msg.getStringProperty("value"));
}
  1. 分析
    从上面代码可以看出,有3个JmsListener监听:
    2个 Consumer.A.VirtualTopic.Topic1监听
    1个Consumer.B.VirtualTopic.Topic1监听
    看一下运行结果(共发5个消息):
    image.png

    A的2个监听者,不会重复监听同一条消息,A监听的同时,不会影响B的监听。
    从消息类型上来说:对于虚拟Topic, 对消息生产者来说,是一个普通的topic;对于消息消费者来说,是一个queue。
    从使用意义上来说,对于虚拟Topic,它会对消费者进行分组,同一组的,同一个消息只会被消费一次。比如上面例子的Consumer.A。
    虚拟Topic命名规范
    Topic命名: VirtualTopic.xxx
    消费者命名: Consumer.yyy.VirtualTopic.xxx

关于虚拟Topic, 感谢这2位大哥:
http://blog.sina.com.cn/s/blog_7d1968e20102wyq0.html
https://www.cnblogs.com/jiangxiaoyaoblog/p/5659734.html

Junit Test

顺便提一下,在使用Junit同时测试消息发送与接收的时候,有可能会接收不到一些消息。如使用for循环发送10个消息,可能只会接收到5个(特别是同步的情况下,异步的没有充分测试所以不清楚异步时会不会有同样的情况)。
原因:当Junit发送完10个消息后,Spring服务就停掉了,导致有些还没来得及接收的消息就接收不到了。也就是说,Spring服务停了,导致Spring中的ActiveMQ客户端也停掉了,导致没法继续监听消息。

@SpringBootTest
@RunWith(SpringRunner.class)
@EnableAsync
public class JmsTest {
    @Autowired
    private JmsProducer producer;
    private Logger logger = LoggerFactory.getLogger(JmsTest.class);

    @Test
    public void sendMsg() {
        for (int i = 0; i < 10; i++) {
            Map<String, String> map = new HashMap<String, String>();
            map.put("value", "value = " + i);
            producer.sendToTopic(map);
//          producer.sendToQueue(map);
//          producer.sendToVTopic(map);
        }
    }

若有错误和补充,请多多指教!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容

  • 简介 ActiveMQ 特点 ActiveMQ 是由 Apache 出品的一款开源消息中间件,旨在为应用程序提供高...
    预流阅读 5,911评论 4 21
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,638评论 18 139
  • ActiveMQ 简介:ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ ...
    CoderZS阅读 2,666评论 0 23
  • Spring Boot 参考指南 介绍 转载自:https://www.gitbook.com/book/qbgb...
    毛宇鹏阅读 46,778评论 6 342
  • 此《长恨歌》非彼《长恨歌》,不是李白的“在天愿作比翼鸟,在地愿为连理枝”,也不是冒险小说家驰星周的《长恨歌》。 这...
    白洛嘉阅读 1,751评论 0 7