本文目录:
1.RabbitMQ 简介
2.RabbitMQ 应用场景
3.RabbitMQ 系统架构
4.RabbitMQ 更多应用 (RPC、超时、延时)
Rabbit简介
大致介绍一下:
MQ:MQ是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。
而RabbitMQ,是一个Erlang开发的AMQP(Advanced Message Queuing Protocol )的开源实现。AMQP是消息通讯的世界里有很多公开标准中的其中一支,也是最为被大众认可的一支。
RabbitMQ应用场景:
MQ是应用程序或组件之间可以进行可靠的异步通讯,从而降低系统之间的耦合度,提高系统的可扩展性和可用性。
典型的应用场景有:
1.典型的异步处理
2.应用解耦
3.流量削锋
4.消息通讯四个场景
RabbitMQ 系统架构
Broker: 简单来说就是消息队列服务器实体
Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列
Queue: 消息队列载体,每个消息都会被投入到一个或多个队列
Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来
Routing Key: 路由关键字,exchange根据这个关键字进行消息投递
VHost: 虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
Producer: 消息生产者,就是投递消息的程序
Consumer: 消息消费者,就是接受消息的程序
Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
如下图所示:
接下来我们根据具体业务说明各个部分的作用:
例如:现有一个业务需求,在某个用户开卡完成之后,会有别的服务想要得知这个用户开卡成功(活动服务),于是开卡的服务就需要发送一条开卡成功的消息到消息中间件。而别的服务也订阅这个消息中间件中的某个事件,在开卡服务发出消息之后便能收到这个消息进行处理。
那这一条开卡消息在RabbitMQ中是怎么转移的呢:
1.Clients P (Producer)它在完成开卡的时候会发送消息到X,也就是 Exchanges
- Exchanges不是一个消息的保持者,只是一个消息的交换机,他会根据不同的规则把消息路由到Queues中。
3.Queues才是消息的实际承载者。它会把收到的消息发送给Client C(Consumer)
根据不同的应用场景,RabbitMQ提供了四种类型的Exchanges:
fanout、direct、topic、headers这四种
fanout:
fanout
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
此种应用场景很普遍,发送消息的生产者不需要理会他到底有多少个Queues,也不用关心Routing Key ,只要有Binding就会路由消息。
direct
如图所示,direct的路由规则是,只有完全匹配routing key 才能路由到指定的Queues中。某个消息的routing key 为 error ,那它就只能路由到amqp.gen-S9b, 以及amqp.gen-Agl , 如果routing key 为 info,就只能路由到amqp.gen-Agl.
topic
Topic Type这种类型的Exchange Type 介于Direct 以及 Fanout之间。他能够通过通配符来指定路由到不同的Queues中。Binding Key中可以存在两种特殊字符" * "与"#",用于做模糊匹配,其中" * "用于匹配一个单词,"#"用于匹配多个单词(可以是零个).图中routingKey=”quick.orange.rabbit”就会被路由到Q1中。
headers
headers类型的Exchange不依赖于Routing Key与Binding Key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。该类型的Exchange用处极少,就不做介绍了。
关于ACK
默认情况下,如果Message 已经被某个Consumer正确的接收到了,那么该Message就会被从Queue中移除。
如果一个Queue没被任何的订阅,当有数据到达时,这个数据会被cache,不会被丢弃。当有Consumer时,这个数据会被立即发送到这个Consumer。这个数据被Consumer正确收到时,这个数据就被从Queue中删除(重发到死信队列)。
那么什么是正确收到呢?通过ACK。每个Message都要被acknowledged(确认,ACK)。我们可以显示的在程序中去ACK,也可以自动的ACK。如果有数据没有被ACK,那么RabbitMQ Server会把这个信息发送到下一个Consumer。
定义的时候显示指定手动ACK acknowledgeMode
<bean id="firstInvoiceListenerContainer"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="queueNames" value="xxxxx"></property>
<property name="connectionFactory" ref="connectionFactory"></property>
<property name="messageListener" ref="xxxxAdapter"></property>
<property name="concurrentConsumers" value="10"></property>
<property name="acknowledgeMode" value="MANUAL"></property>
</bean>
如果任务没有被ACK而是被Reject,Rabbit的Reject策略有两个,一种的Reject可以让RabbitMQ Server将该Message 发送到下一个Consumer。第二种是从Queue中立即删除该Message。
RabbitMQ 更多应用
RabbitMQ基于MQ消息队列之上还可以实现 RPC调用,消息超时,延迟队列
RPC
MQ本身是基于异步的消息处理,上面的示例中生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。
客户端发送请求(消息)时,在消息的属性(Message Properties,在AMQP协议中定义了14种properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)。服务器端收到消息处理完后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性。客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理。
超时时间TTL
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);
只要给队列设置x-message-ttl 参数,就设定了该队列所有消息的存活时间,时间单位是毫秒,值必须大于等于0
RabbitMQ保证超时的消息不会被消费者获得,同时会尽快删除超时的消息。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。
死信队列 Dead Letter Exchange
在消息(队列)设置了TTL而过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:
1.消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
2.消息因为设置了TTL而过期。
3.消息进入了一条已经达到最大长度的队列。
如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。
延时队列
有了刚刚提到的TTL以及DLX,那么久可以将RabbitMQ的TTL和DLX特性结合在一起,实现一个延迟队列。
如上图所示,生产者产生的消息首先会进入缓冲队列。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列,以此达到延迟消费的效果。
定义一个延时队列:
Queue delayQueuePerMessageTTL() {
return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME)
.withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX,dead letter发送到的exchange
.withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
.withArgument("x-message-ttl", QUEUE_EXPIRATION) // 设置队列的过期时间
.build();
}
其中,x-dead-letter-exchange声明了队列里的死信转发到的DLX名称,x-dead-letter-routing-key声明了这些死信在转发时携带的routing-key名称。
Priority 、Prefetch、Lazy-Queues
RabbitMQ还有一些特性在这里简要提一下。
Lazy Queues
3.6.0版本开始引入了惰性队列(Lazy Queue)的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
Priority Queue
RabbitMQ 3.5+版本中支持了队列优先级和消息优先级,在发送消息的时候指定Priority参数就能给消息定义某个优先级。
Prefetch
在开启手动ACK的情况下,对接收到的消息可以根据业务的需要异步对消息进行确认。
然而在实际使用过程中,由于消费者自身处理能力有限,从rabbitmq获取一定数量的消息后,希望rabbitmq不再将队列中的消息推送过来,当对消息处理完后(即对消息进行了ack,并且有能力处理更多的消息)再接收来自队列的消息。在这种场景下,我们可以通过设置prefetch_count来达到这种效果。
参考 https://mp.weixin.qq.com/s/OABseRR0BnbK9svIPyLKXw
参考 http://www.rabbitmq.com/direct-reply-to.html
参考 http://kissyu.org/2017/11/18/Spring%20Boot%E4%B8%8ERabbitMQ%E7%BB%93%E5%90%88%E5%AE%9E%E7%8E%B0%E5%BB%B6%E8%BF%9F%E9%98%9F%E5%88%97/?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io