批量发送:https://www.zhihu.com/question/415003202/answer/1421422716
实验
配置
# 设置批量消费
spring.kafka.listener.type=batch
# 批量大小
spring.kafka.producer.batch-size=2
# 提交延时, 设置实验10s
spring.kafka.producer.properties.linger.ms=10000
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 生产端缓冲区大小
生产者
@RequestMapping("/apk/{id}")
public String proAPK(@PathVariable("id") String id) throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send("topic.apkinfo1", id);
RecordMetadata recordMetadata = send.get().getRecordMetadata();
return recordMetadata.toString();
}
@RequestMapping("/apk02/{id}")
public String proAPK02(@PathVariable("id") String id) throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send("haha", id);
RecordMetadata recordMetadata = send.get().getRecordMetadata();
return recordMetadata.toString();
}
实验结果
当只有一个生产者proAPK时,向topic "topic.apkinfo1"只发送一条消息,等待linger.ms=10s后消息发出
当有多个生产者和proAPK一样,向同一个topic "topic.apkinfo1"发送消息时,当超过batch-size=2消息立马发送,未等待linger.ms=10s
-
proAPK向 "topic.apkinfo1"发送一条消息
proAPK02 向topic "haha"发送消息
proAPK02在linger.ms=10s内,向haha发送超过batch-size=2条消息,topic "topic.apkinfo1"的消息也跟着topic "haha"的消息一起发出去了
结论
设置批量发送且设置时延,当某个topic的消息超过batch-size,会把accumulator的消息全部发出去,即其他topic的也跟着一起发出去。
延申
dosend方法详解:https://blog.csdn.net/mhbsoft/article/details/104313241