RabbitMq高级特性

1.消息如何保证100%投递成功

在学习队列的时候,我想很多朋友都在考虑这个问题。在讲RabbitMq的消息可靠性之前,我们要知道什么是生产端的可靠性投递:
1.保障消息的成功发出
2.保障MQ节点的成功接收
3.发送端收到MQ节点(Broker)确认应答
4.完善的消息进行补偿机制
目前BAT?TMD互联网大厂在保证生产端可靠性投递的解决方案有:
1.消息落库,对消息状态进行打表
2.消息的延迟投递,做二次确认,回调检查
我们看一下方案1,如下图所示:



第一步生产者向业务数据库和消息数据库落地数据。所谓的业务数据库就是比如类似于订单数据库这类数据库,消息数据库就是我们在投递消息之前要记录消息状态的数据库。在高并发下,一般我们不会对两者落地做事务处理,当然这也说明了在高并发场景下方案1确实有缺陷。
第二步生产者发送消息到RabbitMq
第三步Broker回传确认给生产者
第四步生产者将消息库对应的记录状态置为成功状态
这四步中,如果步骤2,步骤3甚至步骤4都有可能发生异常。所以此时我们需要一个定时任务不断的去轮询消息库,如果发现某一投递出去的消息在指定时间内还没有置为成功状态我们就要对这些消息做重新投递,让他重新做步骤1-4.这样就保证了消息的可靠性。
接下来我们看下方案2,如下图所示:



这种方案也是高并发下经常用的一种方式,Upstream service也就是生产者,它会在发送消息前业务数据库插入对应的业务数据(一定要先落库在发送消息)。DownStream监听到消息后做完对应的处理,会发送一个confirm到broker,这里的confirm不是ack,而是发送了一个消息给callback service的,它监听到这个消息后,会在msg db中落库,表示说消费者已经接受到消息并且处理完毕了。那么图中的二次延迟是神马意思呢?这个二次延迟消息是专门发送给callback service的,什么时候发送也是由生产者决定的,如果说callback service收到消息发现在msg db中没有查询到结果,此时callback service就会 RPC调用生产者再次发送一下消息,因为消费者那边失败了。
方案2相对于方案1就是说少了一步让生产者落地MSG DB的操作,这样会大大优化性能。

2.幂等性


在海量订单产生的业务高峰期,如何避免消息的重复消费问题?答案就是在消费端实现幂等性。目前业界主流的幂等性操作有两种:
1.唯一ID+指纹码机制,利用数据库主键去重
指纹码可能是外部传递进来的参数和内部的业务参数组合起来的指纹码。比如说我们在做消费的时候,指纹码可能是用户消费的时间戳,或者是时间戳加上业务内部的一些逻辑字段。select count(1) from t_order where id=唯一ID+指纹码,如果发现有的话说明已经支付过了,就不要做处理了,否则就往数据库中插入一条数据。这种方式的好处就是实现简单,坏处就是高并发下有数据库写入的性能瓶颈。解决方案就是根据ID进行分库分表提升写入性能。
2.利用Redis的原子性去实现
这种方式要考虑两个问题。

  • 我们如果需要进行数据落库的话,关键解决问题是如何保证数据库和redis做到原子性
  • 如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略。

3.Confirm确认消息详解




代码示例:

  • 生产者
public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.234.231.49");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 获取C onnection
        Connection connection = connectionFactory.newConnection();
        
        //3 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();
        
        
        //4 指定我们的消息投递模式: 消息的确认模式 
        channel.confirmSelect();
        
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.save";
        
        //5 发送一条消息
        String msg = "Hello RabbitMQ Send confirm message!";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        
        //6 添加一个确认监听
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------no ack!-----------");
            }
            
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------ack!-----------");
            }
        });
    }
}
  • 消费者
public class Consumer {

    
    public static void main(String[] args) throws Exception {
        
        
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.234.231.49");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 获取C onnection
        Connection connection = connectionFactory.newConnection();
        
        //3 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.#";
        String queueName = "test_confirm_queue";
        
        //4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //5 创建消费者 
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);
        
        while(true){
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            
            System.err.println("消费端: " + msg);
        }
        
        
    }
}

4.Return返回消息详解



代码示例:
生产者:

public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.234.231.49");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_return_exchange";
        String routingKey = "return.save";
        String routingKeyError = "abc.save";
        
        String msg = "Hello RabbitMQ Return Message";
        
        
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange,
                    String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                
                System.err.println("---------handle  return----------");
                System.err.println("replyCode: " + replyCode);
                System.err.println("replyText: " + replyText);
                System.err.println("exchange: " + exchange);
                System.err.println("routingKey: " + routingKey);
                System.err.println("properties: " + properties);
                System.err.println("body: " + new String(body));
            }
        });
        
        
        channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
        
        //channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
        
        
        
        
    }
}

消费者:

public class Consumer {
    
    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.234.231.49");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_return_exchange";
        String routingKey = "return.#";
        String queueName = "test_return_queue";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        
        channel.basicConsume(queueName, true, queueingConsumer);
        
        while(true){
            
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消费者: " + msg);
        }
    }
}

5.自定义消费者使用

在之前的例子中我们就是在代码中编写while循环,然后进行consumer.nextDelivery方法进行获取吓一跳消息,然后进行消费处理。但是我们也可以使用自定义的Consumer会更加方便,也是实际工作中最常用的使用方式。



代码示例:
生产者

public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.234.231.49");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_consumer_exchange";
        String routingKey = "consumer.save";
        
        String msg = "Hello RabbitMQ Consumer Message";
        
        for(int i =0; i<5; i ++){
            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        }
        
    }
}

消费者

public class Consumer {

    
    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.234.231.49");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        
        String exchangeName = "test_consumer_exchange";
        String routingKey = "consumer.#";
        String queueName = "test_consumer_queue";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        channel.basicConsume(queueName, true, new MyConsumer(channel));
        
        
    }
}

自定义消费者

public class MyConsumer extends DefaultConsumer {


    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }


}

6.限流策略

什么是消费端的限流,我们可以通过一个例子来说明:假设一个场景,rabbitmq服务器有上完条未处理的消息,我们随便打开一个消费者客户端,巨量的消息瞬间把消费端炸了。。。。
所以RabbitMq提供了一种qos(服务质量保证)功能,即在非自动确认的消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
prefetchSize 消息大小的限制,比如1MB,为0的时候,表示无限
prefetchCount:告诉rabbitMq不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,即该consumer将block掉,直到有消息ack。
global:true\false将设置应用于channel还是consumer
prefetchSize和global这两项,rabbitmq没有事先,暂且不研究。prefetch_count在no_ask=false的情况下生效,即在自动应答的情况下这两个值是不生效的。
代码示例:
生产者:

public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.234.231.49");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_qos_exchange";
        String routingKey = "qos.save";
        
        String msg = "Hello RabbitMQ QOS Message";
        
        for(int i =0; i<5; i ++){
            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        }
        
    }
}

消费者:

public class Consumer {

    
    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.234.231.49");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        
        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //1 限流方式  第一件事就是 autoAck设置为 false
        
        channel.basicQos(0, 3, false);
        
        channel.basicConsume(queueName, false, new MyConsumer(channel));
        
        
    }
}

7.消费端ACK与重回队列机制

消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿。如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功!
消费端的重回队列是为了对没有处理成功的消息,把消息重新回递给Broker,但是在实际应用中,我们都会关闭重回队列,也就是设置为False。
代码示例:
生产者:

public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.234.231.49");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_ack_exchange";
        String routingKey = "ack.save";
        
        
        
        for(int i =0; i<5; i ++){
            
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num", i);
            
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();
            String msg = "Hello RabbitMQ ACK Message " + i;
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }
        
    }
}

消费者:

public class Consumer {

    
    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.234.231.49");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        
        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "ack.#";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        // 手工签收 必须要关闭 autoAck = false
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}

自定义消费者:

public class MyConsumer extends DefaultConsumer {
    private Channel channel ;
    
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("body: " + new String(body));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if((Integer)properties.getHeaders().get("num") == 0) {
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        } else {
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
        
    }
}

8.TTL消息详解


这里的代码我们只演示一下设置单个消息的过期时间,而不是整个队列中消息的过期时间,所以只需要看看生产者的代码就可以了:

public class Procuder {

    
    public static void main(String[] args) throws Exception {
        //1 创建一个ConnectionFactory, 并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.234.231.49");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();
        
        //3 通过connection创建一个Channel
        Channel channel = connection.createChannel();
        
        Map<String, Object> headers = new HashMap<>();
        headers.put("my1", "111");
        headers.put("my2", "222");
        
        
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .expiration("10000")
                .headers(headers)
                .build();
        
        //4 通过Channel发送数据
        for(int i=0; i < 5; i++){
            String msg = "Hello RabbitMQ!";
            //1 exchange   2 routingKey
            channel.basicPublish("", "test001", properties, msg.getBytes());
        }

        //5 记得要关闭相关的连接
        channel.close();
        connection.close();
    }
}

9.死信队列






代码示例:
生产者:

public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.234.231.49");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_dlx_exchange";
        String routingKey = "dlx.save";
        
        String msg = "Hello RabbitMQ DLX Message";
        
        for(int i =0; i<1; i ++){
            
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .expiration("3000")
                    .build();
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }
        
    }
}

消费者

public class Consumer {

    
    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("49.234.231.49");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        // 这就是一个普通的交换机 和 队列 以及路由
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.#";
        String queueName = "test_dlx_queue";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        
        Map<String, Object> agruments = new HashMap<String, Object>();
        agruments.put("x-dead-letter-exchange", "dlx.exchange");
        //这个agruments属性,要设置到声明队列上
        channel.queueDeclare(queueName, true, false, false, agruments);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //要进行死信队列的声明:
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        channel.queueDeclare("dlx.queue", true, false, false, null);
        channel.queueBind("dlx.queue", "dlx.exchange", "#");
        
        channel.basicConsume(queueName, true, new MyConsumer(channel));
        
        
    }
}

自定义消费者

public class MyConsumer extends DefaultConsumer {


    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }


}

实验结果


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容