RabbitMQ笔记十三:使用@RabbitListener注解消费消息

之前的博客中我们可以在spring容器中构建SimpleMessageListenerContainer来消费消息,我们也可以使用@RabbitListener来消费消息。

@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。使用@RabbitListener可以设置一个自己明确默认值的RabbitListenerContainerFactory对象。
可以在配置文件中设置RabbitListenerAnnotationBeanPostProcessor并通过<rabbit:annotation-driven/>来设置@RabbitListener的执行,当然也可以通过@EnableRabbit注解来启用@RabbitListener

示列

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConsumerConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

}

定义消息处理器,@RabbitListener注解标记的方法

@Component
public class MessageHandler {

    @RabbitListener(queues = "zhihao.miao.order")
    public void handleMessage(byte[] message){
        System.out.println("消费消息");
        System.out.println(new String(message));
    }
}

应用启动类,@EnableRabbit启用@RabbitListener

@EnableRabbit
@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        System.out.println("rabbit service startup");
        TimeUnit.SECONDS.sleep(60);
        context.close();
    }
}

测试:


发送消息

控制台打印:

消费消息
你的订单已经生成。

如果发送的消息content_type的属性是text,那么接收的消息处理方法的参数就必须是String类型,如果是byte[]类型就会报错。

指定content_type类型为text

控制台报错

image.png
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageHandler {

    //此时如果去掉content_type为text,那么会将消息转换成其每个字符的int类型
    //@RabbitListener(queues = "zhihao.miao.order")
    public void handleMessage(String message){
        System.out.println("消费消息");
        System.out.println(new String(message));
    }

    //此时不管属性中有没有content_type属性都能接收到数据
    @RabbitListener(queues = "zhihao.miao.order")
    public void handleMessage(Message message){
        System.out.println("====消费消息===handleMessage(message)");
        System.out.println(message.getMessageProperties());
        System.out.println(new String(message.getBody()));
    }
}

总结
如果消息属性中没有指定content_type,则接收消息的处理方法接收类型是byte[],如果消息属性中指定content_type为text,则接收消息的处理方法的参数类型是String类型。不管有没有指定content_type,处理消息方法的参数类型是Message都不会报错。

步骤

  • 在启动入口增加@EnableRabbit注解
  • 在spring容器中托管一个RabbitListenerContainerFactory的bean(默认的实现是org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory)
  • 写一个消息处理类托管到spring容器中,然后在具体的消息处理方法上增加@RabbitListener注解

具体的消息处理方法的参数是跟MessageConverter转换后的java对象有关。
如果想要设置MessageConverter,则需要在RabbitListenerContainerFactory的实例中去设置,(setMessageConverter方法)

使用@Payload和@Headers注解

@Component
public class MessageHandler {

    //获取消息的头属性和body属性
    @RabbitListener(queues = "zhihao.miao.order")
    public void handleMessage(@Payload String body, @Headers Map<String,Object> headers){
        System.out.println("====消费消息===handleMessage");
        System.out.println(headers);
        System.out.println(body);
    }
}

获取单一个Header的属性,Header还有其他的一些属性,比如requireddefaultvalue等属性,顾名思义:

@Component
public class MessageHandler {

    //获取特定的消息
    @RabbitListener(queues = "zhihao.miao.order")
    public void handleMessage(@Payload String body,@Header String token){
        System.out.println("====消费消息===handleMessage");
        System.out.println(token);
        System.out.println(body);
    }
}
测试

指定向多个队列中发送消息

@Component
public class MessageHandler {
    @RabbitListener(queues ={"zhihao.miao.order","zhihao.info"})
    public void handleMessage(Message message){
        System.out.println("====消费消息"+message.getMessageProperties().getConsumerQueue()+"===handleMessage");
        System.out.println(message.getMessageProperties());
        System.out.println(new String(message.getBody()));
    }
}

通过配置文件发送消息

@Component
public class MessageHandler {

    //通过配置文件发送消息
    @RabbitListener(queues ={"${zhihao.queue1}","${zhihao.queue2}"})
    public void handleMessage(Message message){
        System.out.println("====消费消息"+message.getMessageProperties().getConsumerQueue()+"===handleMessage");
        System.out.println(message.getMessageProperties());
        System.out.println(new String(message.getBody()));
    }

}

配置文件:

zhihao.queue1=zhihao.miao.order
zhihao.queue2=zhihao.info

启动类:

@EnableRabbit
@ComponentScan
@PropertySource(value = "classpath:mq.properties")
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        System.out.println("rabbit service startup");
        TimeUnit.SECONDS.sleep(200);
        context.close();
    }
}

@RabbitListener注解进行声明binding

定义mq中不存在的Queueexchangeroute key

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageHandler {

    //支持自动声明绑定,声明之后自动监听队列的队列,此时@RabbitListener注解的queue和bindings不能同时指定,否则报错
    @RabbitListener(bindings ={@QueueBinding(value = @Queue(value = "q5",durable = "true"),
            exchange =@Exchange(value = "zhihao.miao.exchange",durable = "true"),key = "welcome")})
    public void handleMessage(Message message){
        System.out.println("====消费消息"+message.getMessageProperties().getConsumerQueue()+"===handleMessage");
        System.out.println(message.getMessageProperties());
        System.out.println(new String(message.getBody()));
    }


}

从上面的我们知道声明必须容器中要有RabbitAdminRabbitTemplate实例

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConsumerConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

}

应用启动类

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@EnableRabbit
@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        System.out.println("rabbit service startup");
        TimeUnit.SECONDS.sleep(200);
        context.close();
    }
}

测试验证

自动声明成功
消息发送

控制台打印:

rabbit service startup
====消费消息q5===handleMessage
MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=zhihao.miao.exchange, receivedRoutingKey=welcome, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-RBqtzCiTxMg6knwQJX7B3A, consumerQueue=q5]
hello,自动注册成了吗

说明自动声明的绑定中的队列被自动默认监听。@RabbitListener注解中的bindingsqueues参数不能同时指定,否则会报错。

@RabbitListener和@RabbitHandler搭配使用

@RabbitListener可以标注在类上面,当使用在类上面的时候,需要配合@RabbitHandler注解一起使用,@RabbitListener标注在类上面表示当有收到消息的时候,就交给带有@RabbitHandler的方法处理,具体找哪个方法处理,需要跟进MessageConverter转换后的java对象。
配置:

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConsumerConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }


}

处理器方法

@Component
@RabbitListener(queues ="zhihao.miao.order")
public class MessageHandler {


    @RabbitHandler
    public void handleMessage(byte[] message){
        System.out.println("====消费消息handleMessage");
        System.out.println(new String(message));
    }

    @RabbitHandler
    public void handleMessage2(String message){
        System.out.println("====消费消息===handleMessage2");
        System.out.println(message);
    }
}

应用启动类:

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@EnableRabbit
@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        System.out.println("rabbit service startup");
        TimeUnit.SECONDS.sleep(3000);
        context.close();
    }
}

发送不包含content_type属性的消息和content_type属性为text的消息,控制台打印:

rabbit service startup
====消费消息handleMessage
订单已经生成,请到订单-详情页面确认。
====消费消息===handleMessage2
订单已经生成,请到订单-详情页面确认。

@RabbitListener注解的containerFactory属性

@RabbitListener注解的containerFactory属性可以指定一个RabbitListenerContainerFactory的bean,默认是找名字为rabbitListenerContainerFactory的实例。

当我们将ConsumerConfig类中的RabbitListenerContainerFactory实例的对象名改掉的时候,发现就会报错。

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConsumerConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean("rabbitListenerContainerFactory2")
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }
}

此时控制台上报错,

报错信息

此时如果配置一下@RabbitListener注解的containerFactory属性便不会报错。

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues ="zhihao.miao.order",containerFactory = "rabbitListenerContainerFactory2")
public class MessageHandler {

    @RabbitHandler
    public void handleMessage(byte[] message){
        System.out.println("====消费消息handleMessage");
        System.out.println(new String(message));
    }

    @RabbitHandler
    public void handleMessage2(String message){
        System.out.println("====消费消息===handleMessage2");
        System.out.println(message);
    }
}

我们再去改造一下在RabbitListenerContainerFactory实例中定义消息类型转换器

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConsumerConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean("rabbitListenerContainerFactory2")
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new MessageConverter() {
            @Override
            public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
                return null;
            }

            @Override
            public Object fromMessage(Message message) throws MessageConversionException {
                return new User(1,new String(message.getBody()));
            }
        });
        return factory;
    }
}

User对象:

public class User {

    private int age;

    private String name;
    
   ...get set

    public User(int age,String name){
        this.age = age;
        this.name = name;
    }

    @Override
    public String toString() {
        return "User{" +
                "age=" + age +
                ", name='" + name + '\'' +
                '}';
    }
}

在处理器中增加参数是User的方法:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues ="zhihao.miao.order",containerFactory = "rabbitListenerContainerFactory2")
public class MessageHandler {

    @RabbitHandler
    public void handleMessage3(User user){
        System.out.println("====消费消息===handleMessage3");
        System.out.println(user);
    }


}
发送消息测试
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容