消息中间件
应用场景
- 异步通信
有些业务不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。 - 解耦
降低工程间的强依赖程度,针对异构系统进行适配。在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,当应用发生变化时,可以独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 - 过载保护
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量无法提取预知;如果以为了能处理这类瞬间峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 - 可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 - 顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。 - 缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。消息队列通过一个缓冲层来帮助任务最高效率的执行,该缓冲有助于控制和优化数据流经过系统的速度。以调节系统响应时间。 - 数据流处理
分布式系统产生的海量数据流,如:业务日志、监控数据、用户行为等,针对这些数据流进行实时或批量采集汇总,然后进行大数据分析是当前互联网的必备技术,通过消息队列完成此类数据收集是最好的选择。
常用协议
- AMQP协议
- MQTT协议
- STOMP协议
- XMPP协议
- 其他基于TCP/IP自定义的协议
常用消息中间件
Kafka
Apache下的一个子项目,使用scala实现的一个高性能分布式Publish/Subscribe消息队列系统,具有以下特性:
- 快速持久化:通过磁盘顺序读写与零拷贝机制,可以在O(1)的系统开销下进行消息持久化;
- 高吞吐:在一台普通的服务器上既可以达到10W/s的吞吐速率;
- 高堆积:支持topic下消费者较长时间离线,消息堆积量大;
- 完全的分布式系统:Broker、Producer、Consumer都原生自动支持分布式,依赖zookeeper自动实现复杂均衡;
- 支持Hadoop数据并行加载:对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。
RocketMQ
阿里巴巴的MQ中间件,在其多个产品下使用,并能够撑住双十一的大流量,他并没有实现JMS规范,使用起来很简单。部署由一个 命名服务(nameserver)和一个代理(broker)组成,nameserver和broker以及producer都支持集群,队列的容量受机器硬盘的限制,队列满后可以支持持久化到硬盘(也可以自己适配代码,将其持久化到NOSQL数据库中),队列满后会影响吞吐量,可以采用主备来保证稳定性,支持回溯消费,可以在broker端进行消息过滤.
RabbitMQ
使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的ESB整合。结合erlang语言本身的并发优势,性能较好,但是不利于做二次开发和维护。
ActiveMQ
Apache下的一个子项目。使用Java完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,少量代码就可以高效地实现高级应用场景。可插拔的传输协议支持,比如:in-VM, TCP, SSL, NIO, UDP, multicast, JGroups and JXTA transports。RabbitMQ、ZeroMQ、ActiveMQ均支持常用的多种语言客户端 C++、Java、.Net,、Python、 Php、 Ruby等。
Redis
使用C语言开发的一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。
ZeroMQ
扩展性好,开发比较灵活,采用C语言实现,实际上他只是一个socket库的重新封装,如果我们做为消息队列使用,需要开发大量的代码,非持久队列。
RocketMQ
简介
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
网络架构图
组件特性
nameserver
- nameserver互相独立,彼此没有通信关系,单台nameserver挂掉,不影响其他nameserver,即使全部挂掉,也不影响业务系统使用。
- nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。
- 集群之间互不通信,降低了nameserver的复杂程度,对网络的要求也降低了不少,topic路由信息无须在集群之间保持强一致,追求最终一致。
broker
与nameserver关系
连接
- 单个broker和所有nameserver保持长连接
心跳
- 心跳间隔:每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息
- 心跳超时:nameserver每隔10秒钟(此时间无法更改),扫描所有还存活的broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接
断开
- 时机:broker挂掉;心跳超时导致nameserver主动关闭连接
- 动作:一旦连接断开,nameserver会立即感知,更新topc与队列的对应关系,但不会通知生产者和消费者
负载均衡
- 一个topic分布在多个broker上,一个broker可以配置多个topic,它们是多对多的关系
- 如果某个topic消息量很大,应该给它多配置几个队列,并且尽量多分布在不同broker上,减轻某个broker的压力
- topic消息量都比较均匀的情况下,如果某个broker上的队列越多,则该broker压力越大
可用性
由于消息分布在各个broker上,一旦某个broker宕机,则该broker上的消息读写都会受到影响。所以rocketmq提供了master/slave的结构,salve定时从master同步数据,如果master宕机,则slave提供消费服务,但是不能写入消息,此过程对应用透明,由rocketmq内部解决。
- 一旦某个broker master宕机,生产者和消费者多久才能发现?受限于rocketmq的网络连接机制,默认情况下,最多需要30秒,但这个时间可由应用设定参数来缩短时间。这个时间段内,发往该broker的消息都是失败的,而且该broker的消息无法消费,因为此时消费者不知道该broker已经挂掉。
- 消费者得到master宕机通知后,转向slave消费,但是slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是消息最终不会丢的,一旦master恢复,未同步过去的消息会被消费掉。
可用性
可靠性
消息清理
读写性能
系统特性
消费者
Consumer Group
用来表示一个消费消息应用,一个Consumer Group下包含多个Consumer实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个Consumer对象。一个Consumer Group下的多个Consumer以均摊方式消费消息,如果设置为广播方式,那么这个Consumer Group下的每个实例都消费全量数据。
与nameserver关系
连接
单个消费者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,消费者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。
轮询时间
默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。该时间由DefaultMQPushConsumer的pollNameServerInteval参数决定,可手动配置。
与broker关系
连接
心跳
断开
负载均衡
消费机制
消费进度存储
生产者
Producer Group
用来表示一个发送消息应用,一个Producer Group下包含多个Producer实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个Producer对象。一个Producer Group可以发送多个Topic消息,Producer Group作用如下:
- 标识一类Producer
- 可以通过运维工具查询这个发送消息应用下有多个Producer实例
- 发送分布式事务消息时,如果Producer中途意外宕机,Broker会主动回调Producer Group内的任意一台机器来确认事务状态。
与nameserver关系
连接
心跳
轮询时间
与broker关系
连接
心跳
断开连接
负载均衡
Broker集群配置方式及优缺点
单个 Master
多个 Master
多 Master 多 Slave 模式,异步复制
多 Master 多 Slave 模式,同步双写
演示
单机演示
启动nameserver
- 配置jvm参数
启动broker
- 配置jvm参数
- 配置自动创建主题参数(仅使用测试环境,生产环境不建议)
- 手动创建主体(命令或者直接修改store的配置文件,需要重启)
启动生产者
启动消费者
管控台演示
集群搭建及演示
源码构建
构建rocketmq
git clone https://github.com/apache/rocketmq.git
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/apache-rocketmq
nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
构建管控台:
注意:pom文件中,把<rocketmq.version>4.4.0</rocketmq.version>修改,4.4.0在maven中心仓库中有,自己本地可以安装rocketmq最新代码的快照版本,但是仍然会有问题
git clone https://github.com/apache/rocketmq-externals.git
cd rocketmq-console
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar