RabbitMQ可以设置basicQoS(Consumer Prefetch)来对consumer进行流控,从而限制未ack的消息数量。
示例
每个consumer单独流控
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
多个consumer共享流控
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true); // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
分析
官方Java客户端提供了DefaultConsumer和QueueingConsumer。 其中QueueingConsumer内部维护了一个阻塞队列BlockingQueue,此队列就是用来缓存从queue获取的message。
private final BlockingQueue<QueueingConsumer.Delivery> _queue;
Spring amqp提供了类似的BlockingQueueConsumer,但是默认的prefetchCount是1。
链接
Consumer Prefetch
Some queuing theory: throughput, latency and bandwidth