ActiveMQ学习笔记

一、ActiveMQ简介
1.什么是ActiveMQ

ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1 和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

2.什么是消息

“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串,也可以更复杂,可以包含嵌入对象。

3.什么是队列

是一种有序的,先进先出的数据结构,例如:生活中的排队

4.什么是消息队列

“消息队列”是在消息的传输过程中保存消息的容器

5.常见消息服务应用
  • ActiveMQ
  • RabbitMQ
  • RocketMQ
二、消息服务的应用场景

消息队列的特点主要是异步处理,主要作用是减少消息请求和响应的时间以及解耦。所以主要用于比较耗时并且不需要即时(同步)返回结果的操作。

image.png

2.1 异步处理
2.1.1 用户注册
用户注册流程:
  • 注册处理及写入数据库
  • 发送注册成功的手机短信
  • 发送注册成功的邮件信息

如果使用消息中间件,则可以创建两个线程来做这些事情,直接发送消息给消息中间件,然后让邮件服务和短信服务去消息中间件中取消息,取到消息后自己再做对应的操作。

2.2 应用的解耦
2.2.1 订单处理
生成订单流程:
  • 在购物车中点击结算
  • 完成支付
  • 创建订单
  • 调用库存系统

订单完成后,订单系统不用直接取调用库存系统,而是发送消息到消息中间件,写入一个订单信息。库存系统自己去消息中间件中获取,然后做发货处理,并更新库存。

2.3流量的削峰
2.3.1 秒杀功能
秒杀流程
  • 用户点击秒杀
  • 发送请求到秒杀应用
  • 在请求秒杀应用之前将请求放入到消息队列
  • 秒杀应用从消息队列中获取请求并处理

系统举行秒杀活动,流量蜂拥而至100件商品,10万人挤进来怎么办?
将10万秒杀的操作,放入消息队列。秒杀应用将10万个请求中的前100个进行处理,其它的驳回通知失败。这样将流量控制在了消息队列处。秒杀应用不会被怼死。

三、JMS
1.什么是JMS

JMS(Java Message Service)是Java平台上面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且提供标准的产生、发送、接收消息的接口,简化企业应用的开发。

2.JMS模型
2.1 点对点模型(Point To Point)

生产者发送一条消息到queue,只有一个消费者能收到。


image.png
2.2 发布订阅模型(Publish/Subscribe)

发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。


image.png
四、ActiveMQ安装
1.下载资源

ActiveMQ官网:http://activemq.apache.org

1.1 版本说明

ActiveMQ5.10.x 以上版本必须使用 JDK1.8 才能正常使用。
ActiveMQ5.9.x 及以下版本使用 JDK1.7 即可正常使用。

2.上传至Linux服务器
3.解压安装文件
tar -zxf apache-activemq-5.9.0-bin.tar.gz
4.检查权限
ls -al apache-activemq-5.9.0/bin

如果权限不足,则无法执行,需要修改文件权限:

chmod 755 activemq
5.复制应用至本地目录
cp apache-activemq-5.9.0 /usr/local/activemq -r
6.启动ActiveMQ
/usr/local/activemq/bin/activemq start
7.测试ActiveMQ
7.1检查进程
ps aux|grep activemq
7.2管理界面

使用浏览器访问ActiveMQ管理应用,地址如下:
http://ip:8161/admin/
用户名:admin
密码:admin
AcitveMQ使用的是Jetty提供的HTTP服务。启动稍慢,建议短暂等待再访问测试。
见到如下界面代表服务启动成功

image.png

7.3 修改访问端口(管理应用监听的端口)

修改ActiveMQ配置文件:

/usr/local/activemq/conf/jetty.xml
image.png
7.4 修改用户名和密码

修改conf/users.properties配置文件,内容为:用户名=密码
保存并启动ActiveMQ服务即可。


image.png
8.重启ActiveMQ
/usr/local/activemq/bin/activemq restart
9.关闭ActiveMQ
/usr/local/activemq/bin/activemq stop
10.配置文件activemq.xml

配置文件中,配置的是ActiveMQ的核心配置信息,是提供服务时使用的配置,可以修改启动的访问端口,即Java编程中访问ActiveMQ的访问端口


image.png

默认端口:61616(编程时使用的端口)
使用协议:TCP协议
修改端口后,保存并重启ActiveMQ服务即可

11.ActiveMQ目录介绍

bin:可执行的脚本文件
conf:相关的配置文件
data:存放的是日志文件
docs:存放的是相关文档
examples:存放的是简单的实例
lib:相关的jar包
webapps:用于存放项目的目录

五、ActiveMQ术语
1.Destination

目的地,JMS Provider(消息中间件)负责维护,用于对Message进行管理的对象。MessageProducer需要指定Destination才能发送消息,MessageReceiver需要指定Destination才能接收消息

2.Producer

消息生成者,负责发送Message到目的地。

3.Consumer|Receiver

消息消费者,负责从目的地中消费(处理/监听/订阅)Message

4.Message

消息,用于封装一次通信的内容

六、ActiveMQ应用
1.ActiveMQ常用API简介

下述API都是接口类型,定义在javax.jms包中

1.1 ConnectionFactory

连接工厂:用于创建连接的工厂类型

1.2 Connection

连接:用于建立访问ActiveMQ连接的类型,由连接工厂创建

1.3 Session

会话:一次持久、有效、有状态的访问,由连接创建

1.4 Destination & Queue

目的地:用于描述本次访问ActiveMQ的消息访问目的地,即ActiveMQ服务中的具体队列,由会话创建
Interface Queue extends Destination

1.5 MessageProducer

消息生产者:在一次有效会话中,用于发送消息给ActiveMQ的服务工具,由会话创建

1.6 MessageConsumer

消息消费者:在一次有效会话中,用于从ActiveMQ中获取消息的工具,由会话创建

1.7 Message

消息:通过消息生产者向ActiveMQ服务发送消息时使用的数据载体对象或消息消费者从ActiveMQ服务中获取消息时使用的数据载体对象,是所有消息(文本消息、对象消息等)具体类型的顶级接口,可以通过会话创建或通过会话从ActiveMQ服务中获取

2.JMS-HelloWorld
2.1 处理文本消息
2.1.1 创建消息生产者
创建工程

mq-producer

添加坐标
    <!--activeMQ-->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>${activemq-all.version}</version>
    </dependency>
编写消息生产者
public class HelloWorldProducer {

    public void sendHelloWorldActiveMQ(String msgText){
        //定义连接工厂
        ConnectionFactory connectionFactory = null;

        //定义连接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //目的地
        Destination destination = null;

        //消息生产者
        MessageProducer producer = null;

        //定义消息
        Message message = null;

        try {
            //传入的用户名和密码可以通过jetty-realm.properties文件修改
            //brokerURL:访问activeMQ的连接地址,路径结构为:协议://主机地址:端口号
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");

            //创建连接对象
            connection = connectionFactory.createConnection();

            //启动连接(此时才是真正创建连接)
            connection.start();

            /**
             * 创建会话
             * transacted:是否使用事务,可选值为true,false
             *              true:使用事务,设置此变量值,Session.SESSION.TRANSACTION
             *              false:不使用事务,设置此变量 则acknowledgeMode必须设置
             * acknowledgeMode:
             * Session.AUTO_ACKNOWLEDGE:自动确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认机制(需手动调用API)
             * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认机制(前两种一旦收到消息确认就会进行删除,这个则不会)
             */
            session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);

            //创建目的地,即队列的名称,消息消费者需要通过此名称访问对应的队列
            destination = session.createQueue("helloworld-destination");

            //创建消息生产者
            producer = session.createProducer(destination);

            //创建消息对象
            message = session.createTextMessage(msgText);

            //发送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            //回收消息发送者资源
            if (producer != null){
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (session != null){
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
2.1.2 创建消息消费者
创建工程

mq-consumer

添加坐标
<!--activeMQ-->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
</dependency>
创建消息生产者
public class HelloWorldConsumer {

    public void receiveHelloWorldActiveMQ() {
        //定义连接工厂
        ConnectionFactory connectionFactory = null;
        //定义连接
        Connection connection = null;
        //定义会话
        Session session = null;
        //定义目的地
        Destination destination = null;
        //定义消息消费者
        MessageConsumer consumer = null;
        //定义消息
        Message message = null;

        try {
            //创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
            //创建连接对象
            connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //创建会话
            session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            //创建目的地
            destination = session.createQueue("helloworld-destination");
            //创建消息消费者
            consumer = session.createConsumer(destination);
            //接收消息
            message = consumer.receive();

            //获取文本消息
            String msg = ((TextMessage) message).getText();
            System.out.println("从ActiveMQ中获取的文本信息:" + msg);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
2.1.3 测试
测试生产者
public class Test {
    public static void main(String[] args) {
        HelloWorldProducer producer = new HelloWorldProducer();
        producer.sendHelloWorldActiveMQ("HelloWorld");
    }
}
测试消费者
public class Test {
    public static void main(String[] args) {
        HelloWorldConsumer consumer = new HelloWorldConsumer();
        consumer.receiveHelloWorldActiveMQ();
    }
}
image.png
2.2 处理对象消息
2.2.1 创建对象
public class User implements Serializable {

    private Integer userId;
    private String userName;
    private Integer userAge;

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public Integer getUserAge() {
        return userAge;
    }

    public void setUserAge(Integer userAge) {
        this.userAge = userAge;
    }

    @Override
    public String toString() {
        return "User{" +
                "userId=" + userId +
                ", userName='" + userName + '\'' +
                ", userAge=" + userAge +
                '}';
    }
}
2.2.2 创建生产者
public class HelloWorldProducer2 {

    public void sendHelloWorldActiveMQ(User user){
        //定义连接工厂
        ConnectionFactory connectionFactory = null;

        //定义连接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //目的地
        Destination destination = null;

        //消息生产者
        MessageProducer producer = null;

        //定义消息
        Message message = null;

        try {
            //传入的用户名和密码可以通过jetty-realm.properties文件修改
            //brokerURL:访问activeMQ的连接地址,路径结构为:协议://主机地址:端口号
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");

            //创建连接对象
            connection = connectionFactory.createConnection();

            //启动连接(此时才是真正创建连接)
            connection.start();

            /**
             * 创建会话
             * transacted:是否使用事务,可选值为true,false
             *              true:使用事务,设置此变量值,Session.SESSION.TRANSACTION
             *              false:不使用事务,设置此变量 则acknowledgeMode必须设置
             * acknowledgeMode:
             * Session.AUTO_ACKNOWLEDGE:自动确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认机制(需手动调用API)
             * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认机制(前两种一旦收到消息确认就会进行删除,这个则不会)
             */
            session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);

            //创建目的地,即队列的名称,消息消费者需要通过此名称访问对应的队列
            destination = session.createQueue("my-user");

            //创建消息生产者
            producer = session.createProducer(destination);

            //创建消息对象
            message = session.createObjectMessage(user);

            //发送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            //回收消息发送者资源
            if (producer != null){
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (session != null){
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
2.2.3 创建消费者
public class HelloWorldConsumer2 {

    public void receiveHelloWorldActiveMQ() {
        //定义连接工厂
        ConnectionFactory connectionFactory = null;
        //定义连接
        Connection connection = null;
        //定义会话
        Session session = null;
        //定义目的地
        Destination destination = null;
        //定义消息消费者
        MessageConsumer consumer = null;
        //定义消息
        Message message = null;

        try {
            //创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
            //创建连接对象
            connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //创建会话
            session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            //创建目的地
            destination = session.createQueue("my-user");
            //创建消息消费者
            consumer = session.createConsumer(destination);
            //接收消息
            message = consumer.receive();

            Serializable obj = ((ObjectMessage) message).getObject();

            User user = (User) obj;
            System.out.println("从ActiveMQ中获取的对象信息:" + user);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
2.2.4 测试
public class Test {
    public static void main(String[] args) {
        /*HelloWorldProducer producer = new HelloWorldProducer();
        producer.sendHelloWorldActiveMQ("HelloWorld");*/

        HelloWorldProducer2 producer2 = new HelloWorldProducer2();
        producer2.sendHelloWorldActiveMQ(new User(1, "tom", 21));
    }
}
public class Test {
    public static void main(String[] args) {
        /*HelloWorldConsumer consumer = new HelloWorldConsumer();
        consumer.receiveHelloWorldActiveMQ();*/

        HelloWorldConsumer2 consumer2 = new HelloWorldConsumer2();
        consumer2.receiveHelloWorldActiveMQ();
    }
}
image.png
3.JMS-实现队列服务监听
队列监听使用了观察者模式
3.1 创建消息生产者
public class HelloWorldProducer3 {

    public void sendHelloWorldActiveMQ(User user){
        //定义连接工厂
        ConnectionFactory connectionFactory = null;

        //定义连接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //目的地
        Destination destination = null;

        //消息生产者
        MessageProducer producer = null;

        //定义消息
        Message message = null;

        try {
            //传入的用户名和密码可以通过jetty-realm.properties文件修改
            //brokerURL:访问activeMQ的连接地址,路径结构为:协议://主机地址:端口号
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");

            //创建连接对象
            connection = connectionFactory.createConnection();

            //启动连接(此时才是真正创建连接)
            connection.start();

            /**
             * 创建会话
             * transacted:是否使用事务,可选值为true,false
             *              true:使用事务,设置此变量值,Session.SESSION.TRANSACTION
             *              false:不使用事务,设置此变量 则acknowledgeMode必须设置
             * acknowledgeMode:
             * Session.AUTO_ACKNOWLEDGE:自动确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认机制(需手动调用API)
             * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认机制(前两种一旦收到消息确认就会进行删除,这个则不会)
             */
            session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);

            //创建目的地,即队列的名称,消息消费者需要通过此名称访问对应的队列
            destination = session.createQueue("my-destination");

            //创建消息生产者
            producer = session.createProducer(destination);

            //创建消息对象
            message = session.createObjectMessage(user);

            //发送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            //回收消息发送者资源
            if (producer != null){
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (session != null){
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
3.2 创建消息消费者
public class HelloWorldConsumer3 {

    public void receiveHelloWorldActiveMQ() {
        //定义连接工厂
        ConnectionFactory connectionFactory = null;
        //定义连接
        Connection connection = null;
        //定义会话
        Session session = null;
        //定义目的地
        Destination destination = null;
        //定义消息消费者
        MessageConsumer consumer = null;
        //定义消息
        Message message = null;

        try {
            //创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
            //创建连接对象
            connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //创建会话
            session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            //创建目的地
            destination = session.createQueue("my-destination");
            //创建消息消费者
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {

                //ActiveMQ的回调方法,通过该方法将消息传递到consumer中
                @Override
                public void onMessage(Message message) {
                    Serializable obj = null;
                    try {
                        obj = ((ObjectMessage) message).getObject();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }

                    User user = (User) obj;
                    System.out.println("从ActiveMQ中获取的对象信息:" + user);
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
4.Topic模型
4.1 Publish/Subscribe 处理模式(Topic)

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。
和点对点方式不同,发布到topic的消息会被所有订阅者消费
当生产者发布消息,不管是否有消费者,都不会保存消息
一定要先有消息的消费者,后有消息生产者

image.png

4.2 创建消息生产者
public class HelloWorldProducerTopic {

    public void sendHelloWorldActiveMQ(String msgText){
        //定义连接工厂
        ConnectionFactory connectionFactory = null;

        //定义连接对象
        Connection connection = null;

        //定义会话
        Session session = null;

        //目的地
        Destination destination = null;

        //消息生产者
        MessageProducer producer = null;

        //定义消息
        Message message = null;

        try {
            //传入的用户名和密码可以通过jetty-realm.properties文件修改
            //brokerURL:访问activeMQ的连接地址,路径结构为:协议://主机地址:端口号
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");

            //创建连接对象
            connection = connectionFactory.createConnection();

            //启动连接(此时才是真正创建连接)
            connection.start();

            /**
             * 创建会话
             * transacted:是否使用事务,可选值为true,false
             *              true:使用事务,设置此变量值,Session.SESSION.TRANSACTION
             *              false:不使用事务,设置此变量 则acknowledgeMode必须设置
             * acknowledgeMode:
             * Session.AUTO_ACKNOWLEDGE:自动确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认机制(需手动调用API)
             * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认机制(前两种一旦收到消息确认就会进行删除,这个则不会)
             */
            session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);

            //创建目的地,即队列的名称,消息消费者需要通过此名称访问对应的队列
            destination = session.createTopic("test-topic");

            //创建消息生产者
            producer = session.createProducer(destination);

            //创建消息对象
            message = session.createTextMessage(msgText);

            //发送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            //回收消息发送者资源
            if (producer != null){
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (session != null){
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
4.3 创建消息消费者

创建三份

public class HelloWorldConsumerTopic1 implements Runnable{

    public void receiveHelloWorldActiveMQ() {
        //定义连接工厂
        ConnectionFactory connectionFactory = null;
        //定义连接
        Connection connection = null;
        //定义会话
        Session session = null;
        //定义目的地
        Destination destination = null;
        //定义消息消费者
        MessageConsumer consumer = null;
        //定义消息
        Message message = null;

        try {
            //创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
            //创建连接对象
            connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //创建会话
            session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            //创建目的地
            destination = session.createTopic("test-topic");
            //创建消息消费者
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    //获取文本消息
                    String msg = null;
                    try {
                        msg = ((TextMessage) message).getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    System.out.println("从ActiveMQ中获取的文本信息----topic1:" + msg);
                }
            });

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        receiveHelloWorldActiveMQ();
    }
}
4.4 测试
public class Test {
    public static void main(String[] args) {
        /*HelloWorldProducer producer = new HelloWorldProducer();
        producer.sendHelloWorldActiveMQ("HelloWorld");*/

        /*HelloWorldProducer2 producer2 = new HelloWorldProducer2();
        producer2.sendHelloWorldActiveMQ(new User(1, "tom", 21));*/

        /*HelloWorldProducer3 producer3 = new HelloWorldProducer3();
        producer3.sendHelloWorldActiveMQ(new User(2,"alice",19));*/

        HelloWorldProducerTopic topic = new HelloWorldProducerTopic();
        topic.sendHelloWorldActiveMQ("Hello Topic");
    }
}
public class Test {
    public static void main(String[] args) {
        /*HelloWorldConsumer consumer = new HelloWorldConsumer();
        consumer.receiveHelloWorldActiveMQ();*/

        /*HelloWorldConsumer2 consumer2 = new HelloWorldConsumer2();
        consumer2.receiveHelloWorldActiveMQ();*/

        /*HelloWorldConsumer3 consumer3 = new HelloWorldConsumer3();
        consumer3.receiveHelloWorldActiveMQ();*/

        HelloWorldConsumerTopic1 topic1 = new HelloWorldConsumerTopic1();
        Thread thread1 = new Thread(topic1);
        thread1.start();

        HelloWorldConsumerTopic2 topic2 = new HelloWorldConsumerTopic2();
        Thread thread2 = new Thread(topic2);
        thread2.start();

        HelloWorldConsumerTopic3 topic3 = new HelloWorldConsumerTopic3();
        Thread thread3 = new Thread(topic3);
        thread3.start();
    }
}
image.png
七、Spring整合ActiveMQ
1.创建项目

创建spring-activemq-producer

1.1 添加坐标
    <dependencies>
        <!--activeMQ-->
        <!--ActiveMQ客户端完整jar包依赖-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>spring-context</artifactId>
                    <groupId>org.springframework</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.geronimo.specs</groupId>
                    <artifactId>geronimo-jms_1.1_spec</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--ActiveMQ和Spring整合配置文件标签处理jar包依赖-->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
        </dependency>
        <!--Spring JMS插件相关的jar包依赖-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>

        <!--Active Pool-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-jms-pool</artifactId>
        </dependency>

        <!-- 单元测试 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>
        <!-- 日志处理 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </dependency>
        <!-- spring -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
        </dependency>

        <!--javaee-->
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>jsp-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>jstl</artifactId>
        </dependency>

        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>javax.jms-api</artifactId>
        </dependency>
    </dependencies>
1.2 整合ActiveMQ
  • web.xml
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
          http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
         version="2.5">

    <servlet>
        <servlet-name>springmvc</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:spring-*.xml</param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>springmvc</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>
    
    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>
</web-app>
  • spring-mvc.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!--扫包-->
    <context:component-scan base-package="com.hxx.web.controller"/>
    
    <!--添加注解驱动-->
    <mvc:annotation-driven/>

    <!--配置视图解析器-->
    <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
        <property name="prefix" value="/WEB-INF/jsp/"/>
        <property name="suffix" value=".jsp"/>
    </bean>

    <!--放行静态资源-->
</beans>
  • spring-service
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!--加载资源文件-->
    <context:property-placeholder location="classpath:resource.properties"/>

    <!--扫描bean对象-->
    <context:component-scan base-package="com.hxx.service.impl"/>
</beans>
  • spring-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!--创建一个连接工厂,连接ActiveMQ,ActiveMQConnectionFactory,需要依赖ActiveMQ提供的amq标签-->
    <!--amq:connectionFactory是bean的子标签,会在Spring容器中创建一个bean对象,
    可以为对象名,类似:<bean id="" class="ActiveMQConnectionFactory"/>-->
    <amq:connectionFactory brokerURL="tcp://192.168.254.128:61616"
                           userName="admin" password="admin" id="amqConnectionFactory"/>

    <!--spring管理JMS相关代码的时候,必须依赖jms标签库,Spring-jms提供标签库-->
    <!--
        定义Spring-jms中的连接工厂对象
        CachingConnectionFactory - spring框架提供的连接工厂对象,不能真正访问MOM容器,
        类似一个工厂的代理对象,需要提供一个真实工厂,实现MOM容器的连接访问
    -->
    <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
        <property name="connectionFactory" ref="amqConnectionFactory"/>
        <property name="maxConnections" value="10"/>
    </bean>

    <!--配置有缓存的ConnectionFactory,Session的缓存大小可定制-->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <property name="sessionCacheSize" value="3"/>
    </bean>

    <!--jmsTemplate配置-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--给定连接工厂-->
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <!--默认目的地命名-->
        <property name="defaultDestinationName" value="test-spring"/>
    </bean>
</beans>
2.创建项目

spring-activemq-consumer

2.1 添加依赖
    <dependencies>
        <!--activeMQ-->
        <!--ActiveMQ客户端完整jar包依赖-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>spring-context</artifactId>
                    <groupId>org.springframework</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.geronimo.specs</groupId>
                    <artifactId>geronimo-jms_1.1_spec</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--ActiveMQ和Spring整合配置文件标签处理jar包依赖-->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
        </dependency>
        <!--Spring JMS插件相关的jar包依赖-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>

        <!--Active Pool-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-jms-pool</artifactId>
        </dependency>

        <!-- 单元测试 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>
        <!-- 日志处理 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </dependency>
        <!-- spring -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
        </dependency>

        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>javax.jms-api</artifactId>
        </dependency>
    </dependencies>
2.2 整合ActiveMQ
  • spring-service.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!--扫描bean对象-->
    <context:component-scan base-package="com.hxx.service,com.hxx.listener"/>
</beans>
  • spring-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

    <!--创建一个连接工厂,连接ActiveMQ,ActiveMQConnectionFactory,需要依赖ActiveMQ提供的amq标签-->
    <!--amq:connectionFactory是bean的子标签,会在Spring容器中创建一个bean对象,
    可以为对象名,类似:<bean id="" class="ActiveMQConnectionFactory"/>-->
    <amq:connectionFactory brokerURL="tcp://192.168.254.128:61616"
                           userName="admin" password="admin" id="amqConnectionFactory"/>

    <!--spring管理JMS相关代码的时候,必须依赖jms标签库,Spring-jms提供标签库-->
    <!--
        定义Spring-jms中的连接工厂对象
        CachingConnectionFactory - spring框架提供的连接工厂对象,不能真正访问MOM容器,
        类似一个工厂的代理对象,需要提供一个真实工厂,实现MOM容器的连接访问
    -->
    <!--配置有缓存的ConnectionFactory,Session的缓存大小可定制-->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <property name="sessionCacheSize" value="3"/>
    </bean>

    <!--注册监听器-->
    <!--
        开始注册监听
        需要的参数有:
            acknowledge:消息确认机制
            container-type:simple|default
            simple:SimpleMessageListenerContainer最简单的消息监听器容器,只能处理固定数量的JMS会话
            default:DefaultMessageListenerContainer是一种用于异步消息监听的管理类,且支持事务
            destination-type:目的地类型,使用队列作为目的地,
            connection-factory:连接工厂,spring-jms使用的工厂,必须是spring自主创建的
                                不能使用第三方工具创建工程,如:ActiveMQConnectionFactory
    -->
    <jms:listener-container acknowledge="auto" container-type="default"
                            destination-type="queue" connection-factory="cachingConnectionFactory">
        <!--
            在监听器容器中注册某监听对象,
            destination - 设置目的地命名
            ref - 指定监听器对象
        -->
        <jms:listener destination="test-spring" ref="myListener"/>
    </jms:listener-container>
</beans>
  • 创建MyMessageListener
@Component(value = "myListener")
public class MyMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {

    }
}
  • 测试
public class TestActiveMQ {

    public static void main(String[] args) throws IOException {
        ClassPathXmlApplicationContext ac = new ClassPathXmlApplicationContext(new String[]{"classpath:spring-jms.xml"
                ,"classpath:spring-service.xml"});
        ac.start();
        System.out.println("spring容器启动");

        System.in.read();
    }
}
image.png
3.测试整合
需求:

1.在Producer中创建User类
2.将User对象传递到ActiveMQ中
3.在Consumer中获取User对象并在控制台打印

3.1 Producer发送消息
@Service
public class UserServiceImpl implements UserService {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Override
    public void addUser(final User user) {
        jmsTemplate.send(new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                //发送消息
                return session.createObjectMessage(user);
            }
        });
    }
}

发送成功


image.png
3.2 Consumer接收消息
  • userServiceImpl.java
@Service
public class UserServiceImpl implements UserService {
    @Override
    public void showUser(User user) {
        System.out.println(user);
    }
}
  • MyMessageListener.java
@Component(value = "myListener")
public class MyMessageListener implements MessageListener {

    @Autowired
    private UserService userService;

    @Override
    public void onMessage(Message message) {
        Serializable obj = null;
        try {
            obj = ((ObjectMessage) message).getObject();
        } catch (JMSException e) {
            e.printStackTrace();
        }
        User user = (User) obj;
        userService.showUser(user);
    }
}
image.png

image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 225,015评论 6 522
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 96,373评论 3 404
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 172,253评论 0 368
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 61,069评论 1 300
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 70,079评论 6 400
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 53,550评论 1 314
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,911评论 3 429
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 40,892评论 0 279
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,435评论 1 324
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,465评论 3 346
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 41,586评论 1 355
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,178评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,920评论 3 340
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,356评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,493评论 1 277
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 50,151评论 3 381
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 46,663评论 2 366