RabbitMQ消费者的几个参数

分布式消息中间件

RabbitMQ是用Erlang语言编写的分布式消息中间件,常常用在大型网站中作为消息队列来使用,主要目的是各个子系统之间的解耦和异步处理。消息中间件的基本模型是典型的生产者-消费者模型,生产者发送消息到消息队列,消费者监听消息队列,收到消息后消费处理。

在使用RabbitMQ做消息分发时,主要有三个概念要注意:Exchange,RoutingKey,Queue。

Exchange可以理解为交换器,RoutingKey可以理解为路由,Queue作为真实存储消息的队列和某个Exchange绑定,具体如何路由到感兴趣的Queue则由Exchange的三种模式决定:

  • fanout
  • topic
  • direct

Exchange为fanout时,生产者往此Exchange发送的消息会发给每个和其绑定的Queue,此时RoutingKey并不起作用;Exchange为topic时,生产者可以指定一个支持通配符的RoutingKey(如demo.*)发向此Exchange,凡是Exchange上RoutingKey满足此通配符的Queue就会收到消息;direct类型的Exchange是最直接最简单的,生产者指定Exchange和RoutingKey,然后往其发送消息,消息只能被绑定的满足RoutingKey的Queue接受消息。(通常如果不指定RoutingKey的具体名字,那么默认的名字其实是Queue的名字)

Concurrency与Prefetch

在通常的使用中(Java项目),我们一般会结合spring-amqp框架来使用RabbitMQ,spring-amqp底层调用RabbitMQ的java client来和Broker交互,比如我们会用如下配置来建立RabbitMQ的连接池、声明Queue以及指明监听者的监听行为:

<rabbit:connection-factory id="connectionFactory" />

<!-- template非必须,主要用于生产者发送消息-->
<rabbit:template id="template" connection-factory="connectionFactory" />

<rabbit:queue name="remoting.queue" />
<rabbit:listener-container connection-factory="connectionFactory" concurrency="2" prefetch="3">
    <rabbit:listener ref="listener" queue-names="remoting.queue" />
</rabbit:listener-container>

listener-container可以设置消费者在监听Queue的时候的各种参数,其中concurrency和prefetch是本篇文章比较关心的两个参数,以下是spring-amqp文档的解释:

prefetchCount(prefetch)
The number of messages to accept from the broker in one socket frame. The higher this is the faster the messages can be delivered, but the higher the risk of non-sequential processing. Ignored if the acknowledgeMode
is NONE. This will be increased, if necessary, to match the txSize

concurrentConsumers(concurrency)

The number of concurrent consumers to initially start for each listener.

简单解释下就是concurrency设置的是对每个listener在初始化的时候设置的并发消费者的个数,prefetch是每次从一次性从broker里面取的待消费的消息的个数,上面的配置在监控后台看到的效果如下:


图中可以看出有两个消费者同时监听Queue,但是注意这里的消息只有被一个消费者消费掉就会自动ack,另外一个消费者就不会再获取到此消息,Prefetch Count为配置设置的值3,意味着每个消费者每次会预取3个消息准备消费。每个消费者对应的listener有个Exclusive参数,默认为false, 如果设置为true,concurrency就必须设置为1,即只能单个消费者消费队列里的消息,适用于必须严格执行消息队列的消费顺序(先进先出)。

源码剖析

这里concurrency的实现方式不看源码也能猜到,肯定是用多线程的方式来实现的,此时同一进程下打开的本地端口都是56278.下面看看listener-contaner对应的org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer的源码:

protected int initializeConsumers() {
  int count = 0;
  synchronized (this.consumersMonitor) {
    if (this.consumers == null) {
        this.cancellationLock.reset();
        this.consumers = new HashMap<BlockingQueueConsumer, Boolean>(this.concurrentConsumers);
        for (int i = 0; i < this.concurrentConsumers; i++) {
            BlockingQueueConsumer consumer = createBlockingQueueConsumer();
            this.consumers.put(consumer, true);
            count++;
        }
    }
  }
  return count;
}

container启动的时候会根据设置的concurrency的值(同时不超过最大值)创建n个BlockingQueueConsumer。

protected void doStart() throws Exception {
  //some code
  synchronized (this.consumersMonitor) {
    int newConsumers = initializeConsumers();

    //some code
    Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
    for (BlockingQueueConsumer consumer : this.consumers.keySet()) {
        AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
        processors.add(processor);
        this.taskExecutor.execute(processor);
    }
    //some code
  }
}

在doStart()方法中调用initializeConsumers来初始化所有的消费者,AsyncMessageProcessingConsumer作为真实的处理器包装了BlockingQueueConsumer,而AsyncMessageProcessingConsumer其实实现了Runnable接口,由this.taskExecutor.execute(processor)来启动消费者线程。

private final class AsyncMessageProcessingConsumer implements Runnable {
  private final BlockingQueueConsumer consumer;
  private final CountDownLatch start;
  private volatile FatalListenerStartupException startupException;

  private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
    this.consumer = consumer;
    this.start = new CountDownLatch(1);
  }

  //some code
  @Override
  public void run() {
     //some code
  }
}

那么prefetch的值意味着什么呢?其实从名字上大致能看出,BlockingQueueConsumer内部应该维护了一个阻塞队列BlockingQueue,prefetch应该是这个阻塞队列的长度,看下BlockingQueueConsumer内部有个queue,这个queue不是对应RabbitMQ的队列,而是Consumer自己维护的内存级别的队列,用来暂时存储从RabbitMQ中取出来的消息:

private final BlockingQueue<Delivery> queue;

public BlockingQueueConsumer(ConnectionFactory connectionFactory,
            MessagePropertiesConverter messagePropertiesConverter,
            ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
            boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
            Map<String, Object> consumerArgs, boolean exclusive, String... queues) {
  //some code
  this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);
}

BlockingQueueConsumer的构造函数清楚说明了每个消费者内部的队列大小就是prefetch的大小。

业务问题

前面说过,设置并发的时候,要考虑具体的业务场景,对那种对消息的顺序有苛刻要求的场景不适合并发消费,而对于其他场景,比如用户注册后给用户发个提示短信,是不太在意哪个消息先被消费,哪个消息后被消费,因为每个消息是相对独立的,后注册的用户先收到短信也并没有太大影响。

设置并发消费除了能提高消费的速度,还有另外一个好处:当某个消费者长期阻塞,此时在当前消费者内部的BlockingQueue的消息也会被一直阻塞,但是新来的消息仍然可以投递给其他消费者消费,这种情况顶多会导致prefetch个数目的消息消费有问题,而不至于单消费者情况下整个RabbitMQ的队列会因为一个消息有问题而全部堵死。所有在合适的业务场景下,需要合理设置concurrency和prefetch值。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,122评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,070评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,491评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,636评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,676评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,541评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,292评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,211评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,655评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,846评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,965评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,684评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,295评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,894评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,012评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,126评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,914评论 2 355

推荐阅读更多精彩内容