上一节我们学习rabbitmq的推拉模式的理论,那这一节我们学习一下消费者获取消息的代码以及整合SpringBoot的配置吧。
消费者有两种接收消息的方法:
- poll consumer,即拉模式,消费者主动去消息队列拉取消息。
- push consumer,即推模式,消息队列主动往消费者推送消息。
一. 消费者通过推(PUSH)方式获取消息
实现push模式最简单的方式就是使用
@EnableRabbit+@RabbitListener
注解来指定某方法作为消息消费的方法。例如监听某个Queue
的方法。
1. 配置RabbitListenerContainerFactory
这个bean只会在consumer
端通过@RabbitListener
注解的方式接收消息的时候使用。每个@RabbitListener
注解方法都会由RabbitListenerContainerFactory
创建一个MessageListenerContainer
,负责接收消息。
@Bean( name = "singleListenerContainer" )
public SimpleRabbitListenerContainerFactory listenerContainerFactory(CachingConnectionFactory connectionFactory)
{
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
/* setConnectionFactory:设置spring-amqp的ConnectionFactory。 */
factory.setConnectionFactory( connectionFactory);
/* 消息序列化类型 */
factory.setMessageConverter( new Jackson2JsonMessageConverter() );
/* setConcurrentConsumers:设置每个MessageListenerContainer将会创建的Consumer的最小数量,默认是1个。 */
factory.setConcurrentConsumers( 1 );
factory.setMaxConcurrentConsumers( 1 );
/* setPrefetchCount:设置每次请求发送给每个Consumer的消息数量。 */
factory.setPrefetchCount( 1 );
/* 是否设置Channel的事务。 */
factory.setChannelTransacted( false );
/* setTxSize:设置事务当中可以处理的消息数量。 */
factory.setTxSize( 1 );
/* 设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。 */
factory.setDefaultRequeueRejected( true );
/*
* setErrorHandler:实现ErrorHandler接口设置进去,所有未catch的异常都会由ErrorHandler处理。
* factory.setErrorHandler();
*/
factory.setAcknowledgeMode( AcknowledgeMode.AUTO );
return(factory);
}
-
factory.setConnectionFactory(connectionFactory());
设置spring-amqp
的connectionFactory
。 -
factory.setMessageConverter(new Jackson2JsonMessageConverter());
对于消费者,序列化方式也可以在这里配置。 -
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
设置consumer
端的应答模式。
public enum AcknowledgeMode {
NONE, //无应答。
MANUAL,
AUTO;
}
- NONE:无应答,rabbitmq默认consumer正确处理所有请求。
-
AUTO:consumer自动应答,处理成功(注意:此处的成功确认是没有发生异常)发出ack,处理失败发出nack。rabbitmq发出消息后会等待consumer端应答,只有收到
ack
确定信息后才会将消息在rabbitmq清除掉。收到nack
异常信息的处理方法由setDefaultRequeueReject()
方法设置,这种模式下,发送错误的消息可以恢复。 - MANUAL:基本等同于AUTO模式,区别是需要人为调用方法确认。
注意:没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。
-
factory.setConcurrentConsumers(1);
每个MessageListenerContainer将会创建的Consumer的最小数量,默认是1个。 -
factory.setMaxConcurrentConsumers(1);
设置每个MessageListenerContainer将会创建的Consumer的最大数量,默认等于最小数量。 -
factory.setPrefetchCount(1);
设置每次请求发送给每个Consumer的消息数量。 -
factory.setChannelTransacted(false);
设置Channel的事务。 -
factory.setTxSize(1);
设置事务当中可以处理的消息数量。 -
factory.setDefaultRequeueRejected(true);
设置rabbitmq收到nack/reject
消息时的处理方式,true
,重新放回到queue头部,设置为false
丢弃。 -
factory.setErrorHandler();
实现ErrorHandler接口设置进去,所有未catch的异常都会由ErrorHandler处理。
2. 使用@RabbitListener注解
2.1 配置@EnableRabbit
@EnableRabbit //开启rabbitmq的listener监听
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
2.2 配置@RabbitListener
上面配置了singleListenerContainer
信息,将其加入到containerFactory
中。
@RabbitListener(queues = "queue_direct",
containerFactory = "singleListenerContainer")
public void receive02(Message message, long deliveryTag, Channel channel) {
//获取头信息
int i = 1 / 0;
System.out.println(message.getMessageProperties().getHeaders());
}
2.3 启动项目,查看结果
我们可以看到,因为我们配置的是发生错误后,重回队列,并且是“自动确认”模式,那么程序已经为死循环了。
3. consumer配置进阶学习
3.1 @RabbitListener注解的属性
3.1.1 queues = "queue_direct"
声明绑定的队列,但队列必须存在。
3.1.2. containerFactory = "singleListenerContainer"
声明要使用的容器工厂,一般我们在configuration
类中配置。
3.1.3. bindings属性
@RabbitListener注解中指明binding
信息,就能自动创建queue
、exchange
并建立binding
关系。
(1)在2.0版本之后,可以指定多个routingkey
即key={"ord","con"}
。
(2)exchange属性中,可以使用type = ExchangeTypes.DIRECT
指定不同类型的交换机。
(3)arguments属性,可以用于指定headers类型的exchange。arguments = @Argument(name = "x-message-ttl", value = "10000", type= "java.lang.Integer")),
(4)queue属性中exclusive
,排他队列,只对创建这个queue
的Connection
可见,Connection
关闭,那么这个queue
删除。
(5)queue属性中的autoDelete
,若是这个consumer
下线,那么这个queue
队列将会删除。
bindings注意事项:
1. 对于(4)(5)这两种情况,durable=true
队列持久化是不起作用的。
2. 注意不能和queues
属性同时使用。
3. 特别注意:如果注解声明的queue
和xchange
以及binging
关系都存在的情况下,但是我们在bindings
属性中又进行配置,那么bindings
新增或者修改的参数都不会生效。但是queue
存在,exchange
存在但是没有binding
,那么应用程序启动后,会自动创建binding
关系。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_direct", durable = "true"),
arguments = {}, //可用于headers类型的exchange
exchange = @Exchange(value = "exchange_direct" ,type = ExchangeTypes.DIRECT), //声明交换机的类型
key = "ord" //声明路由主键
), containerFactory = "singleListenerContainer")
public void rec(Message message, long deliveryTag, Channel channel) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
System.out.println("消息体--------->" + message.getBody());
// //foreach遍历循环
for (Map.Entry<String, Object> entry : headers.entrySet()) {
System.out.println("消息头:" + entry.getKey() + "---" + entry.getValue());
}
}
3.2 @Payload以及@Headers
这两个注解可以获取信息体和信息头。
@RabbitListener(queues = "queue_direct", containerFactory = "singleListenerContainer")
public void handleMessage(@Payload Book body, @Headers Map<String, Object> headers) {
System.out.println("-->信息域的值"+body);
for (Map.Entry<String, Object> entry : headers.entrySet()) {
System.out.println("消息头:" + entry.getKey() + "---" + entry.getValue());
}
}
3.3 @RabbitListener以及 @RabbitHandler
@RabbitListener可以标注在类上,需要配合@RabbitHandler注解一起使用。当标注在类方法上时表示收到消息后,就转交给@RabbitHander的方法处理。但是具体那个方法,要看MessageConverter
转换后的参数。
@Service
@RabbitListener(queues = "queue_direct", containerFactory = "singleListenerContainer")
public class BookService {
// @RabbitListener(queues = "queue_direct", containerFactory = "singleListenerContainer")
@RabbitHandler
public void handleBook(@Payload Book body) {
System.out.println("-->信息域的值" + body);
}
@RabbitHandler
public void handleStr(@Payload HashMap<String, Object> body) {
System.out.println("-->信息域2的值" + body);
}
}
3.4. 序列化方式MessageConverter
只要在RabbitTemplate
中配置了MessageConverter
在发送和接收消息时候就能自动完成Message和自定义java对象类的自动转换。
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,MessageConverter messageConverter) {
//客户端开启confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setMessageConverter(messageConverter);
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
}
});
rabbitTemplate.setReturnCallback(new ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
}
});
return rabbitTemplate;
}
我们可以在RabbitTemplate源码中看到:
private volatile MessageConverter messageConverter = new SimpleMessageConverter();
默认采用的是SimpleMessageConverter
他就直接将java对象序列化。但是并不推荐直接使用,因为会只限于java平台。
推荐使用JsonMessageConverter
、Jackson2JsonMessageConverter
,这两个是都将java对象转化为json再转为byte[]来构造Message对象,前一个用的是jackson json lib,后一个用的是jackson 2 json lib。
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
3.5. consumer端的异常处理
有两个error handler类可以对@RabbitListener注解方法中抛出的异常进行异常处理。
3.5.1 RabbitListenerErrorHandler接口
配置类代码:
@Bean
public RabbitListenerErrorHandler rabbitListenerErrorHandler(){
return new RabbitListenerErrorHandler() {
@Override
public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception) throws Exception {
System.out.println("-------------------------------------"+message);
throw exception;
}
};
}
监听类代码:
@RabbitListener(queues = "queue_direct", containerFactory = "singleListenerContainer",errorHandler = "rabbitListenerErrorHandler")
public void handleBook(@Payload Book body) throws Exception {
int i = 1 / 0;
System.out.println("-->信息域的值" + body);
}
效果图:
3.5.2 ErrorHandler接口
这一个值是设置在RabbitListenerContainerFactory
连接工厂的。
生产者生产了Book类型的消息:
@Test
public void contextLoads() {
Book book = new Book("西游记", "120.00");
rabbitTemplate.convertAndSend("exchange_direct", "ord", book);
}
配置类配置了ErrorHandler处理:
注意消息到达ErrorHandler则意味着处理失败,不需要在抛出异常。并且这个含有ConditionalRejectingErrorHandler
默认配置,可以识别特定的不可挽回的异常拒绝requeue
队列,防止消息处理的死循环。
factory.setErrorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable throwable) {
System.out.println("------------------------->丢弃消息啦"+throwable);
//
}
消费者只接受String类型的消息:
@RabbitListener(queues = "queue_direct",
containerFactory = "singleListenerContainer",
errorHandler = "rabbitListenerErrorHandler")
public void handleBook(String body) throws Exception {
System.out.println("----------------->信息域的值" + body);
}
执行结果:
在发送异常后并未重新放入队列,而是直接丢弃消息。
注意事项:
-
@RabbitListener
和@RabbitHandler
组合使用时,RabbitListenerErrorHandler
配置无效。 -
@RabbitListenerErrorHandler
作用域只是配置@RabbitListener
注解上的,这个注解只对当前方法发生异常时有效。而ErrorHandler
对所有@RabbitListener
注解方法有效。 -
@RabbitListener
注解的方法中抛出的异常,首先会进入RabbitListenerErrorHandler
,这里如果没有能力处理这个异常,需要将其重新抛出(否则不会进入rrorHandler
),然后异常将会进入ErrorHandler
,一旦异常进入ErrorHandler
就意味着消息消费失败了(所以不需要重新抛出异常)。 -
RabbitListenerErrorHandler
没有默认配置,而ErrorHandler
有一个默认的ConditionalRejectingErrorHandler
类,他的作用打印日志,辨别特定的异常。将其包装成AmqpRejectAndDontRequeueException
抛出,这个异常的作用是,忽略defaultRequeueRejected
(前文已经讲过)的设置,强制让rabbitmq
丢弃此条处理失败消息,不放回queue
。
需要丢弃的异常:
o.s.amqp...MessageConversionException
o.s.messaging...MessageConversionException
o.s.messaging...MethodArgumentNotValidException
o.s.messaging...MethodArgumentTypeMismatchException
java.lang.NoSuchMethodException
java.lang.ClassCastException
3.5.3 设置死信队列
为了避免消息异常造成的死循环,也可以将requeue
(上文配置参数)设置为false
。消息被拒绝(basic.reject/ basic.nack)并且requeue=false时,消息会进入死信队列。于是我们可以监听死信队列来处理异常消息。
消息进入死信队列的途径:
- 消息被拒绝(basic.reject/ basic.nack)并且requeue=false。
- 消息TTL过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间))。
- 队列达到最大长度。
小结:
-
自动确认模式下,可以使用
3.5.1
和3.5.2
方式的异常处理机制即可。 -
手动确认模式下,推荐是使用死信队列的方式,即
3.5.3
处理。 - 需要注意
3.5.1
和3.5.2
在手动确认模式下,若是异常未被捕获,也是可以生效的。
手动确定模式:
@RabbitListener(queues = "queue_direct", containerFactory = "singleListenerContainer", errorHandler = "rabbitListenerErrorHandler")
public void handleBook(Message message, Book book, Channel channel) throws Exception {
try {
int i = 1 / 0;
} catch (Exception e) {
//告诉MQ删除这一条消息,若是true,则是删除所有小于tags的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("book------>" + book);
}
System.out.println("----------------->信息域的值" + book);
}
3.6 消费者端去重
我们在生产者保证消息不丢失 中,可以知道,为了保证消息不丢失,生产者会将ack=false
的消息重新发送,那么可能会导致消费端的消息重复,那怎么去重?
可以为每一条消息设置一个messageId
,用于消费者端的去重。
生产者代码:
@Autowired
private MessageConverter messageConverter;
@Test
public void contextLoads() {
Map<String, Object> map = new HashMap<>();
Book book = new Book("西游记", "120.00");
//使用继承扩展的CorrelationData
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //消息流水号
correlationData.setMessage(book);
correlationData.setExchange("exchange_direct_no");
correlationData.setRoutingKey("ord");
///////////////////关键代码
MessageProperties messageProperties = new MessageProperties();
//设置messageId
messageProperties.setMessageId("123456");
Message message = messageConverter.toMessage(book, messageProperties);
////////////////////
try {
rabbitTemplate.convertAndSend("exchange_direct", "ord", message, correlationData);
} catch (AmqpConnectException e) {
System.out.println("保存信息编号:" + correlationData);
}
}
消费者代码:
@RabbitListener(queues = "queue_direct", containerFactory = "singleListenerContainer", errorHandler = "rabbitListenerErrorHandler")
public void handleBook(Message message, Book book, Channel channel) throws Exception {
System.out.println("book------>" + book);
System.out.println("--------------------->"+message.getMessageProperties().getMessageId());
//告诉MQ删除这一条消息,若是true,则是删除所有小于tags的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("----------------->信息域的值" + book);
}
效果图:
3.6 多个消费者消费一个queue
若是多个@RabbitListener
消费一个queue
,那么一个消息只会被一个方法调用。如果RabbitListenerContainerFactory中设置concurrentConsumer
为3,意味着每个方法产生3个consumer
。也可以分布在不同的应用程序中,那么就会在不同的Connection中。
二. 消费者通过拉(PULL)方式获取消息
可以通过AmqpTemplate
或者RabbitMqTemplate
拉取消息,当queue没有消息时,会立刻返回null,传入timeoutMillis参数可阻塞等待一段时间。
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
若是想直接在queue获取到java对象,可以调用receiveAndConvert
方法。
测试代码:
@Test
public void receive() {
Object o = rabbitTemplate.receiveAndConvert("queue_direct");
System.out.println(o.hashCode());
System.out.println(o);
}
效果图:
- 我们可以看到,调用这个方法,实际上只是取出一条消息:
- MQ中消息调用
receiveAndConvert
效果图:
相关推荐:
https://www.jianshu.com/p/2c2a7cfdd38a