RabbitMQ可靠性保障

来一幅图:


image.png

说明:可靠性和效率是不可兼得的,保证可靠得牺牲一部分效率。

为了保障消息成功从生产者投递到broker:
采用comfirm确认消息机制,如果Broker端接受到消息,那么就会回送相应,然后生产者会监听Broker给的应答,流程图:


image.png

实现方式:


image.png

代码如下:
生产者:

public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 获取C onnection
        Connection connection = connectionFactory.newConnection();
        
        //3 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();
        
        
        //4 指定我们的消息投递模式: 消息的确认模式 
        channel.confirmSelect();
        
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.save";
        
        //5 发送一条消息
        String msg = "Hello RabbitMQ Send confirm message!";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        
        //6 添加一个确认监听
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------no ack!-----------");
            }
            
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------ack!-----------");
            }
        });

    }
}

队列收到消息之后会自动ACK(或者在消费者端可以手动选择channel的NCK),那么生产者端的监听就会收到回复。

2.如果消息路由不到指定的队列,处理路由不到的问题,那么方法一就可以使用Return Listener.

流程图如下:

image.png

如果发送消息的时候,可能因为routingkey错误,或者队列不存在,或者队列名称错误导致路由失败。
使用方式:
使用mandatory参数(即发送消息时候的第三个参数设置为true)和ReturnListener,可以实现消息无法路由的时候返回给生产者。
核心代码:

        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange,
                    String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                
                System.err.println("---------handle  return----------");
                System.err.println("replyCode: " + replyCode);
                System.err.println("replyText: " + replyText);
                System.err.println("exchange: " + exchange);
                System.err.println("routingKey: " + routingKey);
                System.err.println("properties: " + properties);
                System.err.println("body: " + new String(body));
            }
        });
        
        
        channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());

此时如果消息路由不到,生产者配置的监听会拿到该消息。

方法二是采用备份交换机(alternate-exchange),无法路由的消息会发送到这个交换机上。
代码如下:

Map<String,Object> arguments = new HashMap<String,Object>(); 
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE"); // 指定交换机的备份交换机 
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);

3.确保消息成功从队列投到消费者(注意是队列到消费者):
采用ACK机制,在channel操作队列和消费者时候即首先关闭autoAck,

    // 手工签收 必须要关闭 autoAck = false
        channel.basicConsume(queueName, false, new MyConsumer(channel));

这里我使用的是自定义消费者,自定义消费者代码:


public class MyConsumer extends DefaultConsumer {


    private Channel channel ;
    
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("body: " + new String(body));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if((Integer)properties.getHeaders().get("num") == 0) {
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        } else {
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
        
    }


}

如果是第0条消息,那么就拒绝并将此消息重回队列尾部,如果不是第0条那么就ACK。

生产者的代码如下,每次发送消息都有num标识:


public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_ack_exchange";
        String routingKey = "ack.save";
        
        
        
        for(int i =0; i<5; i ++){
            
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num", i);
            
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();
            String msg = "Hello RabbitMQ ACK Message " + i;
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }
        
    }
}

开启消费者和生产者,控制台打印如下:


image.png

可以看出第0条消息一直被循环消费(因为这个队列只绑定了一个消费者,此消费者设置了第0条消息重回队列,那么就循环消费)。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 16,048评论 2 11
  • 0 相关源码 1 你将学到 如何保证消息百分百投递成功 幂等性 如何避免海量订单生成时消息的重复消费 Confir...
    JavaEdge阅读 1,424评论 0 9
  • 《罗梦幽门》 屈犁缘创新生篇, 饮是城双闩赶门。 唯家以诺乱仗渊, 亲景罗莎集乌恬。 支朱锁那圈层赛, 意语频追命...
    春城怡景阅读 394评论 4 15
  • 在我的世界分大快乐和小快乐。 吃饭逛街看电影是小快乐。 和喜欢的人吃饭谈心,和家人逛街,和朋友讨论电影情节是大快乐...
    思思有片海阅读 569评论 0 0
  • 如果你想培养一个新习惯,微习惯基本上就是它的大幅度缩减版,把每天100个俯卧撑缩减为每天1个,每天写3000字缩减...
    周筠桐阅读 177评论 0 1