最近在学习一个Java的开源项目,git地址:https://gitee.com/shuzheng/zheng
该开源项目开发环境需具备ActiveMQ,所以特地学习一下ActiveMQ。
一、ActiveMQ是什么?
1)JMS概述
JMS即Java消息服务(Java Message Service的简称),是Java EE 的标准/规范之一。这种规范(标准)指出:消息的发送应该是异步的、非阻塞的。也就是说消息的发送者发送完消息后就直接返回了,不需要等待接收者返回后才能返回,发送者和接收者可以说是互不影响。所以这种规范(标准)能够减轻或消除系统瓶颈,实现系统之间去除耦合,提高系统的整体可伸缩性和灵活性。JMS只是Java EE中定义的一组标准API,它自身并不是一个消息服务系统,它是消息传送服务的一个抽象,也就是说它定义了消息传送的接口而并没有具体实现。
2)ActiveMQ概述
ActiveMQ就是JMS规范的具体实现;它是Apache下的一个项目,采用Java语言开发,是一款非常流行的开源消息服务器(消息队列中间件)
3)ActiveMQ与JMS关系
JMS只是定义了一组有关消息传送的规范和标准,并没有真正实现,也就说JMS只是定义了一组接口;其具体实现由不同的消息中间件厂商提供,比如Apache ActiveMQ就是JMS规范的具体实现,Apache ActiveMQ才是一个消息服务系统,而JMS不是。
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。
二、应用场景——异步处理,应用解耦,流量削锋和消息通讯
2.1异步处理
场景说明:用户注册后,需要发注册邮件和注册短信。
传统的做法:1.串行的方式; 2.并行方式。
1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。
与串行的差别是,并行的方式可以提高处理的时间。引入消息队列,就可以将不是必须的业务逻辑改为异步处理。
2.2应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。这种模式存在如下缺点:
1) 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败;
2) 订单系统与库存系统耦合;
引入应用消息队列后的方案:
1)订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
2)库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。
假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。
2.3流量削锋
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
可以控制活动的人数;
可以缓解短时间内高流量压垮应用;
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面;
秒杀业务根据消息队列中的请求信息,再做后续处理。
2.4日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。
日志采集客户端,负责日志数据采集,定时写受写入Kafka队列;
Kafka消息队列,负责日志数据的接收,存储和转发;
日志处理应用:订阅并消费kafka队列中的日志数据;
二.ActiveMQ的使用
1、JMS两种消息传送模式
1)点对点( Point-to-Point):专门用于使用队列Queue传送消息;基于队列Queue的点对点消息只能被一个消费者消费,如多个消费者都注册到同一个消息队列上,当生产者发送一条消息后,而只有其中一个消费者会接收到该消息,而不是所有消费者都能接收到该消息。
2)发布/订阅(Publish/Subscribe):专门用于使用主题Topic传送消息。基于主题的发布与订阅消息能被多个消费者消费,生产者发送的消息,所有订阅了该topic的消费者都能接收到。
2、 JMS API可以分为3个主要部分:
1)公共API:可用于向一个队列或主题发送消息或从其中接收消息;
2)点对点API:专门用于使用队列Queue传送消息;
3)发布/订阅API:专门用于使用主题Topic传送消息。
JMS公共API:
在JMS公共API内部,和发送与接收消息有关的JMS API接口主要是:ConnectionFactory/Connection/Session/Message/Destination/MessageProducer/MessageConsumer 。它们的关系是:一旦有了ConnectionFactory,就可以创建Connection,一旦有了Connection,就可以创建Session,而一旦有了Session,就可以创建Message、MessageProducer和MessageConsumer。
JMS点对点API:
点对点(p2p)消息传送模型API是指JMS API之内基于队列(Queue)的接口:QueueConnectionFactory/QueueConnection/QueueSession/Message/Queue/QueueSender/QueueReceiver. 从接口的命名可以看出,大多数接口名称仅仅是在公共API接口名称之前添加Queue一词。一般来说,使用点对点消息传送模型的应用程序将使用基于队列的API,而不使用公共API 。
JMS发布/订阅API:
发布/订阅消息传送模型API是指JMS API之内基于主题(Topic)的接口:TopicConnectionFactory/TopicConnection/TopicSession/Message/Topic/TopicPublisher/TopicSubscriber. 由于基于主题(Topic)的JMS API类似于基于队列(Queue)的API,因此在大多数情况下,Queue这个词会由Topic取代。
3、ActiveMQ的下载与安装
1)直接去官网(http://activemq.apache.org/)下载最新版本即可,由于这是免安装的,只需要解压就行了。安装完之后进入bin目录,双击 activemq.bat文件(linux下在bin目录下执行 activemq start)
2)访问控制台(检验是否成功)
在浏览器输入:http://ip:8161/admin/ 会弹出登陆框,账号和密码都是默认admin。
3)修改端口号
61616为对外服务端口号
8161为控制器端口号
当端口号冲突时,可以修改这两个端口号。cd conf ,修改activemq.xml 修改里面的61616端口。修改jetty.xml,修改里面的8161端口。
4)删除不活动队列
一般情况下,ActiveMQ的queue或者topic在不使用之后,可以通过web控制台来删除掉。当然,也可以通过配置,使得broker可以自动探测到无用的队列(一定时间内为空的队列)并删除掉,回收响应资源。
activemq.xml
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true" schedulePeriodForDestinationPurge="10000">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="100000" memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="100000" memoryLimit="1mb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
schedulePeriodForDestinationPurge:10000 每十秒检查一次,默认为0,此功能关闭
gcInactiveDestinations: true 删除掉不活动队列,默认为false
inactiveTimoutBeforeGC:30000 不活动30秒后删除,默认为60秒
PS:对于topic的不活动队列只是,10秒中之类没有消费者进行注册监听,如果一个用户事先注册了这个监听,但是他一直没有登录,那么这算活动队列。而queue只要有消息没有出队列就表示活动队列。
附:消息中间件的用途和优点
1)将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块;
2)负责建立网络通信的通道,进行数据的可靠传送;
3)保证数据不重发,不丢失;
4)能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务
三、模拟HelloWord体验ActiveMQ
下载MQ以后,要将---bin.zip解压缩后里面的activemq-all-5.11.1.jar包加入到classpath下面,这个包包含了所有jms接口api的实现,项目结构图:
启动MQ,启动成功,打开该链接:http://localhost:8161/admin/
账号和密码都是admin,启动成功后,图如下:
生产者和消费者代码如下:
package com.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 生产者(消息发送者)
* @author admin
*
*/
public class JMSProducer {
// 账号(默认连接用户名)
private static final String user = ActiveMQConnection.DEFAULT_USER;
// 密码
private static final String pwd = ActiveMQConnection.DEFAULT_PASSWORD;
// 地址
private static final String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final int sendnum = 10;
public static void main(String[] args) {
// 连接工厂
ConnectionFactory connectionFactory;
// 连接
Connection connection = null;
// 会话
Session session;
// 消息目的
Destination destination;
// 消息生产者
MessageProducer messageProducer;
// 实例化工厂
connectionFactory = new ActiveMQConnectionFactory(JMSProducer.user, JMSProducer.pwd, JMSProducer.url);
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("HelloWord");
messageProducer = session.createProducer(destination);
sendMessage(session, messageProducer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发送消息
* @param session
* @param messageProducer 消息生产者
* @throws Exception
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("ActiveMQ" + i);
System.out.println("发送消息" + i);
messageProducer.send(message);
}
}
}
package com.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消费者(消息接收者)
* @author admin
*
*/
public class JMSConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认连接密码
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;// 连接工厂
Connection connection = null;// 连接
Session session;// 会话 接受或者发送消息的线程
Destination destination;// 消息的目的地
MessageConsumer messageConsumer;// 消息的消费者
// 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,
JMSConsumer.BROKEURL);
try {
// 通过连接工厂获取连接
connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个连接HelloWorld的消息队列
destination = session.createQueue("HelloWord");
// 创建消息消费者
messageConsumer = session.createConsumer(destination);
while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
if (textMessage != null) {
System.out.println("收到的消息:" + textMessage.getText());
} else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
启动生产者,输出如下:
然后看一下ActiveMQ服务器,Queues内容如下:
我们可以看到创建了一个名称为HelloWorld的消息队列,队列中有10条消息未被消费,我们也可以通过Browse查看是哪些消息:
这些队列中的消息,被删除,消费者则无法消费,接下来,运行一下消费者:
现在我们再看一下ActiveMQ服务器,Queues内容如下:
我们可以看到HelloWorld的消息队列发生变化,多一个消息者,队列中的10条消息被消费了,点击Browse查看,已经为空了。
点击Active Consumers,我们可以看到这个消费者的详细信息。