spring整合activemq

1.linux安装activemq

本例使用docker pull的activemq的镜像,并没有安装,
安装完成之后通过8161端口访问,输入用户名密码(admin),即可访问activemq的管理界面


image.png

2.新建一个maven项目

这是一个ssm项目。pom如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>jk.zmn</groupId>
  <artifactId>spring-activemq</artifactId>
  <packaging>war</packaging>
  <version>0.0.1-SNAPSHOT</version>
  <name>spring-activemq Maven Webapp</name>
  <url>http://maven.apache.org</url>
  
    <properties>
    <spring.version>4.0.5.RELEASE</spring.version>
    <mybatis.version>3.2.1</mybatis.version>
    <slf4j.version>1.6.6</slf4j.version>
    <log4j.version>1.2.12</log4j.version>
    <mysql.version>5.1.35</mysql.version>
    <jackjson.version>2.8.8</jackjson.version>
    <activemq.version>5.11.2</activemq.version>
  </properties>
  <dependencies>
  <!-- 添加Spring依赖 -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-aop</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-aspects</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-tx</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jdbc</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-web</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <!--spring单元测试依赖 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
            <scope>test</scope>
        </dependency>
 
  <!-- spring webmvc相关jar -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>${spring.version}</version>
        </dependency>
  
  <!-- mysql驱动包 -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.version}</version>
    </dependency>
    
    <!-- alibaba data source 相关jar包-->
     <dependency>
         <groupId>com.alibaba</groupId>
         <artifactId>druid</artifactId>
         <version>0.2.23</version>
     </dependency>
     
    
     <!-- logback start -->
  <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>${log4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.1.2</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-core</artifactId>
        <version>1.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.logback-extensions</groupId>
        <artifactId>logback-ext-spring</artifactId>
        <version>0.1.1</version>
    </dependency>

    
     
    <!--mybatis依赖 -->
    <dependency>
        <groupId>org.mybatis</groupId>
        <artifactId>mybatis</artifactId>
        <version>${mybatis.version}</version>
    </dependency>

    <!-- mybatis/spring包 -->
    <dependency>
        <groupId>org.mybatis</groupId>
        <artifactId>mybatis-spring</artifactId>
        <version>1.2.0</version>
    </dependency>
  <!-- 添加servlet3.0核心包 -->
          <dependency>
              <groupId>javax.servlet</groupId>
              <artifactId>javax.servlet-api</artifactId>
              <version>3.0.1</version>
          </dependency>
          <dependency>
              <groupId>javax.servlet.jsp</groupId>
              <artifactId>javax.servlet.jsp-api</artifactId>
              <version>2.3.2-b01</version>
          </dependency>
          <!-- jstl -->
          <dependency>
              <groupId>javax.servlet</groupId>
              <artifactId>jstl</artifactId>
              <version>1.2</version>
          </dependency>
    <!--单元测试依赖 -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
    </dependency>
    
    <dependency>
         <groupId>com.github.pagehelper</groupId>
         <artifactId>pagehelper</artifactId>
         <version>4.1.4</version>
     </dependency>

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
       <version>${jackjson.version}</version>
    </dependency>
    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
       <version>${jackjson.version}</version>
    </dependency>
    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
       <version>${jackjson.version}</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>${activemq.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
        <version>${spring.version}</version>
    </dependency>
  </dependencies>
  
  
  
  <build>
    <finalName>spring-activemq</finalName>
  </build>
</project>

1.非整合spring的单机版

1.queue形式

内容提供者

    @Test
    public void testQueueMqProducter() throws Exception{
        //1.创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");
        //2.创建链接
        Connection connection = factory.createConnection();
        //3.开启连接
        connection.start();
        //4.创建一个session对象
        /**
         * 第一个参数,是否开启事务,如果开启 后面一个参数无意义
         * 第二个参数,应答模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建一个目的地
        Queue testQueue = session.createQueue("testQueue");
        //6.创建一个内容提供者
        MessageProducer producter = session.createProducer(testQueue);
        //7.发送消息
        TextMessage QMessage = new ActiveMQTextMessage();
        QMessage.setText("我是队列信息,要减库存了");
        producter.send(QMessage);
        //8.关闭资源
        session.close();
        connection.close();
        
        
    }

消费者


    @Test
    public void testQueueMqConsumer() throws Exception{
        //1.创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");
        //2.创建链接
        Connection connection = factory.createConnection();
        //3.开启连接
        connection.start();
        //4.创建一个session对象
        /**
         * 第一个参数,是否开启事务,如果开启 后面一个参数无意义
         * 第二个参数,应答模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建一个目的地
        Queue testQueue = session.createQueue("testQueue");
        //6.创建一个消费者
         MessageConsumer consumer = session.createConsumer(testQueue);
        //7.发送消息
         TextMessage receive = (TextMessage) consumer.receive();
         System.out.println(receive.getText());
        //8.关闭资源
        session.close();
        connection.close();
        
        
    }

效果如下

image.png

2. 发布订阅模式

发布者

@Test
    public void testTopicMqProducter() throws Exception{
        //1.创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");
        //2.创建链接
        Connection connection = factory.createConnection();
        //3.开启连接
        connection.start();
        //4.创建一个session对象
        /**
         * 第一个参数,是否开启事务,如果开启 后面一个参数无意义
         * 第二个参数,应答模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建一个目的地
        Topic createTopic = session.createTopic("testTopic");
        //6.创建一个内容提供者
        MessageProducer producter = session.createProducer(createTopic);
        //7.发送消息
        TextMessage QMessage = new ActiveMQTextMessage();
        QMessage.setText("我是发布信息,张三抢到了手机11,rsad");
        producter.send(QMessage);
        //8.关闭资源
        session.close();
        connection.close();
        
        
    }

订阅者

@Test
    public void testTopicMqConsumer() throws Exception{
        //1.创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");
        //2.创建链接
        Connection connection = factory.createConnection();
        //3.开启连接
        connection.start();
        //4.创建一个session对象
        /**
         * 第一个参数,是否开启事务,如果开启 后面一个参数无意义
         * 第二个参数,应答模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建一个目的地
        Topic createTopic = session.createTopic("testTopic");
        //6.创建一个内容提供者
        MessageConsumer consumer = session.createConsumer(createTopic);
        
        //获取数据
        consumer.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                TextMessage message2 = (TextMessage) message;
                try {
                    System.out.println(message2.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        System.out.println("消费者1启动");
        System.in.read();
        //8.关闭资源
        session.close();
        connection.close();
        
        
    }
    
    @Test
    public void testTopicMqConsumer2() throws Exception{
        //1.创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");
        //2.创建链接
        Connection connection = factory.createConnection();
        //3.开启连接
        connection.start();
        //4.创建一个session对象
        /**
         * 第一个参数,是否开启事务,如果开启 后面一个参数无意义
         * 第二个参数,应答模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建一个目的地
        Topic createTopic = session.createTopic("testTopic");
        //6.创建一个内容提供者
        MessageConsumer consumer = session.createConsumer(createTopic);
        
        //获取数据
        consumer.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                TextMessage message2 = (TextMessage) message;
                try {
                    System.out.println(message2.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        System.out.println("消费者2启动");
        System.in.read();
        //8.关闭资源
        session.close();
        connection.close();
        
        
    }
    

效果如下


image.png

2 整合spring

spring-activemq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
    
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://www.itzmn.com:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    
    <!-- 配置生产者 -->
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    
    
    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--这个是主题目的地,一对多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="springTopic" />
    </bean>
    <!-- 消息监听 -->
    <bean id="myMessageListener" class="jk.zmn.activemq.listener.MyMessageListener"/>
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>
    
    <!-- 消息监听 -->
    <bean id="myTopicMessageListener" class="jk.zmn.activemq.listener.MyTopicMessageListener"/>
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="topicDestination" />
        <property name="messageListener" ref="myTopicMessageListener" />
    </bean>
    
    <!-- 消息监听 -->
    <bean id="myTopicMessageListener2" class="jk.zmn.activemq.listener.MyTopicMessageListener2"/>
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="topicDestination" />
        <property name="messageListener" ref="myTopicMessageListener2" />
    </bean>
    
    
    
</beans>

queue模式

内容提供者

@Test
    public void testSpringActiveMqProducter() {
        ApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext-activemq.xml");
        JmsTemplate jmsTemplate = classPathXmlApplicationContext.getBean(JmsTemplate.class);
        Destination destination = (Destination) classPathXmlApplicationContext.getBean("queueDestination");
        jmsTemplate.send(destination,new MessageCreator() {
            
            @Override
            public Message createMessage(Session session) throws JMSException {
                
                return session.createTextMessage("生意来了,张三购买商品");
            }
        });
    }

这个消费者,要实现messagelistener的接口

public class MyMessageListener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        TextMessage message2 = (TextMessage) message;
        try {
            System.out.println(message2.getText());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

发布订阅模式

发布者

@Test
    public void testSpringActiveMqTopicProducter() {
        ApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext-activemq.xml");
        JmsTemplate jmsTemplate = classPathXmlApplicationContext.getBean(JmsTemplate.class);
        Destination destination = (Destination) classPathXmlApplicationContext.getBean("topicDestination");
        jmsTemplate.send(destination, new MessageCreator() {
            
            @Override
            public Message createMessage(Session session) throws JMSException {
                
                return session.createTextMessage("又来生意啦,李四要购买手机");
            }
        });
    }

订阅者

public class MyTopicMessageListener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            TextMessage message2 = (TextMessage) message;
            System.out.println(message2.getText());
            System.out.println("我去减库存");
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}
public class MyTopicMessageListener2 implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            TextMessage message2 = (TextMessage) message;
            System.out.println(message2.getText());
            System.out.println("我去生成订单");
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }

}

代码太乱,实验的话,请到码云下载,

群号:552113611

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,542评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,822评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,912评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,449评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,500评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,370评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,193评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,074评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,505评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,722评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,841评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,569评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,168评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,783评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,918评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,962评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,781评论 2 354

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,656评论 18 139
  • 关于jms概念以及activemq这里不做具体赘述,activemq是apache的一款项目,介绍里号称是最方便最...
    荒唐的程序猿阅读 863评论 0 2
  • ActiveMQ 即时通讯服务 浅析http://www.cnblogs.com/hoojo/p/active_m...
    bboymonk阅读 1,488评论 0 11
  • 我刚坐下,顿时感觉隔壁桌气氛不大对头。 看样子是一对情侣,但此时男孩女孩没有了卿卿我我,女孩一直低着头用吸管戳着可...
    不是猩猩阅读 1,226评论 8 20
  • 爱无所谓好坏,真实就好 《我的前半生》终于演完,结局还算完美。老卓带洛洛兜了次风,然后去还情债;凌玲向董事会坦白了...
    成都老王阅读 360评论 0 1