MessageConverter详解
org.springframework.amqp.support.converter.MessageConverter

Message toMessage(Object object, MessageProperties messageProperties);
将java对象和属性对象转换成Message对象。
Object fromMessage(Message message) throws MessageConversionException;
将消息对象转换成java对象。
Demo
定义Config类
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.miao.order");
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());
//指定消息转换器
adapter.setMessageConverter(new TestMessageConverter());
//设置处理器的消费消息的默认方法
adapter.setDefaultListenerMethod("onMessage");
container.setMessageListener(adapter);
return container;
}
}
MessageListenerAdapter中定义的消息转换器,消费端接收的消息就从Message类型转换成了String类型
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class TestMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
System.out.println("=======toMessage=========");
return new Message(object.toString().getBytes(),messageProperties);
}
//消息类型转换器中fromMessage方法返回的类型就是消费端处理器接收的类型
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.out.println("=======fromMessage=========");
return new String(message.getBody());
}
}
消费者处理消息的Handler
public class MessageHandler {
public void onMessage(String message){
System.out.println("---------onMessage-------------");
System.out.println(message);
}
}
启动类
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
/**
* MessageConverter可以把java对象转换成Message对象,也可以把Message对象转换成java对象
*
* MessageListenerAdapter内部通过MessageConverter把Message转换成java对象,然后找到相应的处理方法,参数为转换成的java对象
*/
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
System.out.println("===start up======");
TimeUnit.SECONDS.sleep(30);
context.close();
}
}
启动应用类,发送消息到zhihao.miao.order队列,控制台打印:
===start up======
=======fromMessage=========
---------onMessage-------------
String类型的消息
从控制台打印我们知道了在消费者处理消息之前会进行消息类型转换,调用TestMessageConverter的fromMessage方法,然后执行消息处理器的onMessage方法,方法参数就是String类型。
扩展
自定义一个MyBody类型,将消息从Message转换成MyBody类型
public class MyBody {
private byte[] bodys;
public MyBody(byte[] bodys){
this.bodys = bodys;
}
@Override
public String toString() {
return new String(bodys);
}
}
然后修改TestMessageConverter的fromMessage方法,返回了MyBody类型,那么消息处理器的消费方法也是MyBody参数的消费方法
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class TestMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
System.out.println("=======toMessage=========");
return new Message(object.toString().getBytes(),messageProperties);
}
//消息类型转换器中fromMessage方法返回的类型就是消费端处理器接收的类型
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.out.println("=======fromMessage=========");
return new MyBody(message.getBody());
}
}
此时的消息处理器,处理器中的方法的入参就是MyBody类型了,
public class MessageHandler {
public void onMessage(MyBody message){
System.out.println("---------onMessage---MyBody-------------");
System.out.println(message);
}
}
此时控制台打印:
===start up======
=======fromMessage=========
---------onMessage---MyBody-------------
Mybody类型的消息
小结
我们还测试如下如果不使用自定义的Converter,那么当消息的属性中含有属性content_type的值为text,那么默认的转换成的java类型就是String类型,如果不指定那么默认的转换类型就是byte[]
源码分析
我们跟进去MessageListenerAdapter的setMessageConverter方法,
/**
* Set the converter that will convert incoming Rabbit messages to listener method arguments, and objects returned
* from listener methods back to Rabbit messages.
* <p>
* The default converter is a {@link SimpleMessageConverter}, which is able to handle "text" content-types.
* @param messageConverter The message converter.
*/
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
private MessageConverter messageConverter = new SimpleMessageConverter();
我们发现默认的MessageConverter是SimpleMessageConverter,我们进入SimpleMessageConverter类中看其默认的转换逻辑
@Override
public Object fromMessage(Message message) throws MessageConversionException {
Object content = null;
MessageProperties properties = message.getMessageProperties();
if (properties != null) {
String contentType = properties.getContentType();
//contentType属性值是以text开头,那么就将Message类型转换成String类型
if (contentType != null && contentType.startsWith("text")) {
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = this.defaultCharset;
}
try {
content = new String(message.getBody(), encoding);
}
catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"failed to convert text-based Message content", e);
}
}
//如果content_type的值是application/x-java-serialized-object则把消息序列化为java对象
else if (contentType != null &&
contentType.equals(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT)) {
try {
content = SerializationUtils.deserialize(
createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
}
catch (IOException e) {
throw new MessageConversionException(
"failed to convert serialized Message content", e);
}
catch (IllegalArgumentException e) {
throw new MessageConversionException(
"failed to convert serialized Message content", e);
}
catch (IllegalStateException e) {
throw new MessageConversionException(
"failed to convert serialized Message content", e);
}
}
}
if (content == null) {
//都没有符合,则转换成字节数组
content = message.getBody();
}
return content;
}
源码分析总结:
1.MessageConverter可以把java对象转换成Message对象,也可以把Message对象转换成java对象
2.MessageListenerAdapter内部通过MessageConverter把Message转换成java对象,然后找到相应的处理方法,参数为转换成的java对象。
3.SimpleMessageConverter处理逻辑:
如果content_type是以text开头,则把消息转换成String类型
如果content_type的值是application/x-java-serialized-object则把消息序列化为java对象,否则,把消息转换成字节数组。