1.在生产端,可以通过comfirm机制,让生产端获悉消息已被MQ接收(接收不成功则重发)。
2.消息被消费者领了,并且消费者设置了autoAsk(我拿了数据,你队列就把数据删了吧),但消费者运行一半之后崩了,导致的消息丢失。
解决办法:
//消费者消费消息
public static void main(String[] args) throws Exception {
//获取连接和通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明通道
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//这里设置成true的话,那在队列取消息后,队列会马上将消息从队列删掉
//这个自己按场景判断要不要设置为true
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, "myConsumerTag1",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
try{}
catch (Exception e){}
finally {
long deliveryTag = envelope.getDeliveryTag();
//autoAck设置为false的话,这里就一定要回应一下队列,否则消息会一直驻留在消息队列不会删除,导致内存泄露
channel.basicAck(deliveryTag, false);
}
System.out.println(consumerTag+"接收消息 : "+new String(body));
}
});
}
另外说明一点,如果消息消费完了,准备执行channel.basicAck的时候,消费端崩了。此时MQ发现这个消费者崩了,把消息发给另一个消费者消费,导致了重复消费问题。这个需要根据自身的场景判断用什么解决方案,比如我这边是生产者提交订单,消费者进行一系列动作“创建订单”,消费者多个,我在数据库保存有deliveryTag参数(消息的唯一标识 ID),每个消费消息前,都看看这个ID有没被insert到数据库,有则说明该消息已被消费,直接channel.basicAck即可。
3.rabbitMQ服务器宕机,导致队列里面的数据丢失
使用消息持久化:
在上述代码的channel.queueDeclare里面的第二个参数durable设置为true,那么队列就会被持久化,消息宕机重启也能恢复。
案例场景:
订单系统有订单,需要传给仓库系统做拣配。
这里不搞消息确认那一套,直接用代码解决问题。
订单发送给MQ时,订单状态设为“待拣配”,仓库系统拣配完成后,发消息到MQ,订单系统将订单状态改为“已拣配”。系统后台维护一个线程,如果发现一个订单1小时还未拣配,就再发送消息给MQ,如此保证消息绝对会被处理,不会丢失。
这个机制就像是:只要你仓库系统一天不回我信息,我就打爆你手机。
这么一来,整个消息过程不需要任何ack机制,直接跑就行。这么做,也间接解决了,消息在MQ宕机丢失的问题(连消息持久化都不用搞了)。
这套方案,在一对一的消息传递还是好使的,一对多的情况(订单系统对仓库系统+支付系统)就比较麻烦,也许这些时候,就要老老实实地ack,并且思考MQ宕机导致消息丢失的补偿方案了。