//消息转换器 设置过后控制台,以及消息的收发都会以json的格式显示
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
rabbitmq:
addresses: localhost
port: 5672
username: admin
password: admin
listener:
simple:
prefetch: 1#处理完后再接着处理其他的,这样就不会平均分配了
direct:
prefetch: 1
通过注解的方式创建路由器和队列
通过注解的方式创建后,就不需要通过@Bean创建了
@Bean的方式比较烦人
下面是绑定对了路由key的案列
@RabbitListener(bindings = @QueueBinding(value =@Queue(name = "queue-test-5",durable = "true")
,exchange =@Exchange(name = "direct-5",type = "direct"),key ={"red","blue"}))
public void test5(String test){
log.info("test5==>{}",test);
}
消息发送确认
开启:
rabbitmq:
# 开启生产者发布确认,确认消息已发送到交换机 Exchange
publisher-confirm-type: correlated
# 开启发布者返回,确认消息已发送到队列 Queue
publisher-returns: true
ReturnsCallback触发条件
发送给指定交换机,(路由key错误)未找到队列,这个时候还会走ConfirmCallback
触发时机:消息被 RabbitMQ 接收到并尝试投递到交换机之后
并且返回ack=true
发送给未绑定队列的交换器
true:消息已成功到达指定的交换机。
false:消息未能到达交换机,例如因为交换机不存在或 RabbitMQ 内部异常导致失败。
ConfirmCallback
ack情况:
消息消费确认
rabbitmq:
listener:
simple:
#表示消费者端每次从队列拉取多少个消息进行消费,直到手动确认消费完毕后,才会继续拉取下一条
prefetch: 1
#消费被拒绝时 true:重回队列 false为否
default-requeue-rejected: false
retry:
#开枪
enabled: true
max-attempts: 3
max-interval: 1000ms
acknowledge-mode: AUTO #自动确认,抛出异常的时候消费失败,重新返回队列
达到最大次数的时候重新定义一个消息失败返处理策略路由到指定的交换机进行处理
@Bean
public RepublishMessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"error-exchange","error-routing-key");
}
@Bean
public DirectExchange errorExchange(){
return new DirectExchange("error-exchange",true,false);
}
@Bean
public Queue errorQueue(){
return new Queue("error-queue", true);
}