主流MQ对比
优点 | 缺点 | 适用场景 | |
---|---|---|---|
Kafka | 吞吐量非常大,性能非常好,技术生态完整 | 功能比较单一 | 分布式日志收集,大数据采集 |
RabbitMQ | 消息可靠性高,功能全面 | 吞吐量较低。消息积压会影响性能。 erlang语言比较小众 | 企业内部系统调用 |
RocketMQ | 高吞吐、高性能、高可用,高级功能非常全 | 技术生态相对没有那么完整 | 几乎全场景。尤其适合金融场景 |
分析各mq的优缺点,主要看各mq的诞生背景就能明白了
- rabbitmq是一个老牌的传统mq,它在设计之初并没有如今这么大的数据量,所以它的设计最初目标是消息可靠性,
功能全面,相对当前的新兴mq来说吞吐量较低,但作为一个老牌mq,能流行到现在必定也是不断再进步的,所以它也
在不断的突破自己的,比如引入了Streams等来解决积压问题。 - kafka最早是由LinkedIn开发的,最开始是为了做海量日志收集用的,所以它的设计目标是高吞吐量,高性能。
但它的功能就相对单一,因为它最初只需做好这一件事就好了,目前kafka的应用场景也是在大数据采集,分布式日
志收集等,它已成为大数据领域的一个核心组件,因此它的生态也是非常完整的。 - RocketMQ是由阿里开源的,那么阿里为什么不直接使用rabbit或kafka呢?很简单,因为这两个都不足以支撑
他们的业务,他们既要高吞吐量,高性能,又要高级功能全,所以他们只能自己开发一个mq,这就是rocketmq,所
以rocketmq几乎覆盖全场景,但它的缺点就是技术生态相对没有那么完整,
无论哪个mq产品,都在不断的发展竞争,当然也有跟不上发展的,比如activemq等
RabbitMQ核心概念
Connection
Connection是RabbitMQ中的一个概念,它相当于一个连接,它用来连接RabbitMQ服务器
Channel
旦客户端与RabbitMQ建立了连接,就会分配一个AMQP信道 Channel。每个信道都会被分配一个唯一的ID。也可以理解为是客户端与RabbitMQ实际进行数据交互的通道,我们后续的大多数的数据操作都是在信道 Channel 这个层面展开的。
RabbitMQ为了减少性能开销,也会在一个Connection中建立多个Channel,这样便于客户端进行多线程连接,这些连接会复用同一个Connection的TCP通道,所以在实际业务中,对于Connection和Channel的分配也需要根据实际情况进行考量
virtual host
virtual host是RabbitMQ中的一个概念,它相当于一个虚拟的目录,它用来隔离不同应用程序的资源,就好比数据库的库
queue
queue是RabbitMQ的内部对象,用于存储消息,它是消息的容器,这个Queue其实就是一个典型的FIFO的队列数据结构,生产方可以
推送数据到queue,消费方消费queue中的数据。
exchange
队列Queue即可以发消息,也可以收消息,那Exchange交换机是干什么的呢?其实他也是用来辅助发送消息的。Exchange与
Queue之间会建立一种绑定关系,通过绑定关系,Exchange交换机里发送的消息就可以分发到不同的Queue上
通过exchange就可以实现一些复杂的路由,比如我想把消息按照不同的类型分发到不同的队列,这个时候就可以
通过exchange,实际上生产者发送消息都是发送到exchange上,exchange再根据绑定关系将消息发送到queue上
那么基于什么规则进行路由转发呢,这就涉及到了routingKey
routingKey
routingKey是消息的路由规则,它也可以理解为消息的标签,它跟exchange绑定在一起,exchange根据routingKey
发送消息到queue上,这样就实现了消息的路由转发
核心概念图
Exchange类型
No Exchange
在 RabbitMQ 中,No Exchange 实际上是指使用默认的交换机,
默认交换机是一个特殊的 direct 交换机,其名称为空字符串 (""),
每个队列在创建时都会自动绑定到默认交换机,路由键为队列名称,
这意味着你可以直接将消息发送到队列,而不需要显式声明和绑定交换机
比如,队列名称是no-exchange,那么消费者发送消息时,指定exchange为"" ,
routingKey为队列名称,即"no-exchange",消息就会被发送到no-exchange队列
channel.basicPublish("", "no-exchange", null, message.getBytes());
Direct Exchange
DirectExchange是最简单的交换机,它可以绑定某个RoutingKey到某个队列
生产者负责发送消息到Exchange并指定RoutingKey,消费者负责监听队列并消费消息
Fanout Exchange
FanoutExchange 是一个广播模式的交换机,它会将消息发送给所有绑定到交换机的队列。
这种模式是一种广播模式,不需要路由键,只需要将队列绑定到交换机上。
使用广播模式中绑定队列到广播交换机,此时routingKey为空:
channel.queueBind(queueName, EXCHANGE_NAME, "");
Topic Exchange
TopicExchange相比于DirectExchange,它支持通配符模糊匹配,
比如发送日志信息,分为log.error,log.info两个routingKey,
我想有一个消费者可以消费所有得日志信息,还有另一个消费者仅消费error级别的日志信息,
那么可以创建两个队列,一个队列绑定到log.*,另一个队列绑定到log.error,
诸如这种场景,就可以使用TopicExchange
Headers Exchange
Headers Exchange是一个可以实现更复杂的路由交换机,
Headers Exchange 忽略了路由键的作用,完全依赖消息头进行路由,
适合于需要根据多个属性进行路由的场景
支持两种匹配策略:
- x-match=all:所有消息头键值对都必须匹配才能路由。
- x-match=any:只要有一个消息头键值对匹配即可路由。
Headers Exchange灵活性更高,但是性能会差一些,因为需要遍历所有的
消息头键值对进行匹配,此时发送代码如下:
Map<String, Object> headers1 = new HashMap<>();
headers1.put("type", "error");
headers1.put("priority", "high");
String message1 = "High-priority error log";
channel.basicPublish(EXCHANGE_NAME, "", getProperties(headers1), message1.getBytes());
此时消息会被路由到headers_queue1队列
Custom Exchange
自定义 Exchange 是 RabbitMQ 提供的一种扩展机制,允许用户通过插件创建和使用非标准的 Exchange 类型。
这些自定义 Exchange 可以提供更复杂和特定的路由逻辑
常见的第三方自定义 Exchange 插件:
- Consistent Hash Exchange(基于一致性哈希算法进行路由):
- Delayed Message Exchange(延迟消息传递)
- Sharding Exchange(将消息分片到多个队列)
Queue类型
rabbitmq目前支持三种queue类型:Classic经典队列,Quorum仲裁队列,Stream流式队列
RabbitMQ自3.8.x版本推出了Quorum仲裁队列,3.9.x版本推出了Stream流式队列。
这些新的队列类型都是RabbitMQ针对现代新的业务场景做出的大的改善。最明显的,以往的RabbitMQ版本,如果消息产生大量积累就会严重影响消息收发的性能。
而这两种新的队列可以极大的提升RabbitMQ的消息堆积性能
Classic经典队列
在RabbitMQ中,经典队列是一种非常传统的队列结构。消息以FIFO先进先出的方式存入队列。
消息被Consumer从队列中取出后就会从队列中删除,如果消息需要重新投递,就需要再次入队
这种队列都依靠各个Broker自己进行管理,在分布式场景下,管理效率是不太高的。
并且这种经典队列不适合积累太多的消息,因为消息都积压在内存中。
这种队列可以根据参数配置是否需要持久化
Quorum仲裁队列
相比Classic经典队列,
在分布式环境下对消息的可靠性保障更高,官方文档中表示,未来会使用Quorum仲裁队列代替传统Classic队列
Quorum是基于Raft一致性协议实现的一种新型的分布式消息队列,他实现了持久化,多备份的FIFO队列,主要就是针对RabbitMQ的集群设计的。
简单理解就是quorum队列中的消息需要有集群中多半节点同意确认后,才会写入到队列中。
这种方式可以保证消息在集群内部不会丢失。
同时,Quorum是以牺牲很多高级队列特性为代价,来进一步保证消息在分布式环境下的高可靠。
Quorum队列大部分功能都是在Classic队列基础上做减法,比如不支持非持久化模式,不支持独占队列
Quorum相比于Classic功能增强比如支持Poison Message handling(处理有毒的消息)。
所谓毒消息是指消息一直不能被消费者正常消费(可能是由于消费者失败或者消费逻辑有问题等),就会导致消息不断的重新入队,
这样这些消息就成为了毒消息。这些读消息应该有保障机制进行标记并及时删除。Quorum队列会持续跟踪消息的失败投递尝试次数,
并记录在"x-delivery-count"这样一个头部参数中。然后,就可以通过设置 Delivery limit参数来定制一个毒消息的删除策略。
当消息的重复投递次数超过了Delivery limit参数阈值时,RabbitMQ就会删除这些毒消息。
当然,如果配置了死信队列的话,就会进入对应的死信队列。
Quorum队列更适合于队列长期存在,并且对容错、数据安全方面的要求比低延迟、不持久等高级队列更能要求更严格的场景
例如 电商系统的订单,引入MQ后,处理速度可以慢一点,但是订单不能丢失。
Stream流式队列
Stream流式队列是RabbitMQ 3.9.x版本推出的新队列类型,它是一种新型的消息队列,专门为流式数据设计的。
Stream是持久化的,队列的核心是以append-only只添加的日志来记录消息,整体来说,就是消息将以append-only的方式持久化到日志文件中,
然后通过调整每个消费者的消费进度offset,来实现消息的多次分发(可以看出这种设计更像是当前的kafka和RocketMQ)。
Stream不支持死信交换机,不支持处理毒消息
这种队列提供了RabbitMQ已有的其他队列类型不太好实现的四个特点:
1、large fan-outs 大规模分发
当想要向多个订阅者发送相同的消息时,以往的队列类型必须为每个消费者绑定一个专用的队列。如果消费者的数量很大,这就会导致性能低下。而Stream队列允许任意数量的消费者使用同一个队列的消息,从而消除绑定多个队列的需求。
2、Replay/Time-travelling 消息回溯
RabbitMQ已有的这些队列类型,在消费者处理完消息后,消息都会从队列中删除,因此,无法重新读取已经消费过的消息。而Stream队列允许用户在日志的任何一个连接点开始重新读取数据。
3、Throughput Performance 高吞吐性能
Strem队列的设计以性能为主要目标,对消息传递吞吐量的提升非常明显。
4、Large logs 大日志
RabbitMQ一直以来有一个让人诟病的地方,就是当队列中积累的消息过多时,性能下降会非常明显。但是Stream队列的设计目标就是以最小的内存开销高效地存储大量的数据。使用Stream队列可以比较轻松的在队列中积累百万级别的消息。
整体上来说,RabbitMQ的Stream队列,其实有很多地方借鉴了其他MQ产品的优点,在保证消息可靠性的基础上,着力提高队列的消息吞吐量以及消息转发性能。
因此,Stream也是在视图解决一个RabbitMQ一直以来,让人诟病的缺点,就是当队列中积累的消息过多时,性能下降会非常明显的问题。
RabbitMQ以往更专注于企业级的内部使用,但是从这些队列功能可以看到,Rabbitmq也在向更复杂的互联网环境靠拢,未来对于RabbitMQ的了解,也需要随着版本推进,不断更新。
Publisher Confirms机制
RabbitMQ的消息可靠性是⾮常⾼的,但是他以往的机制都是保证消息发送到了MQ之后,可以推送到消费者消费,不
会丢失消息。
但是发送者发送消息是否成功是没有保证的。发送者发送消息的基础API:
Producer.basicPublish⽅法是没有返回值的,也就是说,⼀次发送消息是否成功,应⽤是不知道的,这在业务上就容
易造成消息丢失。
⽽这个模块就是通过给发送者提供⼀些确认机制,来保证这个消息发送的过程是成功的。
Publisher Confirms机制是RabbitMQ提供的⼀种异步确认机制,⽤于确认消息是否成功发送到Broker。
实现的⽅式也⽐较简单,Producer在channel中注册监听器来对消息进⾏确认。核⼼代码就是⼀个:
channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);
按说监听只要注册一个就可以了,那为什么这里要注册两个呢?成功一个,失败一个
除此之外,rabbitmq还提供了很多监听机制,来确保数据的可靠性
死信队列
死信队列是RabbitMQ中非常重要的一个特性。
简单理解,他是RabbitMQ对于未能正常消费的消息进行的一种补救机制。死信队列也是一个普通的队列,同样可以在队列上声明消费者,继续对消息进行消费处理。
有以下三种情况,RabbitMQ会将一个正常消息转成死信
- 消息被消费者确认拒绝。消费者把requeue参数设置为true(false),并且在消费后,向RabbitMQ返回拒绝。channel.basicReject或者channel.basicNack。
- 消息达到预设的TTL时限还一直没有被消费。
- 消息由于队列已经达到最长长度限制而被丢掉
RabbitMQ中有两种方式可以声明死信队列,一种是针对某个单独队列指定对应的死信队列。另一种就是以策略的方式进行批量死信队列的配置
联邦插件
在企业中有很多大型的分布式场景,在这些业务场景下,希望服务也能够同样进行分布式部署。
这样即可以提高数据的安全性,也能够提升消息读取的性能。
例如,某大型企业,可能在北京机房和长沙机房分别搭建RabbitMQ服务,然后希望长沙机房需要同步北京机房的消息,这样可以让长沙的消费者服务可以直接连接长沙本地的RabbitMQ,而不用费尽周折去连接北京机房的RabbitMQ服务。
这时要如何进行数据同步呢?搭建一个跨度这么大的内部子网显然就不太划算。这时就可以考虑使用RabbitMQ的Federation插件,搭建联邦队列Federation。
通过Federation可以搭建一个单向的数据同步通道
集群模式
rabbitmq提供两种集群模式
普通集群
这种模式使用Erlang语言天生具备的集群方式搭建。这种集群模式下,集群的各个节点之间只会有相同的元数据,即队列结构,而消息不会进行冗余,只存在一个节点中。消费时,如果消费的不是存有数据的节点, RabbitMQ会临时在节点之间进行数据传输,将消息从存有数据的节点传输到消费的节点。
很显然,这种集群模式的消息可靠性不是很高。因为如果其中有个节点服务宕机了,那这个节点上的数据就无法消费了,需要等到这个节点服务恢复后才能消费,而这时,消费者端已经消费过的消息就有可能给不了服务端正确应答,服务起来后,就会再次消费这些消息,造成这部分消息重复消费。 另外,如果消息没有做持久化,重启就消息就会丢失。
并且,这种集群模式也不支持高可用,即当某一个节点服务挂了后,需要手动重启服务,才能保证这一部分消息能正常消费。
所以这种集群模式只适合一些对消息安全性不是很高的场景。而在使用这种模式时,消费者应该尽量的连接上每一个节点,减少消息在集群中的传输。
镜像集群
这种模式是在普通集群模式基础上的一种增强方案,这也就是RabbitMQ的官方HA高可用方案。需要在搭建了普通集群之后再补充搭建。其本质区别在于,这种模式会在镜像节点中间主动进行消息同步,而不是在客户端拉取消息时临时同步。
并且在集群内部有一个算法会选举产生master和slave,当一个master挂了后,也会自动选出一个来。从而给整个集群提供高可用能力。
这种模式的消息可靠性更高,因为每个节点上都存着全量的消息。而他的弊端也是明显的,集群内部的网络带宽会被这种同步通讯大量的消耗,进而降低整个集群的性能。这种模式下,队列数量最好不要过多