最近正好在看RocketMQ的源代码,本篇文章主要是用于公司内部的一次整理分享。整篇文章将以订单系统为出发点,对比分析一些市面上的MQ,最终具体到RocketMQ源码分析带领大家全局性的了解整个MQ的流程。
1 传统的订单系统,存在什么问题?
在之前公司的项目中有电商的经验。传统的订单下单流程分为上述的这样一些步骤,如图所示,以微信支付为例,在我们的原有系统里我们主要分为两个大流程:(1)创建订单并支付:如图中步骤1,2,3,4我们进行订单的创建(订单状态为created 创建),并发送订单的支付信息到微信。(2)接收支付成功回调并处理:如图中5,6,7,8 收到微信回调通知处理,(将订单状态更新为paid 已支付待发货 )然后进行发送优惠券和红包,发送通知,扣减库存,更新积分等操作。分别要与促销系统,通知系统,仓储系统,积分系统等进行交互。
换一个视角我们可以这样看这两个大流程:
在上述步骤串行的情况下,但是这样的步骤存在什么问题呢?图1 传统的订单下单流程 步骤8 中的操作可能出现耗时,以及各个子系统的不稳定因素等情况。导致用户的订单支付虽然付了钱,但是订单的某些关联的状态却始终没有更新。
这里“图1 传统的订单下单流程” 步骤8,也就是“图2 传统的订单下单流程v2”就是我们系统中的耗时任务,其实我们的用户对于这些耗时任务并并没有那么关心实时性,但是这些任务如果串行执行却可能导致系统性能等问题。这里也就引入了MQ的使用场景。
2 什么是MQ?MQ 解决了什么问题?
MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。本篇文章将重点围绕RocketMq进行系统性的分析。
2.1 异步提升系统性能
这里单独以支付成功后发送通知为例,如果发送通知我们分为短信,邮件,微信通知等,假设步骤(1)耗时200ms,步骤(2)耗时200ms,那么整个系统至少要等待400ms以上。
我们引入了MQ之后系统变成了如下图所示,步骤变多了,但是对于系统来说却是优化了。
在我们将订单信息发送给MQ之后,我们不需要等待通知系统的事务,RPC调用邮件网关,发送邮件等操作。用户可以迅速的获取到订单支付成功状态。哪怕是10s,20s之后用户再收到支付成功的邮件通知等,也是完全可以接受的。
2.2 降低系统耦合
同样是考虑上面的场景,如果用户一直在同步等待发送通知,通知系统发送失败了,对于没有MQ的场景,我们肯定是要处理对应的错误的。但是在引入MQ的场景里,在通知系统拉取订单信息并消费,然后发送通知失败后,我们肯定可以有机制确保这条订单信息是保存在MQ里面的。于是MQ的引入,降低了系统的耦合程度。
2.3 流量削峰
在单机mysql的情况下,一般Mysql可以做到几百的qps是没问题的。但是如果我们部署多台机器,用户量上来的话,肯定会导致Mysql被数据压垮。
在引入MQ之后我们可以实现流量的削峰。特别是对于电商的秒杀场景或者做活动的时候。我们的瞬时用户订单量都会上来,而且持续时间不会太长。我们可以将这些操作写入MQ,从而实现了数据流量的“削峰填谷”。
上面说了MQ的三个优点,其实在实际的生产环境中,我们引入MQ之后也会存在:消息丢失,消息幂等性(不重复),消息积压故障,分布式消息的一致性等问题。下面文章中也会对这些问题进行剖析以及提供一些对应的解决方案。
3 各种MQ的对比
3.1 MQ对比
下面文章中也会提炼部分MQ内容进行讲解。
3.2 消息队列选择建议
1.Kafka
在上面的所有消息队列中,Kafka可以说是整体性能最高的,而且是天生支持分布式的。Kafka主要特点是基于Pull的模式基于消息的offset来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。
大型公司建议可以选用,如果有日志采集功能,肯定是首选kafka了。
2.RocketMQ
天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。
RoketMQ在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择RocketMQ。
3.RabbitMQ
RabbitMQ :结合erlang语言本身的并发优势,性能较好,社区活跃度也比较高,但是不利于做二次开发和维护。不过,RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug。
如果你的数据量没有那么大,小公司优先选择功能比较完备的RabbitMQ。
4 整体系统架构
由于Kafka和RocketMQ其实可以作为性能最高的表现,所以下面的文章中将主要以Kafka和RocketMQ为核心对这几个系统的主要方面做提炼和对比介绍。
4.1 kafka架构
Producer:生产者,也就是发送消息的一方。生产者负责创建消息,然后将其发送到 Kafka。
Consumer:消费者,也就是接受消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理。
Consumer Group:一个消费者组可以包含一个或多个消费者。使用多分区 + 多消费者方式可以极大提高数据下游的处理速度,同一消费组中的消费者不会重复消费消息,同样的,不同消费组中的消费者消息消息时互不影响。Kafka 就是通过消费组的方式来实现消息 P2P 模式和广播模式。
Broker:服务代理节点。Broker 是 Kafka 的服务节点,即 Kafka 的服务器。
Topic:Kafka 中的消息以 Topic 为单位进行划分,生产者将消息发送到特定的 Topic,而消费者负责订阅 Topic 的消息并进行消费。
Partition:Topic 是一个逻辑的概念,它可以细分为多个分区partition,每个分区Partition只属于单个主题topic。同一个主题下不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。
Offset:offset 是消息在分区Partition中的唯一标识,Kafka 通过它来保证消息在分区Partition内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区Partition有序性而不是主题Topic有序性。
Replication:副本,是 Kafka 保证数据高可用的方式,Kafka 同一 Partition 的数据可以在多 Broker 上存在多个副本,通常只有主副本对外提供读写服务,当主副本所在 broker 崩溃或发生网络一场,Kafka 会在 Controller 的管理下会重新选择新的 Leader 副本对外提供读写服务。
Record:实际写入 Kafka 中并可以被读取的消息记录。每个 record 包含了 key、value 和 timestamp。
我们这里假设TopicA有3个partition(对应上方的浅绿色,深绿色,黄色三种颜色),且每个partition都有2个副本(对应上方的浅绿色,深绿色,黄色同色的方块)kafka通过partition的设置到达了高可用的设计,解决了数据单点的问题。
在后面的内容中我们会再次对Kafka的整个系统架构的部分进行拆解分析。所以看不懂的可以继续往下看。
4.2 Kafka架构中,ZooKeeper扮演了什么角色?怎么理解这个角色?
- Broker 注册,Topic 注册,生产者负载均衡,消费者负载均衡
我们可以通过微服务架构中的服务发现来理解ZooKeeper扮演的类似于服务注册的角色
在引入服务发现注册中间角色前,各个服务实例之间通信会存在动态分配无法识别,负载均衡,耦合严重等问题。
在引入了Consul,ZooKeeper,Eureka等服务发现注册中间角色之后,我们可以这个中间角色感知到微服务中其他服务的变化,进行负载均衡,也降低了系统的耦合。
图片来源:https://www.nginx.com/blog/service-discovery-in-a-microservices-architecture/
在最新版的Kafka中,Kafka官方去除了对ZooKeeper的依赖,但是我们仍然需要这样一个角色。
2021年4月19日,Kafka官方发布了2.8.0版本,包含了很多新特性!包含一条“Kafka用自管理的Quorum代替ZooKeeper管理元数据。”
4.3 Kafka是一个存储系统
不同于ActiveMQ和RabbitMQ使用的时候,我们通过配置来确认消息是否需要持久化到磁盘,Kafka会将所有的消息持久化道磁盘。
通过让消费者指定消费的消息的offset(偏移量)来拉取消息,而不是将消息发送到对应的队列。
将复杂的消费问题又转嫁给消费者了,这样使得 Kafka 本身的复杂度大大降低,从而为它的高性能和高扩展打下了良好的基础。这在后面会继续再进行讲解。
4.4 RocketMQ架构
在看了Kafka架构之后我们再来看看RocketMQ的架构:
这边会发现RocketMQ和Kafka的系统架构是大同小异的,这是因为RocketMQ设计之初就参考了Kafka的很多设计思想。
RocketMQ架构上主要分为四部分,如上图所示:
Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Kafka / Dubbo中的ZooKeeper,微服务中的Eureka,支持Broker的动态注册与发现。
我们在上面4.2中分析了Zookeeper在Kafka中解决的问题,其实也是NameServer解决的共性问题:动态分配,负载均衡,耦合等。
除此之外,NameServer主要包括两个功能: Broker管理:NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活。
路由信息管理:每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
NameServe集群部署: 各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
Broker:Broker主要负责消息的存储、投递和查询以及服务高可用保证。
5 MQ消息存储流程
在讲解RocketMQ的消息存储流程之前,我们先来了解一下Kafka的消息存储流程。因为RocketMQ设计开发之初,也是参考了很多kafka的思想的。这里部分内容参考了《吃透 MQ 系列》之 Kafka 存储选型的奥秘 进行Kafka存储的一个介绍。
5.1 高性能kafka之磁盘顺序写
高效率的存储其实最后利用的核心都是一样的:
1 随机写转换成顺序写 2 集中刷盘
为什么随机写要转换为顺序写?
1.现在主流的硬盘是机械硬盘
2.按照操作系统的知识,我们的硬盘物理结构分为盘面磁道扇区等概念,我们要读取对应的硬盘上的一次读写时间 = 寻道时间 + 旋转延迟 + 读取数据时间
那么寻道时间比较长,如果是顺序写,只需要一次寻道时间。
5.2 高性能kafka之mmap文件映射
在进程 的非堆内存开辟一块内存空间,和os cache 的一块内存进行映射,
kafka数据写入、是写入这块内存空间,但实际这块内存和os cache 有映射,也就是相当于写在内核内存空间了,且这块内核空间、内核直接能够访问到,直接落入磁盘。内核缓冲区的数据,flush就能完成落盘。
kafka 基于操作系统的页缓存(os cache)+磁盘顺序写的方式来实现文件的高性能写入。
5.3 高性能kafka之零拷贝技术
如下图所示,如果没有引入我们所谓的零拷贝技术,在整个系统数据的发送过程中,我们可以看到。操作系统进行了两次数据拷贝:(1)将操作系统os cache里的数据拷贝到我们的应用进程缓存里,(2)再将应用进程缓存里的的数据拷贝到socket中进行发送。其中会有操作系统的上下文切换,会比较耗时。
引入零拷贝技术之后:我们可以直接实现从操作系统os cache发送对应数据到网卡里:
数据直接在内核完成输入和输出,不需要拷贝到用户空间再写出去,极大的提升了系统的性能。
ElasticSearch底层也是大量基于os cache实现了海量数据的高性能检索的,跟Kafka原理类似。
5.4 kafka 采用稀疏哈希索引存储
kafka使用稀疏哈希索引存储对应消息:
Kafka 是一个「分区 + 分段 + 索引」的三层结构:
1、Topic 分成多个 Partition:Partition 从物理上可以理解成一个文件夹。
Partition 主要是为了解决 Kafka 存储上的水平扩展问题,如果一个 Topic 的所有消息都只存在一个 Broker,这个 Broker 必然会成为瓶颈。因此,将 Topic 内的数据分成多个 Partition,然后分布到整个集群是很自然的设计方式。
2、Partition分成多个 Segment:,Segment 从物理上可以理解成一个「数据文件 + 索引文件」,这两者是一一对应的。
有了 Partition 之后,为什么还需要 Segment?
如果不引入 Segment,一个 Partition 只对应一个文件,那这个文件会一直增大,势必造成单个 Partition 文件过大,查找和维护不方便。
此外,在做历史消息删除时,必然需要将文件前面的内容删除,不符合 Kafka 顺序写的思路。而在引入 Segment 后,则只需将旧的 Segment 文件删除即可,保证了每个 Segment 的顺序写。
5.4 RocketMQ消息存储流程
在Kafka中,每个 Topic 被分成多个 Partition。在RocketMQ中每个Topic被分为多个MessageQueue,如下图所示,假如我们的每个Topic被分为2个MessageQueue,那么如果有6w条消息,总共3台master机器,均匀写入的情况下,我们每台master机器会有2w条消息,每个MessageQueue会有1w条消息。
在RocketMQ中,我们的所有消息其实都是写入到了broker对应的一个commitLog文件中去,也就是说,多个消费队列(ConsumeQueue)对应一个CommitLog,我们把对应消息的物理位置写入到了ConsumeQueue中去。CommitLog其实是在os cache中有对应数据区域,采用同步/异步线程刷入物理存储的方式来保存对应消息。
6 高可用的MQ
系统的高可用性: 是一个系统稳定性的衡量指标。不能因为一台机器的宕机导致整个系统的崩溃。我们接下来可以看看这些MQ消息中间件是如何通过各种方式来保证系统的可用性的。
6.1 RabbitMQ如何保证高可用性
RabbitMQ的普通集群模式:只有一个queue中有元数据+实际数据。元数据可以当作是配置等信息,如下图所示当我们尝试从其中一个RabbitMQ拉取数据的时候,RabbitMQ可以通过元数据知道对应的数据在实例2上,也就可以进行消息的拉取。
缺点:也就是说我们的系统数据存在于一个节点上,我们整个系统相互之间会有很多的数据传输。如果queue所在的结点系统宕机了(上方实例2),数据也就丢失了。
RabbitMQ 镜像集群模式: 保证了系统的高可用性。如下图所示,可以理解为,每个MQ都有系统的全量数据。
缺点: 消息同步会带来网络开销。整个系统随着消息的增多,负载也会变重,横向拓展多增加实例其实也不能优化到系统性能。
6.2 Kafka如何保证高可用性
通过上面的6.1我们可以看到其实RabbitMQ的集群模式其实不是真正的分布式。我们再来看看作为分布式支持的Kafka是如何保证整个系统的高可用性的。
kafka在0.8之前,假设我们的系统数据有一个topic。分为三个不同的partition,存储到不同的机器上。如果其中一个机器的数据丢失了,也就丢失了1/3的数据。
在Kafka 0.8 之后我们系统引入的replica(复制)的概念。也就是我们会对不同的partition进行复制,并存储到不同的机器上。如果其中一台机器的数据丢失,那么整个系统的数据一般也不会有丢失的情况。如下图所示我们的写入消息和读消息都是操作的master/leader节点。
7 分布式事务以及RocketMQ中的对应解决方案
谈到事务,我们在都会想到银行转账的例子以及Mysql事务来确保同一个事务同时成功/失败的场景。这里来分析一下分布式系统中的事务以及RocketMQ中的对应解决方案。
7.1 分布式事务和解决方案
在对事务消息的了解和分析之前我们先来看一下,分布式事务的问题。对于分布式系统中,存在分布式事务的的问题。不同的系统存在数据不一致的可能性。如图所示:虽然订单支付成功了,但是其他事务存在不成功的可能性。
一般来说分布式事务存在这样四种解决方案:参考链接:分布式事务解决方案:两阶段提交(2PC),补偿事务(TCC),本地消息表(异步确保),MQ 事务消息
其中两阶段提交(2PC)方案不怎么被用到,我们来了解一下补偿事务(TCC)方案,
TCC是Try,confirm,cancel的缩写。所以整个方案的流程首先是预备阶段:
然后如果订单状态都更新了。那么我们可以对应调用Confirm阶段,失败了的话进入cancel阶段(这里省略了cancel阶段在图中的表示)。
- 国内开源的TCC分布式框架包含ByteTCC,himly,tcc-transaction等
第3种方案本地消息表(异步确保)方案:我们可以理解为将订单系统的消息只要本地事务执行成功后,就扔到我们的MQ中,让MQ去做消息的事务消息的保证,这也是一般公司比较喜欢使用的方案。避免了分布式事务,实现了最终一致性。
第4种方案事务消息方案:也就是我们接下来要讲的RocketMQ的事务消息方案。
7.2 消息丢失的场景
接着上面的问题,我们引入了MQ,我们的订单系统已经支付成功了,但是我们的积分系统没有增加积分。出现这个错误的原因是什么呢?
我们按照流程逻辑首先可以分析出这样三个地方出现消息丢失:
- 1 由于网络原因导致订单系统和MQ通信失败
- 2 由于MQ自身保存逻辑,导致数据丢失
- 3 由于消费者逻辑导致消息丢失。
下面我们先来解决第1个问题。
7.3 RocketMQ的解决方案
下面来看看RocketMQ的事务消息来保证:如果订单系统的本地事务成功,那么消息一定会投递到MQ里去。
为什么按照这样的流程可以解决消息丢失呢?下面根据上面的图来分析:
- 步骤1+2: 如果half 消息发送失败,那么可能是网络故障,订单系统的事务就可以不必执行。
- 步骤3:如果本地事务执行失败,订单系统可以发送对应的rollback消息到MQ,可以确保消息在MQ也被取消。
- 步骤4+步骤5:如果这个时候步骤3已经成功,但是步骤4失败,代表这个消息是需要被MQ的消费者消费的。 于是将执行步骤5回调消息调用订单系统,来确保这个消息最终到达MQ。
整个操作流程的关键点:在于MQ和订单系统producer在进行订单的本地事务之前已经协同了起来。
7.4 由于MQ自身保存逻辑,导致数据丢失
上面的RocketMQ事务消息方案只是保证了第1种场景的消息不丢失,确保消息准备到达了MQ。 我们来看看第2种场景对应的处理。
我们再来看看上面出现过的一张图,
这里我们可以看到RocketMQ有两种刷盘策略,如果我们消息刚好到达了os cache中的commitLog,没有写入磁盘的情况下,系统宕机则会出现消息丢失的情况,我们自然可以调整刷盘策略为同步刷盘来解决。
在RocketMQ中我们可以通过将flushDiskType调整为SYNC_FLUSH与ASYNC_FLUSH来保证消息的同步/异步刷盘。
同时我们可以通过master/slave的方式来确保master上数据的丢失后,slave上可找回已经同步的数据。这在前面“高可用的MQ”章节已经提及。
7.5 由于消费者逻辑导致消息丢失
在整个系统中我们是在消息被准确的消费处理之后再给系统返回消费成功,(在RocketMQ中对应ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 促销系统/积分系统/仓储系统/通知系统处理
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
综上也就完成了消息零丢失场景的处理,但是消息零丢失由于同步刷盘,master/slave数据同步,消息处理后再返回成功,也就导致了整个系统的性能的降低。所以我们一般只会在牵涉到金钱的场景(支付系统,订单系统)里对RocketMQ的事务消息进行使用。
8 Kafka与RocketMQ的区别
经过了上面的整体分析,由于主要介绍了Kafka和RocketMQ,所以这里做一个对比分析二者的区别:
1、Kafka通过ZooKeeper来进行协调,而RocketMQ通过自身的NameServer进行协调。
2、Kafka在消息存储过程中会根据topic和partition的数量创建物理文件,也就是说我们创建一个topic并指定了3个partition,那么就会有3个物理文件目录,也就说说partition的数量和对应的物理文件是一一对应的。
3、RocketMQ在消息存储方式选用commitLog,RocketMQ 的queue的数量其实是在consumeQueue里面体现的,在真正存储消息的commitLog其实就只有一个物理文件。
4、Kafka 的多文件并发写入 , RocketMQ 的单文件写入,性能差异kafka完胜可想而知。
5、Kafka 的大量文件存储会导致一个问题,也就说在partition特别多的时候,磁盘的访问会发生很大的瓶颈,毕竟单个文件看着是append操作,但是多个文件之间必然会导致磁盘的寻道。
9 如何进行MQ的设计
在综合了上面的集中MQ之后我们可以来思考一下,我们怎样设计一款消息中间件。
注册中心角色:我们也需要一个类似于Zookeeper在Kafka,NameServer在RocketMQ中的角色
mq 得支持可伸缩性:就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞?设计个分布式的系统,参照一下 kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。如果现在资源不够了,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了?
mq 数据落地磁盘:落磁盘才能保证别进程挂了数据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是 kafka 的思路。
mq可用性: 具体参考之前可用性那个环节讲解的 kafka 的高可用保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。
数据0丢失:参考我们之前说的那个 RocketMQ 数据零丢失方案。
10 RocketMQ的源代码实现
结合了上面对MQ的分析,这里我们再次深入RocketMQ源代码进行一探究竟,来看看整个消息的流转流程。
这里是之前阅读RocketMQ的源代码的一些流程图,在processon在线画的图 。并且添加了相应的代码注释版本到了我自己的Github:RocketMQ代码注释版本
RocketMQ流程图之消息发送与拉取流程图processon链接
RocketMQ流程图之事务消息流程图processon链接
RocketMQ作为一个高性能的消息中间件,使用Netty进行消息的发送和接收。我们可以看一下Netty的整体流程代码:
12 参考链接:
https://www.cnblogs.com/imstudy/p/11064589.html
【SpringBoot MQ 系列】RabbitListener 消费基本使用姿势介绍
书籍:《RocketMQ技术内幕》
公众号:《石杉的架构笔记》