为什么要对消费端限流
- 场景一: Rabbitmq 服务器积压了有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这样情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!
- 场景二: 当有抢购活动的时候, 所有的请求都发送到了某个接口, 这时候如果不做限流, 所有的请求都到了服务器, 服务器集群做的好, 勉强能抗住, 但是到达了数据库性能的瓶颈, 大量的请求打到数据库上, 造成数据的压力巨大甚至崩溃.
消费端限流
当数据量特别大的时候,我们对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,我们无法约束生产端,这是用户的行为。所以我们应该对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。
限流操作
- 消费端的确认模式一定为手动确认。acknowledge="manual"
- 在监听器上设置concurrency属性
代码演示
开启手动确认模式
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true
设置每次获取的消息数量
@RabbitListener(queues = "test_queue", concurrency = "2")
public void handler(Channel channel, Message message, Map<String, Object> msg) throws Exception{
Thread.sleep(1000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(msg);
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
channel.basicNack(deliveryTag, true, true);
}
}
concurrency = "2"
表示每次从队列里面取出两条消息