RABBITMQ

题纲

  • RABBITMQ介绍
  • RABBITMQ重要概念
  • 传输信道CHANNEL,交换机,队列,虚拟机
  • RABBITMQ权限
  • RABBITMQ分布式介绍
  • RABBITMQ CLUSTER *
  • RABBITMQ资源限制
  • RABBITMQ优化

RABBITMQ介绍

rabbitmq是一种消息队列中间件,由erlang语言开发的amqp(advanved message queue protocol)的开源工具。它实现了代理broker的异步架构,

消息发送到消费者之前可以在节点上排队,解耦生产者和消费者。一个rabbitmq server也叫broker,它维护着从生产者producer到消费者consumer的线路。
broker内部比较重要的概念就是交换机(exchange)、队列(queue)、路由键(routing key)和绑定(binding)。将队列绑定到交 换机,绑定时可以带
上bingding_key参数,叫绑定键。生产者和消费者与broker建立tcp连接后,在连接内创建channel通道,通过channel发送消息给rabbitmq的
exchange(消息不直接发送给队列),exchange根据生产者的routingkey匹配bingdingkey,将消息路由到对应的队列,然后将消息推送到订阅该队列的消费者。

image.png

RABBITMQ重要概念

  • 传输信道CHANNEL
  • 交换机EXCHANGE
  • 队列QUEUE
  • 虚拟机VHOST

传输信道CHANNEL

客户端和rabbitmq服务之间连接会创建一个tcp连接,连接打开并通过认证后,应用程序可
以创建amqp协议的channel信道,是一个基于tcp的虚拟连接,发布、订阅、接收消息都是
通过信道来完成的。使用信道代替tcp连接的原因就是在高峰期,创建和销毁tcp连接的开
销是比较大的。使用信道后,服务端和客户端只建立一条tcp连接,每个线程启动后会在
这个连接上建立一条信道,可以在这条tcp连接上无限制的创建很多的信道而不影响操作
系统。
客户端与服务器之间通道通过amqp协议的heartbeat维护,客户端设置发送心跳包的间隔,
来保持连接正常,同时如果服务器关闭,客户端会通过heartbeat感知到,从而进行重连。
heartbeat的超时时间。broker的超时时间默认是60s,因此client一般设置timeout/2,两次心
跳检测不成功就认为链接不可达而开始重连。

交换机

RABBITMQ支持多种交换机引用于不通的工作场景,其中最常用的就是direct直连交换机,
topic主题交换机和fanout广播交换机。生产者不会将消息直接发给队列,而是发送给交换
机,交换机通过bingding key与队列绑定,交换机拿着生产者的routingkey去匹配bingdingkey,
把消息发给对应的队列。
三种常用交换机:

  • Direct
  • Topic
  • Fonout

DIRECT交换机

direct:默认交换机,处理路由键。需要将一个队列绑定到交换机上,交换机会对绑定键和路由键精确匹配,而确定发送给哪个队列。多个

绑定键可以绑定到一个队列,也可以一个绑定键绑定到多个队列形成了fanout交换机,资方的路由键以队列的名称命名,属于一对一。


image.png

TOPIC交换机

topic:将路由键和某种模式的绑定键进行匹配。此时队列需要绑定要一个模式上,模式既是匹配规则。路由键名称必须以"."来分割,绑定键也要有同样的规

则。Topic交换机和direct交换机很类似,只不过topic交换机是模糊匹配的,*表示一个单词,#代表任意数量(0或多个)单词。不带有这两个符号的绑定键,功
能和direct交换机一样。


image.png

FANOUT交换机

fanout:不处理路由键。只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会以广播的形式被转发到与该交换机绑定的所有队列上。每个
绑定的队里都得到一份复制的消息。fanout类型交换机转发消息是最快的。


image.png

VHOST

VHOST:
rabbitmq服务器都可以创建虚拟消息服务器,称为虚拟机vhost,vhost之间是逻辑隔离的,
避免不同业务之间队列和交换命名的冲突。rabbitmq默认的vhost /。用户权限是基于vhost授
予的,一个用户可以拥有多个vhost的权限,vhost之间的队列和交换器是无法进行绑定的。
vhost可以更细粒度的区分业务,可以根据vhost进行队列策略的设定(比如是否需要镜像、镜
像节点的个数等),vhost更方便用于元数据的迁移,可以在web ui管理界面的export defi
nitions选项选择vhost进行元数据信息的导出。

RABBITMQ权限

用户资源管理

权限控制是基于vhost的,rabbitmq权限位有三个,分别是配置、写、读权限,通过指令

rabbitmqctl set_permissions -p /zfpt zfpt_rabbitmq "." "." ".*"设置

  1. -p指定vhhost
  2. zfpt_rabbitmq被授予的用户
  3. "." "." "."是权限,映射配置,写,读。"."表示匹配任何的交换机或者队列,"zfpt.*"匹配以zfpt开头的交换机和队列,""空不匹配交换机和队列

RABBITMQ权限

用户角色管理当RABBITMQ启用rabbitmq_management插件就会启动远程web管理,在web管理端可以对

rabbitmq的状态进行监控,也可以对交换机、队列、vhost、policy、用户等的设置。启动
none、management、policymaker、monitoring、administrator

management:

用户可以通过amqp做的任何事外加:
1. 列出自己可以通过amqp登入的virtual hosts
2. 查看自己的virtual hosts中的queues, exchanges和   bindings
3. 查看和关闭自己的channels和   connections
4. 查看有关自己的virtual hosts的"全局"的统计信息,包含其他用户在这些virtual hosts中的活动。

policymaker

management可以做的任何事外加:
查看、创建和删除自己的virtual hosts所属的policies和parameters

monitoring

management可以做的任何事外加:
1. 列出所有virtual hosts,包括他们不能登录的virtual hosts
2. 查看其他用户的connections和channels
3. 查看节点级别的数据如clustering和memory使用情况
4. 查看真正的关于所有virtual hosts的全局的统计信息

administrator

1. policymaker和monitoring可以做的任何事外加:
2. 创建和删除virtual hosts
3. 查看、创建和删除users
4. 查看创建和删除permissions

队列QUEUE

RABBITMQ中的队列用于存储和投递消息,队列在声明时可以设置不同的参数实现不同的

功能,如下一些常见的队列:

  1. 普通队列
  2. 惰性队列(LAZY QUEUE)
  3. 专用队列(EXCLUSIVE QUEUE)

普通队列

一般队列是应用中最常用的队列,既应用中声明中没有特殊定义的队列,普通队列可以是

非持久化、持久化和镜像队列。非持久化队列只存在内存中,持久化队列存在于内存和磁盘中。镜像队列是CLUSTER集群
中的实现。

惰性队列

惰性队列(lazy   queue)是在3.6.0以后才有的,该队列将消息尽可能快的存储到磁盘上,

只有在消费者请求的时候才被加载到内存。
惰性队列的主要目的是支持长队列,超过数百万消息,达到这个级别的队列有一下原因:
1 消费者下线或者宕机维护
2 突然有消息传入,速度超过了消费者消费速度
3 消费者比平时消费慢
消息page out to disk不仅会花费时间、增加磁盘的IO而且还会阻塞队列,使队列不能接收消息,影
响性能。因此除特殊情况,不建议存储如此大量的数据。

排他队列

排他队列(   exclusive queue)仅在消费者声明创建后,只有自己可以使用的队列,该队列

一但创建,另一个连接将无法创建相同的队列,因此叫排他队列,当连接断开,队列同时
被删除。其他连接上是无法请求到当前连接的排他队列的。由于排他队列会在声明它的连
接端口的时候被删除,因此这样的队列是没有必要做持久化的。
1 排他队列的特性:
2 只对首次声明它的连接可见
3 会在其连接断开的时候自动删除

RABBITMQ分布式

rabbitmq的分布式实现包括cluster、 federation、 shovel
1 cluster集群是连接多个节点(设备)组成一个单一逻辑的broker,使用erlang通信,因此所有
的节点都需要配置相同的erlang cookie,节点之间对于网络可靠性要求高,避免使用wan网
络。所有节点运行的rabbitmq版本必须一致。
2 federation模式允许在一个broker上的exchange或者queues接收由推送到其他broker的
exchange或者queue中的消息
3 shovel只是使用一个代理上的消息,将它们转发到另一个代理上的交换机。

CLUSTER介绍

rabbitmq的cluster,将一个网内的多个rabbitmq服务组成一个逻辑的broker。集群的作用就
是让生产者和消费者在rabbit目前节点崩溃的时候可以继续运行,多个节点分摊数据压力,
线性扩容提高吞吐量。

CLUSTER数据持久化

1 消息持久化:如果要保证消息在rabbitmq服务器节点崩溃和重启时不丢失,则需要客户端声明的交换

机、队列为持久化,同时设置发送的消息也为持久化,当某个节点崩溃重启后,该节点上队列的消息
仅会丢失没来得及写入磁盘的消息。如果仅仅设置消息持久化,那么在节点崩溃后,队列和交换机信
息丢失,持久化的消息将永远不会被消费。
2 元数据持久化:
无论是单一的节点还是集群,都会记录rabbitmq的元数据:
1 队列元数据(队列名称和他们的属性:是否可持久化、是否自动删除)
2 交换机元数据(交换机名称,类型和属性:可持久化等)
3 绑定元数据(记录交换机和队列的绑定关系)
4 vhost元数据(vhost内的队列交换机、绑定提供命名空间和安全)
元数据的持久化是设置集群的节点为disc节点实现的。一个集群可以有多个disc节点和ram节点。

DISC节点还是RAM节点

Cluster节点可以是disc节点,也可以是ram节点(单实例的rabbitmq只能是disc节点)

cluster的每个节点都会保存一份元数据,记录到磁盘或者内存中,
RAM节点将所有的队列、交换机、绑定、用户、权限和vhost的元数据都仅存储在内存中,
而DISC节点则将元数据存储在磁盘中。保存在磁盘中,会产生磁盘io,对性能有一定的影响,
处于安全和性能考虑,集群中应该至少有一个disc节点。如果disc节点故
障,那么对元数据的写和修改操作都不能执行了,因此,较大规模的集群需要增加disc节
点的数量保证高可用。
如果遇到特殊情况需要重启集群,disc节点应该是最后被关闭的,因为磁盘节点可以永久
保存集群元数据的最新状态。ram节点在关闭的时候,元数据会丢失。启动集群时先启动
最后关闭的disc节点,再启动ram节点,ram节点就可以从disc节点同步元数据信息,相反
先启动ram节点是无法启动成功的。

CLUSTER的消息处理

客户端连接到集群中的节点,会在连接的节点创建队列,同时会更新同步所有节点的元数据

信息,客户端声明队列绑定到交换机后,客户端即可发送和消费消息。
如果生产者连接的节点并不在队列存在的节点上,由于所连接的节点有所有的队列信息元
数据,则连接节点会将消息路由到队列存在的节点上进行存储。
如果消费者连接到不存在请求队列的节点,该节点会去队列节点取消息发送给消费者。因
此,客户端可以连接集群中任意一个节点且不会感知到所连接的节点是否存在请求的队列。
普通的CLUSTER的QUEUE只存在于一个节点,节点故障或重启后,队列将不可用,这种集
群优势是不需要等待数据同步,相应快,吞吐量高。

CLUSTER镜像队列

我们在生产环境中使用的是镜像模式集群,这种模式是在普通集群基础上,通过设置队列的

POLICY将队列配置为镜像队列,会在集群的多个节点上存在多个镜像拷贝,最初创建队列的节
点存在的队列称为master,其他节点的镜像拷贝称为mirror。
镜像队列解决集群中节点故障导致该节点的队列不可用的问题。当集群中一个节点宕机,如果
上面存在的是mirror队列,则只对连接在上面的客户端产生影响,需要客户端支持重连。如果
是master队列,与master的连接全部断开;rabbitmq会在mirror队列中选取一个状态最新的队列
提升为master;新的master会requeue所有unack(客户端确认在原master宕机的时候未及时同步
而丢失);其他mirror节点从新的master同步,生产者和消费者重新建立连接继续工作。
Rabbitmq集群中还会保留所有服务器的信息,会等待宕机的节点归队。
all queue operations go through the master first and then are replicated to mirrors,意思是在集群中
任意节点上的操作(生产和消费消息),都会被路由到master队列,然后master将状态同步到
mirror队列,这样做保证了消息的顺序性。
队列的镜像个数({"ha-mode":"exactly", "ha-params":2}),目前生存环境2个节点设置2,三个
节点也是2,有几个镜像,数据就会从master传输到几个mirror节点,镜像数越多,产生的网络
消耗和磁盘io消耗也越多。

CLUSTER镜像队列

ha-mode默认是all,所有的节点都创建镜像,exactly表示明确镜像个数,node表示指定节点创建,后两者
要结合ha-paramater配置
{"ha-sync-mode": "automatic"} ha-sync-mode是队列同步设置,默认是manual,新的队列加入的时候,只会接收新消息,如果在相对于新加队列时
间点之前的消息未消费完,master节点宕机就会导致数据丢失。因此要设置成automatic自动同步,会增加一部分流
量消耗,但是保证消息的安全性。
{"queue-master-locator":"min-masters"}也是比较关键的

  • client-local,默认模式,在客户端连接的节点上创建master队列
  • mini-masters,节点最小master数量分配,保证每个节点的master队列数量平均分配,可能会出现消息量大的队列的
    master分配到一个节点上,如果发现,可以使用官方的balance方法重新分配
  • random,随机分配
    完整队列创建指令:rabbitmqctl set_policy -p /zfpt zfpt_queue "^" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":
    "automatic","queue-master-locator":"min-masters"}'
    之前的遇到的问题就是,在迁移的时候,导入元数据信息,结果都落在一个几点,就是因为没有在导入前设置平均分配策略

CLUSTER负载均衡

目前负载均衡使用阿里云的elb实现,走tcp协议,需要客户端支持amqp协议,否则无法识

别heartbeat包。需要注意的是elb的超时时间,不能低于heartbeat的探测间隔(一般30s)。
rabbitmq支持代理协议(proxy protocol),需要负载均衡器支持amqp协议,否则rabbitmq
节点日志记录的是负载均衡的ip以及在web管理界面显示的也是负载均衡的地址(haporxy
和aws elb有这样的问题,阿里云slb没有),这种模式还没有试成功!

CLUSTER升级和迁移

有时候需要对cluster进行版本升级或者迁移,官方提供的方案基本上都是数据通过某种方

式传输,以保证数据在升级或迁移的过程中不丢失,但是操作比较复杂,比如基于
federation的蓝绿迁移,也不能保证数据完全迁移到新集群;cluster的主次版本升级,也是
需要停止服务一段时间才可以恢复的。经过资方平台的升级和迁移总结,可以新建一套目
标集群,切换生产者和一部分消费者到新集群中进行正常业务,保留一部分老的消费者继
续消费老集群的消息直到消息完全被消费掉,就可以将老集群下线。新集群在投入生产之
前,需要创建元数据信息,通过web ui界面的export出所有元数据信息(也可以是某个
vhost的),然后import到新集群中。
注意,要先设置好队列镜像的策略中的master队列分配为min-master或者随机,否则导入后
队列master会落到一个节点上。

RABBITMQ资源限制

Rabbitmq通过内存限制和磁盘剩余量限制保证节点不会crash,当达到限制值的时候,rabbitmq会阻塞消息入队。

内存限制:
默认当使用超过系统内存的40%,就会引起内存警告,阻塞所有的消息推送连接,直到服务把
内存数据刷到磁盘,或者分发给消费者,内存会被清理,服务恢复正常。默认设置的40%内存
门槛并不是保护rabbitmq服务使用不会超过40%,只是一个消息推送节流的点。erlang进行垃圾
收集的时候会使用两倍的内存即80%,官方强力建议开启swap。可以设置百分比:
vm_memory_high_watermark.relative = 0.4
也可以设置绝对值:vm_memory_high_watermark.absolute = 1024mb
在服务内存达到限制之前,服务会将内存中的数据刷盘,持久化和短暂消息将被page out到
磁盘以释放内存以保护服务正常,默认配置的是内存水位线的50%开始执行page out,
vm_memory_high_watermark_paging_ratio = 0.75可以适当增加到75%
磁盘限制:
磁盘默认使用到剩余50m时达到水位线,将阻塞所有生产者发布消息,目的是防止磁
盘被写满而导致服务rabbitmq服务停止。在内存不足的情况下,没有被持久化的消息(transient
messages)会被page out到磁盘,将耗尽剩余的磁盘空间,因此,磁盘水位线如果设置的太低,
且信息快速换入到磁盘,可能会在磁盘空间检查时间(默认10s)间隔内造成磁盘空间满导致
rabbitmq crash。
设置参数
配置文件: disk_free_limit.absolute = 1GB
动态设置: rabbitmqctl set_disk_free_limit 1GB
内存和磁盘的水位线限制,都是保护服务正常的策略,内存和磁盘告警是集群范围的,如果其中
一个节点达到水位线,整个集群的节点将全部阻塞生产者写入消息。

RABBITMQ的一些配置和优化

消息确认机制:

rabbitmq消费者完成消息消费后,需要给rabbitmq确认信息,rabbitmq收到确认信息后,将消息在master队

列上删除,并同步到mirror队列。消息确认方式包括自动确认no_ack=true和手动确认basic_ack,根据业务需
求选择不同的确认方式。可以使用rabbitmqctl list_queues name messages_ready指令查看未确认的消息。
在手动ack的时候,遇到过一种情况,web显示一直有消息在redelivered,在queue界面确定是哪个队列后,
经过研发排查分析的结果是,是消费者在处理过程中发生了异常,但不想丢掉这条异常消息,所以乐观地
认为重试多次是有可能成功的,于是告诉队列,该消息可以重新回到队列,导致该消息进入无限循环。

消息公平调度

队列消息通过round-robin方式分发给消费者,如果某个消费者正在处理耗时的消息,队列又继续分发消息给

它,会造成负载不均。消费者可以配置prefetch_count=n(资方设置为1)来解决这种问题,消费者们设置一
次从rabbitmq预取消息处理数量存到缓存区等待消费,rabbitmq将按照该值将消息投递给消费者,之后会判
断channel中unack的消息数量是否等于消费者数量*prefetch_count数量,如果相等,不推送消息,如果其中一
个消费者消费一条消息,则rabbitmq会发现这个消费者的消息数量少了,将会补充一条消息给它。
Best practice提供参数RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+HMQD OFF_HEAP"可以
是内存运行更稳定

开启hipe

开启erlang   hipe编译选项, hiPE是Erlang的即时编译器。这将以增加启动时间为代价提高服务器吞

吐量,能够提高性能20%以上。在erlang r17(目前3.7.7使用r21)后hipe已经相当稳定,rabbitmq
官方也建议开启此选项。
rabbitmq.conf中配置 hipe_compile = true

其他

限制消息体大小、限制队列长度,限制channel数量等等,这些参数需要通过实践来设置。

RABBITMQ压测工具

rabbitmq的压测工具官方推荐rabbitmq-perf-test(  https://rabbitmq.github.io/rabbitmq-perf-

test/stable/htmlsingle/)
压测命令举例:
bin/runjava com.rabbitmq.perf.perftest -f persistent -s 33000 -h
amqp://username:password@10.40.93.221:5672 --queue-pattern 'perf-test-%d' --queue-pattern-
from 1 --queue-pattern-to 600 --producers 200 --consumers 200 -r 5 -R 3

RABBITMQ常用指令

  • 开启web插件: rabbitmq-plugins enable rabbitmq_management
  • 创建用户: rabbitmqctl add_user wecash password
  • 设置用户角色:rabbitmqctl set_user_tags wecash administrator
  • 创建vhost: abbitmqctl add_vhost /zfpt
  • 授权用户权限: rabbitmqctl set_permissions -p /zfpt wecash "." "." ".*"
  • 创建队列Policy: rabbitmqctl set_policy -p /zfpt zfpt_queue "^" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":
    "automatic","queue-master-locator":"min-masters"}'
  • 列出队列信息:rabbtimqctl list_queues
  • 列出用户:rabbitmqctl list_users
  • 查看集群状态:rabbitmqctl cluster_status
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353

推荐阅读更多精彩内容