本篇主要介绍RabbitTemplate
发送json对象的用法
1. 环境
spring-boot
2.1.1.RELEASE
spring-cloudFinchley.RELEASE
rabbitmq3.7.10
spring-boot-starter-amqp
2. RabbitMQ的content_type
针对RabbitMQ的消息,最终存储的都是字节数组,而我们在消费端,消息可能是字符串、对象等其它类型。我们发送时需要指定每个消息的content_type
属性,从而让消费端知道如何把字节数组转化成消息的原始类型。
例如,SimpleMessageConverter
是默认的消息转换器,以其作为例子:
-
content_type=text/plain
的消息,直接new String,关键代码为:
if (contentType != null && contentType.startsWith("text")) {
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = this.defaultCharset;
}
try {
content = new String(message.getBody(), encoding);
}
catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"failed to convert text-based Message content", e);
}
}
-
content_type=application/x-java-serialized-object
,直接进行java反序列化,关键代码为:
else if (contentType != null &&
contentType.equals(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT)) {
try {
content = SerializationUtils.deserialize(
createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
}
catch (IOException | IllegalArgumentException | IllegalStateException e) {
throw new MessageConversionException(
"failed to convert serialized Message content", e);
}
}
我们从RabbitMQ的web页面可以看到每个消息的content_type
:
正因为content_type
的存在,RabbitMQ才能支持消息的多样性。
在发送数据,有两种方式可以指定content_type
:
- 自己构建消息对象
Message
,通过MessageProperties
设置content_type
- 通过MessageConverter来设置
content_type
我们下面的例子都以Department
作为json对象,请注意json对象必须包含无参构造器。
public class Department {
private Integer departmentId ;
private String departmentName ;
private String departmentDesc ;
/**
* 对于json对象,必须要有无参的构造器
*/
public Department() {
}
public Department(Integer departmentId, String departmentName, String departmentDesc) {
this.departmentId = departmentId;
this.departmentName = departmentName;
this.departmentDesc = departmentDesc;
}
@Override
public String toString() {
return "Department{" +
"departmentId=" + departmentId +
", departmentName='" + departmentName + '\'' +
", departmentDesc='" + departmentDesc + '\'' +
'}';
}
//省略getter and setter
}
3. 发送Json方式1:自定义Message
这种方式需要自己把对象转化为json字符串,创建消息的MessageProperties,设置content_type,最后构建Message对象,并发送。
核心代码如下:
@Component
public class JsonSender {
Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送对象消息,使用的json序列化
* Department不需要实现序列号接口,也不需要生产者、消费者是同一个包下的同一个类。只需要属性相同即可。
*/
public void sendTest1(Department department) {
logger.info("---> 我准备发送json对象:"+department);
/**
* 发送json对象,必须为消息指定contentType,否则发送时只会使用默认的消息转换器SimpleMessageConverter
* 而SimpleMessageConverter只支持字符串、byte数字和java Serializable对象
* 不指定就会在发送时报错:IllegalArgumentException:SimpleMessageConverter only supports String, byte[] and Serializable payloads
*/
ObjectMapper objectMapper = new ObjectMapper();
try {
//对象转化为json字符串
String jsonStr = objectMapper.writeValueAsString(department) ;
//配置消息的属性contentType
MessageProperties properties = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
//构建消息对象
Message message = MessageBuilder.withBody(jsonStr.getBytes())
.andProperties(properties)
.build();
//发送
this.rabbitTemplate.convertAndSend(RabbitmqBaseConfig.EX_SIMPLE_DIRECT, RabbitmqJsonConfig.KEY_SIMPLE_JSON, message); ;
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
通过RabbitMQ可以查看消息,其content_type=application/json
:
这种方式的优点是简单明了,可以与发送字符串、java等共用同一个
RabbitTemplate
;缺点则是需要自己把对象转化成字符串、构建消息属性和消息对象,代码比较多。
4. 发送Json方式2:设置MessageConverter
RabbitTemplate
在发送消息对象时,会通过MessageConverter
把对象转化为byte[]
,我们以其中一个convertAndSend
方法作为入口,其源码为:
@Override
public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
convertAndSend(exchange, routingKey, object, (CorrelationData) null);
}
继续跟进
@Override
public void convertAndSend(String exchange, String routingKey, final Object object,
@Nullable CorrelationData correlationData) throws AmqpException {
send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
}
继续跟进convertMessageIfNecessary
方法:
protected Message convertMessageIfNecessary(final Object object) {
if (object instanceof Message) {
return (Message) object;
}
return getRequiredMessageConverter().toMessage(object, new MessageProperties());
}
而getRequiredMessageConverter
方法,则是获取RabbitTemplate
内部的MessageConverter:
private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
MessageConverter converter = getMessageConverter();
if (converter == null) {
throw new AmqpIllegalStateException(
"No 'messageConverter' specified. Check configuration of RabbitTemplate.");
}
return converter;
}
//省略其它代码....
/**
* Return the message converter for this template. Useful for clients that want to take advantage of the converter
* in {@link ChannelCallback} implementations.
*
* @return The message converter.
*/
public MessageConverter getMessageConverter() {
return this.messageConverter;
}
RabbitTemplate
默认的MessageConverter为SimpleMessageConverter
,它不支持json,所以我们需要替换为支持json序列化的消息转换器,而Spring也给我们提供了一个Jackson2JsonMessageConverter
-
Jackson2JsonMessageConverter
引来与jackson的jar包
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.8</version>
</dependency>
- 配置用于发送json对象的
RabbitTemplate
/**
* 定义支持json的RabbitTemplate
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
//重写发送端的消息转换器,来替换默认的SimpleMessageConverter
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
- 发送json对象示例代码如下:
/**
* 发送对象消息,使用的json序列化
*/
public void sendTest2(Department department) {
logger.info("---> 我准备发送json对象:"+department);
/**
* 发送json对象,必须为消息指定contentType,否则发送时只会使用默认的消息转换器SimpleMessageConverter
* 而SimpleMessageConverter只支持字符串、byte数字和java Serializable对象
* 不指定就会在发送时报错:IllegalArgumentException:SimpleMessageConverter only supports String, byte[] and Serializable payloads
*/
//发送
this.jsonRabbitTemplate.convertAndSend(RabbitmqBaseConfig.EX_SIMPLE_DIRECT, RabbitmqJsonConfig.KEY_SIMPLE_JSON, department); ;
}
通过RabbitMQ可以查看消息:
写在最后:
细心的同学可能已经发现,这两种方式发送json的方式,在RabbitMQ内还是有些不同(注意对比两张图片),通过
Jackson2JsonMessageConverter
发送的消息多了一个__TypeId__
header属性,其值为json对象的全限定类名。__TypeId__
的作用主要为消费者进行消息转化时提供依据,下一节我们会继续讲它的作用。