RabbitMQ笔记十:MessageConverter详解

MessageConverter详解

org.springframework.amqp.support.converter.MessageConverter

MessageConverter

Message toMessage(Object object, MessageProperties messageProperties);
将java对象和属性对象转换成Message对象。

Object fromMessage(Message message) throws MessageConversionException;
将消息对象转换成java对象。

Demo

定义Config类

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("zhihao.miao.order");

        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());
        //指定消息转换器
        adapter.setMessageConverter(new TestMessageConverter());
        //设置处理器的消费消息的默认方法
        adapter.setDefaultListenerMethod("onMessage");
        container.setMessageListener(adapter);

        return container;
    }
}

MessageListenerAdapter中定义的消息转换器,消费端接收的消息就从Message类型转换成了String类型

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

public class TestMessageConverter implements MessageConverter {


    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        System.out.println("=======toMessage=========");
        return new Message(object.toString().getBytes(),messageProperties);
    }

    //消息类型转换器中fromMessage方法返回的类型就是消费端处理器接收的类型
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.out.println("=======fromMessage=========");
        return new String(message.getBody());
    }
}

消费者处理消息的Handler

public class MessageHandler {


    public void onMessage(String message){
        System.out.println("---------onMessage-------------");
        System.out.println(message);
    }
}

启动类

import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

/**
 * MessageConverter可以把java对象转换成Message对象,也可以把Message对象转换成java对象
 *
 * MessageListenerAdapter内部通过MessageConverter把Message转换成java对象,然后找到相应的处理方法,参数为转换成的java对象
 */
@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        System.out.println("===start up======");
        TimeUnit.SECONDS.sleep(30);
        context.close();
    }
}

启动应用类,发送消息到zhihao.miao.order队列,控制台打印:

===start up======
=======fromMessage=========
---------onMessage-------------
String类型的消息

从控制台打印我们知道了在消费者处理消息之前会进行消息类型转换,调用TestMessageConverterfromMessage方法,然后执行消息处理器的onMessage方法,方法参数就是String类型。

扩展

自定义一个MyBody类型,将消息从Message转换成MyBody类型

public class MyBody {

    private byte[] bodys;

    public MyBody(byte[] bodys){
        this.bodys = bodys;
    }

    @Override
    public String toString() {
        return new String(bodys);
    }
}

然后修改TestMessageConverterfromMessage方法,返回了MyBody类型,那么消息处理器的消费方法也是MyBody参数的消费方法

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

public class TestMessageConverter implements MessageConverter {


    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        System.out.println("=======toMessage=========");
        return new Message(object.toString().getBytes(),messageProperties);
    }

    //消息类型转换器中fromMessage方法返回的类型就是消费端处理器接收的类型
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.out.println("=======fromMessage=========");
        return new MyBody(message.getBody());
    }
}

此时的消息处理器,处理器中的方法的入参就是MyBody类型了,

public class MessageHandler {

    public void onMessage(MyBody message){
        System.out.println("---------onMessage---MyBody-------------");
        System.out.println(message);
    }
}

此时控制台打印:

===start up======
=======fromMessage=========
---------onMessage---MyBody-------------
Mybody类型的消息

小结

我们还测试如下如果不使用自定义的Converter,那么当消息的属性中含有属性content_type的值为text,那么默认的转换成的java类型就是String类型,如果不指定那么默认的转换类型就是byte[]

源码分析

我们跟进去MessageListenerAdapter的setMessageConverter方法,

/**
 * Set the converter that will convert incoming Rabbit messages to listener method arguments, and objects returned
 * from listener methods back to Rabbit messages.
 * <p>
 * The default converter is a {@link SimpleMessageConverter}, which is able to handle "text" content-types.
 * @param messageConverter The message converter.
 */
public void setMessageConverter(MessageConverter messageConverter) {
    this.messageConverter = messageConverter;
}
private MessageConverter messageConverter = new SimpleMessageConverter();

我们发现默认的MessageConverterSimpleMessageConverter,我们进入SimpleMessageConverter类中看其默认的转换逻辑

@Override
public Object fromMessage(Message message) throws MessageConversionException {
    Object content = null;
    MessageProperties properties = message.getMessageProperties();
    if (properties != null) {
        String contentType = properties.getContentType();
        //contentType属性值是以text开头,那么就将Message类型转换成String类型
        if (contentType != null && contentType.startsWith("text")) {
            String encoding = properties.getContentEncoding();
            if (encoding == null) {
                encoding = this.defaultCharset;
            }
            try {
                content = new String(message.getBody(), encoding);
            }
            catch (UnsupportedEncodingException e) {
                throw new MessageConversionException(
                        "failed to convert text-based Message content", e);
            }
        }
        //如果content_type的值是application/x-java-serialized-object则把消息序列化为java对象
        else if (contentType != null &&
                contentType.equals(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT)) {
            try {
                content = SerializationUtils.deserialize(
                        createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
            }
            catch (IOException e) {
                throw new MessageConversionException(
                        "failed to convert serialized Message content", e);
            }
            catch (IllegalArgumentException e) {
                throw new MessageConversionException(
                        "failed to convert serialized Message content", e);
            }
            catch (IllegalStateException e) {
                throw new MessageConversionException(
                        "failed to convert serialized Message content", e);
            }
        }
    }
    if (content == null) {
       //都没有符合,则转换成字节数组
        content = message.getBody();
    }
    return content;
}

源码分析总结:
1.MessageConverter可以把java对象转换成Message对象,也可以把Message对象转换成java对象
2.MessageListenerAdapter内部通过MessageConverterMessage转换成java对象,然后找到相应的处理方法,参数为转换成的java对象。
3.SimpleMessageConverter处理逻辑:
如果content_type是以text开头,则把消息转换成String类型
如果content_type的值是application/x-java-serialized-object则把消息序列化为java对象,否则,把消息转换成字节数组。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,399评论 19 139
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,930评论 18 399
  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 13,864评论 6 13
  • 文/洛小简 题记1——当等待变为无奈,爱情慢慢不再,沦为友情或亲情,感觉没了,放手会变得更洒脱。题记2——别让爱你...
    洛小简阅读 1,722评论 0 1
  • 对于我这种懒人,看本书都觉得奢侈,可有一本书让我有兴趣从娱乐中,抽出时间来看。虽说这本书是与经济学有关系,但也...
    一刹那烟花阅读 2,976评论 0 0