一. 认识消息队列
1. 队列
队列(queue)是只允许在一端进行插入操作,而在另一端进行删除操作的线性表(数据结构)。
队列是一种先进先出(First in First Out)的线性表
,简称FIFO。允许插入的一端称为队尾,允许删除的一端称为队头。假设队列是 q=(a1,a2,…,an),那么a1就是队头元素,而an是队尾元素。这样我们就可以删除时,总是从a1开始,而插入时,列在最后。
2. jdk中的实现
常见队列:
- ArrayBlockingQueue :由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue :支持优先级排序的无界阻塞队列。
- DelayQueue:使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:不存储元素的阻塞队列。
- LinkedTransferQueue:由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列。
ArrayBlockingQueue(公平、非公平):
用数组
实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则
对元素进行排序。默认情况下
不保证访问者公平的访问队列
,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当
队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入
元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐
量。我们可以使用以下代码创建一个公平的阻塞队列:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
LinkedBlockingQueue(两个独立锁提高并发)
基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,此队列按照先进先出(FIFO)的原则对
元素进行排序。而 LinkedBlockingQueue 之所以能够高效的处理并发数据
,还因为其对于生产者
端和消费者端分别采用了独立的锁来控制数据同步
,这也意味着在高并发的情况下生产者和消费
者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
LinkedBlockingQueue 会默认一个类似无限大小的容量(Integer.MAX_VALUE)。
PriorityBlockingQueue(compareTo 排序实现优先)
是一个支持优先级
的无界队列。默认情况下元素采取自然顺序升序排列。可以自定义实现 compareTo()方法
来指定元素进行排序规则,或者初始化 PriorityBlockingQueue 时,指定构造 参数 Comparator
来对元素进行排序。需要注意的是不能保证同优先级元素的顺序
。
SynchronousQueue(不存储数据、可用于传递数据)
是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。
SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线
程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给
另 外 一 个 线 程 使 用 , SynchronousQueue 的 吞 吐 量 高 于 LinkedBlockingQueue 和
ArrayBlockingQueue。
LinkedBlockingDeque
是一个由链表
结构组成的双向阻塞队列
。所谓双向队列指的你可以从队列的两端插入和移出元素
。
双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其
他的阻塞队列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,
peekFirst,peekLast 等方法,以 First 单词结尾的方法,表示插入,获取(peek)或移除双端队
列的第一个元素。以 Last 单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另
外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。但是 take 方法却等同
于 takeFirst,不知道是不是 Jdk 的 bug,使用时还是用带有 First 和 Last 后缀的方法更清楚。
在初始化 LinkedBlockingDeque 时可以设置容量防止其过渡膨胀。另外双向阻塞队列可以运用在
“工作窃取”模式中。
LinkedTransferQueue
是 一 个 由 链 表 结 构 组 成 的 无 界 阻 塞 TransferQueue 队 列 。 相 对 于 其 他 阻 塞 队 列 ,
LinkedTransferQueue 多了 tryTransfer
和 transfer
方法。
- transfer 方法:如果当前有消费者正在等待接收元素(消费者使用 take()方法或带时间限制的
poll()方法时),transfer 方法可以把生产者传入的元素立刻 transfer(传输)给消费者
。如
果没有消费者在等待接收元素,transfer 方法会将元素存放在队列的 tail 节点,并等到该元素 被消费者消费了才返回
。 - tryTransfer 方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费
者等待接收元素,则返回 false。和 transfer 方法的区别是 tryTransfer 方法无论消费者是否
接收,方法立即返回。而 transfer 方法是必须等到消费者消费了才返回。
对于带有时间限制的 tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传
入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时
还没消费元素,则返回 false,如果在超时时间内消费了元素,则返回 true。
DelayQueue(缓存失效、定时任务 )
是一个支持延时获取元素
的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实
现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才
能从队列中提取元素。我们可以将 DelayQueue 运用在以下应用场景:
缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询
DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从
DelayQueue 中获取到任务就开始执行,从比如 TimerQueue 就是使用 DelayQueue 实现的。
3. 消息队列(Message Queue):
维基百科:
在计算机科学领域,消息队列和邮箱都是软件工程组件,通常用于进程间或同一进程内的线程通信。它们通过队列来传递消息-传递控制信息或内容,群组通信系统提供类似的功能。
简单的概括下上面的定义:消息队列就是一个使用队列来通信的组件。
上面的定义没有错,但就现在而言我们日常所说的消息队列常常指代的是消息中间件,它的存在不仅仅只是为了通信这个问题。
可以认为是在消息的传输过程中保存消息的容器。
二. 消息队列的两种模式
1. 点对点模式(一对一
,消费者主动拉取数据,消息收到后消息清除)
消息生产者生产消息发送到 Queue 中,消费者从 Queue 中取出消费消息,消息被消费以后,Queue中不在存储该消息,所以消费者不可能消费已经消费过的消息。Queue支持有多个消费者,但是对于一条消息而言,只会被一个消费者消费。
生产者往某个队列里面发送消息,一个队列可以存储多个生产者的消息,一个队列也可以有多个消费者, 但是消费者之间是竞争关系,即每条消息只能被一个消费者消费。
2)发布/订阅模式(一对多
,消费者消费数据后不会清除数据)
消息生产者(发布)生产消息到Topic中,多个消息消费者(订阅)消费该消息。发布到Topic的消息可以同时被多个订阅者消费。
其实可以这么理解,发布/订阅模型等于我们都加入了一个群聊中,我发一条消息,加入了这个群聊的人都能收到这条消息。那么一对一模式就是一对一聊天,我发给你的消息,只能在你的聊天窗口弹出,是不可能弹出到别人的聊天窗口中的。
讲到这有人说,那我一对一聊天对每个人都发同样的消息不就也实现了一条消息被多个人消费了嘛。
是的,通过多队列全量存储相同的消息,即数据的冗余可以实现一条消息被多个消费者消费。RabbitMQ 就是采用队列模型,通过 Exchange 模块来将消息发送至多个队列,解决一条消息需要被多个消费者消费问题。
总结:点对点模式每条消息只能被一个消费者消费,而发布/订阅模型就是为让一条消息可以被多个消费者消费而生的,当然点对点模式也可以通过消息全量存储至多个队列来解决一条消息被多个消费者消费问题,但是会有数据的冗余。
三. 为什么要使用消息队列?
从本质上来说是因为互联网的快速发展,业务不断扩张,促使技术架构需要不断的演进。
从以前的单体架构到现在的微服务架构,成百上千的服务之间相互调用和依赖。从互联网初期一个服务器上有 100 个在线用户已经很了不得,到现在坐拥10亿日活跃量的微信。我们需要有一个「东西」来解耦服务之间的关系、控制资源合理合时的使用以及缓冲流量洪峰等等。
消息队列就应运而生了。它常用来实现:异步处理、服务解耦、流量控制。
1. 异步
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
随着业务复杂度、架构的演变,微服务已经广泛应用在各个系统中。
比如一个购物网站,涉及到很多系统:订单系统、支付系统、库存系统、积分系统、短信系统....
传统的业务流程:
如果按照传统的流程,真正的电商系统在下单支付后会涉及到更多的系统,这个链路这样下去,耗时是非常巨大的,不可能买个东西花费十几秒(用户体验感极差)
其实从上边的流程中可以看出支付后,积分增减的同时也可以发短信,这时候可以考虑异步
异步处理,为什么不用多线程去做?这就涉及到下面第二点解耦
:
如果一个订单流程,增减积分,发短信,扣库存都在支付系统中进行调用,如果再增加一个优惠券系统,又需要调用扣优惠券的接口,如果再增加,又需要增加或修改代码....每次增加业务时支付系统都需要相应的修改和重新部署,耦合性非常高(一个友好系统的设计应该满足低耦合、高内聚的特点),而且所有流程都写在一起,排查问题的复杂度也会上升。
所以,使用消息队列是有必要的,下单后把支付成功的消息发送到MQ,其他系统通过监听消息去处理各自的业务,面要再接入系统,直接订阅发送的支付成功消息即可。
2. 解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
3. 削峰(灵活性 & 峰值处理能力)
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
比如:秒杀活动、双十一.....流量突然暴增,而平时都是低流量,如果承受能力不够,可能就会突然崩掉
所以:可以把请求放到队列里面,然后至于每秒消费多少请求,就看自己的服务器处理能力,能处理多多少QPS就消费这么多少,可能会比正常的慢一点,但是不至于打挂服务器,等流量高峰下去了,服务也就没压力了。
阿里双十一12:00的时候这么多流量瞬间涌进去,他有时候是会慢一点,但是不会直接挂掉,或者降级给你个友好的提示页面。
2. 可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
3. 缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
四. 使用消息队列有什么问题?
1. 系统复杂性
本来只有一个系统,现在接入一个中间件,需要去维护他,而且使用的过程中是不是要考虑各种问题,比如消息重复消费、消息丢失、消息的顺序消费等等。
(1) 重复消费
消息重复消费是使用消息队列之后,必须考虑的一个问题,也是比较严重和常见的问题。
特别是对于交易支付的场景
,重复的操作和行为会导致重复扣款,重复支付等资金风险
就比如有这样的一个场景,用户下单成功后我需要去一个活动页面给他加GMV(销售总额),最后根据他的GMV去给他发奖励,这是电商活动很常见的玩法。
一般消息队列的使用,我们都是有重试机制
的,就是说我下游的业务发生异常了,我会抛出异常并且要求你重新发一次。
但是可能下游有多个系统,某个系统业务处理失败要求重发,但是其他系统处理成功了,会再次消费,导致不幂等
幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中。
在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。
幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。
例如,“setTrue()”函数就是一个幂等函数,无论多次执行,其结果都是一样的.更复杂的操作幂等保证是利用唯一交易号(流水号)实现.
怎么保证幂等?
一般幂等,分场景去考虑,看是强校验
还是弱校验
,比如跟金钱相关的场景那就很关键,就做强校验,其他不是很重要的场景做弱校验。
- 强校验:
比如你监听到用户支付成功的消息,你监听到了去加GMV是不是要调用加钱的接口,那加钱接口下面再调用一个加流水的接口,两个放在一个事务,成功一起成功失败一起失败。
每次消息过来都要拿着订单号+业务场景这样的唯一标识(比是天猫双十一活动)去流水表查,看看有没有这条流水,有就直接return不要走下面的流程了,没有就执行后面的逻辑。
之所以用流水表,是因为涉及到金钱这样的活动,有啥问题后面也可以去流水表对账,还有就是帮助开发人员定位问题。
- 弱校验
这个简单,一些不重要的场景,比如给谁发短信啥的,我就把这个id+场景唯一标识作为Redis的key,放到缓存里面失效时间看你场景,一定时间内的这个消息就去Redis判断。
用KV就算消息丢了可能这样的场景也没关系,反正丢条无关痛痒的通知短信嘛(你敢说你没验证码短信丢失的情况?)。
因此需要改造业务处理逻辑,使得在重复消息的情况下也不会影响最终的结果。
- 可以通过上面我那条 SQL 一样,做了个前置条件判断,即money = 100情况,并且直接修改,更通用的是做个version即版本号控制,对比消息中的版本号和数据库中的版本号。
- 或者通过数据库的约束例如唯一键,例如insert into update on duplicate key...。
- 或者记录关键的key,比如处理订单这种,记录订单ID,假如有重复的消息过来,先判断下这个ID是否已经被处理过了,如果没处理再进行下一步。当然也可以用全局唯一ID等等。
基本上就这么几个套路,真正应用到实际中还是得看具体业务细节。
(2)消息有序
一般都是同个业务场景下不同几个操作的消息同时过去,本身顺序是对的,但是你发出去的时候同时发出去了,消费的时候却乱掉了,这样就有题问了。
生产者消费者一般需要保证顺序消息的话,可能就是一个业务场景下的,比如订单的创建、支付、发货、收货。
全局有序
如果要保证消息的全局有序,首先只能由一个生产者往Topic发送消息,并且一个Topic内部只能有一个队列(分区)。消费者也必须是单线程消费这个队列。这样的消息就是全局有序的!
不过一般情况下我们都不需要全局有序,即使是同步MySQL Binlog也只需要保证单表消息有序即可。
部分有序
因此绝大部分的有序需求是部分有序,部分有序我们就可以将Topic内部划分成我们需要的队列数,把消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者。这样即完成了部分有序的需求,又可以通过队列数量的并发来提高消息处理效率。
RocketMQ:一个topic下有多个队列,为了保证发送有序,RocketMQ提供了MessageQueueSelector队列选择机制,他有三种实现:
使用Hash取模法,让同一个订单发送到同一个队列中,再使用同步发送,只有同个订单的创建消息发送成功,再发送支付消息。这样,我们保证了发送有序。
RocketMQ的topic内的队列机制,可以保证存储满足FIFO(First Input First Output 简单说就是指先进先出),剩下的只需要消费者顺序消费即可。
RocketMQ仅保证顺序发送,顺序消费由消费者业务保证!!!
(3)数据丢失
就我们市面上常见的消息队列而言,只要配置得当,我们的消息就不会丢。
1)生产者:开启confirm模式
产者发送消息至Broker,需要处理Broker的响应,不论是同步还是异步发送消息,同步和异步回调都需要做好try-catch,妥善的处理响应,如果Broker返回写入失败等错误消息,需要重试发送。当多次发送失败需要作报警,日志记录等。
这样就能保证在生产消息阶段消息不会丢失。
2)MQ:消息持久化
存储消息阶段需要在消息刷盘(持久化)之后再给生产者响应,假设消息写入缓存中就返回响应,那么机器突然断电这消息就没了,而生产者以为已经发送成功了。
如果Broker是集群部署,有多副本机制,即消息不仅仅要写入当前Broker,还需要写入副本机中。那配置成至少写入两台机子后再给生产者响应。这样基本上就能保证存储的可靠了。一台挂了还有一台还在呢(假如怕两台都挂了..那就再多些)。
3)消费者:关闭自动ack
这里经常会有同学犯错,有些同学当消费者拿到消息之后直接存入内存队列中就直接返回给Broker消费成功,这是不对的。
你需要考虑拿到消息放在内存之后消费者就宕机了怎么办。所以我们应该在消费者真正执行完业务逻辑之后,再发送给Broker消费成功,这才是真正的消费了。
所以只要我们在消息业务逻辑处理完成之后再给Broker响应,那么消费阶段消息就不会丢失。
保证消息的可靠性需要三方配合:
生产者:需要处理好Broker的响应,出错情况下利用重试、报警等手段。
MQ:需要控制响应的时机,单机情况下是消息刷盘后返回响应,集群多副本情况下,即发送至两个副本及以上的情况下再返回响应。
消费者:需要在执行完真正的业务逻辑之后再返回响应给Broker。
但是要注意消息可靠性增强了,性能就下降了,等待消息刷盘、多副本同步后返回都会影响性能。因此还是看业务,例如日志的传输可能丢那么一两条关系不大,因此没必要等消息刷盘再响应。
(4)消息堆积
消息的堆积往往是因为生产者的生产速度与消费者的消费速度不匹配。有可能是因为消息消费失败反复重试造成的,也有可能就是消费者消费能力弱,渐渐地消息就积压了。
因此我们需要先定位消费慢的原因,如果是bug则处理 bug ,如果是因为本身消费能力较弱,我们可以优化下消费逻辑,比如之前是一条一条消息消费处理的,这次我们批量处理,比如数据库的插入,一条一条插和批量插效率是不一样的。
假如逻辑我们已经都优化了,但还是慢,那就得考虑水平扩容了,增加Topic的队列数和消费者数量,注意队列数一定要增加,不然新增加的消费者是没东西消费的。一个Topic中,一个队列只会分配给一个消费者。
当然你消费者内部是单线程还是多线程消费那看具体场景。不过要注意上面提高的消息丢失的问题,如果你是将接受到的消息写入内存队列之后,然后就返回响应给Broker,然后多线程向内存队列消费消息,假设此时消费者宕机了,内存队列里面还未消费的消息也就丢了。
2. 数据一致性
这个其实是分布式服务本身就存在的一个问题,不仅仅是消息队列的问题,但是放在这里说是因为用了消息队列这个问题会暴露得比较严重一点。就像我开头说的,你下单的服务自己保证自己的逻辑成功处理了,你成功发了消息,但是优惠券系统,积分系统等等这么多系统,他们成功还是失败你就不管了?我说了保证自己的业务数据对的就好了,其实还是比较不负责任的一种说法。(分布式事务)
分布式事务
事务:
一般是指要做的或所做的事情。
在计算机术语中是指访问并可能更新数据库中各种数据项的一个程序执行单元(unit)。
事务通常由高级数据库操纵语言或编程语言(如SQL,C++或Java)书写的用户程序用户程序的执行所引起,并用形如begin transaction和end transaction语句(或函数调用)来界定。
事务由事务开始(begin transaction)和事务结束(end transaction)之间执行的全体操作组成。
特性
事务是恢复和并发控制的基本单位。
事务应该具有4个属性:原子性、一致性、隔离性、持久性
。这四个属性通常称为ACID
特性。
原子性(atomicity)
:一个事务是一个不可分割的工作单位,事务中包括的操作要么都做,要么都不做。
一致性(consistency)
:事务必须是使数据库从一个一致性状态变到另一个一致性状态。一致性与原子性是密切相关的。
隔离性(isolation)
:一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对并发的其他事务是隔离的,并发执行的各个事务之间不能互相干扰。
持久性(durability):持久性也称永久性(permanence)
,指一个事务一旦提交,它对数据库中数据的改变就应该是永久性的。接下来的其他操作或故障不应该对其有任何影响。
总结起来就是:事务就是一系列操作,要么同时成功,要么同时失败。事务具有的特性 ACID (原子性、一致性、隔离性、持久性)。
分布式事务:
我接触和了解到的分布式事务大概分为:
- 2pc(两段式提交)
- 3pc(三段式提交)
- TCC(Try、Confirm、Cancel)
- 最大努力通知
- XA
- 本地消息表(ebay研发出的)
- 半消息/最终一致性(RocketMQ)
介绍下最简单的2pc(两段式)
,以及大家以后可能比较常用的半消息事务
也就是最终一致性
,目的是让大家理解下分布式事务里面消息中间件的作用,别的事务都大同小异,都有很多优点。
当然也都有种种弊端:
- 例如
长时间锁定数据库资源
,导致系统的响应不快,并发上不去
。 - 网络抖动出现
脑裂
情况,导致事物参与者,不能很好地执行协调者的指令,导致数据不一致。 - 单点故障:例如事物协调者,在某一时刻宕机,虽然可以通过选举机制产生新的Leader,但是这过程中,必然出现问题,而TCC,只有强悍的技术团队,才能支持开发,成本太高。
2pc(两段式提交):
2pc(两段式提交)可以说是分布式事务的最开始的样子了,像极了媒婆,就是通过消息中间件协调多个系统,在两个系统操作事务的时候都锁定资源但是不提交事务,等两者都准备好了,告诉消息中间件,然后再分别提交事务。
是的你可能已经发现了,如果A系统事务提交成功了,但是B系统在提交的时候网络波动或者各种原因提交失败了,其实还是会失败的。
最终一致性:
整个流程中,我们能保证是:
- 业务主动方本地事务提交失败,业务被动方不会收到消息的投递。
- 只要业务主动方本地事务执行成功,那么消息服务一定会投递消息给下游的业务被动方,并最终保证业务被动方一定能成功消费该消息(消费成功或失败,即最终一定会有一个最终态)。
不过呢技术就是这样,各种极端的情况我们都需要考虑,也很难有完美的方案,所以才会有这么多的方案三段式、TCC、最大努力通知等等分布式事务方案,大家只需要知道为啥要做,做了有啥好处,有啥坏处,在实际开发的时候都注意下就好好了,系统都是根据业务场景设计出来的,离开业务的技术没有意义,离开技术的业务没有底气。
3. 高可用
无论是我们使用消息队列来做解耦、异步还是削峰,消息队列肯定不能是单机的。试着想一下,如果是单机的消息队列,万一这台机器挂了,那我们整个系统几乎就是不可用了。
所以,当我们项目中使用消息队列,都是得集群/分布式的。要做集群/分布式就必然希望该消息队列能够提供现成的支持,而不是自己写代码手动去实现。
五. 消息队列中间件
目前使用较多的消息队列有Kafka、ActiveMQ、RabbitMQ、RocketMQ,我们后面会一一对比这些消息队列。
ActiveMQ和RabbitMQ这两着因为吞吐量还有GitHub的社区活跃度的原因,在各大互联网公司都已经基本上绝迹了,业务体量一般的公司会是有在用的,但是越来越多的公司更青睐RocketMQ这样的消息中间件了。
Kafka和RocketMQ一直在各自擅长的领域发光发亮,不过写这篇文章的时候我问了蚂蚁金服,字节跳动和美团的朋友,好像大家用的都有点不一样,应该都是各自的中间件,可能做过修改,也可能是自研的,大多没有开源。
1. 对比
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级,比RocketMQ、Kafka低一个数量级 | 万级 | 10万级,支撑高吞吐量 | 10万级,高吞吐,一般配合大数据类系统进行实时数据计算、日志采集等场景 |
topic数量对吞吐量的影响 | topic可以达到几百/几千的级别,吞吐量会有较小幅度下降,这是RocketMQ的一大优势,在同等机器下可以支撑大量的Topic | topic从几十到几百个,吞吐量会大幅下降,在同等机器下,Kafka尽量保证topic数量不要过多,如需支撑大规模Topic需要增加更多的机器资源 | ||
时效性 | ms级 | 微秒级,延迟最低,这是RabbitMQ的一大特性 | ms级 | 延迟在ms级内 |
可用性 | 高,基于主从架构实现高可用 | 同ActiveMQ | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 基本不会丢失 | 经过参数优化配置,可以做到0丢失 | 同RocketMQ |
功能支持 | MQ领域的功能机器完备 | 基于erlang开发,并发能力很强,性能极好,延时很低 | MQ功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集中被大规模的使用 |
社区活跃度 | 低 | 中 | 高 | 高 |
2. 选择
从上面对比可以看出其中的一些差距:
吞吐量:早期比较活跃的ActiveMQ 和RabbitMQ基本上不是后两者的对手了,在现在这样大数据的年代吞吐量真的很重要。
比如现在突然爆发了一个超级热点新闻,你的APP注册用户高达亿数,你要想办法第一时间把突发全部推送到每个人手上,你没有大吞吐量的消息队列中间件用啥去推?
部署方式:后面两个天然分布式架构,都是高可用的分布式架构,而且数据多个副本的数据也能做到0丢失。
RabbitMQ 这个中间件其实还行,但是这玩意开发语言居然是erlang,我敢说绝大部分工程师肯定不会为了一个中间件去刻意学习一门语言的,开发维护成本你想都想不到,出个问题查都查半天。
RocketMQ(阿里开源的)git活跃度还可以,基本上你push了自己的bug确认了有问题都有阿里大佬跟你试试解答并修复的,我个人推荐的也是这个,他的架构设计部分跟同样是阿里开源的一个RPC框架是真的很像(Dubbo)。
Kafka:大数据领域,公司的日志采集,实时计算等场景,都离不开他的身影,他基本上算得上是世界范围级别的消息队列标杆了。
没有最好的技术,只有最适合的技术,不要为了用而用
RocketMQ
1. 介绍
RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。
2. 发展历程
2007年:淘宝实施了“五彩石”项目,“五彩石”用于将交易系统从单机变成分布式,也是在这个过程中产生了阿里巴巴第一代消息引擎——Notify。
2010年:阿里巴巴B2B部门基于ActiveMQ的5.1版本也开发了自己的一款消息引擎,称为Napoli,这款消息引擎在B2B里面广泛地被使用,不仅仅是在交易领域,在很多的后台异步解耦等方面也得到了广泛的应用。
2011年:业界出现了现在被很多大数据领域所推崇的Kafka消息引擎,阿里巴巴在研究了Kafka的整体机制和架构设计之后,基于Kafka的设计使用Java进行了完全重写并推出了MetaQ 1.0版本,主要是用于解决顺序消息和海量堆积的问题。
2012年:阿里巴巴开源其自研的第三代分布式消息中间件——RocketMQ。
经过几年的技术打磨,阿里称基于RocketMQ技术,目前双十一当天消息容量可达到万亿级。
2016年11月:阿里将RocketMQ捐献给Apache软件基金会,正式成为孵化项目。
阿里称会将其打造成顶级项目。这是阿里迈出的一大步,因为加入到开源软件基金会需要经过评审方的考核与观察。
坦率而言,业界还对国人的代码开源参与度仍保持着刻板印象;而Apache基金会中的342个项目中,暂时还只有Kylin、CarbonData、Eagle 、Dubbo和 RocketMQ 共计五个中国技术人主导的项目。
2017年2月20日:RocketMQ正式发布4.0版本,专家称新版本适用于电商领域,金融领域,大数据领域,兼有物联网领域的编程模型。
以上就是RocketMQ的整体发展历史,其实在阿里巴巴内部围绕着RocketMQ内核打造了三款产品,分别是MetaQ、Notify和Aliware MQ。
这三者分别采用了不同的模型,MetaQ主要使用了拉模型,解决了顺序消息和海量堆积问题;Notify主要使用了推模型,解决了事务消息;而云产品Aliware MQ则是提供了商业化的版本。
3.功能
Apache RocketMQ是一个分布式消息传递和流平台,具有低延迟,高性能和可靠性,万亿级容量和灵活的可伸缩性。
它具有多种功能:
- 消息模式包括发布/订阅,请求/答复和流式传输
- 财务级交易信息
- 基于DLedger的内置容错和高可用性配置选项
- 各种跨语言客户端,例如Java,C / C ++,Python,Go
- 可插拔的传输协议,例如TCP,SSL,AIO
- 内置消息跟踪功能,还支持开放式跟踪
- 多功能的大数据和流生态系统集成
- 按时间或偏移量追溯消息
- 可靠的FIFO和严格的有序消息传递在同一队列中
- 高效的推拉消费模型
- 单个队列中的百万级消息累积容量
- 多种消息传递协议,例如JMS和OpenMessaging
- 灵活的分布式横向扩展部署架构
- 快如闪电的批量消息交换系统
- 各种消息过滤器机制,例如SQL和Tag
- 用于隔离测试和云隔离群集的Docker映像
- 功能丰富的管理仪表板,用于配置,指标和监视
- 认证与授权
- 免费的开源连接器,适用于源和接收器
4. 核心模块
- rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息
- rocketmq-client:提供发送、接受消息的客户端API。
- rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息。
- rocketmq-common:通用的一些类,方法,数据结构等。
- rocketmq-remoting:基于Netty4的client/server + fastjson序列化 + 自定义二进制协议。
- rocketmq-store:消息、索引存储等。
- rocketmq-filtersrv:消息过滤器Server,需要注意的是,要实现这种过滤,需要上传代码到MQ!(一般而言,我们利用Tag足以满足大部分的过滤需求,如果更灵活更复杂的过滤需求,可以考虑filtersrv组件)。
- rocketmq-tools:命令行工具。
5. 四大核心组成部分
主要有四大核心组成部分:
NameServer
Broker
Producer
Consumer
我们可以看到RocketMQ啥都是集群部署的,这是他吞吐量大,高可用的原因之一,集群的模式也很花哨,可以支持多master 模式、多master多slave异步复制模式、多 master多slave同步双写模式。
而且这个模式好像Kafka啊!(我这里是废话,本身就是阿里基于Kafka的很多特性研发的)。
(1)NameServer
- 其角色类似Dubbo中的Zookeeper,但NameServer与Zookeeper相比更轻量。主要是因为每个NameServer节点互相之间是独立的,没有任何信息交互。
- NameServer压力不会太大,平时主要开销是在维持心跳和提供Topic-Broker的关系数据。
- 但有一点需要注意,Broker向NameServer发心跳时, 会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),会导致一次心跳中,就Topic的数据就几十M,网络情况差的话, 网络传输失败,心跳失败,导致NameServer误认为Broker心跳失败。
- NameServer 被设计成几乎无状态的,可以横向扩展,节点之间相互之间无通信,通过部署多台机器来标记自己是一个伪集群。
- 每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。
(2)Broker
- 消息中转角色,负责存储消息,转发消息。
- Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer,顺带一提底层的通信和连接都是基于Netty实现的。
- Broker负责消息存储,以Topic为纬度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。
- 官网上有数据显示:具有上亿级消息堆积能力,同时可严格保证消息的有序性。
(3)Producer
- 消息生产者,负责产生消息,一般由业务系统负责产生消息。
- Producer由用户进行分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败。
- RocketMQ 提供了三种方式发送消息:同步、异步和单向
(1)同步发送:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。
(2)异步发送:异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。
(3)单向发送:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。
(4)Consumer
- 消息消费者,负责消费消息,一般是后台系统负责异步消费。
- Consumer也由用户部署,支持PUSH和PULL两种消费模式,支持集群消费和广播消息,提供实时的消息订阅机制。
- Pull:拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型。
- Push:推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息。
6. 优缺点
RocketMQ优点:
- 单机吞吐量:十万级
- 可用性:非常高,分布式架构
- 消息可靠性:经过参数优化配置,消息可以做到0丢失
- 功能支持:MQ功能较为完善,还是分布式的,扩展性好
- 支持10亿级别的消息堆积,不会因为堆积导致性能下降
- 源码是java,我们可以自己阅读源码,定制自己公司的MQ,可以掌控
- 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况
- RoketMQ在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择RocketMQ
RocketMQ缺点:
- 支持的客户端语言不多,目前是java及c++,其中c++不成熟
- 社区活跃度不是特别活跃那种
- 没有在 mq 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码
Kafka介绍:
Kafka是天然分布式的
现在我们已经知道了往topic里边放数据,实际上这些数据会分到不同的partition上,这些partition存在不同的broker上。分布式肯定会带来问题:“万一其中一台broker(Kafka服务器)出现网络抖动或者挂了,怎么办?”
Kafka是这样做的:我们数据存在不同的partition上,那kafka就把这些partition做备份。比如,现在我们有三个partition,分别存在三台broker上。每个partition都会备份,这些备份散落在不同的broker上。
生产者往topic丢数据,是与主分区交互,消费者消费topic的数据,也是与主分区交互。
备份分区仅仅用作于备份,不做读写。如果某个Broker挂了,那就会选举出其他Broker的partition来作为主分区,这就实现了高可用。
当生产者把数据丢进topic时,我们知道是写在partition上的,那partition是怎么将其持久化的呢?(不持久化如果Broker中途挂了,那肯定会丢数据嘛)。
Kafka是将partition的数据写在磁盘的(消息日志),不过Kafka只允许追加写入(顺序访问),避免缓慢的随机 I/O 操作。
参考公众号:三太子敖丙