RabbitMQ的学习笔记
关于RabbitMQ的几个角色如下:
关于名词的通俗解析:
首先我们肯定知道RabbitMQ就是消息队列的一种实现,那么围绕这个,我们就可以思考一个消息队列到底需要什么,当然是需要队列,那么这个队列就是Queue,那么其他的所有名词都是围绕这个Queue来拓展的。
首先,想要让队列不在本地运行,而在网络中运行,肯定会有连接这个概念,所以就会有Connection,我们发一条消息连接一次,这样很显然是浪费资源的,建立连接的过程也很耗时,所以我们就会做一个东西让他来管理连接,当我用的时候,直接从里边拿出来已经建立好的连接发信息,那么ConnectionFactory应运而生。
接下来,当程序开发时,可能不止用到一个队列,可能有订单的队列、消息的队列、任务的队列等等,那么就需要给不同的queue发信息,那么和每一个队列连接的这个概念,就叫Channel
再往下来,当我们开发的时候还有时候会用到这样一种功能,就是当我发送一条消息,需要让几个queue都收到,那么怎么解决这个问题呢,难道我要给每一个queue发送一次消息?那岂不是浪费带宽又浪费资源,我们能想到什么办法呢,当然是我们发送给RabbitMQ服务器一次,然后让RabbitMQ服务器自己解析需要给哪个Queue发,那么Exchange就是干这件事的
但是我们给Exchange发消息,他怎么知道给哪个Queue发呢?这里就用到了RoutingKey和BindingKey
BindingKey是Exchange和Queue绑定规则的描述。Exchange接收到的消息中会带有RoutingKey这个字段,Exchange就是根据这个RoutingKey和当前Exchange所有绑定的BindingKey做匹配,如果满足要求,就往BindingKey所绑定的Queue发送消息,这样我们就解决了我们向RabbitMQ发送一次消息,可以分发到不同的Queue的过程
关于RabbitMQ Server的问题?
1问:如何申明的exchange和queue?如何发送和接收消息?
答:
exchange 申明方法为:
/**
* exchange: exchange的名称
* type: 四种类型fanout(分发),topic(匹配),direct(直连),header(主题)
* durable:是否持久化;默认为false不持久化
* autoDelete:是否自动删除;默认false不删除
* internal:默认为false;true表示:这个exchange不接收client推送来的消息,只接收exchange之间通信
* Map<String, Object> arguments:
*/
exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
queue的申明方法:
/**
* queue: queue的名称
* durable:是否持久化;默认为false不持久化
* exclusive:排他队列;
* autoDelete:是否自动删除;默认true自动删除
* Map<String, Object> arguments:
*/
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
- queue: queue的名称
- durable跟交换机方法的参数一样,true表示做持久化,当RabbitMQ服务重启时,队列依然存在。
- exclusive是排他队列,如果一个队列被声明为排他队列,那么这个队列只能被第一次声明他的连接所见,并在连接断开的时候自动删除。这里有三点需要说明,1、同一个连接的不同channel,是可以访问同一连接下创建的排他队列的。2、排他队列只能被声明一次,其他连接不允许声明同名的排他队列。3、即使排他队列是持久化的,当连接断开或者客户端退出时,排他队列依然会被删除。
- autoDelete是自动删除,为true时,当没有任何消费者订阅该队列时,队列会被自动删除。
- arguments:其它参数:
x-message-ttl
:创建queue时设置该参数可指定消息在该queue中待多久,当消息到期时,会主动删除(可根据x-dead-letter-routing-key和x-dead-letter-exchange生成可延迟的死信队列)x-expires
:queue存活时间,创建queue时参数arguments设置了x-expires参数,该queue会在x-expires到期后queue消息,亲身测试直接消失(哪怕里面有未消费的消息)。x-max-length
:queue消息条数限制,限制加入queue中消息的条数。先进先出原则,超过10条后面的消息会顶替前面的消息。x-max-length-bytes
:queue消息容量限制,该参数和x-max-length目的一样限制队列的容量,但是这个是靠队列大小(bytes)来达到限制。x-dead-letter-routing-key
与x-dead-letter-exchange
:创建queue时参数arguments设置了x-dead-letter-routing-key和x-dead-letter-exchange,会在x-message-ttl时间到期后把消息放到x-dead-letter-routing-key和x-dead-letter-exchange指定的队列中达到延迟队列的目的。x-max-priority
:消息优先级(版本限制3.5+ )创建queue时arguments可以使用x-max-priority参数声明优先级队列的最大优先级(整数,建议0-10之间),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费。目前使用更多的优先级将消耗更多的资源(Erlang进程)。 设置该参数同时设置死信队列时或造成已过期的低优先级消息会在未过期的高优先级消息后面执行。 该参数会造成额外的CPU消耗。x-queue-mode
: Lazy;Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中
发送消息
/**
* Publish a message.
*
* Publishing to a non-existent exchange will result in a channel-level
* protocol exception, which closes the channel.
*
* Invocations of <code>Channel#basicPublish</code> will eventually block if a
* <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
*
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>.
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param mandatory true if the 'mandatory' flag is to be set
* @param immediate true if the 'immediate' flag is to be
* set. Note that the RabbitMQ server does not support this flag.
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;
- routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用
- mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。默认false:出现上述情形broker会直接将消息扔掉
- immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。默认false
- BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留
简单来说:mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。
消费消息
在RabbitMQ中消费者有2种方式获取队列中的消息:
- 通过basic.consume命令,订阅某一个队列中的消息,channel会自动在处理完上一条消息之后,接收下一条消息。(同一个channel消息处理是串行的)。除非关闭channel或者取消订阅,否则客户端将会一直接收队列的消息。
- 通过basic.get命令主动获取队列中的消息,但是绝对不可以通过循环调用basic.get来代替basic.consume,这是因为basic.get RabbitMQ在实际执行的时候,是首先consume某一个队列,然后检索第一条消息,然后再取消订阅。如果是高吞吐率的消费者,最好还是建议使用basic.consume。
简单总结一下就是说:
consume是只要队列里面还有消息就一直取。
get是只取了队列里面的第一条消息。
因为get开销大,如果需要从一个队列取消息的话,首选consume方式,慎用循环get方式。
问:什么是死信?
答:三种情况:
- 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到mq中)
问:什么是死信交换机?
答:在定义业务队列的时候,要考虑指定一个死信交换机,死信交换机可以和任何一个普通的队列进行绑定,然后
在业务队列出现死信的时候就会将数据发送到死信队列。
问:什么是死信队列?
答:死信队列实际上就是一个普通的队列,只是这个队列跟死信交换机进行了绑定,用来存放死信而已
死信交换机图解
问:什么场景下可以用延时队列?用RabbitMQ如何实现?
答:
电商系统中,支付订单未完成时,30分钟后提示用户,可以用延时队列来实现。
rabbitmq本身不具有延时消息队列的功能,但是可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现。其原理给消息设置过期时间,在消息队列上为过期消息指定转发器,这样消息过期后会转发到与指定转发器匹配的队列上,变向实现延时队列。
(rabbitmq-delayed-message-exchange 这个插件可以实现延迟队列的功能)
问:设置每条消息的失效时间,先设置一条30s的失效消息发送给MQ,再设置一条10s的失效消息发给MQ,那么什么收到消息的顺序是什么样的?什么时间能收到?
答:
申明队列队时设置消息的失效时间,方式为:
Map<String, Object> argsMap = new HashMap<String, Object>();
argsMap.put("x-message-ttl", 6000); // 消息6s后失效
argsMap.put("x-dead-letter-exchange", "some.dead.exchange.name");//设置死信交换机
argsMap.put("x-dead-letter-routing-key", "some-dead-routing-key");//设置死信routingKey
String myqueue = "myqueue2";
channel.queueDeclare(myqueue, false, false, false, argsMap);
设置每条消息的失效时间,方式为:
new AMQP.BasicProperties.Builder().deliveryMode(2).expiration("2000").build();
问:两种设置失效方式有什么不一样呢?
经过试验得出,对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而第二种方法里,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期时在即将投递到server之前判定的,为什么两者得处理方法不一致?因为第一种方法里,队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期消息即可,而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息,势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期,再进行删除。
所以,会在30s之后,收到两条消息。
问:队列设置了消息失效时间TTL为10s,消息1设置了失效时间5s,消息2设置了失效时间15s,当无消费者时,消息1和2分别多久后会出现在死信队列里?
答:失效时间近的设置先生效。消息1在5s后,消息2在10s后。
关于RabbitMQ连接的建立、传输的问题
问:connection关闭后,channel是否会消失?
答:会。
问:相同的connection、channel能否申明已经申明过的exchange、queue?
答:可以。
问:相同的connection、channel能否再次申明已经申明过但属性不一样的exchange、queue,后申明的属性是否覆盖之前的属性?
答:不可以反复申明不同属性的exchange,会抛出异常。相同属性的exchange可以反复申明。
问:新建立的connection、channel能否申明已经被其他connection、channel申明过的exchange、queue?
答:可以,但那是属性必须和已经申明的exchange一致。
问:客户端主动断开connection,RabbitMQ服务端是否会删除这个connection所申明的exchange、queue?
答:exchange默认不会。queue默认会。在创建的时候可以设置这个属性。
问:生产者client发送消息后,服务器宕机,导致消息丢失。RabbitMQ是通过什么方式避免消息发送时丢失?
答:事务机制(性能消耗大)和confirm模式(消耗小)
事务机制
RabbitMQ提供了txSelect()、txCommit()和txRollback()三个方法对消息发送进行事务管理,txSelect用于将通道channel开启事务模式,txCommit用于提交事务,txRollback用户进行事务回滚操作。
示例代码:
try{
//channel开启事务模式
channel.txSelect();
//发送消息
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
//模拟异常
int n = 1/0;
//提交事务
channel.txCommit();
}catch(Exception e){
e.printStackTrace();
channel.txRollback();
}
假如在txCommit之前发生了异常,那么就可以通过Rollback进行回滚操作。
以上是基于AMQP协议层的事务机制,确保了数据在生产者与RabbitMQ服务器之间的可靠性,但是性能开销较大。
Confirm模式
RabbitMQ提供了一种低消耗的事务管理方式,将channel设置成confirm模式。confirm模式的channel,通过该channel发出的消息会生成一个唯一的有序ID(从1开始),一旦消息成功发送到相应的队列之后,RabbitMQ服务端会发送给生产者一个确认标志,包含消息的ID,这样生产者就知道该消息已经发送成功了。如果消息和队列是持久化的,那么当消息成功写入磁盘之后,生产者会收到确认消息。此外服务端也可以设置basic.ack的mutiple域,表明是否是批量确认的消息,即该序号之前的所有消息都已经收到了。
confirm的机制是异步的,生产者可以在等待的同时继续发送下一条消息,并且异步等待回调处理,如果消息成功发送,会返回ack消息供异步处理,如果消息发送失败发生异常,也会返回nack消息。confirm的时间没有明确说明,并且同一个消息只会被confirm一次。
我们在生产者使用如下代码开启channel的confirm模式,**已经开启事务机制的channel是不能开启confirm模式的。**
channel.confirmSelect();
处理ack或者nack的方式有三种:
串行confirm:
每发送一条消息就调用waitForConfirms()方法等待服务端confirm
//开启confirm模式
channel.confirmSelect();
String message = "Hello World";
//发送消息
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
//判断是否回复
if(channel.waitForConfirms()){
System.out.println("Message send success.");
}
其中waitForConfirms可以换成带有时间参数的方法waitForConfirms(Long mills)指定等待响应时间
批量confirm:
每发送一批次消息就调用waitForConfirms()方法等待服务端confirm
//开启confirm模式
channel.confirmSelect();
for(int i =0;i<1000;i++){
String message = "Hello World";
//发送消息
channel.basicPublish(EXCHANGE_NAME,
"",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
if(i%100==0){
//每发送100条判断一次是否回复
if(channel.waitForConfirms()){
System.out.println("Message send success.");
}
}
}
批量的方法从数量级上降低了confirm的性能消耗,提高了效率,但是有个致命的缺陷,一旦回复确认失败,当前确认批次的消息会全部重新发送,导致消息重复发送。所以批量的confirm虽然性能提高了,但是消息的重复率也提高了。
异步confirm:
使用监听方法,当服务端confirm了一条或多条消息后,调用回调方法
//声明一个用来记录消息唯一ID的有序集合SortedSet
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
//开启confirm模式
channel.confirmSelect();
//异步监听方法 处理ack与nack方法
channel.addConfirmListener(new ConfirmListener() {
//处理ack multiple 是否批量 如果是批量 则将比该条小的所有数据都移除 否则只移除该条
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
//处理nack 与ack相同
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("There is Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
while (true) {
//获取消息confirm的唯一ID
long nextSeqNo = channel.getNextPublishSeqNo();
String message = "Hello World.";
//发送消息
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
//将ID加入到有序集合中
confirmSet.add(nextSeqNo);
}
每一个comfirm的通道维护一个集合,每发送一条数据,集合增加一个元素,每异步响应一条ack或者nack的数据,集合删除一条。SortedSet是一个有序的集合,它的有序是值大小的有序,不是插入时间的有序。JDK中waitForConfirms()方法也是使用了SortedSet集合
7问:RabbitMQ如何知道消息已经被consumer消费?
答:ack机制。为了保证RabbitMQ能够感知消费者正确取到了消息,RabbitMQ提供了消息确认机制,与给生产者回复ACK的方式类似,当队列发送一条消息给消费者时,会记录一个unack标志,当消费者拿到消息之后,会回复一个ack标志,从而抵消了原来的unack标志。
8问:消费者中默认的ack机制是什么样的?
答:默认情况是:当消费者拿到消息之后立即回复ack而不管消息是否正确被处理,就回复。这个时间很快,以至于基本看不到unack的状态。
9问:消费者的默认ack机制,会有什么问题么?
答:消息在消费者端阻塞(消息重复或者大量的消息堆积)。假如消费者在接收消息后,业务处理的过程中发生异常crash了,那么这条消息就消失了,持久化也无法解决这个问题。这里就需要我们在日常的业务处理中,消费者要手动的确认消息。确认消息包括两种,一种是ack,另一种是unack,unack是表明我这条消息处理异常了,可以设置参数告诉MQ服务器是否需要将消息重新放入到队列中。同时,如果开启了手动回复确认的消费者,当消费者异常断开时,没有回复的消息会被重新放入队列供给其他消费者使用。所以程序员必须一定要记得回复消息确认,不然会导致消息重复或者大量的消息堆积。
10问:如何解决消费者的消息阻塞(消息重复或者大量的消息堆积)?
阻塞的问题的解决方案就是设置合理的prefetch大小,处理能力快的消费者设置数值大,处理更多的消息,处理能力慢的消费者设置数值小,少处理消息,也就不会发生阻塞。假设有两个消费者,消费者1处理业务时间是2s,消费者2处理业务时间是2ms,都设置prefetch大小为10,那么就不会出现消费者1积累大量的unack,这里最多的unack数目就是两个prefetch的大小之和20,同时,MQ分发消息是先塞满10个到消费者1,再塞满10个到消费者2,塞第21个的时候,先看消费者1的缓冲池有没有空位,没有的话去看消费者2,因为消费者2的处理速度比1快1000倍,所以1000条数据前10条塞给消费者1之后,后边的数据就都塞给消费者2了。设置prefetch大小的方法:在消费者中加入代码:channel.basicQos(10);