ActiveMQ结合Spring

先看application_mq.xml
1.配置ConnectionFactory

<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"/>

Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory的,真正产生到JMS服务器链接的ConnectionFactory还是由ActiveMQ提供,所以完整的ConnectionFactory配置如下,我们以SingleConnectionFactory为例:

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
    <property name="brokerURL" value="tcp://localhost:61616"/>  
</bean> 

<!-- Spring用于管理的ConnectionFactory的ConnectionFactory -->  
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
</bean>  

ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory可以用来将ConnectionSession和MessageProducer池化,这样可以大大的减少我们的资源消耗。当使用PooledConnectionFactory时,我们定义一个ConnectionFactory时如下:

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
    <property name="brokerURL" value="tcp://localhost:61616"/>  
</bean>  
  
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="connectionFactory" ref="targetConnectionFactory"/>
</bean>
  
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
    <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>  
</bean>  

2.配置Template和Destination

<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
    <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
    <constructor-arg ref="connectionFactory" />
    <property name="messageConverter" ref="messageConverter"></property>
    <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
    <property name="pubSubDomain" value="false" />
</bean>

<!--这个是队列目的地-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <!-- 设置消息队列的名字 -->
    <constructor-arg index="0" value="HelloWorldSpringQueue" />
</bean>

<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
    <constructor-arg ref="connectionFactory" />
    <property name="messageConverter" ref="messageConverter"></property>
    <!-- pub/sub模型(发布/订阅) -->
    <property name="pubSubDomain" value="true" />
</bean>

<!--这个是订阅目的地-->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <!-- 设置消息队列的名字 -->
    <constructor-arg index="0" value="HelloWorldSpringTopic" />
</bean>

3.配置生产者和消费者
我们在配置一个MessageListenerContainer的时候有三个属性必须指定,一个是表示从哪里监听的ConnectionFactory;一个是表示监听什么的Destination;一个是接收到消息以后进行消息处理的MessageListener。

<!--send-->
<bean id="queueProducer" class="com.zzhblh.activemq.producer.QueueProducer">
    <property name="jmsQueueTemplate" ref="jmsQueueTemplate"></property>
    <property name="queueDestination" ref="queueDestination"></property>
</bean>

<bean id="topicProducer" class="com.zzhblh.activemq.producer.TopicProducer">
    <property name="jmsTopicTemplate" ref="jmsTopicTemplate"></property>
    <property name="topicDestination" ref="topicDestination"></property>
</bean>

<!--receive-->
<!-- 注入我们自己写的继承了javax.jms.MessageListener的消息监听器-->
<bean id="consumerMessageListener" class="com.zzhblh.activemq.consumer.ConsumerMessageListener"/>

<!-- 消息监听容器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="queueDestination" />
    <property name="messageListener" ref="consumerMessageListener" />
    <property name="sessionAcknowledgeMode" value="1"/>//1代表AUTO_ACKNOWLEDGE 
</bean>
<!-- 消息监听容器2 -->
<bean id="jmsContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="topicDestination" />
    <property name="messageListener" ref="consumerMessageListener" />
    <property name="sessionAcknowledgeMode" value="1"/>//1代表AUTO_ACKNOWLEDGE 
</bean>

4.发送消息

@Service
public class QueueProducer {
    @Autowired
    JmsTemplate jmsQueueTemplate;
    @Autowired
    ActiveMQQueue queueDestination;

    public void sendMessages(final String text, final int i) throws JMSException {
            jmsQueueTemplate.send(queueDestination,new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    TextMessage message = session.createTextMessage(text);
                    message.setIntProperty("messageCount", i);
                    return message;
                }
            });
    }

    public JmsTemplate getJmsQueueTemplate() {
        return jmsQueueTemplate;
    }

    public void setJmsQueueTemplate(JmsTemplate jmsQueueTemplate) {
        this.jmsQueueTemplate = jmsQueueTemplate;
    }

    public ActiveMQQueue getQueueDestination() {
        return queueDestination;
    }

    public void setQueueDestination(ActiveMQQueue queueDestination) {
        this.queueDestination = queueDestination;
    }
}

5.接收消息

public class ConsumerMessageListener implements MessageListener {
    private NotifyMessageConverter messageConverter;

    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage
            TextMessage textMsg = (TextMessage) message;
            System.out.println("接收到一个纯文本消息。");
            try {
                System.out.println("消息内容是:" + textMsg.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
        } else if (message instanceof ObjectMessage) {
            ObjectMessage objMessage = (ObjectMessage) message;
            try {
                /*Object obj = objMessage.getObject();
                Email email = (Email) obj;
                Email email = (Email) messageConverter.fromMessage(objMessage);
                System.out.println("接收到一个ObjectMessage,包含Email对象。");
                System.out.println(email);*/
                messageConverter.fromMessage(message);
                System.out.println("接收到一个ObjectMessage");
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

6.消息转换器

public class NotifyMessageConverter implements MessageConverter {
    @Override
    public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
        return session.createObjectMessage((Serializable) object);
    }

    @Override
    public Object fromMessage(Message message) throws JMSException, MessageConversionException {
        ObjectMessage objMessage = (ObjectMessage) message;
        return objMessage.getObject();
    }
}

要点:
1.用activemq的connection构造spring用于管理的SingleConnectionFactory。用于发送/接收消息。
2.创建Destination,ActiveMQQueue或ActiveMQTopic。用于发送/接收消息。

3.用SingleConnectionFactory构造JmsTemplate。用于发送消息。
4.用JmsTemplate和Destination(ActiveMQQueue/ActiveMQTopic)构造QueueProducer/TopicProducer。用于发送消息。

5.创建MessageListener。用于接收消息。
6.构造ListenerContainer,用到ConnectionFactory,Destination,MessageListener。用于接收消息。


参考:
http://my.oschina.net/xiaoxishan/blog/381209
http://shengwangi.blogspot.com/2015/06/spring-jms-with-activemq-hello-world.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容