场景:定时任务查询全表数据进行处理推送时,采用的策略是分页每次查询100条数据,进行业务处理,直到所有数据都处理完成。因为数据量太大导致出现:
SELECT * FROM table_name ORDER BY id ASC
![]()
这样的分页sql 性能是非常低的,因为数据库会从第一条数据遍历到第3000000条,再进行分页。
public ReturnT<String> execute(String param) {
List<Person> persons;
for (int i = 0; ; i += 100) {
String limitSql = "limit " + i + ",100";
persons = personService.selectList(new QueryWrapper<Person>()
.lambda().orderByAsc(Person::getId).last(limitSql));
//=========================业务逻辑==========================================================
persons.forEach(e -> {
try {
rocketProducer.send(RocketMqConstant.PERSON_TOPIC, RocketMqConstant.PERSON_TAG, e);
} catch (Exception ex) {
log.error("异常, {}", ex);
}
});
//===========================================================================================
//终止循环
if (persons.size() < 100) break;
}
return ReturnT.SUCCESS;
}
从上面的代码可以看出来,除了分页sql的性能问题,分页和业务也耦合在了一起。类似的代码有13处,全是使用类似的形式编写,分页每次查询100条进行业务处理的逻辑。
分页与业务进行拆分
- 分页
public static <T> void selectByConPage(
Long start, Long size, BiFunction<Long, String, List<T>> function,
Function<List<T>, Long> functionMaxId, Consumer<List<T>> consumer) {
Long maxId = 0L;
for (; ;) {
// 拼接分页条件.
String limitSql = "limit " + size;
// 执行分页查询.
List<T> apply = function.apply(maxId, limitSql);
// 获取最大id.
maxId = functionMaxId.apply(apply);
// 处理业务逻辑.
consumer.accept(apply);
// 最后一页返回.
if (apply.size() < size) break;
}
}
因为全都是默认从0条开始,每页100行,因此重载分页方法
/**
* 默认从0开始,每页100条.
*
* @param function
* @param functionMaxId
* @param consumer
* @param <T>
*/
public static <T> void selectByConPage(
BiFunction<Long, String, List<T>> function,
Function<List<T>, Long> functionMaxId, Consumer<List<T>> consumer) {
selectByConPage(0L, 100L, function, functionMaxId, consumer);
}
- 业务
private void business(List<Person> persons) {
persons.forEach(e -> {
try {
rocketProducer.send(RocketMqConstant.PERSON_TOPIC, RocketMqConstant.PERSON_TAG, e);
} catch (Exception ex) {
log.error("异常, {}", ex);
}
});
}
- 查询
public List<Person> selectByCon(Long id, String limitSql) {
LambdaQueryWrapper<Person> wrapper = new QueryWrapper<Person>().lambda()
.gt(Person::getId, id)
.orderByAsc(Person::getId).last(limitSql);
return baseMapper.selectList(wrapper);
}
- 处理查询结果,获取最大id
public Long selectMaxId(List<Person> list) {
if (Optional.ofNullable(list).isPresent()) {
Long maxId = list.stream().map(Person::getId).max(Comparator.naturalOrder()).orElse(Long.MAX_VALUE);
return maxId;
}
return Long.MAX_VALUE;
}
- 改造完成之后
将查询和业务的函数以参数的形式传入,达到分页与业务处理逻辑的分离以及复用分页的目的。
public ReturnT<String> execute(String param) {
PageAllMsgUtil.selectByConPage((id, limitSql) -> personService.selectByCon(id, limitSql),
apply -> personService.selectMaxId(apply), list -> business(list));
return ReturnT.SUCCESS;
}