RabbitMQ扫盲

本文目录:

1.RabbitMQ 简介
2.RabbitMQ 应用场景
3.RabbitMQ 系统架构
4.RabbitMQ 更多应用 (RPC、超时、延时)

Rabbit简介

大致介绍一下:
MQ:MQ是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。
而RabbitMQ,是一个Erlang开发的AMQP(Advanced Message Queuing Protocol )的开源实现。AMQP是消息通讯的世界里有很多公开标准中的其中一支,也是最为被大众认可的一支。

RabbitMQ应用场景:

MQ是应用程序或组件之间可以进行可靠的异步通讯,从而降低系统之间的耦合度,提高系统的可扩展性和可用性。
典型的应用场景有:
1.典型的异步处理
2.应用解耦
3.流量削锋
4.消息通讯四个场景

RabbitMQ 系统架构

Broker: 简单来说就是消息队列服务器实体
Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列
Queue: 消息队列载体,每个消息都会被投入到一个或多个队列
Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来
Routing Key: 路由关键字,exchange根据这个关键字进行消息投递
VHost: 虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
Producer: 消息生产者,就是投递消息的程序
Consumer: 消息消费者,就是接受消息的程序
Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

如下图所示:


RabbitMQ消息模型

接下来我们根据具体业务说明各个部分的作用:

例如:现有一个业务需求,在某个用户开卡完成之后,会有别的服务想要得知这个用户开卡成功(活动服务),于是开卡的服务就需要发送一条开卡成功的消息到消息中间件。而别的服务也订阅这个消息中间件中的某个事件,在开卡服务发出消息之后便能收到这个消息进行处理。
那这一条开卡消息在RabbitMQ中是怎么转移的呢:

1.Clients P (Producer)它在完成开卡的时候会发送消息到X,也就是 Exchanges

  1. Exchanges不是一个消息的保持者,只是一个消息的交换机,他会根据不同的规则把消息路由到Queues中。

3.Queues才是消息的实际承载者。它会把收到的消息发送给Client C(Consumer)

根据不同的应用场景,RabbitMQ提供了四种类型的Exchanges:
fanout、direct、topic、headers这四种
fanout:


fanout

fanout

fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
此种应用场景很普遍,发送消息的生产者不需要理会他到底有多少个Queues,也不用关心Routing Key ,只要有Binding就会路由消息。

direct

direct

如图所示,direct的路由规则是,只有完全匹配routing key 才能路由到指定的Queues中。某个消息的routing key 为 error ,那它就只能路由到amqp.gen-S9b, 以及amqp.gen-Agl , 如果routing key 为 info,就只能路由到amqp.gen-Agl.

topic

topic

Topic Type这种类型的Exchange Type 介于Direct 以及 Fanout之间。他能够通过通配符来指定路由到不同的Queues中。Binding Key中可以存在两种特殊字符" * "与"#",用于做模糊匹配,其中" * "用于匹配一个单词,"#"用于匹配多个单词(可以是零个).图中routingKey=”quick.orange.rabbit”就会被路由到Q1中。

headers

headers类型的Exchange不依赖于Routing Key与Binding Key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。该类型的Exchange用处极少,就不做介绍了。

关于ACK

默认情况下,如果Message 已经被某个Consumer正确的接收到了,那么该Message就会被从Queue中移除。
如果一个Queue没被任何的订阅,当有数据到达时,这个数据会被cache,不会被丢弃。当有Consumer时,这个数据会被立即发送到这个Consumer。这个数据被Consumer正确收到时,这个数据就被从Queue中删除(重发到死信队列)。
那么什么是正确收到呢?通过ACK。每个Message都要被acknowledged(确认,ACK)。我们可以显示的在程序中去ACK,也可以自动的ACK。如果有数据没有被ACK,那么RabbitMQ Server会把这个信息发送到下一个Consumer。
定义的时候显示指定手动ACK acknowledgeMode

    <bean id="firstInvoiceListenerContainer"
          class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
        <property name="queueNames" value="xxxxx"></property>
        <property name="connectionFactory" ref="connectionFactory"></property>
        <property name="messageListener" ref="xxxxAdapter"></property>
        <property name="concurrentConsumers" value="10"></property>
        <property name="acknowledgeMode" value="MANUAL"></property>
    </bean>

如果任务没有被ACK而是被Reject,Rabbit的Reject策略有两个,一种的Reject可以让RabbitMQ Server将该Message 发送到下一个Consumer。第二种是从Queue中立即删除该Message。

RabbitMQ 更多应用

RabbitMQ基于MQ消息队列之上还可以实现 RPC调用,消息超时,延迟队列

RPC

MQ本身是基于异步的消息处理,上面的示例中生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。


image.png

客户端发送请求(消息)时,在消息的属性(Message Properties,在AMQP协议中定义了14种properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)。服务器端收到消息处理完后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性。客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理。

超时时间TTL

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

只要给队列设置x-message-ttl 参数,就设定了该队列所有消息的存活时间,时间单位是毫秒,值必须大于等于0
RabbitMQ保证超时的消息不会被消费者获得,同时会尽快删除超时的消息。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。

死信队列 Dead Letter Exchange

在消息(队列)设置了TTL而过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:
1.消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
2.消息因为设置了TTL而过期。
3.消息进入了一条已经达到最大长度的队列。
如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。

延时队列

有了刚刚提到的TTL以及DLX,那么久可以将RabbitMQ的TTL和DLX特性结合在一起,实现一个延迟队列。


延时队列

如上图所示,生产者产生的消息首先会进入缓冲队列。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列,以此达到延迟消费的效果。
定义一个延时队列:

Queue delayQueuePerMessageTTL() {
    return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME)
                       .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX,dead letter发送到的exchange
                       .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
                       .withArgument("x-message-ttl", QUEUE_EXPIRATION) // 设置队列的过期时间
                       .build();
}

其中,x-dead-letter-exchange声明了队列里的死信转发到的DLX名称,x-dead-letter-routing-key声明了这些死信在转发时携带的routing-key名称。

Priority 、Prefetch、Lazy-Queues

RabbitMQ还有一些特性在这里简要提一下。

Lazy Queues

3.6.0版本开始引入了惰性队列(Lazy Queue)的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

Priority Queue

RabbitMQ 3.5+版本中支持了队列优先级和消息优先级,在发送消息的时候指定Priority参数就能给消息定义某个优先级。

Prefetch

在开启手动ACK的情况下,对接收到的消息可以根据业务的需要异步对消息进行确认。
然而在实际使用过程中,由于消费者自身处理能力有限,从rabbitmq获取一定数量的消息后,希望rabbitmq不再将队列中的消息推送过来,当对消息处理完后(即对消息进行了ack,并且有能力处理更多的消息)再接收来自队列的消息。在这种场景下,我们可以通过设置prefetch_count来达到这种效果。

参考 https://mp.weixin.qq.com/s/OABseRR0BnbK9svIPyLKXw
参考 http://www.rabbitmq.com/direct-reply-to.html
参考 http://kissyu.org/2017/11/18/Spring%20Boot%E4%B8%8ERabbitMQ%E7%BB%93%E5%90%88%E5%AE%9E%E7%8E%B0%E5%BB%B6%E8%BF%9F%E9%98%9F%E5%88%97/?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,899评论 2 11
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,353评论 2 34
  • RabbitMQ 简介 MQ 消息队列,上承生产者,下接消费者。从生产者侧获取消息,然后将消息转发给消费者。由此可...
    2205阅读 3,491评论 1 11
  • 利用RabbitMQ集群横向扩展能力,均衡流量压力,让消息集群的秒级服务能力达到百万,Google曾做过此类实验;...
    有货技术阅读 3,463评论 0 1