kafka与RabbitMQ的区别?
1、确认机制不同
在RabbitMQ中,消息确认是指生产者发送消息到RabbitMQ后,等待RabbitMQ返回确认信息,确认消息已经被正确地接收并处理。通常情况下,【消息确认是同步进行的】即必须等到 【消息队列】 接收到和处理了消息后,才会返回确认信息。这样可以确保消息队列完全处理了消息,避免消息的丢失,保证了数据的一致性和可靠性。
而在Kafka中,为了保证高吞吐率,没有提供消息确认机制(at-most-once和at-least-once除外),数据是异步推送到broker中的,一旦生产者把消息推送到broker后,就不会再对该消息进行确认处理,消费者需要自己对消息的处理情况进行确认。在Kafka中,也有一些同步的模式(例如producer.send()方法中的同步调用),可以确保发送的消息被同步复制到Kafka的所有副本,并在接收到确认后才会返回。
2、实施部署不同
RabbitMQ 稳定可靠,数据⼀致,⽀持多协议,有消息确认,基于erlang语⾔
Kafka ⾼吞吐,⾼性能,快速持久化,⽆消息确认,⽆消息遗漏,可能会有有重复消息,依赖于zookeeper,成本高,“成本高”的含义是指 Kafka 部署和运维所需要的成本相比 RabbitMQ更高需要更加复杂的基础设施来支持它的运行,这些基础设施包括:
一、 集群环境成本高:Kafka 集群需要依赖 ZooKeeper 来实现节点的选举和协调工作,而 ZooKeeper 集群的部署和配置都比 RabbitMQ 更加复杂,需要更多的服务器资源和维护成本
二. 数据持久化成本高:Kafka 依赖磁盘的高速读写特性来实现其快速的消息持久化,因此需要更高的成本来维护磁盘的健康状态和数据备份和恢复等工作。
总的来说,Kafka 的高吞吐、高性能的特点是与高成本密切相关的,如果一个应用场景对性能要求很高,且具备一定的技术架构能力,那么可以选择使用 Kafka。而如果应用场景对可靠性和稳定性要求较高,对【性能要求不太高,且业务规模不太大 ,那么选择 RabbitMQ 更为合适。
逻辑结构:
用一栋大楼类比说明
1、Broker 是指 RabbitMQ 服务器实例,并根据预先定义的规则将消息发送到合适的队列或 Exchange
2、exchange是 RabbitMQ 中用于接收和分发消息的中间组件,它负责接收来自生产者的消息,并将其路由到与之相关的1个或多个队列 (生产者到交换机)
3、队列:房间是存放消息的地方(每个房间)
4、绑定:绑定规定了交换机和队列之间的连接(楼层之间的通道)
5、虚拟主机:虚拟主机则是 RabbitMQ 中用于【逻辑隔离】的重要概念,它类似于一个命名空间(每层楼)
可以在一个 Broker 内部创建多个不同的虚拟主机,每个虚拟主机都有自己的命名空间,并且可以独立配置和管理其中Exchange、Queue、Binding、用户权限等
在 RabbitMQ 中是可以设置多个
交换机的。通过设置不同类型的交换机以及它们之间的绑定关系,可以实现复杂的消息路由和处理逻辑
交换机类型和工作模式都涉及消息在 RabbitMQ 中的传递和处理,但它们的焦点和作用有所不同。交换机类型主要关注消息的路由和分发,决定消息如何被发送到队列
中;而工作模式主要关注消费者对消息的接收和处理方式,决定消息如何被消费
消息发送模式:
1、简单模式:一个队列只有一个消费者
2、工作模式:多个消费者监听同一个队列,但消费者中只有一个消费者会成功消费消息(负载均衡)
3、发布/订阅模式:一个交换机绑定多个队列,消息同时被所有队列消费(集体通知)
4、路由模式:它通过在交换机和队列之间建立绑定关系,并定义路由键来实现消息的有选择性
地路由传递(订阅广告)
有一个即时消息系统或者新闻订阅系统需要将消息或新闻推送给多个用户,这时可以采用发布/订阅模式。生产者将消息发送到交换机 (exchange),交换机将消息广播给多个订阅者,订阅者通过绑定交换机来接收消息。在这种模式下,每个订阅者都会接收到相同的消息
,可以采用不同的队列来对订阅者进行分组,以实现订阅者之间的隔离和不同级别的消息推送
工作模式和发布订阅模式的区别?
在工作模式中,多个消费者可以并行地处理来自同一个队列的任务
每个消息只能由一个消费者处理,但是多个消息可以同时被多个消费者处理
在发布订阅模式中,虽然消息会被广播到多个订阅者,但是每个订阅者独立地接收和处理消息。因此,不同的订阅者可以并行地处理自己接收到的消息路由模式和发布订阅模式的区别?
发布订阅Fanout(广播)模式:在 Fanout 模式中,交换机会将消息广播到绑定的所有队列。无论队列是否有不同的路由键或其他绑定条件,交换机都会将消息传递给所有绑定的队列。
路由Redirect(重定向)模式:在 Redirect 模式中,交换机会将消息发送到1个特定的队列而不是广播到所有绑定的队列。这种模式适用于需要将消息定向发送到1个特定的队列
创建用户和创建交换机
创建用户:创建用户名、密码
创建交换机:
1、选择虚拟主机
2、选择交换机类型
3、持久性……
在创建交换机时,durability
是一个可选的参数,用于配置交换机的持久性。持久性指的是当消息代理(如 RabbitMQ)重新启动
时,交换机是否仍然存在。如果将 durability 设置为 true,交换机将被标记为持久的,即使消息代理重新启动,交换机也将保留
。这意味着在消息代理恢复正常运行后,交换机将保持不变
如果你的应用程序需要频繁地更改或删除交换机,并且你不希望在每次更改时手动删除或重新创建交换机,那么可以选择非持久化交换机
4、交换机绑定队列
发布消息并设置消息属性
在 RabbitMQ 的 Java 客户端 API 中,可以在发布消息时设置这些消息属性。
下面是一个示例代码,展示了如何设置消息的持久性、优先级和过期时间
:
// 创建连接和通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 创建消息属性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 设置消息的持久性
.priority(5) // 设置消息的优先级
.expiration("10000") // 设置消息的过期时间
.build();
// 发布消息
String exchangeName = "exchange1";
String routingKey = "routingKey1";
String message = "Hello, RabbitMQ!";
channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());
// 关闭连接和通道
channel.close();
connection.close();
在上述代码中,首先创建了连接和通道。然后使用 AMQP.BasicProperties.Builder
类来创建消息属性对象,通过链式调用方法来设置属性,例如使用 deliveryMode
方法设置消息的持久性(1非持久2持久),使用 priority
方法设置消息的优先级,使用 expiration
方法设置消息的过期时间。最后,调用 basicPublish
方法发布消息时,将消息属性对象作为参数传递,即可在消息中包含相应的属性。
使用RabbitMQ传递对象
发送和接收的都是字符串/字节数组类型的消息
- 使用
序列化
对象
消息提供者
@Service
public class MQService {
@Resource
private AmqpTemplate amqpTemplate;
public void sendGoodsToMq(Goods goods){
//消息队列可以发送 字符串、字节数组、序列化对象
byte[] bytes = SerializationUtils.serialize(goods);
amqpTemplate.convertAndSend("","queue1",bytes);
}
}
消息消费者
@Component
@RabbitListener(queues = "queue1")
public class ReceiveService {
@RabbitHandler
public void receiveMsg(byte[] bs){
Goods goods = (Goods) SerializationUtils.deserialize(bs);
System.out.println("byte[]---"+goods);
}
}
PS:为什么要实现序列化接口?
1、易于传输和存储:将 Java 对象序列化为字节数组
后,可以将其转换为不同的格式,例如 JSON 或 XML,用于传输到不同的系统之间。字节数组还可以在数据库中进行持久化存储,以便以后检索和使用。
2、平台无关性:通过使用序列化对象,可以将 Java 对象转换为平台无关的字节序列,这样它们可以在不同的系统
之间进行传输和存储,包括跨语言和跨平台的场景。
3、可扩展性:可以在对象中添加新字段和属性,并保证可以与旧版本兼容,以便在不同版本的应用程序之间进行传输和存储。这可以通过维护对象的序列化版本来实现。
4、高效性:通过序列化对象,可以减少在网络或磁盘 I/O 期间传输数据的大小,从而提高传输效率并减少网络流量。这是因为序列化过程通常会去除对象中的一些额外信息或元数据,只保留必要的数据以及用于恢复对象的必要信息。
- 使用
JSON
字符串传递
消息提供者
@Service
public class MQService {
@Resource
private AmqpTemplate amqpTemplate;
public void sendGoodsToMq(Goods goods) throws
JsonProcessingException {
//消息队列可以发送 字符串、字节数组、序列化对象
ObjectMapper objectMapper = new ObjectMapper();
String msg = objectMapper.writeValueAsString(goods);
amqpTemplate.convertAndSend("","queue1",msg);
}
}
RabbitMQ事务、消息确认和return机制、手动ACK
RabbitMQ事务指的是基于客户端实现的事务管理,当在消息发送过程中添加了事务,处理效率降低
几十倍甚至上百倍
Connection connection = RabbitMQUtil.getConnection(); //connection 表
示与 host1的连接
Channel channel = connection.createChannel();
channel.txSelect(); //开启事务
try{
channel.basicPublish("ex4", "k1", null, msg.getBytes());
channel.txCommit(); //提交事务
}catch (Exception e){
channel.txRollback(); //事务回滚
}finally{
channel.close();
connection.close();
}
同步等待:为了实现事务的原子性
,事务提交是一个同步操作,即发送端会等待事务提交的结果,只有在返回提交成功的响应后才会继续发送下一条消息。
重复操作:如果发送端在发送消息的过程中出现错误或异常,事务会回滚
并且消息会重发
,这种方式可以保证消息的可靠性传递
所以,在实际场景中,如果对消息的实时性要求较高或对消息处理的吞吐量有较高要求,建议尽量避免使用事务
PS:消息吞吐量(Message Throughput):消息吞吐量指的是系统在单位时间
内处理的消息量,对于需要高吞吐量的应用来说,高效地传递和处理消息是非常重要的。使用事务机制会对消息发送的性能造成一定的影响,因为事务机制需要等待事务提交的确认
,会增加发送消息的延迟。
事务如何确保消息发送的可靠性?
消息的可靠性:从 生产者发送消息 —— 消息队列存储消息 —— 消费者消费消息的整个过程中消息的安全性及可控性
RabbitMQ提供了消息确认机制及return机制
意义:消息确认机制可以帮助发送方确保消息成功发送到交换机,并可以跟踪
消息是否被一个或多个消费者成功消费
。Return 机制则是确保了消息成功从交换机分发到队列的过程,当消息无法
成功路由到队列时,消息会被 Return
给消息提供者
- 消息确认机制:当消息提供者将消息发送到交换机时,交换机会对消息提供者进行反馈(到达交换机)
- return机制:当消息达到交换机之后,交换机会将消息分发到队列,MQ会将分发的结果也会反馈给消息提供者(到达队列)
在 RabbitMQ 中使用消息确认机制可以确保消息被成功发送到交换机,并被一个或多个消费者成功消费。
RabbitMQ 提供了同步和异步两种消息确认方式
同步消息确认指的是,在消息发送之后,发送方会阻塞等待确认结果。具体来说,发送方通过 waitForConfirms()
方法等待 RabbitMQ 发送确认结果,直到接收到确认结果或者超时时间到达。如果确认结果为消息成功发送到交换机并被一个或多个消费者消费,则 waitForConfirms()
方法返回 true,否则返回 false。
异步消息确认指的是,在消息发送之后,发送方不会阻塞等待确认结果。相反,它通过添加ConfirmListener
来异步处理确认回调
。确认回调会在消息成功发送到交换机并被一个或多个消费者消费或者发送失败时触发。
异步消息确认相较于同步消息确认,其对消息发送方更加友好。使用异步确认的优点是可以提高发送消息的吞吐量,因为发送方能够继续
发送下一条
消息,而不需要等待每条消息的确认结果。另外,异步确认还可以通过监听确认回调在消息发送失败
时及时发现并处理这种情况,而同步确认则只能通过超时等待
的方式判断消息是否发送成功。
普通maven项目的消息确认机制
下面是 RabbitMQ 中 Java 客户端使用消息确认和批量发送消息的示例代码,以及使用确认监听器异步处理确认结果:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MessageSender {
private static final String EXCHANGE_NAME = "ex1";
private static final String ROUTING_KEY = "a";
private static final String MESSAGE = "Hello, RabbitMQ!";
private static final int MESSAGE_COUNT = 10;
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//开启消息确Z
channel.confirmSelect();
//批量发送消息
for (int i=0 ; i<MESSAGE_COUNT ; i++) {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());
}、
//假如发送消息需要10s,waitForConfirms会进⼊阻塞状态(同步)
//boolean b = channel.waitForConfirms();
//使用监听器异步处理确认结果
channel.addConfirmListener(new ConfirmListener() {
//参数1:long deliveryTag 返回消息的标识
//参数2:boolean multiple 是否为批量confirm
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("~~~~~消息成功发送到交换机");
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("~~~~~消息发送到交换机失败");
}
});
}
}
当调用 channel.waitForConfirms()
时,它会阻塞当前线程,直到 RabbitMQ 确认消息是否成功发送到队列或交换机。
换句话说,在以下情况下,waitForConfirms()
会阻塞程序继续执行:
- 当你发送一条消息后,还没有收到 RabbitMQ 的确认消息;
- 当发送消息时发生异常,如网络中断或发送超时;
- 当你设置了批量发送消息的模式,并且尚未收到足够数量的消息确认,或者
超过
了设置的超时时间
。
简而言之,waitForConfirms()
会等待确认消息的返回,确保消息成功发送到 RabbitMQ。
消息确认机制不光监听成功
的消息同时也监听失败
的消息
spring项目的return机制
@Component
public class MyReturnListener implements RabbitTemplate.ReturnsCallback {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息从交换机分发到队列失败");
String exchange = returnedMessage.getExchange();
String routingKey = returnedMessage.getRoutingKey();
String msg = returnedMessage.getMessage().toString();
amqpTemplate.convertAndSend(exchange,routingKey,msg);
}
}
在上述代码中,returnedMessage
方法是用于处理消息从交换机分发到队列失败的回调方法。当消息无法路由到指定的队列时,会触发该方法。
下面是代码中各个参数的含义:
returnedMessage
对象:表示被退回的消息的相关信息。
getExchange()
:获取退回消息所使用的交换机名称。
getRoutingKey()
:获取退回消息时所使用的路由键。
getMessage()
:获取退回的消息对象,可以通过toString()
方法将其转换为字符串。
在该方法中,它首先打印了一条消息提示,表示消息从交换机分发到队列失败。然后,通过 amqpTemplate.convertAndSend(exchange,routingKey, msg)
方法将消息重新发送到指定的交换机和路由键,以便重新分发消息
RabbitMQ消费者手动应答(手动ACK)
@Component
@RabbitListener(queues = "queue01")
public class Consumer1 {
@RabbitHandler
public void process(String msg, Channel channel, Message message) throws IOException {
try {
System.out.println("成功接收到消息 msg = " + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.err.println("接收消息失败 msg = " + msg);
}
}
}
在上述代码中,channel.basicAck
是用于手动确认消息的方法,用于告知 RabbitMQ 已经成功处理(消费)了一个消息。下面是 channel.basicAck
方法的各个参数的详细说明:
deliveryTag
:这是一个消息的唯一标识符。每个消息都有一个独特的deliveryTag
,用于标识消息的顺序。通过message.getMessageProperties().getDeliveryTag()
可以获取当前处理消息的deliveryTag
。
PS:每条消息在 RabbitMQ 中都会被分配一个唯一
的deliveryTag
,用于标识消息的顺序。deliveryTag 是一个连续递增的正整数
,每次消费一条消息时,deliveryTag 会被分配并随消息一起传递给消费者。消费者在处理完一条消息后,需要对该消息进行手动 ACK 才能告知 RabbitMQ 已经成功处理了该消息,此时会将该消息的 deliveryTag 返回给 RabbitMQ 作为确认-
multiple
:这是一个布尔值,用于指定是否确认多个消息。对于简单地确认单个消息,可以将其设置为false
。如果要确认多个消息,可以将其设置为true
,这将确认所有比当前deliveryTag
小(包括当前deliveryTag
)的未确认消息。
PS:在消息队列系统中,通常会有多个消息同时被发送到队列中等待消费。当消费者处理消息时,通常会以一批消息的方式进行确认,而非逐条确认。因此,multiple参数的存在是为了提供一种批量确认
消息的机制在这种情况下,如果设置为
true
,则表示当前消息之前的所有消息也被认为已经处理完成,全部移除。如果设置为false
,则仅确认当前deliveryTag
的消息,然后将其从队列中移除。通常设置为false。 requeue
:这是一个布尔值,用于指定当确认消息时,是否将消息重新加入到队列。如果设置为false
,则代表不重新加入队列,即认为消息已被完全处理。如果设置为true
,则代表将消息重新加入队列,以便重新被消费。通常情况下,如果消息处理成功,则设置为false
,如果消息处理失败,则设置为true
。
在上述代码中,第一个 channel.basicAck
用于确认消息已经成功处理,同时使用了 message.getMessageProperties().getDeliveryTag()
来获取当前消息的 deliveryTag
值。
而第二个 channel.basicAck
是在处理消息出现异常时的回退机制。它使用与第一个 channel.basicAck
相同的参数,以确认消息处理失败,然后将消息重新加入队列(requeue
设置为 true
),以便重新消费。同时,使用了 message.getMessageProperties().getDeliveryTag()
获取当前消息的 deliveryTag
值。
手动ACK的意义体现在以下几个方面:
1、确保可靠性:消费者在处理消息时,可能会发生异常或出现错误的情况。如果没有手动ACK机制,当消费者在处理消息过程中出现异常时,消息队列将无法得知消息是否被成功处理,可能会导致消息的丢失
或重复
消费。手动ACK可以确保消费者在处理消息成功后再发送确认,从而确保消息的可靠性。
2、提高消息处理效率:手动ACK机制可以确保消息队列在收到消费者的ACK确认后,【将该消息从队列中移除
】,从而减少队列的负荷
。通过手动ACK,消费者可以在处理完一条消息后立即发送确认,减少了消息在队列中滞留的时间,提高了消息的处理效率
。
手动ACK与消息确认机制和return机制之间的区别?
手动 ACK 与消息确认机制:手动 ACK 是消费者在处理完一条消息后,向 RabbitMQ 发送确认信号,告知 RabbitMQ 已成功处理该消息的机制。通过手动 ACK,消费者
可以明确地通知 RabbitMQ 消息已经被处理,避免消息丢失
或重复消费
的情况发生,手动ACK是消费端
的机制。消息确认机制则是生产者端
的机制,用于确认消息已经被发送到交换机并被消费者接收,确保消息的可靠传递。手动 ACK 和消息确认机制共同保证了消息在生产者和消费者之间的可靠
传递。
手动 ACK 与消息返回机制:消息返回机制会在消息无法被路由到队列时将消息返回给生产者。在这种情况下,消费者可能无法接收到消息,因此消费者不会发送 ACK。手动 ACK 可以确保消费者在成功处理消息后再发送确认信号,而不是直接丢弃未处理的消息。通过手动 ACK,消费者能够在实际处理消息后再确认消息的处理情况,提高了消息传递的完整性和可靠性
消息的幂等性问题
要在 RabbitMQ 中实现幂等性,可以使用 Redis 的 SETNX`(Set if Not Exists)命令来进行辅助。
- 意义:
确保消息被处理一次而不会重复处理
确保消息处理的唯一性和正确性
SETNX 是 Redis 提供的一种原子性命令
,用于设置指定键的值,仅当该键不存在时才会设置成功。在 RabbitMQ 的消息处理中,可以将消息的唯一标识
或关键信息作为键,将已处理消息的标记作为值存储在 Redis 中,并利用 SETNX 来保证只有第一次
处理消息时才会成功设置标记,从而保证消息的幂等性
下面是一个示例代码:
@Component
@RabbitListener(queues = "queue01")
public class Consumer1 {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private final String LOCK_PREFIX = "messageLock:";
private final long LOCK_EXPIRE_TIME = 60 * 1000; // 锁的过期时间,单位毫秒
@RabbitHandler
public void process(String msg, Channel channel, Message message) throws IOException {
// 使用消息的唯一标识作为锁的 key
String lockKey = LOCK_PREFIX + message.getMessageProperties().getDeliveryTag();
Boolean isLocked = false;
try {
// 尝试获取锁
isLocked = redisTemplate.opsForValue().setIfAbsent(lockKey, "");
// 设置锁的过期时间
redisTemplate.expire(lockKey, LOCK_EXPIRE_TIME, TimeUnit.MILLISECONDS);
// 如果成功获取到了锁,则处理消息
if (isLocked) {
System.out.println("成功获取到锁并处理消息 msg = " + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("未获取到锁,消息已被其他消费者处理 msg = " + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.err.println("处理消息失败 msg = " + msg);
} finally {
// 释放锁
if (isLocked) {
redisTemplate.delete(lockKey);
}
}
}
}
- 添加了
RedisTemplate<String, Object>
对象的自动装配,该对象用于与 Redis 进行交互。 - 声明了一个以
messageLock:
作为前缀的锁定键,使用消息的唯一标识作为锁的唯一标识。 - 使用
RedisTemplate
的opsForValue().setIfAbsent()
方法尝试获取锁,如果成功获取到锁则继续处理该消息,否则说明该消息已被其他消费者处理。 - 使用
redisTemplate.expire()
设置锁的过期时间,避免死锁。 - 在
finally
块中,如果成功获取到了锁,则释放锁。
延迟机制
未完待续