ps:
1、本文示例使用的消息中间件为Rabbitmq
。
2、示例代码是以测试用例的形式给出。
3、使用@ActiveProfiles( active_profile(s) )
让指定配置生效。
多消费者
Spring Cloud Stream
消费消息时,默认只启动一个消费者,`spring.cloud.stream.binding
可以简单类比为单线程,所以最简单的提高消费端吞吐量的方式就是增加消费者数量。
消费者数量的配置为:spring.cloud.stream.bindings.<channelName>.consumer
,例如:spring.cloud.stream.bindings.input.consumer.concurrency=3
。
示例
以下代码可在 源码 查看。
配置
spring:
application:
name: scas-data-collection
profiles:
active:
default
cloud:
stream:
binders:
rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
packetUplinkOutput:
destination: packetUplinkTopic
content-type: application/json
binder: rabbit
packetUplinkInput:
destination: packetUplinkTopic
content-type: application/json
group: ${spring.application.name}
binder: rabbit
consumer:
concurrency: 10 # 初始/最少/空闲时 消费者数量。默认1
上述配置,应用启动后会创建10个消费者。可以看到,spring.cloud.stream.bindings.<channelName>.consumer
的默认配置为1,所以若没有显式配置,Spring Cloud Stream
初始只会帮我们创建一个消费者。注意,这里使用的是 初始 而不是 最终,至于为什么,请允许我卖个关子。
代码
消息模型
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PacketModel {
/**
* 设备 eui
*/
private String devEui;
/**
* 数据
*/
private String data;
// 省略其他字段
}
测试用例
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("concurrency")
@EnableBinding({ScasConcurrencyTest.MessageSink.class, ScasConcurrencyTest.MessageSource.class})
public class ScasConcurrencyTest {
@Autowired
private PacketUplinkProducer packetUplinkProducer;
private Random random = new Random();
private List<String> devEuis = new ArrayList<>(10);
@PostConstruct
private void initDevEuis() {
devEuis.add("10001");
devEuis.add("10002");
devEuis.add("10003");
devEuis.add("10004");
devEuis.add("10005");
devEuis.add("10006");
devEuis.add("10007");
devEuis.add("10008");
devEuis.add("10009");
devEuis.add("10010");
}
@Test
public void test() throws InterruptedException {
for (int i = 0; i < 100000; i++) {
String devEui = getDevEuis();
packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
}
Thread.sleep(1000000);
}
private String getDevEuis() {
return devEuis.get(random.nextInt(10));
}
@Component
public static class PacketUplinkProducer {
@Autowired
private MessageSource messageSource;
public void publish(PacketModel model) {
log.info("发布上行数据包消息. model: [{}].", model);
messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
}
}
@Component
public static class PacketUplinkHandler {
@StreamListener("packetUplinkInput")
public void handle(PacketModel model) throws InterruptedException {
Thread.sleep(10);
log.info("消费上行数据包消息. model: [{}].", model);
}
}
public interface MessageSink {
@Input("packetUplinkInput")
SubscribableChannel packetUplinkInput();
}
public interface MessageSource {
@Output("packetUplinkOutput")
MessageChannel packetUplinkOutput();
}
}
运行测试用例
运行测试用例后,访问 Rabbitmq可视化页面 可以看到类似下图的页面:
若把 spring.cloud.stream.bindings.packetUplinkInput.consumer
改成1,即只启动一个消费者,然后再次启动测试用例,可以看到:
上图中,Message rates
展示了消息的发布(incoming)及消费的速率(ack),即本文所说的吞吐量。
结合上面两个测试用例,在大量消息堆积的情况下,增加消费者数量能大幅度提高消费端的吞吐量。消费者数量从 1 增加到 10后,吞吐量提高接近10倍。
各种情景下不同消费者数量的吞吐量
上述的测试用例,为了模拟真实环境并控制消费速度,消费消息时会睡眠 10ms
,而接下来的测试需要放开消费速度的控制,所以需要先把 Thread.sleep(10);
注释掉。
消息发布速度 >= 消息消费速度
上述所示的图片,在 消息发布速度 >> 消息消费速度 的情况下,增加消费者数量能大幅度提高消费端的吞吐量,一直到 消息发布速度=消息消费速度 吞吐量达到最大,可以看到,吞吐量的增长,基本呈指数级增长。当然,理论上肯定没这么夸张的,会出现这种情况,很大程度是因为消息的消费端和发布端都在同一台机器上,而且消费消息时基本没有其他时间消耗。以后有条件的话,会把消费端和发布端部署在不同机器上,再做测试。
消费大量堆积消息
上述所示的图片,消费端的吞吐量并没有随着消费者数量的增加而成倍增长,甚至增加到10的时候,约等于5个消费者时的吞吐量。出现这种情况,是因为随着消费者数量的增加,I/O逐渐达到饱和,即I/O成为了瓶颈,所以增加消费者数量并没有达到预期效果。而之所以I/O会成为了瓶颈,是因为Spring Cloud Stream 默认会创建持久化队列,即消息会除了会保存在内存,还会序列化到磁盘,而相应的,消息被成功消费后,也需要从内存和磁盘清除。
结论
无论怎样,毋庸置疑的是,适当增加消费者数量,肯定可以提高消费端的吞吐量。重点就在于 适当 二字,但是如何做到适当呢?好在 Spring Cloud Stream
已经帮我们考虑到这种情况了,并给出了解决方案——动态增加消费者,并且可以控制最大消费者数量。所以前文才会说:应用启动时创建的消费者数量可能只是暂时的,并不是最终的数量。至于如何配置,可参考 Spring Cloud Stream 进阶配置——高吞吐量(二)
推荐阅读
Spring Cloud Stream 进阶配置——高吞吐量(二)——弹性消费者数量
Spring Cloud Stream 进阶配置——高吞吐量(三)——批量预取消息(prefetch)