一、简介
1.1 什么是 JMS
JMS 即 Java 消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数MOM提供商都对JMS提供支持。
1.2 什么是 ActiveMQ
ActiveMQ 是由 Apache 出品的一款开源消息中间件,旨在为应用程序提供高效、可扩展、稳定、安全的企业级消息通信。
它的设计目标是提供标准的、面向消息的、多语言的应用集成消息通信中间件。ActiveMQ 实现了 JMS 1.1 并提供了很多附加的特性,比如 JMX 管理、主从管理、消息组通信、消息优先级、延迟接收消息、虚拟接收者、消息持久化、消息队列监控等
1.3 ActiveMQ 特点
- 支持包括 Java、C、C++、C#、Ruby、Perl、Python、PHP 等多种语言的客户端和协议。协议包含 OpenWire、Stomp、AMQP、MQTT 。
- 提供了像消息组通信、消息优先级、延迟接收消息、虚拟接收者、消息持久化之类的高级特性
- 完全支持 JMS 1.1 和 J2EE 1.4规范(包括持久化、分布式事务消息、事务)
- 对 Spring 框架的支持,ActiveMQ 可以通过 Spring 的配置文件方式很容易嵌入到 Spring 应用中
- 通过了常见的 J2EE 服务器测试,比如 TomEE、Geronimo、JBoss、GlassFish、WebLogic
- 连接方式的多样化,ActiveMQ 提供了多种连接模式,例如 in-VM、TCP、SSL、NIO、UDP、多播、JGroups、JXTA
- 支持通过使用 JDBC 和 journal 实现消息的快速持久化
- 为高性能集群、客户端-服务器、点对点通信等场景而设计
- 提供了技术和语言中立的 REST API 接口
- 支持 Ajax 方式调用 ActiveMQ
- ActiveMQ 可以轻松地与 CXF、Axis 等 Web Service 技术整合,以提供可靠的消息传递
- 可用作为内存中的 JMS 提供者,非常适合 JMS 单元测试
1.4 基本组件
ActiveMQ 使用时包含的基本组件如下(与 JMS 相同):
- Broker:消息代理,表示消息队列服务器实体,接受客户端连接,提供消息通信的核心服务;
- Producer:消息生产者,业务的发起方,负责生产消息并传输给 Broker;
- Consumer:消息消费者,业务的处理方,负责从 Broker 获取消息并进行业务逻辑处理;
- Topic:主题,发布订阅模式下的消息统一汇集地,不同生产者向 Topic 发送消息,由 Broker 分发到不同的订阅者,实现消息的广播;
- Queue:队列,点对点模式下特定生产者向特定队列发送消息,消费者订阅特定队列接收消息并进行业务逻辑处理;
- Message:消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务 数据,实现消息的传输。
1.5 消息传送模型
ActiveMQ 支持两种截然不同的消息传送模型:PTP(即点对点模型)和Pub/Sub(即发布 /订阅模型),分别称作:PTP Domain 和Pub/Sub Domain。
PTP(即点对点模型)
PTP 消息域使用 queue(队列) 作为 Destination(消息被寻址、发送以及接收的对象),消息可以被同步或异步的发送和接收,每个消息只会给一个 Consumer 传送一次。Consumer 可以使用 MessageConsumer.receive()
同步地接收消息,也可以通过使用MessageConsumer.setMessageListener()
注册一个 MessageListener 实现异步接收。多个 Consumer 可以注册到同一个 queue 上,但一个消息只能被一个 Consumer 所接收,然后由该 Consumer 来确认消息。并且在这种情况下,Provider 对所有注册的 Consumer 以轮询的方式发送消息。
Pub/Sub 发布订阅模型
Pub/Sub(发布/订阅,Publish/Subscribe)消息域使用 topic (主题) 作为 Destination(消息被寻址、发送以及接收的对象),发布者向 topic 发送消息,订阅者注册接收来自 topic 的消息。发送到 topic 的任何消息都将自动传递给所有订阅者。接收方式(同步和异步)与 P2P 域相同。除非显式指定,否则 topic 不会为订阅者保留消息。当然,这可以通过持久化(Durable)订阅来实现消息的保存。这种情况下,当订阅者与 Provider 断开时,Provider 会为它存储消息。当持久化订阅者重新连接时,将会受到所有的断连期间未消费的消息。
1.6 消息存储
JMS 规范中消息的分发方式有两种:非持久化和持久化。对于非持久化消息 JMS 实现者须保证尽最大努力分发消息,但消息不会持久化存储;而持久化方式分发的消息则必须进行持久化存储。非持久化消息常用于发送通知或实时数据,当你比较看重系统性能并且即使丢失一些消息并不影响业务正常运作时可选择非持久化消息。持久化消息被发送到消息服务器后如果当前消息的消费者并没有运行则该消息继续存在,只有等到消息被处理并被消息消费者确认之后,消息才会从消息服务器中删除。具体的消息存储方式如下:
- AMQ,是 ActiveMQ 5.0及以前版本默认的消息存储方式,它是一个基于文件的、支持事务的消息存储解决方案。在此方案下消息本身以日志的形式实现持久化,存放在 Data Log 里。并且还对日志里的消息做了引用索引,方便快速取回消息
<broker brokerName="broker" persistent="true" useShutdownHook="false"> <persistenceAdapter> <amqPersistenceAdapter directory="${activemq.data}/amq" syncOnWrite="true" indexPageSize="16kb" indexMaxBinSize="100" maxFileLength="10mb" /> </persistenceAdapter> </broker>
- KahaDB,也是一种基于文件并具有支持事务的消息存储方式,从5.3开始推荐使用 KahaDB 存储消息,它提供了比 AMQ 消息存储更好的可扩展性和可恢复性
<broker brokerName="broker" persistent="true" useShutdownHook="false"> <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/> </persistenceAdapter> </broker>
- JDBC,基于 JDBC 方式将消息存储在数据库中,将消息存到数据库相对来说比较慢,所以 ActiveMQ 建议结合 journal 来存储,它使用了快速的缓存写入技术,大大提高了性能;
<beans> <broker brokerName="broker" persistent="true" xmlns="http://activemq.apache.org/schema/core"> <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter> </broker> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> </beans>
- 内存存储,是指将所有要持久化的消息放到内存中,因为这里没有动态的缓存,所以需要注意设置消息服务器的 JVM 和内存大小
<broker brokerName="broker" persistent="false" xmlns="http://activemq.apache.org/schema/core"> <transportConnectors> <transportConnector uri="tcp://localhost:61635"/> </transportConnectors> </broker>
- LevelDB,5.6版本之后推出了 LevelDB 的持久化引擎,它使用了自定义的索引代替常用的 BTree 索引,其持久化性能高于 KahaDB,虽然默认的持久化方式还是 KahaDB,但是 LevelDB 将是趋势。在5.9版本还提供了基于 LevelDB 和 Zookeeper 的数据复制方式,作为 Master-Slave 方式的首选数据复制方案
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker" dataDirectory="${activemq.data}"> <persistenceAdapter> <levelDB directory="${activemq.data}/levelDB"/> </persistenceAdapter> </broker>
1.7 连接器
ActiveMQ Broker (消息代理) 的主要作用是为客户端应用提供一种通信机制,为此 ActiveMQ 提供了一种连接机制,并用连接器(connector)来描述这种连接机制。ActiveMQ 中的连接器有两种,一种是用于客户端与消息代理服务器(client-to-broker)之间通信的传输连接器(transport connector),一种是用于消息代理服务器之间(broker-to-broker)通信的网络连接器(network connector)。connector 使用 URI(统一资源定位符)来表示,URI 格式为: <schema name>:<hierarchical part>[?<query>][#<fragment>]
, 例如:foo://username:password@example.com:8042/over/there/index.dtb?type=animal&name=narwhal#nose
- schema name: fool
- hierarchical part: username:password@example.com:8042/over/there/index.dtb
- query: type=animal&name=narwhal
- fragment: nose
1.7.1 传输连接器(transport connector)
为了交换消息,消息生产者和消息消费者(统称为客户端)都需要连接到消息代理服务器,这种客户端和消息代理服务器之间的通信就是通过传输连接器(Transport connectors)完成的。很多情况下用户连接消息代理时的需求侧重点不同,有的更关注性能,有的更注重安全性,因此 ActiveMQ 提供了一系列l连接协议供选择,来覆盖这些使用场景。从消息代理的角度看,传输连接器就是用来处理和监听客户端连接的,查看 ActiveMQ demo 的配置文件(/examples/conf/activemq-demo.xml),传输连接的相关配置如下:
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector name="ws" uri="ws://localhost:61614/" />
</transportConnectors>
传输连接器定义在<transportConnectors>
元素中,一个<transportConnector>
元素定义一个特定的连接器,一个连接器必须有自己唯一的名字和URI
属性,但discoveryUri
属性是可选的。目前在 ActiveMQ 最新的5.15版本中常用的传输连接器连接协议有:vm、tcp、udp、multicast、nio、ssl、http、https、websocket、amqp、mqtt、stomp 等等
- VM:允许客户端和消息服务器直接在 VM 内部通信,采用的连接不是 Socket 连接,而是直接的虚拟机本地方法调用,从而避免网络传输的开销。应用场景仅限于服务器和客户端在同一 JVM 中。如使用代码启动嵌入式的ActiveMQ Broker实例,通常用于单元测试。因为是嵌入式,所以不需要配置ActiveMQ的配置文件,只要在连接Broker的URI中直接使用即可
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker"> <property name="brokerURL" value="vm://localhost"/> </bean>
- TCP:ActiveMQ默认的传输连接,也是最常用的使用方式。长连接,每个客户端实例都会与服务器维持一个连接。每个连接一个线程。TCP的优点是:
- 性能高:ActiveMQ使用默认协议OpenWire序列化和反序列化消息。OpenWire是一个性能很高的序列化协议
- 可用性高:TCP是使用最广泛的技术,几乎所有的开发语言都支持TCP协议
- 可靠性高:TCP协议确保消息不会在网络传说的过程中丢失
<transportConnector name="tcp" uri="tcp://localhost:61616"/>
- UDP:与面向连接,可靠的字节流服务的TCP不同,UDP是一个面向数据的简单传输连接,没有TCP的三次握手,所以性能大大强于TCP,但是是以牺牲可靠性为前提。适用于丢失也无所谓的消息,如统计uv,pv。(当然如果真是统计uv什么的,有Kafka这样专门的消息中间件)
<transportConnector name="udp" uri="udp://localhost:8123"/>
- NIO:使用Java的NIO方式对连接进行改进,因为NIO使用线程池,可以复用线程,所以可以用更少的线程维持更多的连接。如果有大量的客户端,或者性能瓶颈在网络传输上,可以考虑使用NIO的连接方式。也可以根据不同的场景选择不用的传输连接,比如:Producer有很多,但是Consumer很少,可以Producer用NIO协议,Consumer用TCP协议。从ActiveMQ 5.6版本开始,NIO可以支持和SSL搭配使用的传输连接。
- NIO配置:
<transportConnector name="nio" uri="nio://localhost:61616"/>
- NIO+SSL配置:
<transportConnector name="nio+ssl" uri="nio+ssl://localhost:61616"/>
- SSL:需要一个安全连接的时候可以考虑使用SSL,适用于client和broker在公网的情况,如使用aws云平台等
<transportConnector name="ssl" uri="ssl://localhost:8123"/>
- HTTP(S):需要穿越防火墙,可以考虑使用HTTP(S),但由于HTTP(S)是短连接,每次创建连接的成本较高,所以性能最差。允许客户端使用 REST 或 Ajax 的方式进行连接,这意味着可以直接使用 Javascript 向 ActiveMQ 发送消息
- HTTP配置
<transportConnector name="http" uri="http://localhost:8080"/>
- HTTPS配置
<transportConnector name="https" uri="http://localhost:8080"/>
- AMQP:ActiveMQ 5.8新增加的传输连接。用于支持AMQP(高级消息队列协议)。因为AMQP是消息队列的标准协议,而且已经越来越被广泛使用,所以ActiveMQ也支持了此协议。AMQP协议可以搭配NIO或SSL协议使用,amqp+nio用于提升系统的延展性和性能。amqp+ssl可以创建安全连接。
- amqp配置:
<transportConnector name="amqp" uri="amqp://localhost:5672"/>
- amqp+nio配置:
<transportConnector name="amqp+nio" uri="amqp://localhost:5672"/>
- amqp+ssl配置:
<transportConnector name="amqp+ssl" uri="amqp://localhost:5672"/>
- MQTT:ActiveMQ 5.8新增加的传输连接。是一个轻量级的消息订阅/发布协议。和AMQP一样,同样支持搭配NIO或SSL使用
<transportConnector name="mqtt" uri="mqtt://localhost:1883"/>
1.7.2 网络连接器(network connector)
很多情况下,我们要处理的数据可能是海量的,这种场景单台服务器很难支撑,这就要用到集群功能,为此 ActiveMQ 提供了网络连接的模式,简单说就是通过把多个消息服务器实例连接在一起作为一个整体对外提供服务,从而提高整体对外的消息服务能力。通过这种方式连接在一起的服务器实例之间可共享队列和消费者列表,从而达到分布式队列的目的,网络连接器就是用来配置服务器之间的通信。
如图所示,服务器S1 和 S2 通过 NewworkConnector 相连,生产者 P1 发送的消息,消费者 C3 和 C4 都可以接收到,而生产者 P3 发送的消息,消费者 C1 和 C2 也可以接收到。要使用网络连接器的功能需要在服务器 S1 的 activemq.xml 中的 broker 节点下添加如下配置(假设192.168.11.23:61617 为 S2 的地址):
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.11.23:61617)"/>
</networkConnectors>
如果只是这样,S1 可以将消息发送到 S2,但这只是单方向的通信,发送到 S2 上的的消息还不能发送到 S1 上。如果想 S1 也收到从 S2 发来的消息需要在 S2 的 activemq.xml 中的 broker 节点下也添加如下配置(假设192.168.11.45:61617为 S1 的地址):
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.11.45:61617)"/>
</networkConnectors>
这样,S1和S2就可以双向通信了。目前在 ActiveMQ 最新的5.15版本中常用的网络连接器协议有 static
和 multicast
两种:
- static,静态协议,用于为一个网络中多个代理创建静态配置,这种配置协议支持复合的 URI (即包含其他 URI 的 URI)。例如:
static://(tcp://ip:61616,tcp://ip2:61616)
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerA" dataDirectory="${activemq.base}/data"> <networkConnectors> <networkConnector uri="static:(tcp://localhost:61617)" /> </networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616" /> </transportConnectors> </broker>
- multicast,多点传送协议,消息服务器会广播自己的服务,也会定位其他代理。这种方式用于服务器之间实现动态识别,而不是配置静态的 IP 组。默认配置:
multicast://default
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="multicast" dataDirectory="${activemq.base}/data"> <networkConnectors> <networkConnector name="default-nc" uri="multicast://default"/> </networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/> </transportConnectors> </broker>
2.1 添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>priv.simon.boot</groupId>
<artifactId>activemq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>activemq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.2 点对点配置
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
2.3 发布/订阅配置
Spring Boot 默认开启点对点模式,发布订阅模式需要手动开启
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.jms.pub-sub-domain=true
2.4 消息提供者
@Service
public class ProviderService {
@Autowired
JmsMessagingTemplate jmsMessagingTemplate;
/**
* 点对点消息发送
*/
public void sendPTPMessage(String message){
ActiveMQQueue queue = new ActiveMQQueue("queue");
jmsMessagingTemplate.convertAndSend(queue,message);
}
/**
* 发布订阅消息发送
*/
public void sendPubSubMesage(String message){
ActiveMQTopic topic = new ActiveMQTopic("topic");
jmsMessagingTemplate.convertAndSend(topic,message);
}
}
2.5 消息消费者
@Service
public class ConsumerService {
/**
* 监听点对点消息
*/
@JmsListener(destination = "queue")
public void receiveQueue(String message){
System.err.println("queue收到的消息:"+message);
}
/**
* 监听发布订阅消息
*/
@JmsListener(destination = "topic")
public void receiveTopic(String message){
System.err.println("topic收到的消息:"+message);
}
}
2.6 测试类
@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqApplicationTests {
@Autowired
private ProviderService providerService;
@Test
public void ptpTest() {
providerService.sendPTPMessage("ptp hello");
}
@Test
public void pubSubTest() {
providerService.sendPubSubMesage("pub/sub hello");
}
}