Spring-Cloud RabbitMQ 用法 - 监听json对象

本篇主要介绍基于spring的@RabbitListener监听json对象的用法


1. 环境

spring-boot 2.1.1.RELEASE
spring-cloud Finchley.RELEASE
rabbitmq 3.7.10
spring-boot-starter-amqp

2. Json在RabbitMQ中的格式

在上一节中,我们使用两种方式来发送json对象,它们在RabbitMQ中的格式为:

  • 发送Json方式1:自定义Message,消息格式为:


    格式1
  • 发送Json方式2:设置MessageConverter,消息格式为:


    格式2
  • 可以看出格式2格式1多了一个header属性:__TypeId__,其值为json对象的全限定类名。
  • 我们通过方式1发送时,由于消息是我们自定义的,所以RabbitTemplate就没有给我们增加此属性(当然我们也可以通过Message.setHeader()方式来手动设置);而通过方式2发送时,Jackson2JsonMessageConverter为我们设置了消息的__TypeId__属性
  • 这两种格式的消息监听还是有差别的,下面我们进行详细说明

2. 消息监听容器 MessageListenerContainer

要使用监听器,必须要先了解MessageListenerContainer,它通过MessageConverter把RabbitMQ中的byte[]转化为其他对象。

  • 默认情况:消息监听容器是SimpleRabbitListenerContainerFactory,其内部的默认消息转换器是SimpleMessageConverter,只支持content_typeapplication/octet-streamtext/plaincontent_type的消息

  • 如果想要处理content_type=application/json的消息,必须要使用Jackson2JsonMessageConverter消息转换器替换默认的消息转换器,代码如下:

    @Bean
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        //设置json序列化 消息转换器
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
  • ListenerContainerFactory只支持一个消息转换器,如果我们替换为Jackson2JsonMessageConverter,就无法支持其他类型的消息(如自定义的消息类型)。如果想要同时支持多种类型的消息,需要使用ContentTypeDelegatingMessageConverter,它可以代理多个消息转换器,通过content_type来决定使用哪个消息转换器。代码如下:
    /**
     * 消费者端配置: 重写默认的监听器工厂
     * 创建一个支持自定义json序列化监听容器工厂
     * @param connectionFactory
     * @return
     */
    @Bean
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        /**
         * 系统使用的默认消息转换器为SimpleMessageConverter,可以处理文本、java序列化等
         * 我们要在默认的基础上,再添加一个处理json的消息转化器。
         * 所以这里我们使用了ContentTypeDelegatingMessageConverter,它可以代理多个消息处理器,每个消息处理器由contentType决定。
         * new ContentTypeDelegatingMessageConverter()就是指定了SimpleMessageConverter作为默认的消息处理器
         */
        ContentTypeDelegatingMessageConverter messageConverter = new ContentTypeDelegatingMessageConverter() ;
        messageConverter.addDelegate(MessageProperties.CONTENT_TYPE_JSON,new Jackson2JsonMessageConverter());

        //设置消息转换器
        factory.setMessageConverter(messageConverter);
        return factory;
    }

按以上配置以后,就可以使用监听器来监听json类型的数据

3. 监听器

spring中使用@RabbitListener来实现消费者监听器,它用法有两种:

  • @RabbitListener标注方法
  • @RabbitListener标注类,@RabbitHandler标注方法

@RabbitListener 标注方法

使用方式:

@Component
public class QueueJsonListenerByRabbitListener {
    Logger logger = LoggerFactory.getLogger(getClass());
    /**
     * 使用 @Payload 和 @Headers 注解可以获取消息中的 body 与 headers 信息
     * @param headers
     */
    @RabbitListener(queues = RabbitmqJsonConfig.QUEUE_SIMPLE_JSON)
    public void process(@Payload Department department, @Headers Map<String,Object> headers) {
        logger.info("<--- json 我收到的消息:"+department+", header="+headers);
    }
}
  • queues为要监听的队列名称
  • @Payload为消息对象本身
  • @Headers为消息对象的头部,也可以通过@Header获取单个头部属性:@Header String token
  • 这种方式不仅能监听上面格式1的json消息,还能监听格式2的json消息。
  • 这种方式之所有能监听格式1的json消息,是因为@RabbitListener注解到方法上,可以获取方法参数对象Department的类型。
  • 对于这两种格式的消息,在发送端,发送的都是Department对象;而在消费者端,我们可以把消息转化为任何对象(如Department2),只需要保证对象的属性与Department的属性一致即可。(这种方式对于发送端和消费端不在同一个系统的应用来说特别有用)

@RabbitListener 标注类,@RabbitHandler 标注方法

使用方式:

@Component
@RabbitListener(queues = RabbitmqJsonConfig.QUEUE_SIMPLE_JSON)
public class QueueJsonListenerByRabbitHandler {
    Logger logger = LoggerFactory.getLogger(getClass());
    /**
     * 使用 @Payload 和 @Headers 注解可以获取消息中的 body 与 headers 信息
     * @param headers
     */
    @RabbitHandler
    public void process(@Payload Department department, @Headers Map<String,Object> headers) {
        logger.info("<--- json 我收到的消息:"+department+", header="+headers);
    }
}
  • 这种方式只能监听含有__TypeId__头属性的json消息,即格式2这种类型的json消息。
  • 这种方式@RabbitListener无法获取到json对象Department的类型,所以如果没有__TypeId__明确指定json对象类型的话,系统就无法转化格式1这种类型的json消息。

参考

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容