-
业务背景
为了对订单进行统计分析,需要将订单表中的数据查询并导出来。
但是,问题来了,订单表动辄成百上千万数据,加上字段繁多,使用sql来查询,很容易就连接超时了。那么,换一种手段,将订单数据同步到定位为搜索的Elasticsearch(es)中去,再从es中查询出来。可是,问题又来了,从官方文档中介绍,从es中查询出来的数据默认限制了一万条,这个将直接影响查询的深度分页以及查询总数。官方文档介绍如下:
- 解决方式
- 调大index.max_result_window的参数,如果太大会影响性能和效率,并且很有可能导致OOM
- 如果是进行深度分页的话,可以使用滚动查询Scroll。如果是使用from+size,假设现在有5个节点,那么当一个请求分发到A节点,那么A节点将作为协调节点,然后将请求分发到其它节点,其它节点都需返回from+size条数据给A节点(可以由主分片或者副分片处理),然后再由协调节点进行排序,截取出第from页的size条数据,返回客户端,数据量大的话性能比较低。
-
如果是查询订单,可以采用切割查询的策略,流程如下:
核心代码如下:
//先从数据库中查出一部分订单id
List<Integer> orderIds = wholesaleOrderDAO.getOrderIds(stime, etime, providerId, timeType);
if (CollectionUtils.isEmpty(orderIds)){
return Pair.of(Collections.EMPTY_LIST,BigDecimal.ZERO);
}
//按照9000的容量对订单进行切割
List<List<Integer>> partition = Lists.partition(orderIds, 9000);
List<ExportOrderDataItem> orders = new ArrayList<>();
//使用多线程从es中进行订单查询
List<List<ExportOrderDataItem>> resultsAsync = taskCommonService.getResultsAsync(partition, e -> {
try {
return orderIndexService.getOrderDetail(e,drugStore,phone,providerId);
} catch (Exception exception) {
log.error("从es中查询订单数据异常",exception);
}
return new ArrayList<>();
});
for(List<ExportOrderDataItem> orderList : resultsAsync){
orders.addAll(orderList);
}
if (CollectionUtils.isEmpty(orders)){
return Pair.of(Collections.EMPTY_LIST,BigDecimal.ZERO);
}
/**
* 处理异步结果20s超时
*/
private static final long DEFAULT_TIME_OUT = 20000;
private final ThreadPoolTaskExecutor fileCommonExecutor;
/**
* 异步获取结果
* @param items 需要异步分页的参数
* @param <T> 方法参数
* @param <R> 结果
*/
public <R, T> List<R> getResultsAsync(Collection<T> items, Function<T, R> function) throws Exception {
List<R> results = new ArrayList<>(items.size());
List<Future<R>> futureList = new ArrayList<>();
for (T param : items) {
futureList.add(fileCommonExecutor.submit(() -> function.apply(param)));
}
long remainingTime = DEFAULT_TIME_OUT;
long cos = 0;
for (Future<R> future : futureList) {
remainingTime -= cos;
long beginTime = System.currentTimeMillis();
R result = future.get(remainingTime, TimeUnit.MILLISECONDS);
cos = System.currentTimeMillis() - beginTime;
results.add(result);
}
return results;
}