一、消息发送原理
应用与Mq Server之间会创建一个TCP连接,一旦TCP打开,并通过了认证,认证就是你试图连接Rabbit之前发送的Rabbit服务器连接信息和用户名和密码,一旦认证通过你的应用程序和Rabbit就创建了一条AMQP信道(Channel)。
信道是创建在“真实”TCP上的虚拟连接,AMQP命令都是通过信道发送出去的,每个信道都会有一个唯一的ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的。
为什么不通过TCP直接发送命令?
对于操作系统来说创建和销毁TCP会话是非常昂贵的开销,假设高峰期每秒有成千上万条连接,每个连接都要创建一条TCP会话,这就造成了TCP连接的巨大浪费,而且操作系统每秒能创建的TCP也是有限的,因此很快就会遇到系统瓶颈。
如果我们每个请求都使用一条TCP连接,既满足了性能的需要,又能确保每个连接的私密性,这就是引入信道概念的原因。
二、引入
pom:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
docker安装
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
web界面
http://localhost:15672/#/queues
默认账户密码:guest
三、作用
1:消峰 类似秒杀的场景大量请求进入放入队列,消费方恒定的速度消费,避免瞬间并发拖垮系统 时间空间双重消峰
2:解耦 后续的业务逻辑放到队列中处理,例如跨系统的调用
3:异步 异步执行,处理请求
四、消息模式
1.点对点简单模式
2.work模式:同一队列内一个消费者消费
根据routingKey寻找所有队列中相符的队列
3.订阅模式:发送到交换器,根据不同交换器发送到对应规则队列 队列内单一消费者消费
3.1 fanout广播模式
适用场景:
创建订单后发送消息,日志服务与订单服务都接到消息处理
投放到交换器所有绑定的队列都会收到消息
消息由队列内其一消费者消费
3.2 direct路由模式
默认的交换机模式(不写exchange参数时,直接使用routingKey对匹配的队列名匹配)
交换机内相同routingKey的队列接收到消息
3.3 topic主题模式
与direct模式对比支持模糊绑定
交换机内匹配的routingKey的队列接收到消息
"*"匹配一个分段(用“.”分割)的内容(替代表示某个字符串);
"#"匹配0和多个字符(通配符替代内容)
广播模式拓展
需求场景:通过消息服务对集群中每台实例发送消息,可通过fanout交换机实现
fanout交换机绑定AnonymousQueue 生成一个随机队列 每台实例生成一个队列 一个交换机对所有实例的队列发送消息,每台实例接收处理
五、消息丢失
发送方设置confirm回调
yml:
spring:
rabbitmq:
publisher-confirms: true
注入回调接口ConfirmCallback
@Configuration
public class MqConfig {
@Bean
public AmqpTemplate amqpTemplate(RabbitTemplate rabbitTemplate){
//设置消息发送回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if(ack){
System.out.println("消息: "+correlationData+",已经被ack成功");
}else{
System.out.println("消息: "+correlationData+",nack,失败原因是:"+cause);
}
});
return rabbitTemplate;
}
}
接收方开启ack 手动确认消息接收
yml:
spring:
rabbitmq:
listener:
simple
acknowledge-mode: MANUAL
手动确认:
deliveryTag
:channel中唯一代表了一次投递
multiple
:是否批处理
void channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)
拒绝返回队列:
deliveryTag
:
multiple
:是否批处理
requeue
:是否将消息返回队列
void channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
丢弃消息:
deliveryTag
:
requeue
:是否将返回队列 丢弃时为false
void channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
六、重复消费
业务保证接口幂等性
通过对Tagid上锁,维护一个map