1.消息如何保证100%投递成功
在学习队列的时候,我想很多朋友都在考虑这个问题。在讲RabbitMq的消息可靠性之前,我们要知道什么是生产端的可靠性投递:
1.保障消息的成功发出
2.保障MQ节点的成功接收
3.发送端收到MQ节点(Broker)确认应答
4.完善的消息进行补偿机制
目前BAT?TMD互联网大厂在保证生产端可靠性投递的解决方案有:
1.消息落库,对消息状态进行打表
2.消息的延迟投递,做二次确认,回调检查
我们看一下方案1,如下图所示:
第一步生产者向业务数据库和消息数据库落地数据。所谓的业务数据库就是比如类似于订单数据库这类数据库,消息数据库就是我们在投递消息之前要记录消息状态的数据库。在高并发下,一般我们不会对两者落地做事务处理,当然这也说明了在高并发场景下方案1确实有缺陷。
第二步生产者发送消息到RabbitMq
第三步Broker回传确认给生产者
第四步生产者将消息库对应的记录状态置为成功状态
这四步中,如果步骤2,步骤3甚至步骤4都有可能发生异常。所以此时我们需要一个定时任务不断的去轮询消息库,如果发现某一投递出去的消息在指定时间内还没有置为成功状态我们就要对这些消息做重新投递,让他重新做步骤1-4.这样就保证了消息的可靠性。
接下来我们看下方案2,如下图所示:
这种方案也是高并发下经常用的一种方式,Upstream service也就是生产者,它会在发送消息前业务数据库插入对应的业务数据(一定要先落库在发送消息)。DownStream监听到消息后做完对应的处理,会发送一个confirm到broker,这里的confirm不是ack,而是发送了一个消息给callback service的,它监听到这个消息后,会在msg db中落库,表示说消费者已经接受到消息并且处理完毕了。那么图中的二次延迟是神马意思呢?这个二次延迟消息是专门发送给callback service的,什么时候发送也是由生产者决定的,如果说callback service收到消息发现在msg db中没有查询到结果,此时callback service就会 RPC调用生产者再次发送一下消息,因为消费者那边失败了。
方案2相对于方案1就是说少了一步让生产者落地MSG DB的操作,这样会大大优化性能。
2.幂等性
在海量订单产生的业务高峰期,如何避免消息的重复消费问题?答案就是在消费端实现幂等性。目前业界主流的幂等性操作有两种:
1.唯一ID+指纹码机制,利用数据库主键去重
指纹码可能是外部传递进来的参数和内部的业务参数组合起来的指纹码。比如说我们在做消费的时候,指纹码可能是用户消费的时间戳,或者是时间戳加上业务内部的一些逻辑字段。select count(1) from t_order where id=唯一ID+指纹码,如果发现有的话说明已经支付过了,就不要做处理了,否则就往数据库中插入一条数据。这种方式的好处就是实现简单,坏处就是高并发下有数据库写入的性能瓶颈。解决方案就是根据ID进行分库分表提升写入性能。
2.利用Redis的原子性去实现
这种方式要考虑两个问题。
- 我们如果需要进行数据落库的话,关键解决问题是如何保证数据库和redis做到原子性
- 如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略。
3.Confirm确认消息详解
代码示例:
- 生产者
public class Producer {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.234.231.49");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 获取C onnection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
//4 指定我们的消息投递模式: 消息的确认模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
//5 发送一条消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
//6 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------no ack!-----------");
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});
}
}
- 消费者
public class Consumer {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.234.231.49");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 获取C onnection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.#";
String queueName = "test_confirm_queue";
//4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端: " + msg);
}
}
}
4.Return返回消息详解
代码示例:
生产者:
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.234.231.49");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_return_exchange";
String routingKey = "return.save";
String routingKeyError = "abc.save";
String msg = "Hello RabbitMQ Return Message";
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("---------handle return----------");
System.err.println("replyCode: " + replyCode);
System.err.println("replyText: " + replyText);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
});
channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
//channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
}
}
消费者:
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.234.231.49");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_return_exchange";
String routingKey = "return.#";
String queueName = "test_return_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费者: " + msg);
}
}
}
5.自定义消费者使用
在之前的例子中我们就是在代码中编写while循环,然后进行consumer.nextDelivery方法进行获取吓一跳消息,然后进行消费处理。但是我们也可以使用自定义的Consumer会更加方便,也是实际工作中最常用的使用方式。
代码示例:
生产者
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.234.231.49");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_consumer_exchange";
String routingKey = "consumer.save";
String msg = "Hello RabbitMQ Consumer Message";
for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}
}
}
消费者
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.234.231.49");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.#";
String queueName = "test_consumer_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
自定义消费者
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
6.限流策略
什么是消费端的限流,我们可以通过一个例子来说明:假设一个场景,rabbitmq服务器有上完条未处理的消息,我们随便打开一个消费者客户端,巨量的消息瞬间把消费端炸了。。。。
所以RabbitMq提供了一种qos(服务质量保证)功能,即在非自动确认的消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
prefetchSize 消息大小的限制,比如1MB,为0的时候,表示无限
prefetchCount:告诉rabbitMq不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,即该consumer将block掉,直到有消息ack。
global:true\false将设置应用于channel还是consumer
prefetchSize和global这两项,rabbitmq没有事先,暂且不研究。prefetch_count在no_ask=false的情况下生效,即在自动应答的情况下这两个值是不生效的。
代码示例:
生产者:
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.234.231.49");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_qos_exchange";
String routingKey = "qos.save";
String msg = "Hello RabbitMQ QOS Message";
for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}
}
}
消费者:
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.234.231.49");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//1 限流方式 第一件事就是 autoAck设置为 false
channel.basicQos(0, 3, false);
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
7.消费端ACK与重回队列机制
消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿。如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功!
消费端的重回队列是为了对没有处理成功的消息,把消息重新回递给Broker,但是在实际应用中,我们都会关闭重回队列,也就是设置为False。
代码示例:
生产者:
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.234.231.49");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_ack_exchange";
String routingKey = "ack.save";
for(int i =0; i<5; i ++){
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("num", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();
String msg = "Hello RabbitMQ ACK Message " + i;
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
消费者:
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.234.231.49");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "ack.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 手工签收 必须要关闭 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
自定义消费者:
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
8.TTL消息详解
这里的代码我们只演示一下设置单个消息的过期时间,而不是整个队列中消息的过期时间,所以只需要看看生产者的代码就可以了:
public class Procuder {
public static void main(String[] args) throws Exception {
//1 创建一个ConnectionFactory, 并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.234.231.49");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过connection创建一个Channel
Channel channel = connection.createChannel();
Map<String, Object> headers = new HashMap<>();
headers.put("my1", "111");
headers.put("my2", "222");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.headers(headers)
.build();
//4 通过Channel发送数据
for(int i=0; i < 5; i++){
String msg = "Hello RabbitMQ!";
//1 exchange 2 routingKey
channel.basicPublish("", "test001", properties, msg.getBytes());
}
//5 记得要关闭相关的连接
channel.close();
connection.close();
}
}
9.死信队列
代码示例:
生产者:
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.234.231.49");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_dlx_exchange";
String routingKey = "dlx.save";
String msg = "Hello RabbitMQ DLX Message";
for(int i =0; i<1; i ++){
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("3000")
.build();
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
消费者
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("49.234.231.49");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 这就是一个普通的交换机 和 队列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//这个agruments属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
//要进行死信队列的声明:
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
自定义消费者
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
实验结果