一、利用docker安装rabbitmq
https://www.cnblogs.com/yufeng218/p/9452621.html
rabbitmq技术文档:https://www.jianshu.com/p/78847c203b76
Broker:消息系统
Virtual host(虚拟地址,可以理解为命名空间)
Connection:(TCP生产者或者消费端和消息系统建立的长连接)
Channel:每一个连接里面可以划分出很多渠道,一个消息的发送通过一个渠道发送到交换机。
Exchange:交换机(根据路由健把消息分发到不同的队列)
Queue:队列,存储消息的队列
Binding:绑定器,绑定交换机的路由健和队列。
路由键有四种:
direct :
这种类型的交换机的路由规则是根据一个routingKey的标识,交换机通过一个routingKey与队列绑定 ,在生
产者生产消息的时候 指定一个routingKey 当绑定的队列的routingKey 与生产者发送的一样 那么交换机会吧
这个消息发送给对应的队列。
fanout:
这种类型的交换机路由规则很简单,只要与他绑定了的队列, 他就会吧消息发送给对应队列(与routingKey
没关系)
topic:
这种类型的交换机路由规则也是和routingKey有关 只不过 topic他可以根据:*,#( 号代表过滤一单词,#代
表过滤后面所有单词, 用.隔开)来识别routingKey 我打个比方 假设 我绑定的routingKey 有队列A和B A的
routingKey是:.user B的routingKey是: #.user
那么我生产一条消息routingKey 为: error.user 那么此时 2个队列都能接受到, 如果改为 topic.error.user
那么这时候 只有B能接受到了
发布方消息确认机制和失败回调机制
application.yml
rabbitmq:
host: 192.168.29.133
port: 5672
username: admin
password: admin
virtual-host: my_vhost
publisher-confirms: true
publisher-returns: true
package com.luban.mall.search.mq;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
@Bean
public DirectExchange directExchange2() {
return new DirectExchange("directExchange2");
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
@Bean
public Queue queue1() {
return new Queue("testQueue1", true);
}
@Bean
public Queue queue2() {
return new Queue("testQueue2", true);
}
@Bean
public Queue queue3() {
return new Queue("testQueue3", true);
}
@Bean
public Queue queue4() {
return new Queue("testQueue4", true);
}
@Bean
public Queue queue5() {
return new Queue("testQueue5", true);
}
@Bean
public Queue queue6() {
return new Queue("testQueue6", true);
}
@Bean
public Binding binding1(){
return BindingBuilder.bind(queue1()).to(directExchange()).with("rkey1");
}
@Bean
public Binding binding2(){
return BindingBuilder.bind(queue2()).to(directExchange()).with("rkey2");
}
@Bean
public Binding binding3(){
return BindingBuilder.bind(queue3()).to(fanoutExchange());
}
@Bean
public Binding binding4(){
return BindingBuilder.bind(queue4()).to(fanoutExchange());
}
@Bean
public Binding binding5(){
return BindingBuilder.bind(queue5()).to(topicExchange()).with("*.user");
}
@Bean
public Binding binding6(){
return BindingBuilder.bind(queue6()).to(topicExchange()).with("#.user");
}
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true);
template.setReturnCallback(new MyReturnCallback());
template.setConfirmCallback(new MyPublisherConfirmCallback());
return template;
}
}
package com.luban.mall.search.mq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class MyPublisherConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//通过这个参数可以拿到我们发送的消息对象
Message message = correlationData.getReturnedMessage();
if(ack){
System.out.println("消息发送成功:"+correlationData+"---ack:"+ack+"---cause:"+cause);
}else{
System.out.println("消息发送失败:"+correlationData+"---ack:"+ack+"---cause:"+cause);
}
}
}
public class MyReturnCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replycode, String replyText, String exchange, String routingkey) {
System.out.println("error return call back: message"+message+"--replycode:"+replycode+"-- replyText:"+replyText+"--exchange:"+exchange+"---routingkey:"+routingkey);
}
}
@RequestMapping(value = "/send5",method = RequestMethod.POST)
@ApiOperation("发送消息到消息队列")
@ResponseBody
@ApiImplicitParams({
@ApiImplicitParam(name = "name", value = "用户名", defaultValue = "李四",paramType = "query"),
@ApiImplicitParam(name = "age", value = "年龄", defaultValue = "23", required = true,paramType = "query")
}
)
public CommonResult<String> send5(@RequestParam String name,@RequestParam String age){
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(name,age,"majunweitest callback",correlationData);
return CommonResult.success("马","返回正确");
}
消费方ack机制
代码实现
package com.luban.mall.search.mq;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyRabbitListenerContainerConif {
@Bean
public SimpleRabbitListenerContainerFactory
simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory =
new SimpleRabbitListenerContainerFactory();
//这个connectionFactory就是我们自己配置的连接工厂直接注入进来
simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
//这边设置消息确认方式由自动确认变为手动确认
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return simpleRabbitListenerContainerFactory;
}
}
package com.luban.mall.search.controller;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TestListener {
//@RabbitListener(queues = {"testQueue5","testQueue6"})
public void reciveMessage(String msg){
System.out.println("listener....:"+msg);
}
@RabbitListener(queues = {"testQueue5","testQueue6"},containerFactory = "simpleRabbitListenerContainerFactory")
public void getMessage(Message message, Channel channel) throws Exception{
System.out.println("list manul ack:"+(new String(message.getBody(),"UTF-8")));
Thread.sleep(5000l);
//消息消费成功后调用第一个参数是消息的标识字段,第二个是否是批量确认:false不是批量,true是批量
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
//消费失败后调用这个方法告知消息队列最后一个参数:是否返回原队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
}
消息预取
消息预取
扯完消息确认 我们来讲一下刚刚所说的批量处理的问题
什么情况下回遇到批量处理的问题呢?
在这里 就要先扯一下rabbitmq的消息发放机制了
rabbitmq 默认 他会最快 以轮询的机制吧队列所有的消息发送给所有客户端 (如果消息没确认的话 他会添加一个
Unacked的标识上图已经看过了)
那么 这种机制会有什么问题呢, 对于Rabbitmq来讲 这样子能最快速的使自己不会囤积消息而对性能造成影响,
但是 对于我们整个系统来讲, 这种机制会带来很多问题, 比如说 我一个队列有2个人同时在消费,而且他们处理
能力不同, 我打个最简单的比方 有100个订单消息需要处理(消费) 现在有消费者A 和消费者B , 消费者A消费一
条消息的速度是 10ms 消费者B 消费一条消息的速度是15ms ( 当然 这里只是打比方) 那么 rabbitmq 会默认给
消费者A B 一人50条消息让他们消费 但是 消费者A 他500ms 就可以消费完所有的消息 并且处于空闲状态 而 消费
}
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
者B需要750ms 才能消费完 如果从性能上来考虑的话 这100条消息消费完的时间一共是750ms(因为2个人同时在
消费) 但是如果 在消费者A消费完的时候 能把这个空闲的性能用来和B一起消费剩下的信息的话, 那么这处理速
度就会快非常多。
这个例子可能有点抽象, 我们通过代码来演示一下
我往Rabbitmq生产100条消息 由2个消费者来消费 其中我们让一个消费者在消费的时候休眠0.5秒(模拟处理业务
的延迟) 另外一个消费者正常消费 我们来看看效果:
正常的那个消费者会一瞬间吧所有消息(50条)全部消费完(因为我们计算机处理速度非常快) 下图是加了延迟
的消费者:
可能我笔记里面你看不出效果,这个你自己测试就会发现 其中一个消费者很快就处理完自己的消息了 另外一个消
费者还在慢慢的处理 其实 这样严重影响了我们的性能了。
其实讲了这么多 那如何来解决这个问题呢?
我刚刚解释过了 造成这个原因的根本就是rabbitmq消息的发放机制导致的, 那么我们现在来讲一下解决方案: 消
息预取
什么是消息预取? 讲白了以前是rabbitmq一股脑吧所有消息都均发给所有的消费者(不管你受不受得了) 而现在
是在我消费者消费之前 先告诉rabbitmq 我一次能消费多少数据 等我消费完了之后告诉rabbitmq rabbitmq再给
我发送数据
在代码中如何体现?
在使用消息预取前 要注意一定要设置为手动确认消息, 原因参考上面划重点的那句话。
因为我们刚刚设置过了 这里就不贴代码了, 完了之后设置一下我们预取消息的数量 一样 是在容器(Container)
里面设置:
@Configuration
public class MyRabbitListenerContainerConif {
@Bean
public SimpleRabbitListenerContainerFactory
simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory =
new SimpleRabbitListenerContainerFactory();
//这个connectionFactory就是我们自己配置的连接工厂直接注入进来
simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
//这边设置消息确认方式由自动确认变为手动确认
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置消息预取得数量
simpleRabbitListenerContainerFactory.setPrefetchCount(2);
return simpleRabbitListenerContainerFactory;
}
}
@Component
public class TestListener {
//@RabbitListener(queues = {"testQueue5","testQueue6"})
public void reciveMessage(String msg){
System.out.println("listener....:"+msg);
}
@RabbitListener(queues = {"testQueue5","testQueue6"},containerFactory = "simpleRabbitListenerContainerFactory")
public void getMessage(Message message, Channel channel) throws Exception{
System.out.println("list manul ack:"+(new String(message.getBody(),"UTF-8")));
Thread.sleep(5000l);
//消息消费成功后调用第一个参数是消息的标识字段,第二个是否是批量确认:false不是批量,true是批量
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
//消费失败后调用这个方法告知消息队列最后一个参数:是否返回原队列
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
}
死信交换机
package com.luban.mall.search.mq;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitmqConfig {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public DirectExchange deathExchange() {
return new DirectExchange("deathExchange");
}
@Bean
public Queue queue6() {
Map<String, Object> map = new HashMap();
//设置消息的过期时间 单位毫秒
map.put("x-message-ttl", 100000);
//设置附带的死信交换机
map.put("x-dead-letter-exchange", "deathExchange");
//指定重定向的路由建 消息作废之后可以决定需不需要更改他的路由建 如果需要 就在这里指定
map.put("x-dead-letter-routing-key", "drkey1");
return new Queue("testQueue6", true,false,false,map);
}
@Bean
public Queue queue7() {
return new Queue("deathQueue", true);
}
@Bean
public Binding binding7(){
return BindingBuilder.bind(queue7()).to(deathExchange()).with("drkey1");
}
@Bean
public Binding binding6(){
return BindingBuilder.bind(queue6()).to(topicExchange()).with("#.user");
}
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true);
template.setReturnCallback(new MyReturnCallback());
template.setConfirmCallback(new MyPublisherConfirmCallback());
return template;
}
}
怎么保证消息不丢失、不重复消费问题
https://www.cnblogs.com/jis121/p/11050202.html
rabbitmq集群搭建,配置
rabbbitmq由于是由erlang语言开发的 天生就支持分布式
rabbitmq 的集群分两种模式 一种是默认模式 一种是镜像模式
当然 所谓的镜像模式是基于默认模式加上一定的配置来的
在rabbitmq集群当中 所有的节点(一个rabbitmq服务器) 会被归为两类 一类是磁盘节点 一类是内存节点
磁盘节点会把集群的所有信息(比如交换机,队列等信息)持久化到磁盘当中,而内存节点只会将这些信息保存到内存
当中 讲白了 重启一遍就没了。
为了可用性考虑 rabbitmq官方强调集群环境至少需要有一个磁盘节点, 而且为了高可用的话, 必须至少要有2个
磁盘节点, 因为如果只有一个磁盘节点 而刚好这唯一的磁盘节点宕机了的话, 集群虽然还是可以运作, 但是不能
对集群进行任何的修改操作(比如 队列添加,交换机添加,增加/移除 新的节点等)
具体想让rabbitmq实现集群, 我们首先需要改一下系统的hostname (因为rabbitmq集群节点名称是读取
hostname的)
这里 我们模拟3个节点 :
rabbitmq1
rabbitmq2
rabbitmq3
linux修改hostname命令: hostnamectl set-hostname [name]
修改后重启一下 让rabbitmq重新读取节点名字
然后 我们需要让每个节点通过hostname能ping通(记得关闭防火墙) 这里 我们可以修改修改一下hosts文件
关闭防火墙:
关闭防火墙 systemctl stop firewalld.service 禁止开机自启 systemctl disable firewalld.service
接下来,我们需要将各个节点的.erlang.cookie文件内容保持一致(文件路径/var/lib/rabbitmq/.erlang.cookie)
因为我是采用虚拟机的方式来模拟集群环境, 所以如果像我一样是克隆的虚拟机的话 同步.erlang.cookie文件这个
操作在克隆的时候就已经完成了。
上面这些步骤完成之后 我们就可以开始来构建集群 了
我们先让rabbitmq2 加入 rabbitmq1与他构建为一个集群
执行命令( ram:使rabbitmq2成为一个内存节点 默认为:disk 磁盘节点):
rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@rabbitmq1 --ram rabbitmqctl start_app
在构建的时候 我们需要先停掉rabbitmqctl服务才能构建 等构建完毕之后再启动
我们吧rabbitmq2添加完之后在rabbitmq3节点上也执行同样的代码 使他也加入进去 当然 我们也可以让
rabbitmq3也作为一个磁盘节点
当执行完操作以后我们来看看效果:
随便在哪个节点打开管理页面都能看到集群环境各节点的信息;
有关集群的其他命令:
rabbitmq-server -detached 启动RabbitMQ节点 rabbitmqctl start_app 启动RabbitMQ应用,而不是节点
rabbitmqctl stop_app 停止 rabbitmqctl status 查看状态 rabbitmqctl add_user mq 123456 rabbitmqctl
set_user_tags mq administrator 新增账户 rabbitmq-plugins enable rabbitmq_management 启用
RabbitMQ_Management rabbitmqctl cluster_status 集群状态 rabbitmqctl forget_cluster_node
rabbit@[nodeName] 节点摘除 rabbitmqctl reset application 重置
普通模式的rabbitmq集群搭建好后, 我们来说一下镜像模式
在普通模式下的rabbitmq集群 他会吧所有节点的交换机信息 和队列的元数据(队列数据分为两种 一种为队列里面
的消息, 另外一种是队列本身的信息 比如队列的最大容量,队列的名称,等等配置信息, 后者称之为元数据) 进
行复制 确保所有节点都有一份。
而镜像模式,则是吧所有的队列数据完全同步(当然 对性能肯定会有一定影响) 当对数据可靠性要求高时 可以使
用镜像模式
实现镜像模式也非常简单 有2种方式 一种是直接在管理台控制, 另外一种是在声明队列的时候控制
声明队列的时候可以加入镜像队列参数 在上方的参数列表当中有解释 我们来讲一下管理台控制
镜像队列配置命令解释:
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
-p Vhost: 可选参数,针对指定vhost下的queue进行设置 Name: policy的名称 Pattern: queue的匹配模式(正
则表达式) Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode ha-mode:指明镜像队
列的模式,有效值为 all/exactly/nodes all:表示在集群中所有的节点上进行镜像 exactly:表示在指定个数的
节点上进行镜像,节点的个数由ha-params指定 nodes:表示在指定的节点上进行镜像,节点名称通过ha-
params指定 ha-params:ha-mode模式需要用到的参数 ha-sync-mode:进行队列中消息的同步方式,有效
值为automatic和manual
这里举个例子 如果想配置所有名字开头为 policy的队列进行镜像 镜像数量为1那么命令如下:
rabbitmqctl set_policy ha_policy "^policy_" '{"ha-mode":"exactly","ha-params":1,"ha-sync-
mode":"automatic"}'
怎么保证消息不丢失
消息发布方:
1.创建队列时,设置队列的持久化
2.发送消息是设置消息的模式deliveryMode=2,持久化
3.利用发送方ack机制,发送前在redis中维护id-发送状态[0-未发送,1-已发送,2-发送成功,3-发送失败],时间戳
2-发送成功的移除集合,对3-发送失败和1-已发送但超过规定时间未ack的消息,开启一个新的线程或任务定期扫描并发送。
消息接收方:
1.开启手动确认机制。
2.消费消息后还未手动确认,系统挂掉了(这种情况不用考虑,消息会从unacked状态重新入队改为redy状态)
3.消息消费时出现业务异常的情况,手动ack消息失败,放入原队列或者重定向到死信队列。
怎么保证消息的顺序性
1.要实现严格的顺序消息,简单且可行的办法就是:保证生产者 - MQServer - 消费者是一对一对一的关系
2.设定相关的路由键,把强相关的数据分配到同一个队列,一个队列对应一个消费者。
面试题解答:
1.rabbitmq
应用场景:
质控分组
怎么保证消息不丢失:
1.搭建高可用集群,设置多个磁盘节点保证元数据不丢失
2.创建持久化队列,创建镜像队列
3.发送消息时指定消息是持久化消息
4.利用消息发送端的消息确认回调机制,比如在redis中维护消息id与发送状态的记录,ack确认发送成功的清除掉,发送失败的,redis中更新失败状态
5.消费端开启手动确认,业务异常情况 存入数据库,存库失败,放入死信队列。
怎么保证消息不重复消费?
消息幂等性问题:消费消息时判断该消息是否已经处理过,处理过就不在处理。(比如你存库已经存了那这个时候 再过来这个消息 要么就不处理)
怎么保证消息的有序性?
一个消息队列只开启一个消费端:或者通过路由间把强关联的消息发送到同一个队列,然后每个消费端跟队列一一对应。
怎么解决消息堆积问题?
大量消息堆积:
造成消息堆积有两种原因:
1.很大原因是消费端挂了或者消费端的处理能力比较差?
临时借调10倍的机器,部署消费端进行消费
2.消费端bug问题
,如果是消费端bug问题,那就写一个临时的消费消息的应用 把消息释放掉,等bug修复之后,重新再服务端生成消息。