RabbitMQ 介绍
RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ 安装
$ sudo apt-get update
$ sudo apt-get install libsctp1
$ wget https://packages.erlang-solutions.com/erlang/,version: 20.0-1_ubuntu_xenial
$ sudo dpkg -i esl-erlang_20.0-1_ubuntu_xenial_amd64.deb
$ wget http://packages.erlang-solutions.com/site/esl/esl-erlang/FLAVOUR_1_general/esl-erlang_20.0-1~ubuntu~xenial_amd64.deb
$ sudo apt-get install -f -y
$ sudo dpkg -i esl-erlang_20.0-1_ubuntu_xenial_amd64.deb
$ wget https://packagecloud.io/rabbitmq/rabbitmq-server/packages/ubuntu/xenial/rabbitmq-server_3.7.6-1_all.deb/download.deb
$ sudo dpkg -i rabbitmq-server_3.7.6-1_all.deb
RabbitMQ 配置
支持跨域访问
$ sudo chmod 777 -R /etc/rabbitmq/
$ cd /etc/rabbitmq/
$ sudo vim rabbitmq.conf
$ sudo sed -i "s/# loopback_users.guest = false/loopback_users.guest = false/g" ./rabbitmq.conf
$ sudo chmod 777 ./rabbitmq.conf
$ sudo service rabbitmq-server restart
RabbitMQ 命令
- /etc/init.d/rabbitmq-server start|stop|restart|reload
- rabbitmqctl add_vhost vhostname 创建 Vhost
- rabbitmqctl delete_vhost vhostname 删除 Vhost
- rabbitmqctl list_vhost 遍历所有虚拟主机信息
- rabbitmqctl add_user username password 添加用户名、密码
- rabbitmqctl change_password username password 修改用户密码
- rabbitmqctl set_permissions -p v_host user “."".”".*" 绑定权限,并且具备读写的权限
- rabbitmqctl list_queues 显示所有队列
RabbitMQ 模式
1. fanout模式
模式特点: 可以理解他是一个广播模式不需要routing key它的消息发送时通过Exchange binding进行路由的~~在这个模式routing key失去作用这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定如果接收到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
2. Direct 模式
模式特点: 任何发送到Direct Exchange的消息都会被转发到routing_key中指定的Queue。一个routing_key可以绑定一个Queue,同时一个routing_key也可以绑定多个队列。
- 一般情况可以使用rabbitMQ自带的Exchange:”” (该Exchange的名字为空字符串), 也可以自定义Exchange。
- 这种模式下不需要将Exchange进行任何绑定(bind)操作。当然也可以进行绑定。可以将不同的routing_key与不同的queue进行绑定,不同的queue与不同exchange进行绑定。
- 消息传递时需要一个“routing_key”。
- 如果消息中不存在routing_key中绑定的队列名,则该消息会被抛弃。
3. topic类型
模式特点: copic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同如下所述:
- routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
- binding key与routing key一样也是句点号“. ”分隔的字符串
- binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
RabbitMQ实现RPC机制
- 客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14种properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
- 服务器端收到消息并处理
- 服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性
- 客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理
Correlation id
一般我们在设置RPC请求时给每个请求设置一个唯一值,最后当我们收到消息在这个callback Queue中,我们查看这个属性和请求的属性(Correlation_id)进行匹配如果没有匹配上,我们将拒绝这个消息它不是我们的。
为什么我们应该忽略回调队列中的未知消息,而不是错误地失败呢?这是由于服务器端可能出现竞态条件。虽然不太可能,但是RPC服务器可能在发送了答案后才会死亡,但在发送请求消息之前。
如果发生这种情况,重新启动的RPC服务器将再次处理此请求。这就是为什么在客户端我们必须优雅地处理重复的响应,而RPC应该是幂等()的。
RPC幂等性:
f(x)=f(f(x))
如果消息具有操作幂等性,也就是一个消息被应用多次与应用一次产生的效果是一样的。
博客著作权归本作者所有,任何形式的转载都请联系作者获得授权并注明出处。