pom文件引入依赖
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml文件写入配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated #开启交换机确认功能
publisher-returns: true #开启消息回退功能
listener:
simple:
acknowledge-mode: manual #开启手动应答机制,默认自动应答
prefetch: 1 # 消费者每次从队列获取的消息数量,此属性不设置时为:轮询分发,设置1时:公平分发,
#设置为其他数字时表示在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量),用以达到限流的目的
concurrency: 1 #消费者最小数量
max-concurrency: 10 #消费者最大数量
retry:
enabled: true #开启消费者重试
max-attempts: 3 #最大重试次数
initial-interval: 3000 #重试间隔时间 单位毫秒
配置队列,交换机,路由key等信息
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqCofig {
public static final String QUEUE_SMS="queue_sms"; //短信信息队列
public static final String EXCHANGE_SMS="exchange_sms";//短信交换机
public static final String ROUTING_SMS="routing_sms"; //短信路由ke
//声明队列
@Bean(QUEUE_SMS)
public Queue QUEUE_SMS(){
//创建队列方法及其属性值
/*
* 队列参数
* durable() 入参队列名称,durable,默认为true 开启持久化
* exclusive 是否独享,默认false,不独享,true 独享 .exclusive()方法开启,队列独占连接,队列只允许在连接中访问
* autoDelete 是否自动删除,默认false 不自动删除, true 自动删除 .autoDelete()方法开启
* 其他参数,例如队列长度,ttl等 .withArguments()开启,入参 map
* 其他参数例如ttl时长等,也可通过内置方法.ttl()来设置
* */
//方法一
return QueueBuilder.durable(QUEUE_SMS).build();
/*
* 队列参数
* String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments
* 1.队列名称
* 2.是否进行持久化,默认true 开启持久化,false 不开启
* 3.是否独享,默认false,不独享, true 独享,队列独占连接,队列只允许在连接中访问
* 4.是否自动删除,默认false 不自动删除, true 自动删除
* 5.其他参数,例如队列长度,ttl等
arguments:队列的其他属性参数,
(1)x-message-ttl:消息的过期时间,单位:毫秒;
(2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
(3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
(4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
(5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
(6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
(7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
(8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
(9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
(10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
(11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。
* */
//方法二
//return new Queue(QUEUE_SMS);
}
//声明交换机
@Bean(EXCHANGE_SMS)
public Exchange EXCHANGE_SMS(){
//声明交换机两种方法
/*
* 交换机类型 fanout、topic、direct、headers
* fanout 对应的rabbit mq的工作模式是 Publish/Subscribe 发布订阅
* topic 对应 Topics 通配符
* direct 对应 Routing 路由
* headers 对应 Header转发器
* */
//方法一
return ExchangeBuilder.directExchange(EXCHANGE_SMS).build();
//方法二
//return new DirectExchange(EXCHANGE_SMS);
}
//绑定交换机与队列
@Bean
public Binding queueBingExchangeSms(@Qualifier(QUEUE_SMS) Queue queue,@Qualifier(EXCHANGE_SMS) Exchange exchange){
/*
* 如果声明交换机的时候没有指定交换机类型则需添加.noargs()方法
* 如果声明交换机指定返回类型 如:public DirectExchange EXCHANGE_SMS(){}
* */
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_SMS).noargs();
}
}
controller发送消息
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@RequestMapping("mq")
@RestController
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendSms/{message}")
public void sendSms(@PathVariable String message) {
//交换机,路由key,信息体
rabbitTemplate.convertAndSend(RabbitMqCofig.EXCHANGE_SMS, RabbitMqCofig.ROUTING_SMS, message);
System.out.println("当前时间:"+new Date()+",发送一条短信:"+message);
}
}
监听消息
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
@Component
public class QueueConsumer {
//个人整合代码
@RabbitListener(queues = RabbitMqCofig.QUEUE_SMS) //监听要收消息的队列,参数队列名称
public void receiveSms(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("当前时间:"+new Date().toString()+",收到信息:"+msg);
}
}
高级特性
死信队列+TTL延迟消息
在config添加死信队列,交换机,路由key,并生成绑定
public static final String QUEUE_DEAD="queue_dead"; //死信队列
public static final String EXCHANGE_DEAD="exchange_dead"; //死信交换机
public static final String ROUTING_DEAD="routing_dead"; //死信路由key
@Bean(QUEUE_DEAD)
public Queue QUEUE_DEAD(){
return QueueBuilder.durable(QUEUE_DEAD).build();
}
@Bean(EXCHANGE_DEAD)
public Exchange EXCHANGE_DEAD(){
return ExchangeBuilder.directExchange(EXCHANGE_DEAD).build();
}
@Bean
public Binding queueBingExchangeDead(@Qualifier(QUEUE_DEAD) Queue queue,@Qualifier(EXCHANGE_DEAD) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_DEAD).noargs();
}
延迟消息可以有三种方式生成
注意:如不重新创建队列,交换机测试需在重启系统之前在客户端页面删除交换机,队列,不然启动会报错
取消短信监听方法,改为监听死信队列,由于消息到期没有被消费就会转到死信队列
1.设置队列过期时间
此处把短信队列改为以下代码,发送消息代码不变
@Bean(QUEUE_SMS)
public Queue QUEUE_SMS(){
/*
//两种设置方法,二选其一
//key是默认固定值固定值
Map<String, Object> args = new HashMap<>();
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_SMS).withArguments(args).build();
*/
return QueueBuilder.durable(QUEUE_SMS)
.deadLetterExchange(EXCHANGE_DEAD) //设置死信交换机
.deadLetterRoutingKey(ROUTING_DEAD) //设置死信路由key
.ttl(10000) //设置过期时间,单位毫秒,这里设置10秒
.build();
}
优点:时间准确,过期即立马推送到死信队列
缺点:时间固定,不够灵活多变,如果同一个功能需要多个不同的时间,则需要创建多个队列,不友好,除非时间固定
2.设置消息过期时间
config配置短信队列取消时间设定,只保留死信队列,路由信息
@Bean(QUEUE_SMS)
public Queue QUEUE_SMS(){
return QueueBuilder.durable(QUEUE_SMS)
.deadLetterExchange(EXCHANGE_DEAD) //设置死信交换机
.deadLetterRoutingKey(ROUTING_DEAD) //设置死信路由key
.build();
}
controller发送消息时设置消息过期时间
@GetMapping("sendSms/{message}/{ttlTime}")
public void sendSms(@PathVariable String message,@PathVariable String ttlTime) {
//交换机,路由key,信息体
rabbitTemplate.convertAndSend(RabbitMqCofig.EXCHANGE_SMS, RabbitMqCofig.ROUTING_SMS,message,(Message msg) ->{
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
System.out.println("当前时间:"+new Date()+",发送一条延迟短信:"+message);
}
优点:时间灵活多变,可以随意设置消息过期时间
缺点:如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
即先发送一个三十秒过期的消息,在发送一个五秒过期的消息,结果是同时收到两条消息,因为第一条过期时间没到,把第二条给堵塞住了。所以即便第二条过期时间很短,也不会先发送。
3.安装延迟插件
插件下载地址
https://www.rabbitmq.com/community-plugins.html
Linux安装
解压放置到 RabbitMQ 的插件目录。
进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Windows安装
把 rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez 文件拷贝到RabbitMQ安装目 录下的 plugins 目录。
进入RabbitMQ安装目录下的 sbin目录,在cmd窗口下执行如下命令使插件生效
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
congfig配置新增自定交换机,队列,路由key
取消死信队列监听,改为监听自定义队列
public static final String QUEUE_CUSTOM="queue_custom"; //自定义队列
public static final String EXCHANGE_CUSTOM="exchange_custom"; //自定义交换机
public static final String ROUTING_CUSTOM="routing_custom"; //自定义路由key
@Bean(QUEUE_CUSTOM)
public Queue QUEUE_CUSTOM(){
return QueueBuilder.durable(QUEUE_CUSTOM).build();
}
@Bean(EXCHANGE_CUSTOM)
public Exchange EXCHANGE_CUSTOM(){
//由于ExchangeBuilder没有封装自定义交换机方法所以使用new方法
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); //交换机发送消息的工作模式 direct,topic,fanout,headers
/*
* 参数明细
* String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
* 1.交换机名称
* 2.交换机类型
* 3.是否持久化
* 4.是否自动删除
* 5.其他参数
* */
return new CustomExchange(EXCHANGE_CUSTOM,"x-delayed-message",true,false,args);
}
@Bean
public Binding queueBingExchangeCustom(@Qualifier(QUEUE_CUSTOM) Queue queue,@Qualifier(EXCHANGE_CUSTOM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_CUSTOM).noargs();
}
发送方法
@GetMapping("sendSms/{message}/{ttlTime}")
public void sendSms(@PathVariable String message,@PathVariable Integer ttlTime) {
//交换机,路由key,信息体
rabbitTemplate.convertAndSend(RabbitMqCofig.EXCHANGE_CUSTOM, RabbitMqCofig.ROUTING_CUSTOM,message,(Message msg) ->{
//设置时长方法.setExpiration(ttlTime) 改为.setDelay(ttlTime)
msg.getMessageProperties().setDelay(ttlTime);
return msg;
});
System.out.println("当前时间:"+new Date()+",发送一条延迟短信:"+message);
}
监听方法
@RabbitListener(queues = RabbitMqCofig.QUEUE_CUSTOM) //监听要收消息的队列,参数队列名称
public void receiveDead(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("当前时间:"+new Date().toString()+",收到延迟信息:"+msg);
}
优点:发送的延迟消息统一在交换机处理,等待ttl时间,不会有被上一条覆盖住的问题,可以自定义设置消息时间。
缺点:需要安装额外扩展插件
发布确认
开启发布确认功能,pom文件添加
spring.rabbitmq.publisher-confirm-type=correlated //交换机应答
spring.rabbitmq.publisher-returns= true //队列应答
.NONE
禁用发布确认模式,是默认值
.CORRELATED
发布消息成功到交换器后会触发回调方法
.SIMPLE
经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
消息信息封装类,用于重发消息
import lombok.Data;
import org.springframework.amqp.rabbit.connection.CorrelationData;
//消息信息封装类,用于重发消息
@Data
public class GmallCorrelationData extends CorrelationData {
//消息内容
private Object message;
//交换机
private String exchange;
//路由key
private String routingKey;
//重试次数
private int retryCount=0;
//是否延迟消息,true 延迟 false 不延迟
private Boolean isDelay=false;
//延迟时长
private int delayTime;
}
config短信队列
@Bean(QUEUE_SMS)
public Queue QUEUE_SMS(){
return QueueBuilder.durable(QUEUE_SMS).build();
}
创建交换机,队列回调方法类
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
//rabbitTemplate 注入之后就设置该值,把本类注入到rabbitTemplate中,不然调用rabbitTemplate的这个方法没有具体实现
@PostConstruct
private void init() {
rabbitTemplate.setConfirmCallback(this);
/**
* true:
* 交换机无法将消息进行路由时,会将该消息返回给生产者
* false:
* 如果发现消息无法进行路由,则直接丢弃
*/
rabbitTemplate.setMandatory(true); //yml:publisher-returns: true配置true同等效果,二选一
//设置回退消息交给谁处理
rabbitTemplate.setReturnsCallback(this);
}
/*
* 参数列表 @NonNull CorrelationData correlationData, boolean ack, @Nullable String cause
* 1.回调消息id及其余信息
* 2.是否发送成功,交换机是否收到消息,true 成功 false 失败
* 3.原因,b为true则s为空,b为false则s为失败原因
* */
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
//此处能否拿到CorrelationData中的ReturnedMessage消息信息,跟交换机时候接收成功无关,而是跟后续的队列,路由有关
//如果绑定的路由不对或者队列不存在,此处可以得到消息其余信息,因为此处只是监听交换机是否收到消息
//路由或队列正确的话,消息会被发送出去所以ReturnedMessage为空,不对消息发送不出去,所以有值,消息还停留在交换机中
//交换机不正确,此处也拿不到ReturnedMessage,因为消息根本没发送进来
//总结,交换机不正确ReturnedMessage空值,交换机正确,绑定的路由或队列正确,空值,交换机正确绑定的路由或队列不正确,有值
if (!b){
//可以在此处写入对失败消息的处理逻辑,重新发送,还是存入数据库,等待后续发送
sendRetry((GmallCorrelationData) correlationData);
}
//如果发送的是延迟消息(测试使用的插件延迟消息)correlationData是null,b是true
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
//此处监听消息是否被发送到队列上等待消费,如果没有通知生产者,进行下一步逻辑处理
String id = returnedMessage.getMessage().getMessageProperties().getHeader("spring_returned_message_correlation");
//此处取到id,如果GmallCorrelationData以id为key存到redis中可以根据id获取消息参数,然后更改重试次数,从新放到redis中
//闲麻烦也可以在此处直接发送消息,不过有两个问题
// 1。如果一直发送失败,会进行一个死循环,一直发,一直失败
//2.重发之后恰巧在交换机处失败,因为没有GmallCorrelationData的消息信息所以无法进行重发,消息会丢失
//最好的方法还是发送消息的时候存进缓存,此处根据id取值
//如果发送的是延迟消息(测试使用的插件延迟消息)此处也可以监听到,所以延迟消息要过滤掉不重新发送
/* GmallCorrelationData correlationData=new GmallCorrelationData();
if (correlationData.getIsDelay()){
return;
}*/
//测试了一下延迟消息如果路由不正确,好像在消息没送达之前(ttl没过期)监听到一次,等ttl时间到期也监听不到了,直接丢失
}
//重新发送消息
public void sendRetry(GmallCorrelationData gmal){
//消息发送失败从新发送三次,(加原有的发送一共三次)或者一直从新发送
if (gmal.getRetryCount()<2) {
gmal.setRetryCount(gmal.getRetryCount()+1);
System.out.println("消息发送失败,第"+gmal.getRetryCount()+"次发送");
rabbitTemplate.convertAndSend(gmal.getExchange(), gmal.getRoutingKey(), gmal.getMessage(), gmal);
}
}
}
消息发送方法
交换机写错confirm方法处理
路由写错returnedMessage处理
@GetMapping("sendSms/{message}")
public void sendSms(@PathVariable String message) {
//封装消息参数用以confirm失败的时候从发消息,延迟消息记得添加延迟参数
//根据消息id是取不到消息的参数的,所以可以把correlationData存储到redis缓存中,id作为key 在returnedMessage方法中根据id去redis中取值
//如果存放到redis中可以设置一个过期时间,一般五分钟就够了,足够消息重新发送了
GmallCorrelationData correlationData=new GmallCorrelationData();
correlationData.setId("1"); //实际开发可以uuid
correlationData.setExchange(RabbitMqCofig.EXCHANGE_SMS+"1");
correlationData.setRoutingKey(RabbitMqCofig.ROUTING_SMS);
correlationData.setMessage(message);
//交换机,路由key,信息体
rabbitTemplate.convertAndSend(RabbitMqCofig.EXCHANGE_SMS, RabbitMqCofig.ROUTING_SMS,message,correlationData);
System.out.println("当前时间:"+new Date()+",发送一条短信短信1:"+message);
}
手动应答
yml配置开启acknowledge-mode
acknowledge-mode: manual #开启手动应答机制,默认自动应答
消费者代码
@RabbitListener(queues = RabbitMqCofig.QUEUE_CUSTOM) //监听要收消息的队列,参数队列名称
public void receiveDead(Message message, Channel channel) throws IOException {
try {
String msg = new String(message.getBody());
System.out.println("当前时间:"+new Date().toString()+",收到信息:"+msg);
//手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}catch (Exception e){
//判断是否已经处理过一次消息
if (message.getMessageProperties().isRedelivered()){
System.out.println("消息已处理过一次,拒绝再次接收");
//拒收消息
//true 消息重新进入队列,false不在重新进入队列,如果配置了死信交换机,则进入死信
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}else {
System.out.println("消息没有处理过,即将再次进入队列");
//不确认消息
//参数二 是否批量 true批量,false单个处理
//参数三 是否从新进入队列,true重新进入队列,false不进入
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
}
备份交换机
当正常交换机无法处理消息时,消息发送到备份交换机,并推送到队列
eg:短信交换机接收到消息,无法发送到队列(测试发送时路由key写错),消息转发到备份交换机
config添加备份交换机,备份队列,报警队列,路由key并绑定
public static final String QUEUE_BACKUP="queue_backup"; //备份队列
public static final String QUEUE_WARNING="queue_warning"; //备份报警信息队列
public static final String EXCHANGE_BACKUP="exchange_backup"; //备份交换机
public static final String ROUTING_BACKUP="routing_backup"; //备份路由key
@Bean(QUEUE_BACKUP)
public Queue QUEUE_BACKUP(){
return QueueBuilder.durable(QUEUE_BACKUP).build();
}
@Bean(QUEUE_WARNING)
public Queue QUEUE_WARNING(){
return QueueBuilder.durable(QUEUE_WARNING).build();
}
@Bean(EXCHANGE_BACKUP)
public Exchange EXCHANGE_BACKUP(){
return ExchangeBuilder.fanoutExchange(EXCHANGE_BACKUP).build();
}
@Bean
public Binding queueBingExchangeBack(@Qualifier(QUEUE_BACKUP) Queue queue,@Qualifier(EXCHANGE_BACKUP) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_BACKUP).noargs();
}
@Bean
public Binding queueWarBingExchangeBack(@Qualifier(QUEUE_WARNING) Queue queue,@Qualifier(EXCHANGE_BACKUP) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_BACKUP).noargs();
}
短信交换机
@Bean(EXCHANGE_SMS)
public Exchange EXCHANGE_SMS(){
//return ExchangeBuilder.directExchange(EXCHANGE_SMS).withArgument("alternate-exchange",EXCHANGE_BACKUP()).build();
return ExchangeBuilder.directExchange(EXCHANGE_SMS).alternate(EXCHANGE_BACKUP).build();
}
发送消息,监听消息代码不变,监听队列改下就行
mandatory (消息回退)参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高。
幂等性,队列优先级,集群,镜像队列功能(用到的场景少,幂等性消息全局id+redis防止重复消费)略,太多了,不想复制了
完结撒花。。。。。。