MQ消息队列
- MQ全称为message queue,即消息队列.MQ是一种应用程序对应用程序的通信方法,应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无须专用连接来链接它们
MQ消息队列简介
- 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构.主流消息队列包括RabbitMQ、Kafka等
- 消息传递指的是应用程序之间通过在消息中发送数据进行通信,而不是直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术
- 排队指的是应用程序通过队列来通信,队列的使用除去了接收和发送应用程序同时执行的要求
MQ消息队列作用
- 程序解耦
- 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
- 冗余
- 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险
- 许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕
- 峰值处理能力
- 使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃
- 可恢复性
- 系统的一部分组件失效时,不会影响到整个系统
- 消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理
- 顺序保证
- 大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理
- 缓冲
- 有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况
- 异步通信
- 消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们
RabbitMQ安装
# 安装erlang
yum -y install erlang
# 安装RabbitMQ
yum -y install rabbitmq-server
# 启动(无用户名密码):
systemctl start/stop/restart/status rabbitmq-server
# 设置rabbitmq账号密码,以及角色权限设置
# 设置新用户test 密码123
rabbitmqctl add_user test 123
# 设置用户为administrator角色
rabbitmqctl set_user_tags test administrator
# 设置权限,允许对所有的队列都有权限
# 对何种资源具有配置、写、读的权限通过正则表达式来匹配,具体命令如下:
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" test ".*" ".*" ".*"
# 开启web界面rabbitmq
rabbitmq-plugins enable rabbitmq_management
# 访问web界面
http://server-name:15672/
# 重启服务生效设置
service rabbitmq-server start/stop/restart
相关命令
# 新建用户
rabbitmqctl add_user {用户名} {密码}
# 设置权限
rabbitmqctl set_user_tags {用户名} {权限}
# 查看用户列表
rabbitmqctl list_users
# 为用户授权
添加 Virtual Hosts :
rabbitmqctl add_vhost <vhost>
# 删除用户
rabbitmqctl delete_user Username
# 修改用户的密码
rabbitmqctl change_password Username Newpassword
# 删除 Virtual Hosts :
rabbitmqctl delete_vhost <vhost>
# 添加 Users :
rabbitmqctl add_user <username> <password>
rabbitmqctl set_user_tags <username> <tag> ...
rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read>
# 删除 Users :
delete_user <username>
# 使用户user1具有vhost1这个virtual host中所有资源的配置、写、读权限以便管理其中的资源
rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*'
# 查看权限
rabbitmqctl list_user_permissions user1
rabbitmqctl list_permissions -p vhost1
# 清除权限
rabbitmqctl clear_permissions [-p VHostPath] User
# 清空队列步骤
rabbitmqctl reset
需要提前关闭应用rabbitmqctl stop_app ,
然后再清空队列,启动应用
rabbitmqctl start_app
此时查看队列rabbitmqctl list_queues
查看所有的exchange: rabbitmqctl list_exchanges
查看所有的queue: rabbitmqctl list_queues
查看所有的用户: rabbitmqctl list_users
查看所有的绑定(exchange和queue的绑定信息): rabbitmqctl list_bindings
查看消息确认信息:
rabbitmqctl list_queues name messages_ready messages_unacknowledged
查看RabbitMQ状态,包括版本号等信息:rabbitmqctl status
# 开启web界面rabbitmq
rabbitmq-plugins enable rabbitmq_management
# 访问web界面
http://server-name:15672/
- 访问本地:15672端口,输入账号/密码:guest
RabbitMQ消息测试
- RabbitMQ完整的消息通信包括
- 发布者/生产者(producer)是发布消息的应用程序
- 队列(queue)用于消息存储的缓冲
- 消费者(consumer)是接收消息的应用程序
- RabbitMQ消息模型核心理念是发布者(producer)不会直接发送任何消息给队列,甚至不知道消息是否已经被投递到队列,而只需要把消息发送给一个交换器(exchange),交换器可以一边从发布者接收消息,一边把消息推入队列.交换器必须知道如何处理它接收到的消息,是应该推送到指定的队列还是多个队列,或是直接忽略消息
RabbitMQ组件
AMQP
-
AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。它主要包括以下组件
Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程。
Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host
Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。
Message Queue:消息队列,用于存储还未被消费者消费的消息。
Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。
Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。
Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。
Channel:信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。
Command:AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。
Exchange模型
rabbitmq发送消息首先是发给exchange,然后再通过exchange发送消息给队列(queue)
-
exchange有四种模式
fanout:exchange将消息发送给和该exchange连接的所有queue;也就是所谓的广播模式;此模式下忽略routing_key;
-
direct:路由模式,通过routing_key将消息发送给对应的queue; 如下面这句即可设置exchange为direct模式,只有routing_key为“black”时才将其发送到队列queue_name;
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key='black')
- Q1和Q2可以绑定同一个key,如绑定routing_key=‘KeySame’,那么收到routing_key为KeySame的消息时将会同时发送给Q1和Q2,退化为广播模式
- top:topic模式类似于direct模式,只是其中的routing_key变成了一个有“.”分隔的字符串,“.”将字符串分割成几个单词,每个单词代表一个条件;
- headers:headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配
关键字发布Exchange
- 发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列