本文基于《Spring实战(第4版)》所写。
异步消息是一个应用程序向另一个应用程序间接发送消息的一种方式,这种方式无需等待对方的响应。借助Spring,我们有多个实现异步消息的可选方案:在Spring中使用Java消息服务(Java Message Service, JMS)和高级消息队列协议(Advanced Message Queuing Protocol, AMQP)发送和接收消息。除了基本的消息发送和接收之外,我们还会看到Spring对消息驱动POJO的支持,它是一种与EJB的消息驱动Bean(message-driven bean, MDB)类似的消息接收方式。
在异步消息中有两个主要的概念:消息代理(message broker)和目的地(destination)。当一个应用发送消息时,会将消息交给一个消息代理。消息代理可以确保消息被投递到指定的目的地,同时解放发送者,使其能够继续进行其他的业务。
尽管不同的消息系统会提供不同的消息路由模式,但是有两种通用的目的地:队列(queue)和主题(topic)。每种类型都与特定的消息模型相关联,分别是点对点模型(队列)和发布/订阅模型(主题)。
点对点消息模型
在点对点模型中,每一条消息都有一个发送者和一个接收者,如下图。当消息代理得到消息时,它将消息放入一个队列中。当接收者请求队列中的下一条消息时,消息会从队列中取出,并投递给接收者。因为消息投递后会从队列中删除,这样就可以保证消息只能投递给一个接收者。
通常可以使用几个接收者来处理队列中的消息。不过,每个接收者都会处理自己所接收到的消息。如果想要提高应用的消息处理能力,我们只需简单地为队列添加新的监视器就可以了。
发布—订阅消息模型
在发布—订阅消息模型中,消息会发送给一个主题。与队列类似,多个接收者都可以监听一个主题。但是,与队列不同的是,消息不再是只投递给一个接收者,而是主题的所有订阅者都会接收到此消息的副本,如下图
异步消息的优点:
- 无需等待:客户端不需要等待,可以继续执行其他任务。
- 面向消息和解耦:发送异步消息是以数据为中心的,客户端不必了解远程服务的任何规范。
- 位置独立:只要服务能够从队列或主题中获取消息即可,消息客户端根本不需要关注服务来自哪里。
- 确保投递:当发送异步消息时,客户端可以相信消息会被投递。即使在消息发送时,服务无法使用,消息也会被储存起来,直到服务重新可以使用为止。
使用JMS发送消息
Java消息服务(Java Message Service , JMS)是一个Java标准,定义了使用消息代理的通用API。
Spring通过基于模版的抽象为JMS功能提供了支持,这个模版也就是JmsTemplate。Spring还提供了消息驱动POJO的理念:这是一个简单的Java对象,它能够以异步的方式响应队列或主题上的到达的消息。
在Spring中搭建消息代理
ActiveMQ是一个伟大的开源消息代理产品,也是使用JMS进行异步消息传递的最佳选择。ActiveMQ可在http://activemq.apache.org下载二进制发行包。下载后可以在bin目录下,找到对应的子目录,子目录中有相应的启动ActiveMQ的脚本。例如,OS X系统可在“bin/macosx”目录下运行“activemq start”。运行脚本后,ActiveMQ就准备好了,可以在浏览器输入“http://localhost:8161/admin/”进入ActiveMQ管理页,用户名密码应该都是admin
创建连接工厂
我们必须配置JMS连接工厂,让它知道如何连接到ActiveMQ。配置如下:
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"
p:brokerURL="tcp:localhost:61616" />
也可以使用ActiveMQ的命名空间配置
<amq:connectionFactory id="connectionFactory" brokerURL="tcp://localhost:61616" />
amq是ActiveMQ的命名空间,使用前我们必须确保在Spring的配置文件中声明了amq命名空间:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
.....
</beans>
声明目的地
例如,下面的<bean>声明定义了一个ActiveMQ队列:
<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue"
c:_="spitter.queue" />
命名空间配置如下:
<amq:queue id="spittleQueue" physicalName="spitter.queue" />
同样,下面<bean>声明定义了一个ActiveMQ主题:
<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic"
c:_="spitter.queue" />
命名空间配置如下:
<amq:topic id="spittleTopic" physicalName="spitter.topic" />
使用Spring的JMS模版
针对如何消除冗长和重复的JMS代码,Spring给出的解决方案是JmsTemplate。JmsTemplate可以创建连接、获得会话以及发送和接收消息。
另外,JmsTemplate可以处理抛出的笨拙的JMSException异常。如果在使用JmsTemplate时抛出JMSException异常,JmsTemplate将捕获该异常,然后抛出一个非检查型异常,该异常是Spring自带的JmsException异常的子类。
为了使用JmsTemplate,我们需要在Spring的配置文件中将它声明为一个bean,除了配置了工厂,还配置了默认队列。如下的XML可以完成这项工作:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
c:_0-ref="connectionFactory" p:defaultDestination-ref="queue"/>
发送消息
为了在Spittle创建的时候异步发送spittle提醒,让我们为Spittr应用引入AlertService:
package spittr.web;
import spittr.model.Spittle;
public interface AlertService {
void sendSpittleAlert(Spittle spittle);
}
然后实现该接口,方法中使用JmsOperation将Spittle对象发送给消息队列。
package spittr.web;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import spittr.model.Spittle;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
@Service
public class AlertServiceImpl implements AlertService {
private JmsOperations jmsOperations;
//注入JMS模版
@Autowired
public AlertServiceImpl(JmsOperations jmsOperations) {
this.jmsOperations = jmsOperations;
}
@Override
public void sendSpittleAlert(final Spittle spittle) {
//创建并发送消息,如在配置jmsTemplate时,配置了默认目的地,可在此步骤省略代码中目的地配置。
jmsOperations.send("spitter.queue", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(spittle);
}
});
}
}
在发消息时,对消息进行转换
除了send()方法,JmsTemplate还提供了convertAndSend()方法。与send()方法不同,convertAndSend()方法并不需要 MessageCreator作为参数。这是因为convertAndSend()会使用内置的消息转换器(message converter)为我们创建信息。
当我们使用convertAndSend()时,代码可以减少到方法体只包含一行代码:
@Override
public void sendSpittleAlert(final Spittle spittle) {
// 使用消息转换器创建消息,默认情况下会使用SimpleMessageConverter
jmsOperations.convertAndSend(spittle);
}
这个接口之所以简单,是因为Spring已经提供了多个实现,我们没有必要创建自定义的实现。如下表所示
消息转换器 | 功能 |
---|---|
MappingJacksonMessageConverter | 使用Jackson JSON库实现消息与JSON格式之间的相互转换 |
MappingJackson2MessageConverter | 使用Jackson2 JSON库实现消息与JSON格式之间的相互转换 |
MarshallingMessageConverter | 使用JAXB库实现消息与XML格式之间的相互转换 |
SimpleMessageConverter | 实现String与TextMessage之间的相互转换,字节数组与ByteMessage之间的相互转换,Map与MapMessage之间的相互转换以及Serializable对象与ObjectMessage之间的相互转换 |
默认情况下,JmsTemplate在convertAndSend()方法中会使用SimpleMessageConverter。如果想使用其他转换器,可以重写这种行为。例如,想使用JSON消息的话
<bean id="messageConverter" class="org.springframework.jms.support.converter.MappingJacksonMessageConverter" />
然后,我们可以将其注入到JmsTemplate中,如下所示:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
c:_-ref="connectionFactory"
p:defaultDestinationName="spittle.queue"
p:messageConverter-ref="messageConverter" />
同步接收消息
使用JmsTemplate接收消息
方法一,使用ObjectMessage,如下
public Spittle receiveSpittleAlert(){
try{
// 接收消息
ObjectMessage receivedMessage = (ObjectMessage) jmsOperations.receive();
// 获得对象
return (Spittle) receivedMessage.getObject();
}
catch (JMSException jmsException) {
// 抛出转换后的异常
throw JmsUtils.convertJmsAccessException(jmsException);
}
}
方法二,使用消息转换器,如下
public Spittle receiveSpittleAlert(){
return (Spittle) jmsOperations.receiveAndConvert();
}
创建消息驱动的POJO(消息监听器)
Spring提供了以POJO的方式处理消息的能力,这些消息来自于JMS的队列或主题中。例如,基于POJO是想SpittleAlertHandler就足以做到这一点。
package spittr.web;
import spittr.model.Spittle;
public class SpittleAlertHandler {
public void handleSpittleAlert(Spittle spittle) {
// ... implementation goes here ... 处理方法
}
}
为POJO赋予消息接收能力的诀窍是在Spring中把它配置为消息监听器。Spring的jms命名空间为我们提供了所需要的一切。首先,让我们先把处理器声明为bean:
<bean id="spittleHandler" class="spittr.web.SpittleAlertHandler" />
然后,为了把SpittleAlertHandler转变为消息驱动的POJO,我们需要把这个bean声明为消息监听器:
<jms:listener-container >
<jms:listener destination="spitter.queue" ref="spittleHandler" method="handleSpittleAlert"/>
</jms:listener-container>
在这里,我们在消息监听器中包含了一个消息监听器。消息监听器是一个特殊的bean,它可以监控JMS目的地并等待消息到达。一旦有消息到达,它取出信息,然后把消息传给任意一个对此消息感兴趣的消息监听器,如下图展示了这个交互过程。
使用AMQP实现消息功能
AMQP具有多项JMS所不具备的优势。首先,AMQP为消息定义了线路层的协议,而JMS所定义的是API规范。JMS的API协议能够确保所有的实现都能通过的API来使用,但是并不能保证某个JMS实现所发送的消息能够被另外不同的JMS实现所使用。而AMQP的线路层协议规范了消息的格式,消息在生产者和消费者间传送的时候会遵循这个格式。这个AMQP在互相协作方面就要优于JMS—它不仅能跨不同的AMQP实现,还能跨语言和平台。
相比JMS,AMQP另外一个明显的优势在于它具有更加灵活和透明的消息模型。使用JMS的话,只有两种消息模型可供选择:点对点和发布-订阅。这两种模型在AMQP当然都是可以实现的,但AMQP还能够让我们以其他的多种方式来发送消息,这是通过将消息的生产者与存放消息的队列解耦实现的。
AMQP的生产者并不会直接将消息发布到队列中。AMQP在消息的生产者以及传递信息的队列之间引入了一种间接的机制:Exchange。这种关系如下图
可以看到,消息的生产者将信息发布到一个Exchange。Exchange会绑定到一个或多个队列上它负责将信息路由到队列上。信息的消费者会从队列中提取数据并进行处理。
AMQP定义了四种不同类型的Exchange,每一种都有不同的路由算法,这些算法决定了是否要将信息放到队列中。根据Exchange的算法不同,它可能会使用消息的routing key和/或参数,并将其与Exchange和队列之间binding的routing key和参数进行对比。如果对比结果满足相应的算法,那么消息将会路由到队列上。否则的话,将不会路由到队列上。
四种标准的AMQP Exchange如下所示:
- Direct: 如果消息的routing key与binding的routing key直接匹配的话,消息将会路由到该队列上;
- Topic: 如果消息的routing key与binding的routing key附和通配符匹配的话,消息将会路由到该队列上;
- Headers: 如果消息参数表中的头信息和值都与binding参数表中相匹配,消息将会路由到该队列上;
- Fanout: 不管消息的routing key和参数表和头信息/值是什么,消息将会路由到所有队列上。
生产者将信息发送给Exchange并带有一个routing key,消费者从队列中获取消息。
配置Spring支持AMQP消息
使用Spring AMQP前也要配置一个连接工厂,需要配置RabbitMQ连接工厂。RabbitMQ是一个流行的开源消息代理,它实现了AMQP。
使用之前,需要安装RabbitMQ,安装地址:http://www.rabbitmq.com/download.html。由于RabbitMQ用erlang语言开发,所有安装RabbitMQ前需要先安装Erlang,安装地址:http://www.erlang.org/downloads
安装成功后,进入/usr/local/sbin,执行./rabbitmq-server,来启动rabbitmq。用户名和密码默认都是guest,可浏览器直接登录http://localhost:15672/查看rabbitmq的管理平台。
配置RabbitMQ连接工厂最简单的方式就是使用Spring AMQP所提供的rabbit配置命名空间。为了使用这项功能,需要确保在Spring配置文件中已经声明了该模式:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
....
</beans>
接下来,配置工厂信息
<!--配置连接工厂, 默认地址localhost,默认端口5672,并且用户名和密码均为guest-->
<rabbit:connection-factory id="connectionFactory" host="localhost" port="5672" username="guest" password="guest"/>
除了连接工厂以外,我们还要考虑使用其他的几个配置元素。接下来,看一下如何创建队列、Exchange已经binding。
声明队列、Exchange已经binding
rabbit命名空间包含了多个元素,帮助我们声明队列、Exchange以及将他们结合在一起的binding。下表中列出了这些元素。
元素 | 作用 |
---|---|
<queue> | 创建一个队列 |
<fanout-exchange> | 创建一个fanout类型的Exchange |
<header-exchange> | 创建一个header类型的Exchange |
<topic-exchange> | 创建一个topic类型的Exchange |
<direct-exchange> | 创建一个direct类型的Exchange |
<bindings> <binding /> </bindings> | 元素定义一个或多个元素的集合。元素创建Exchange和队列之间的binding |
这些配置元素要与<admin> 元素一起使用。<admin>元素会创建一个RabbitMQ管理组件,它会自动创建上述这些元素所声明的队列、Exchange以及binding。
例如,如果你希望声明为spittle.alerts的队列,只需要在Spring配置中添加如下代码:
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue id="spittleAlertQueue" name="spittle.alerts" />
<rabbit:fanout-exchange name="spittle.alert.exchange" >
<rabbit:bindings>
<rabbit:binding queue="spittleAlertQueue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
使用RabbitTemplate发送消息
配置RabbitTemplate的最简单方式是使用rabbit命名空间的<template>元素,如下所示:
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"/>
现在,要发送消息的话,我们只需要将模版bean注入到AlertServiceImpl中,并使用它来发送Spittle。如下的程序清单展示了一个新版本的AlertServiceImpl,它使用RabbitTemplate代替JmsTemplate来发送Spittle提醒。
package spittr.web;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import spittr.model.Spittle;
@Service
public class AlertServiceImpl implements AlertService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendSpittleAlert(Spittle spittle) {
rabbitTemplate.convertAndSend("spittle.alert.exchange",
"spittle.alerts",
spittle);
}
}
RabbitTemplate有多个重载版本的convertAndSend()方法。可以同时省略Exchange名称和routing key:
rabbitTemplate.convertAndSend(spittle);
那么可以在<template>元素上借助exchange和routing-key属性配置不同的默认值:
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="spittle.alert.exchange"
routing-key="spittle.alerts" />
convertAndSend()也是需要一个消息转换器的帮助来完成任务,默认的消息转换器是SimpleMessageConverter。Spring AMQP还提供了其他几个有用的消息转换器,其中包括使用JSON和XML数据的消息转换器。
接收AMQP消息
Spring AMQP也是提供了两种方式来获取消息。
使用RabbitTemplate来接收消息
<template>的配置与发送消息一致。接收方法如下:
Spittle spittle = (Spittle) rabbit.receiveAndConvert("spittle.alerts");
如果<template>的配置已经队列名称,可以省略代码中的队列名称。
定义消息驱动的AMQP POJO
如果想在消息驱动POJO中异步地消费使用Spittle对象,首先要解决的问题就是这个POJO本身。如下的SpittleAlertHandler扮演了这个角色:
package spittr.web;
import spittr.model.Spittle;
public class SpittleAlertHandler {
public void handleSpittleAlert(Spittle spittle) {
// ... implementation goes here ...
}
}
我们还需要在Spring应用上下文中将SpittleAlertHandler声明为一个bean:
<bean id="spittleHandler" class="spittr.web.SpittleAlertHandler" />
最后,我们需要声明一个监听器容器和监听器,当消息到达的时候,能够调用SpittleAlertHandler。
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="spittleHandler" method="handleSpittleAlert" queue-names="spittle.alerts" />
</rabbit:listener-container>