RabbitMQ 消费端的限流策略

假设一个场景,由于我们的消费端突然全部不可用了,导致 rabbitMQ 服务器上有上万条未处理的消息,这时候如果没做任何现在,随便开启一个消费端客户端,就会导致巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多的数据,就会导致消费端变得巨卡,有可能直接崩溃不可用了。所以在实际生产中,限流保护是很重要的。

rabbitMQ 提供了一种 qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 QOS 的值)未被确认前,不进行消费新的消息。关键代码就是在声明消费者代码里面的

void basicQos(unit prefetchSize , ushort prefetchCount, bool global )
  1. prefetchSize:0

  2. prefetchCount:会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack

  3. global:true、false 是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别

备注:prefetchSize 和 global 这两项,rabbitmq 没有实现,暂且不研究。特别注意一点,prefetchCount 在 no_ask=false 的情况下才生效,即在自动应答的情况下这两个值是不生效的。

代码演示:

代码地址:    https://github.com/hmilyos/rabbitmqdemo.git  rabbitmq-api 项目下

生产端代码基本没变化,改了 exchange 和 routingKey 而已

public class Procuder {

    private static final Logger log = LoggerFactory.getLogger(Procuder.class);

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String msg = "Hello RabbitMQ limit Message";
        for(int i = 0; i < 5; i ++){
            log.info("生产端发送:{}", msg + i);
            channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + i).getBytes());
        }
    }
}

消费端代码需要修改一下 autoAck 设置为 false **
增加 ** channel.basicQos(0, 1, false);

完整的消费端代码如下

/**
 * 使用自定义消费者
 */
public class Consumer {

    private static final Logger log = LoggerFactory.getLogger(Consumer.class);
    
    public static final String EXCHANGE_NAME = "test_qos_exchange";
    public static final String EXCHANGE_TYPE = "topic";
    public static final String ROUTING_KEY_TYPE = "qos.#";
    public static final String ROUTING_KEY = "qos.save";
    public static final String QUEUE_NAME = "test_qos_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
        //2 获取C onnection
        Connection connection = connectionFactory.newConnection();
        //3 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_TYPE);
        
        /**
         * prefetchSize:0
         prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,限速N个
            即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack 回来,你再发送 N 个过来
         global:true\false 是否将上面设置应用于channel级别,false是consumer级别
         prefetchSize 和global这两项,rabbitmq没有实现,暂且不研究
         */
        channel.basicQos(0, 1, false);

        //使用自定义消费者
        //1 限流方式  第一件事就是 autoAck设置为 false
      //使用自定义消费者
        channel.basicConsume(QUEUE_NAME, false, new MyConsumer(channel));
        log.info("消费端启动成功");
    }
}

自定义消费者

public class MyConsumer extends DefaultConsumer {

    private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
    
     private Channel channel;
     
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag,  //消费者标签
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        
        log.info("------limit-----consume message----------");
        log.info("consumerTag: " + consumerTag);
        log.info("envelope: " + envelope);
        log.info("properties: " + properties);
        log.info("body: " + new String(body));
        //一定要手动ACK回去
       //channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

然后启动消费端,上管控台查看 test_qos_exchange 和 test_qos_queue 是否生成了


image

确认 test_qos_exchange 上绑定了 test_qos_queue

image

启动生产端发送 5 条消息

image

发现消费端只打印了一条消息

image

从管控台上也看到总共 5 条消息,有 4 条等待着,一条消费了但是没有 ack 回去


image

修改自定义消费者里面的代码,如下所示

public class MyConsumer extends DefaultConsumer {

    private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
    
     private Channel channel;
     
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag,  //消费者标签
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        
        log.info("------limit-----consume message----------");
        log.info("consumerTag: " + consumerTag);
        log.info("envelope: " + envelope);
        log.info("properties: " + properties);
        log.info("body: " + new String(body));
        //一定要手动ACK回去
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

重启消费端,看到消费端就按照一条一条消费,并且 ACK 回去了

image
image

如上所示就是简单的RabbitMQ消费端的限流策略

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,928评论 6 509
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,748评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,282评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,065评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,101评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,855评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,521评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,414评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,931评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,053评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,191评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,873评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,529评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,074评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,188评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,491评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,173评论 2 357

推荐阅读更多精彩内容