1、AMQP
AMQP(Advanced Message Queueing Protocol)协议是一个开放的标准的的协议,它定义了系统之间如何传递消息。AMQP不仅定义了consumer/producer/broker之间如何交互,也定义了消息的格式和命令的交换。
Broker:代表着一个中间件应用,负责接收消息生产者的消息,然后将消息发送至消息接受者或者其他的broker。
Virtual host:这是对broker的虚拟化分,主要用于对consumer、producer和他们依赖的AMQP相关结构进行隔离。通常是处于安全因素的考虑。
Connection:代表着producer、consumer和broker之间的物理网络(TCP),connection只有在客户端断开连接或者网络问题的时候会断开。
Channel:代表着producer、consumer和broker之间的逻辑连接,一个Connection可以包含多个Channel。Channel使得基同一连接的不同进程之间与broker之间的交互相互隔离,不干扰。而不需要重新建立连接,channel在发生协议错误的时候会被关闭。
Exchange:这是所有被发送的消息首先到达的目的地,Exchange负责根据路由规则将消息路由到不同的目的地。路由规则包括下面几种:direct(point-to-point)、topic(publish-subscribe)和fanout(multicast)。
Queue:这是消息到达的最终目的地,到达queue的消息是已经准备好被消费的消息,一个消息可以被exchange copy发送至多个queue。
Binding:这是queue和exchange之间的虚拟连接,使得消息从哪个exchange路由到Queue。routing key可以通过binding和exchange routing规则关联。
2、rabbitmq安装启动
准备:
yum install
build-essential openssl openssl-devel unixODBC unixODBC-devel
make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
下载:
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
配置文件:
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
常用命令:
服务启动和停止:
启动 rabbitmq-server start &
停止 rabbitmqctl app_stop
lsof(list open files)是一个列出当前系统打开文件的工具
列出所有的网络连接 lsof -i:5672
启动 rabbitmqctl start_app
状态 rabbitmqctl status
添加用户 rabbitmqctl add_user username password
查看用户 rabbitmqctl list_users
删除用户 rabbitmqctl delete_user username
清楚权限 rabbitmqctl clear_permissions -p vhostpath username
列出权限 rabbitmqctl list_user_permissions username
修改密码 rabbitmqctl change_password username newpassword
设置权限 rabbitmqctl set_permissions -p vhostpath username
管理插件 rabbitmq-plugins enable rabbitmq_management
访问地址 http://192.168.162.200:15672/
添加虚拟主机 rabbitmqctl add_vhost vhostpath
查看虚拟主机 rabbitmqctl list_vhosts
权限查看虚拟主机 rabbitmqctl list_permissions -p vhostpath
删除虚拟主机 rabbitmqctl delete_vhost vhostpath
查看队列 rabbitmqctl list_queues
移除所有数据 rabbitmqctl reset 需要先rabbitmqctl stop_app
组成集群 rabbitmqctl join_cluster <clusternode> [--ram]
修改集群节点存储形式 rabbitmqctl change_cluster_node_type disc | ram
忘记节点(摘除节点) rabbitmqctl forget_cluster_node [--offline]
修改节点名称 rabbitmqctl rename_cluster_node oldnode1 newmode2
3、消息的可靠性投递
保障消息的成功发出
保障MQ节点的成功接收
发送端收到MQ节点(Broker)确认应答
完善的消息进行补偿机制
方案:
1、消息落库,对消息状态进行打标
2、消息的延迟投递,做二次确认,回调检查
4、消息端-幂等性保障
1、唯一ID+指纹码 机制,利用数据库主键去重
select count(*) from tablename where id = 唯一ID + 指纹码 为0插入
优点 简单
坏处 高并发下有数据库写入的性能问题
解决方案 跟进ID进行分库分表 进行算法路由
2、利用redis的原子性去实现
使用redis进行幂等性,需要考虑的问题
5、Return 消息机制
处理一些不可路由的消息
如当exchange不存在或指定的路由key路由不到的情况,这时我们需要监听这种不可达的消息就要使用return Listener
Mandatory true,则监听器会接收到路由不到的消息,然后进行后续处理
false,broker端自动删除该消息
6、消费端ACK与重回队列
手工ACK与NACK
1、消费端进行消费的时候,若由于业务的异常我们可以进行的日志记录,然后进行补偿
2、如果由于服务器宕机等问题,需要手动ACK保障消费端消费成功
重回队列
消费消费没有成功,消息重新给Broker
一般在实际应用中,都会关闭重回队列,设置为false
7、TTL(time to live)队列/消息
可以指定过期时间
过期时间从消息入队列开始计算,超时消息会自动消除
8、死信队列:DLX Dead-Letter-Exchange
利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX
消息被拒绝(basic.reject/basic.nack)并且requeue=false
消息TTL过期
队列达到最大长度
DLX是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性
当这个队列中有死信时,rabbitmq就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列
可以监听这个队列中消息做相应的处理
9、rabbitmq集群
HA模式
镜像模式 haproxy + keepalived
多活模式(远程) federation插件
10、SET(单元化)架构及策略
业务:解决业务遇到的扩展性和容灾等需求,支撑业务的高速发展
通用性:架构侧形成统一通用的解决方案,方便各业务线接入使用
流量路由:
按照特殊的key(通常为userid)进行路由,判断某次请求该路由到中心集群还是单元化集群
中心集群:
未进行单元化改造的服务(通常不在核心交易链路)称为中心集群
单元化集群:
每个单元化集群只负责本单元内的流量处理,以实现流量拆分及故障隔离
每个单元化集群前期只存储本单元产生的交易数据,后续会做双向数据同步,实现容灾切换需求
中间件(RPC、KV、MQ):
RPC:对于SET服务,调用封闭在set内;对于非set服务,沿用现有路由逻辑
KV:支持分SET的数据生成和查询
MQ:支持分SET的消息生成和消费
数据同步:
全局数据数据变化不大的部署在中心集群,其他单元化集群同步全局数据到本单元化内
演变为异地多话架构时,各单元化集群数据需要进行双向同步来实现容灾需要
SET化路由策略及其能力:
异地容灾
流量调度能力,将set分别部署在不同地区的数据中心,实现跨地区容灾支持
高效的本地化服务:利用前段位置信息采集和域名解析策略,将流量路由到最近的set,提供最高效的本地化服务
集装箱式扩展:set的封装性支持更灵活的部署扩展性,比如set一键创建/下线、一键发布
federation插件:集群间数据同步(通信)
11、MQ组件实现功能点基本
支持消息高性能的序列化转换,异步化发送消息
支持消息生成实例与消费实例的链接池化缓存化,提升性能
支持可靠性投递消息,保障消息的100%不丢失
支持消费端的幂等操作,避免消费端重复消费的问题
支持迅速消息发送模式,在一些日志收集/统计分析等需求下可保证高性能,超高吞吐量
支持延迟消息模式,消息可以延迟发送,指定延迟时间,用于某些延迟检查、服务限流场景
支持事务消息,且100%保障可靠性投递,在金融行业单笔大金额操作时会有此类需求
支持顺序消息,保证消息送达消费端的前后顺序,列如下订单等复活性操作
支持消息补偿,重试,以及快速定位异常、失败消息
支持集群消息负载均衡,保障消息落到具体SET集群的负载均衡
支持消息路由策略,指定某些消息路由到指定的set集群