RabbitMQ整合Spring AMQP(三)

消息监听适配器

  • MessageListenerAdapter,即消息监听适配器
  • 前面我们介绍了通过实现ChannelAwareMessageListener接口并通过onMessage方法来接收消息。
  • 除了这种方式我们也可以使用适配器对不同类型的消息进行适配处理。
1.定义消息处理类

委托类可以自己随意定义,但是这里的方法名和参数是固定的:handleMessage

public class MessageDelegate {

    public void handleMessage(byte[] messageBody) {
        System.err.println("默认方法, 消息内容:" + new String(messageBody));
    }
}

可以通过查看MessageListenerAdapter类的源码得知

2.设置消息监听器

这里将消息监听器设置为一个适配器对象,适配器需要一个委托对象。

    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        //省略其他配置....
        
        //设置消息监听器
        /*container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                System.err.println("----------消费者: " + msg);
            }
        });*/
        //适配器,传递的参数为委托者对象
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        container.setMessageListener(adapter);
        return container;
    }
3.运行说明

运行之前编写的测试方法进行消息发送

    @Test
    public void testSendMessage() throws Exception {
        //1 创建消息
        MessageProperties messageProperties = new MessageProperties();
        //设置消息属性
        messageProperties.getHeaders().put("desc", "信息描述..");
        messageProperties.getHeaders().put("type", "自定义消息类型..");
        Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
        
        //发送消息
        //参数:exchange, routingKey, message, messagePostProcessor
        rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                System.err.println("------添加额外的设置---------");
                message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                return message;
            }
        });
    }

控制台打印了如下内容,说明通过适配器同样可以接收到消息

------添加额外的设置---------
默认方法, 消息内容:Hello RabbitMQ
消息监听适配器相关设置
1.修改默认监听方法

设置方法:adapter.setDefaultListenerMethod("consumeMessage");
MessageDelegate类中添加方法

    public void consumeMessage(byte[] messageBody) {
        System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
    }

再次运行测试方法,打印了如下内容,说明方法名修改是生效的。

------添加额外的设置---------
字节数组方法, 消息内容:Hello RabbitMQ
2.修改方法参数类型

修改为String类型:adapter.setMessageConverter(new TextMessageConverter());
自定义转换器,实现MessageConverter接口

public class TextMessageConverter implements MessageConverter {

    //将一个java对象转换为Message
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        System.err.println("toMessage");
        return new Message(object.toString().getBytes(), messageProperties);
    }

    //将Message转换为Java对象
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.err.println("fromMessage");
        //含有"text"的contentType,转换为字符串对象
        String contentType = message.getMessageProperties().getContentType();
        if(null != contentType && contentType.contains("text")) {
            return new String(message.getBody());
        }
        return message.getBody();
    }
}

修改委托类的监听方法参数

    public void consumeMessage(String messageBody) {
        System.err.println("字符串方法, 消息内容:" + messageBody);
    }

添加测试方法,指定消息的contentType属性

    @Test
    public void testSendMessage4Text() throws Exception {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        Message message = new Message("Test SpringAMQP Message".getBytes(), messageProperties);
        rabbitTemplate.send("topic001", "spring.abc", message);
        rabbitTemplate.send("topic002", "rabbit.abc", message);
    }

运行测试方法,打印了如下内容,说明方法参数类型修改是OK的。

fromMessage
字符串方法, 消息内容:Test SpringAMQP Message
fromMessage
字符串方法, 消息内容:Test SpringAMQP Message
3.队列名与方法名匹配

修改MessageListenerAdapter的配置,采用队列与方法名匹配方式,此时只有不匹配的队列消息才会走默认的监听方法。

    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        //省略其他配置....
        /**
         * 2.适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配
         */
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setMessageConverter(new TextMessageConverter());
        Map<String, String> queueOrTagToMethodName = new HashMap<>();
        queueOrTagToMethodName.put("queue001", "method1");
        queueOrTagToMethodName.put("queue002", "method2");
        //设置队列与方法名对应
        adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
        container.setMessageListener(adapter); 
        
        return container;
    }

MessageDelegate类添加消息监听方法

    public void method1(String messageBody) {
        System.err.println("method1 收到消息内容:" + new String(messageBody));
    }
    
    public void method2(String messageBody) {
        System.err.println("method2 收到消息内容:" + new String(messageBody));
    }

运行测试方法:testSendMessage4Text,控制台打印了如下内容

fromMessage
method1 收到消息内容:Test SpringAMQP Message
fromMessage
method2 收到消息内容:Test SpringAMQP Message
MessageListenerAdapter总结
  • MessageListenerAdapter:即消息监听适配器
  • 通过messageListenerAdapter的代码我们可以看出如下核心属性:
    defaultListenerMethod: 默认监听方法名称:用于设置监听方法名称
    Delegate: 实际真实的委托对象,用于处理消息
    queueOrTagToMethodName: 队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接受处理

消息转换器 - MessageConverter

  • 我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter
  • 自定义转换器通常是实现MessageConverter接口,重写下面两个方法:
    toMessage: java对象转换为Message
    fromMessage: Message对象转换为java对象
  • 3.常用转换器
    Jackson2JsonMessageConverter: Json转换器,可以进行java对象的转换功能
    DefaultJackson2JavaTypeMapper: 映射器,可以进行java对象的映射关系
    自定义二进制转换器: 比如图片类型、PDF、 PPT、 流媒体
JSON转换器的使用
创建实体类
@Data
public class Order {
    private String id;
    private String name;
    private String content;
}
@Data
public class Packaged {
    private String packageId;
    private String packageName;
    private String description;
}
配置转换器

在RabbitMQConfig配置类中配置MessageListenerAdapter对应的转换器

    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        //省略其他配置...
        /**
         * 1.1 支持json格式的转换器
         */
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        //使用json转换器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        adapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(adapter);
        return container;
    }
委托类中添加监听方法

在MessageDelegate中添加JSON格式的消息监听方法,对应的参数类型是Map

public class MessageDelegate {
    public void consumeMessage(Map messageBody) {
        System.err.println("map方法, 消息内容:" + messageBody);
    }
}
测试json转换器

编写测试方法,注意设置消息的contentType属性

    @Test
    public void testSendJsonMessage() throws Exception {
        Order order = new Order();
        order.setId("001");
        order.setName("消息订单");
        order.setContent("描述信息");
        //使用jackson进行json序列化
        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(order);
        System.err.println("order 4 json: " + json);
        //发送消息
        MessageProperties messageProperties = new MessageProperties();
        //这里注意一定要修改contentType为 application/json
        messageProperties.setContentType("application/json");
        Message message = new Message(json.getBytes(), messageProperties);
        rabbitTemplate.send("topic001", "spring.order", message);
    }

运行测试方法,控制台打印内容如下

order 4 json: {"id":"001","name":"消息订单","content":"描述信息"}
map方法, 消息内容:{id=001, name=消息订单, content=描述信息}
对象映射器的使用
配置转换器

在RabbitMQConfig配置类中配置MessageListenerAdapter对应的转换器,其实依然是使用JSON转换器,只不过需要另外对JSON转换器设置一个对象映射器,这样接收消息时就能使用对象接收了。
方法:jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);

    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    adapter.setDefaultListenerMethod("consumeMessage");
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    //设置json与java对象的映射器
    DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
    jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
        
    adapter.setMessageConverter(jackson2JsonMessageConverter);
    container.setMessageListener(adapter);
委托类中添加监听方法

在MessageDelegate中添加消息监听方法,对应的参数类型是Java对象

    public void consumeMessage(Order order) {
        System.err.println("order对象, 消息内容, id: " + order.getId() + 
                ", name: " + order.getName() + 
                ", content: "+ order.getContent());
    }
测试映射器

编写测试方法,注意设置消息的java对象类型: messageProperties.getHeaders().put("__TypeId__", "xxx");

    @Test
    public void testSendJavaMessage() throws Exception {
        
        Order order = new Order("002","订单消息","订单描述信息");
        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(order);
        System.err.println("order 4 json: " + json);
        
        MessageProperties messageProperties = new MessageProperties();
        //这里注意一定要修改contentType为 application/json
        messageProperties.setContentType("application/json");
        //设置java类型:key是固定的
        messageProperties.getHeaders().put("__TypeId__", "com.rxy.spring.entity.Order");
        Message message = new Message(json.getBytes(), messageProperties);
        
        rabbitTemplate.send("topic001", "spring.order", message);
    }

运行测试方法,控制台打印内容如下

order 4 json: {"id":"002","name":"订单消息","content":"订单描述信息"}
order对象, 消息内容, id: 002, name: 订单消息, content: 订单描述信息
Java对象多映射转换
配置转换器

方法: javaTypeMapper.setIdClassMapping(idClassMapping);
可以配置多个Java对象的映射关系,从而根据对象标识将json数据转换为不同的Java对象

    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    adapter.setDefaultListenerMethod("consumeMessage");
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
    //配置类标识与java类的映射关系
    Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
    idClassMapping.put("order", com.rxy.spring.entity.Order.class);
    idClassMapping.put("packaged", com.rxy.spring.entity.Packaged.class);
    javaTypeMapper.setIdClassMapping(idClassMapping);
    
    jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
    adapter.setMessageConverter(jackson2JsonMessageConverter);
    container.setMessageListener(adapter);
委托类中添加监听方法

在MessageDelegate中添加消息监听方法,对应的参数类型是Java对象

    public void consumeMessage(Order order) {
        System.err.println("order对象, 消息内容, id: " + order.getId() + 
                ", name: " + order.getName() + 
                ", content: "+ order.getContent());
    }
    
    public void consumeMessage(Packaged pack) {
        System.err.println("package对象, 消息内容, id: " + pack.getPackageId() + 
                ", name: " + pack.getPackageName() + 
                ", content: "+ pack.getDescription());
    }
测试映射器

编写测试方法,注意设置消息的java对象类型,这里只需要设置java对象的标识即可

    @Test
    public void testSendMappingMessage() throws Exception {
        
        ObjectMapper mapper = new ObjectMapper();
        
        Order order = new Order("003","订单消息","订单描述信息");
        String json1 = mapper.writeValueAsString(order);
        System.err.println("order 4 json: " + json1);
        
        MessageProperties messageProperties1 = new MessageProperties();
        //这里注意一定要修改contentType为 application/json
        messageProperties1.setContentType("application/json");
        //只需要设置java对象的标识
        messageProperties1.getHeaders().put("__TypeId__", "order");
        Message message1 = new Message(json1.getBytes(), messageProperties1);
        rabbitTemplate.send("topic001", "spring.order", message1);
        
        Packaged pack = new Packaged("003","包裹消息","包裹描述信息");
        String json2 = mapper.writeValueAsString(pack);
        System.err.println("pack 4 json: " + json2);

        MessageProperties messageProperties2 = new MessageProperties();
        //这里注意一定要修改contentType为 application/json
        messageProperties2.setContentType("application/json");
        messageProperties2.getHeaders().put("__TypeId__", "packaged");
        Message message2 = new Message(json2.getBytes(), messageProperties2);
        
        rabbitTemplate.send("topic001", "spring.pack", message2);
    }

运行测试方法,控制台打印内容如下,事实上只打印了前三行,因为SpringBoot运行完Junit测试后就会自动停止了,而不会等消息处理完成之后再关闭容器,此时可以再运行Application类就打印了第四行信息,也就是完成了第二条消息的处理。

order 4 json: {"id":"003","name":"订单消息","content":"订单描述信息"}
pack 4 json: {"packageId":"003","packageName":"包裹消息","description":"包裹描述信息"}
order对象, 消息内容, id: 003, name: 订单消息, content: 订单描述信息
package对象, 消息内容, id: 003, name: 包裹消息, content: 包裹描述信息
全局转换器

上述介绍的转换器功能都相对单一,如果要处理更多场景下的不同类型消息,可以使用全局转换器。

配置全局转换器
    //全局的转换器,可以添加各种具体的转换器
    ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
    //根据不同的contentType添加相应的消息转换器
    //addDelegate(String contentType, MessageConverter messageConverter)
    TextMessageConverter textConvert = new TextMessageConverter();
    convert.addDelegate("text", textConvert);
    convert.addDelegate("html/text", textConvert);
    convert.addDelegate("xml/text", textConvert);
    convert.addDelegate("text/plain", textConvert);
    
    Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
    convert.addDelegate("json", jsonConvert);
    convert.addDelegate("application/json", jsonConvert);
    
    ImageMessageConverter imageConverter = new ImageMessageConverter();
    convert.addDelegate("image/jpg", imageConverter);
    convert.addDelegate("image", imageConverter);
    
    PDFMessageConverter pdfConverter = new PDFMessageConverter();
    convert.addDelegate("application/pdf", pdfConverter);
    
    adapter.setMessageConverter(convert);
    container.setMessageListener(adapter);
定义相应的转换器

这里以图片转换器为例,最终转换的类型是File

public class ImageMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        throw new MessageConversionException(" convert error ! ");
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.err.println("-----------Image MessageConverter----------");
        
        Object _extName = message.getMessageProperties().getHeaders().get("extName");
        String extName = _extName == null ? "jpg" : _extName.toString();
        
        byte[] body = message.getBody();
        String fileName = UUID.randomUUID().toString();
        String path = "d:/photo/" + fileName + "." + extName;
        File f = new File(path);
        try {
            Files.copy(new ByteArrayInputStream(body), f.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return f;
    }
}
委托类中添加监听方法
    public void consumeMessage(File file) {
        System.err.println("文件对象方法, 消息内容:" + file.getName());
    }
测试

编写测试方法,注意设置消息的contentType

    @Test
    public void testSendExtConverterMessage() throws Exception {
            byte[] body = Files.readAllBytes(Paths.get("C:/Users/ruoxiyuan/Desktop", "picture.jpg"));
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("image/jpg");
            messageProperties.getHeaders().put("extName", "jpg");
            Message message = new Message(body, messageProperties);
            rabbitTemplate.send("", "image_queue", message);
    }

运行测试方法,控制台打印内容如下,其他几种类型转换器也可以自行验证

-----------Image MessageConverter----------
文件对象方法, 消息内容:35f00409-83ce-4bf0-881a-a3bc5d13f85e.jpg
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,908评论 2 11
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,097评论 1 32
  • 1. 简介 1.1 什么是 MyBatis ? MyBatis 是支持定制化 SQL、存储过程以及高级映射的优秀的...
    笨鸟慢飞阅读 5,516评论 0 4
  • 一、基础知识:1、JVM、JRE和JDK的区别:JVM(Java Virtual Machine):java虚拟机...
    杀小贼阅读 2,378评论 0 4
  • ORA-00001: 违反唯一约束条件 (.) 错误说明:当在唯一索引所对应的列上键入重复值时,会触发此异常。 O...
    我想起个好名字阅读 5,308评论 0 9