本章节涉及,SpringBoot整合RabbitMQ的基本操作,spring环境下RabbitMQ生产者消息确认机制(Confirm和Return),TTL,死信队列,延时队列。
1、SpringBoot整合RabbitMQ,生产与消费的基本操作
加入RabbitMQ的依赖包:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置类相关代码:
# 配置RabbitMQ的基本信息 ip 端口 username password..
spring:
rabbitmq:
host: 127.0.0.1 #ip
port: 5672 #端口
username:
password:
virtual-host: /test
//定义交换机的名字
public static final String EXCHANGE_NAME = "topic_exchange";
//定义队列的名字
public static final String QUEUE_NAME = "queue_name";
//1、声明交换机
@Bean
public Exchange exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2、声明队列
@Bean
public Queue queue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//3、队列与交换机进行绑定
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("routing.#").noargs();
}
生产者伪代码:
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(){
rabbitTemplate.convertAndSend("topic_exchange","routing.key.aa","内容");
}
消费者伪代码:
//定义方法进行信息的监听 RabbitListener中的参数用于表示监听的是哪一个队列
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message){
System.out.println("message:"+message.getBody());
}
2、Spring环境下生产者相关操作
Confirm:
public void testConfirm() {
//定义回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 相关配置信息
* @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//System.out.println("confirm方法被执行了...."+correlationData.getId());
//ack 为 true表示 消息已经到达交换机
if (ack) {
//接收成功
System.out.println("接收成功消息" + cause);
} else {
//接收失败
System.out.println("接收失败消息" + cause);
//做一些处理,让消息再次发送。
}
}
});
//进行消息发送
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message Confirm...");
}
//进行睡眠操作
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
Return:
public void testReturn() {
//设置交换机处理失败消息的模式 为true的时候,消息达到不了 队列时,会将消息重新返回给生产者
rabbitTemplate.setMandatory(true);
//定义回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 执行了....");
System.out.println("message:"+message);
System.out.println("replyCode:"+replyCode);
System.out.println("replyText:"+replyText);
System.out.println("exchange:"+exchange);
System.out.println("routingKey:"+routingKey);
//处理业务逻辑
}
});
//进行消息发送
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message return...");
//进行睡眠操作
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
2、TTL:过期时间:只需要在创建消费队列的时候添加超时时间即可,其他操作和上述类似
@Bean
public Queue queue(){
return QueueBuilder.durable(QUEUE_NAME).withArgument("x-message-ttl", 5000).build();
}
3、死信队列:队列长度达到限制、消费者拒绝接收并且不重回队列、消息设置了过期时间。
@Bean
public Queue queue(){
Map<String,Object> argument = new HashMap<>();
//x-dead-letter-exchange:死信交换机名称
argument.put("x-dead-letter-exchange","exchange_dlx");
//x-dead-letter-routing-key:发送给死信交换机的routingkey
argument.put("x-dead-letter-routing-key","dlx.test");
//设置队列的过期时间 ttl
argument.put("x-message-ttl",10000);
//设置队列的长度限制 max-length
argument.put("x-max-length",10);
return QueueBuilder.durable(QUEUE_NAME).withArguments(argument).build();
}
4、延迟队列:需要超时时间配合死信队列实现,设置了超时时间的消息,在过期后会发送到死信队列中,消费者监听对应的死信队列进行消费,即可实现延迟队列的效果。