QMQ 设计背景、理念、和目标
- QMQ 设计背景、理念、和目标
1.1 设计背景
QMQ是2012年就开始开发的,在这个时期其实消息中间件并没有太多的选择,那个时候Kafka还不太成熟,而RocketMQ也没有出现,大部分公司都会采用ActiveMQ或RabbitMQ。首先RabbitMQ的开发语言是erlang,这是一门略小众的语言,我们担心无法完全掌控,所以没有选择。而ActiveMQ其实在公司内部已有很长一段时间使用历史,但是ActiveMQ太过于复杂,在使用过程中经常出现消息丢失或者整个进程hang住的情况,并且难以定位,所以Qunar 就自己造了个轮子,QMQ 在后续的运维中也学习和借鉴了很多kafka 和 RocketMQ 的设计理念,但是QMQ 仍然有自己的独特的特点,这也是QMQ 值得开源,值得学的的地方。
1.2 设计理念
QMQ 的 设计也是基于主题的发布与订阅模式,核心功能就是消息的发送、消息存储、消息消费。
整体设计主要是根据qunar 自身业务形态实现不同的发布、订阅类型消息,主要体现在支持任意秒级的延迟/定时消息,consumer 和 server 可以很容易的扩容、缩容,consumer 端幂等处理,丰富的监控指标等QMQ 的特色特点。
1.3 设计目标
- 异步实时消息
- 延迟/定时消息(支持任意秒级)
- 广播消息(每个Consumer都收到相同消息,比如本地cache更新)
- 基于Tag的服务端过滤
- Consumer端幂等处理支持
- Consumer端filter
- 消费端支持按条ack消息
- 死信消息
- 结合Spring annotation使用的简单API
- 提供丰富的监控指标
- 接入OpenTracing
- 事务消息
- Consumer的处理能力也可以方便扩容缩容
- Server可以随心所欲扩容缩容
- Java Client, .NET Client
- 读写分离
- 消息投递轨迹
- 历史消息的自动备份
- 有序消息(即将开源)
-
QMQ 架构以及组件介绍
- meta server提供集群管理和集群发现的作用
- server 提供实时消息服务
- delay server 提供延时/定时消息服务,延时消息先在delay server排队,时间到之后再发送给server
- producer 消息生产者
- consumer 消息消费者
交互过程:
1). delay server 向meta server注册
2). 实时server 向meta server注册
3). producer在发送消息前需要询问meta server获取server list
4). meta server返回server list给producer(根据producer请求的消息类型返回不同的server list)
5). producer发送延时/定时消息
6). 延时时间已到,delay server将消息投递给实时server
7). producer发送实时消息
8). consumer需要拉取消息,在拉取之前向meta server获取server list(只会获取实时server的list)
9). meta server返回server list给consumer
10). consumer向实时server发起pull请求
11). 实时server将消息返回给consumer
-
消息数据存储模型
3.1 实时消息存储模型
在实时Server存储模型中有三种重要的log:
- message log 所有subject的消息进入该log,消息的主存储
- consume log consume log存储的是message log的索引信息
-
pull log 每个consumer拉取消息的时候会产生pull log,pull log记录的是拉取的消息在consume log中的sequence
3.2 延时/定时消息存储模型
在延时/定时消息里也存在三种log:
- message log 和实时消息里的message log类似,收到消息后append到该log就返回给producer,相当于WAL。
- schedule log 按照投递时间组织,每个小时一个。该log是回放message log后根据延时时间放置对应的log上,这是上面描述的两层hash wheel的第一层,位于磁盘上。schedule log里是包含完整的消息内容的,因为消息内容从message log同步到了schedule log,所以历史message log都可以删除(所以message log只需要占用极小的存储空间,所以我们可以使用低容量高性能的ssd来获取极高的吞吐量,比如采用100G极好的SSD只需要RMB2000左右)。另外,schedule log是按照延时时间组织的,所以延时时间已过的schedule log文件也可以删除。
- dispatch log 延时/定时消息投递成功后写入,主要用于在应用重启后能够确定哪些消息已经投递,dispatch log里写入的是消息的offset,不包含消息内容。当延时server中途重启时,我们需要判断出当前这个刻度(比如一个小时)里的消息有哪些已经投递了则不重复投递。
- QMQ 特色
4.1 快速的扩容缩容能力
Kafka和RocketMQ都是基于partition的存储模型,也就是每个subject分为一个或多个partition,而Server收到消息后将其分发到某个partition上,而Consumer消费的时候是与partition对应的。比如,我们某个subject a分配了3个partition(p1, p2, p3),有3个消费者(c1, c2, c3)消费该消息,则会建立c1 - p1, c2 - p2, c3 - p3这样的消费关系。
由于集群模式下,消息消费进度控制的问题,consumer 和partition 的关系:
那么如果我们的consumer个数比partition个数多呢?则有的consumer会是空闲的。
而如果partition个数比consumer个数多呢?则可能存在有的consumer消费的partition个数会比其他的consumer多的情况
那么合理的分配策略只有是partition个数与consumer个数成倍数关系。
以上都是基于partition的MQ所带来的负载均衡问题。因为这种静态的绑定的关系,还会导致Consumer扩容缩容麻烦。也就是使用Kafka或者RocketMQ这种基于partition的消息队列时,如果遇到处理速度跟不上时,光简单的增加Consumer并不能马上提高处理能力,需要对应的增加partition个数,而特别在Kafka里partition是一个比较重的资源,增加太多parition还需要考虑整个集群的处理能力;当高峰期过了之后,如果想缩容Consumer也比较麻烦。
跟扩容相关的另外一个问题是,已经堆积的消息是不能快速消费的。比如开始的时候我们分配了2个partition,由2个Consumer来消费,但是突然发送方大量发送消息(这个在日常运维中经常遇到),导致消息快速的堆积,这个时候我们如何能快速扩容消费这些消息呢?其实增加partition和Consumer都是没有用的,增加的Consumer爱莫能助,因为堆积的那2个partition只能由2个Consumer来消费,这个时候你只能纵向扩展,而不能横向扩展,而我们都知道纵向扩展很多时候是不现实的,或者执行比较重的再均衡操作。
基于这些考虑我们并没有直接采用Kafka等基于partition存储模型的消息队列,我们的设计考虑是消费和存储模型是完全解耦的关系,Consumer需要很容易的扩容缩容,从现在来看这个选择也是正确的。现在去哪儿网的系统架构基本上呈现为基于消息驱动的架构,在我们内部系统之间的交互大部分都是以消息这种异步的方式来进行。比如我们酒店的订单变更消息就有接近70个不同的消费组订阅(可以将消费组理解为不同的应用),整个交易流程都是靠消息来驱动,那么从上面对基于partition模型的描述来看,要在70个不同应用之间协调partition和Consumer的均衡几乎是不可能的。
QMQ 是如何做到快速扩容缩容的呢? 后续篇章会做详细介绍。
4.2 可以精确时刻的延时/定时消息
开源RocketMQ支持延迟消息,但是不支持秒级精度。默认支持18个level的延迟消息,这是通过broker端的messageDelayLevel配置项确定的,如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
RocketMQ延迟级别的值可以进行修改,以满足自己的业务需求,可以修改/添加新的level。例如:你想支持2天的延迟,修改最后一个level的值为2d,这个时候依然是18个level;也可以增加一个2d,这个时候总共就有19个level。
RabbitMQ的话需要安装一个rabbitmq_delayed_message_exchange插件。
QMQ 根据上文提到的内容采用双重时间轮实现,可以实现任意秒级别的延时/定时消息。
其实 Broker端内置延迟消息处理能力,核心实现思路都是一样:将延迟消息通过一个临时存储进行暂存,到期后才投递到目标Topic中,这个会在后续的文章中详细说明。