前言:
这是1张相对简单些报表示例,用sql语句也能实现,但主要是为了便于大家理解ES聚合实现报表的使用方法。
之前写过"java实现日报表、月报表统计,没数据补0"文章,https://blog.csdn.net/JRocks/article/details/113841913,方法是用sql语句查询数据库得到结果,然后通过java代码实现日期无数据自动补0,这种方法是非常简单方便。
但也有弊端,如有些数据展示用sql语句关联查询非常麻烦并且效率低下、或者说是sql语句实现不了、又或者sql+java代码实现也很麻烦、又或者是随着业务数据迅速增长,复杂的sql关联查询导致页面响应速度慢等等情况,所以考虑使用ES来做处理,一是可以提高查询效率,二是ES可以自动补0。
产品需求:
需求描述:
- 首先大家可以理解为这是1张订单表,表中有“购买周期、购买来源”字典字段;
- 这是1张年月日报表,同时筛选条件允许跨年、跨月、跨日,我们需要做的就是根据查询条件将数据从“合计、购买周期、购买来源”3个维度进行汇总,特别需要说明一点就是"人数"汇总字段,相同用户需要去重;
- 项目已经上线,所以既需要对历史数据进行推送ES处理,又需要对单笔订单购买进行数据推送ES处理;
思路步骤:很重要
- kibana中新建ES数据索引;
- 新建批量添加ES索引数据的方法;
- 历史数据初始化推送ES对应索引中;
- kibana中新建ES查询模板;
- kibana中新建ES汇总查询模板
(注:也就是产品需求中表格下的最后一列合计相关字段内容,是显示当前查询条件下的所有汇总,不只是显示当前分页数据结果); - 对ES查询模板的数据结果进行汇总返回前端JSON数据;
- 单笔订单购买进行数据推送ES处理;
注明:这是微服务项目,文章中提到的是2个module,其中关于ES相关的代码单独是1个module,称为dg-search,在代码实战中我会列出。
代码实战:按照思路步骤逐步展开
一:kibana中新建ES数据索引;
首先得搭建好ES,搭建过程这里不展开,kibana界面如下
# 1、新建保证金购买统计表索引
PUT dg_financial_order_report
{
"mappings": {
"properties": {
"orderTime":{
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 20
}
}
},
"totalType":{
"type": "integer"
},
"deadline":{
"type": "integer"
},
"orderType":{
"type": "integer"
},
"qty":{
"type": "long"
},
"orderMoney":{
"type": "float"
},
"people":{
"type": "long"
}
}
}
}
二:新建批量添加ES索引数据的方法(dg-search项目)
// -------------------------------------------------------------------------------------------
// Feign接口
package com.dg.mall.search.feign.feign;
@FeignClient(name = "dg-search", path = "/dg-search/api/bzjFinanceReportEs")
public interface BzjFinanceReportFeignService {
/**
* 批量新增保证金购买表数据
*/
@PostMapping("/batchAddOrUpdateDepositOrder")
void batchAddOrUpdateDepositOrderEsMapping(@RequestBody List<DepositOrderMappingReq> list);
/**
* 保证金购买表数据查询
* @param depositBucketsReq
* @return
*/
@GetMapping("/listDepositOrder")
List<DepositOrderBucketsRes> listDepositOrder(@SpringQueryMap DepositBucketsReq depositBucketsReq);
/**
* 保证金购买表--统计金额
* @param depositBucketsReq
* @return
*/
@GetMapping("/getDepositOrderTotal")
DepositOrderBucketsRes getDepositOrderTotal(@SpringQueryMap DepositBucketsReq depositBucketsReq);
}
// -------------------------------------------------------------------------------------------
// Feign接口实现类
@RestController
@RequestMapping("/api/bzjFinanceReportEs")
public class BzjFinanceReportFeignProvider implements BzjFinanceReportFeignService {
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* 批量新增保证金购买表数据
*/
@Override
public void batchAddOrUpdateDepositOrderEsMapping(List<DepositOrderMappingReq> list) {
try {
if (CollectionUtils.isEmpty(list)) {
return;
}
BulkRequest bulkRequest = new BulkRequest();
IndexRequest request = null;
for (DepositOrderMappingReq req : list) {
request = new IndexRequest("POST");
request.index( "索引名"); // dg_financial_order_report
request.id(req.getId());
request.source(JSON.toJSONString(req), XContentType.JSON);
bulkRequest.add(request);
}
restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkListener);
} catch (Exception e) {
LogUtil.error("批量添加保证金存款表es数据失败! message:{} --", e, e.getMessage());
throw new ServiceException(SearchExceptionEnum.BATCH_ADD_BZJ_ORDER_ES_FAIL);
}
}
}
实体类
/**
* <p>
* 保证金购买统计表--批量新增ES数据,请求实体
* <p>
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DepositOrderMappingReq {
/**
* 存款ID
*/
private String id;
/**
* 购买日期
*/
private String orderTime;
/**
* 合计
*/
private Integer totalType = null;
/**
* 购买周期
*/
private Integer deadline = null;
/**
* 购买来源
*/
private Integer orderType = null;
/**
* 笔数
*/
private Long qty = 0L;
/**
* 金额
*/
private BigDecimal orderMoney = BigDecimal.ZERO;
/**
* 人数(已去重)
*/
private Long people = 0L;
}
/**
* <p>
* 保证金购买统计表--ES数据查询请求实体
* <p>
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DepositBucketsReq {
/**
* 开始时间
*/
private String gteTime;
/**
* 结束时间
*/
private String lteTime;
/**
* 统计类型(年月日)
*/
private String interval;
/**
* 时间格式化
*/
private String format;
}
三:历史数据初始化推送ES索引
说明:这里采用任务调度的方式执行(没有启用),便于操作数据初始化,只是上线之后手动执行一次该定时任务。
package com.dg.mall.financial.jobhandle;
/**
* <p>
* 推送Es数据定时任务
* <p>
*/
@Service
public class SyncDepositOrderToEsHandle extends IJobHandler {
@Resource
private DepositOrderService orderService;
@Resource
private BzjFinanceReportFeignService bzjFinanceReportFeignService;
@Resource
private DepositRollReportFeignService depositRollReportFeignService;
@Resource
private FinancialProducer financialProducer;
private static ThreadLocal<String> threadLocal = new ThreadLocal<>();
@XxlJob("syncDepositOrderToEsHandle")
@Override
public ReturnT<String> execute(String param) throws Exception {
// 这里采用这种写法的目的是:执行任务调度后可以马上返回结果提示
threadLocal.set(param);
Thread thread = new Thread(new SyncDepositOrderToEsHandle.SynExecute());
thread.start();
Thread expiredThread = new Thread(new SyncDepositOrderToEsHandle.SynExpiredExecute());
expiredThread.start();
return ReturnT.SUCCESS;
}
class SynExecute implements Runnable {
@Override
public void run() {
//考虑数据量过大,进行分页推送数据,每次推送100条
int current = 1, size = 100;
for (; ; ) {
PageVO pageVO = new PageVO<>(current, size);
pageVO.setSearchCount(false);
// 查询所有订单数据,按合计、购买来源、购买周期进行区分,见下面的mapping.xml
PageVO<DepositOrderMappingRes> res = orderService.getSyncPushOrderDataToEs(pageVO, threadLocal.get());
if (ObjectUtil.isEmpty(res.getRecords()) || res.getRecords().size() < 1) {
break;
}
final List<DepositOrderMappingReq> orderMappingReqs = Lists.newArrayList();
for (DepositOrderMappingRes mappingRes : res.getRecords()) {
DepositOrderMappingReq reportReq = orderService.batchAddOrderData(mappingRes);
orderMappingReqs.add(reportReq);
}
if (CollectionUtils.isNotEmpty(orderMappingReqs)) {
// 调用dg-search批量新增的ES的方法
bzjFinanceReportFeignService.batchAddOrUpdateDepositOrderEsMapping(orderMappingReqs);
}
current++;
}
}
}
}
orderService.getSyncPushOrderDataToEs() 方法的mapping.xml
<select id="getSyncPushOrderDataToEs"
resultType="com.dg.mall.financial.vo.res.deposit.DepositOrderMappingRes">
-- 1、合计
SELECT
o.id AS id,
DATE_FORMAT( o.created_time, '%Y-%m-%d' ) AS orderTime,
0 AS totalType,
NULL AS deadline,
NULL AS orderType,
count( * ) AS qty,
sum( o.order_money ) AS orderMoney,
o.user_id AS userId
FROM
dg_deposit_order o
JOIN dg_deposit_order_subsidy dos on dos.order_id = o.order_id
WHERE
<![CDATA[ o.order_status < 4 ]]>
<if test="param != null and param !=''">
<![CDATA[ and DATE_FORMAT( o.created_time, '%Y-%m-%d' ) = #{param} ]]>
</if>
GROUP BY
orderTime,
userId
UNION ALL
-- 2、购买周期
SELECT
o.id AS id,
DATE_FORMAT( o.created_time, '%Y-%m-%d' ) AS orderTime,
NULL AS totalType,
dos.deadline AS deadline,
NULL AS orderType,
count( * ) AS qty,
sum( o.order_money ) AS orderMoney,
o.user_id AS userId
FROM
dg_deposit_order o
JOIN dg_deposit_order_subsidy dos on dos.order_id = o.order_id
WHERE
<![CDATA[ o.order_status < 4 ]]>
<if test="param != null and param !=''">
<![CDATA[ and DATE_FORMAT( o.created_time, '%Y-%m-%d' ) = #{param} ]]>
</if>
GROUP BY
orderTime,
userId,
deadline
UNION ALL
-- 3、购买来源
SELECT
o.id AS id,
DATE_FORMAT( o.created_time, '%Y-%m-%d' ) AS orderTime,
NULL AS totalType,
NULL AS deadline,
o.order_type AS orderType,
count( * ) AS qty,
sum( o.order_money ) AS orderMoney,
o.user_id AS userId
FROM
dg_deposit_order o
JOIN dg_deposit_order_subsidy dos on dos.order_id = o.order_id
WHERE
<![CDATA[ o.order_status < 4 ]]>
<if test="param != null and param !=''">
<![CDATA[ and DATE_FORMAT( o.created_time, '%Y-%m-%d' ) = #{param} ]]>
</if>
GROUP BY
orderTime,
userId,
orderType
</select>
四:kibana中新建ES查询模板
有小伙伴会问:为什么使用查询模板,它有什么好处呢?
- 首先:我认为更好上手,可以提升你的成功自信心,“模板”从字面上讲就是给人一种套用,更简单这种感觉;
- 第二:更直观、可以减少java代码量,让你更专心关注ES的聚合写法,如果你手动用java代码写过ES的复杂聚合,你会发现使用模板确实更方便好用;
# 保证金购买模板查询
GET /_search/template
{
"id":"search_order_collect_template",
"params":{
"gteTime":"2021-02-02",
"lteTime":"2021-02-02",
"interval":"day",
"format":"yyyy-MM-dd"
}
}
# 保证金购买模板
POST _scripts/search_order_collect_template
{
"script": {
"lang": "mustache",
"source": {
"size": 0,
"query": {
"range": {
"orderTime": {
"gte": "{{gteTime}}",
"lte": "{{lteTime}}",
"format": "yyyy-MM-dd"
}
}
},
"aggs": {
"group_date_histogram_data": {
"date_histogram": {
"field": "orderTime",
"calendar_interval": "{{interval}}",
"format": "{{format}}"
},
"aggs": {
"group_totalType_data": {
"terms": {
"field": "totalType"
},
"aggs": {
"qty": {
"sum": {
"field": "qty"
}
},
"orderMoney": {
"sum": {
"field": "orderMoney"
}
},
"people": {
"sum": {
"field": "people"
}
}
}
},
"group_orderType": {
"terms": {
"field": "orderType"
},
"aggs": {
"qty": {
"sum": {
"field": "qty"
}
},
"orderMoney": {
"sum": {
"field": "orderMoney"
}
},
"people": {
"sum": {
"field": "people"
}
}
}
},
"group_deadline": {
"terms": {
"field": "deadline"
},
"aggs": {
"qty": {
"sum": {
"field": "qty"
}
},
"orderMoney": {
"sum": {
"field": "orderMoney"
}
},
"people": {
"sum": {
"field": "people"
}
}
}
}
}
}
}
}
}
}
模板查询显示效果
java代码(dg-search项目)
//feign接口
/**
* 保证金购买表数据查询
* @param depositBucketsReq
* @return
*/
@GetMapping("/listDepositOrder")
List<DepositOrderBucketsRes> listDepositOrder(@SpringQueryMap DepositBucketsReq depositBucketsReq);
// 实现类
@Override
public List<DepositOrderBucketsRes> listDepositOrder(DepositBucketsReq depositBucketsReq) {
final Map<String, Object> params = (Map<String, Object>) JSON.toJSON(depositBucketsReq);
String templateName = elasticsearchConfig.bzjConfig.getSearch_order_collect_template();
LogUtil.info("模板名称:{} 请求参数为:{}", templateName, params.toString());
SearchResponse searchResponse = EsTemplateUtil.getEsByTemplate(elasticsearchConfig.bzjConfig.getDgFinancialOrderReportIndex(), templateName,
params, restHighLevelClient);
Aggregations aggregations = searchResponse.getAggregations();
// 和统计模板的不同之处
ParsedDateHistogram dateHistogram = aggregations.get("group_date_histogram_data");
if (dateHistogram.getBuckets().size() < 1) {
return null;
}
return getDepositOrderCollect(dateHistogram.getBuckets());
}
private List<DepositOrderBucketsRes> getDepositOrderCollect(List<? extends Histogram.Bucket> buckets){
List<DepositOrderBucketsRes> res = Lists.newLinkedList();
buckets.stream().forEach(dateBucket -> {
String dateString = dateBucket.getKeyAsString();
DepositOrderBucketsRes build = DepositOrderBucketsRes.builder()
.orderTime(dateString)
.build();
Aggregations aggregations = dateBucket.getAggregations();
// 获取合计聚合数据
getOrderSumAggsData(aggregations, build);
// 获取购买周期数据
getOrderDeadlineAggsData(build, aggregations);
// 获取购买来源数据
getOrderSourceAggsData(aggregations, build);
res.add(build);
});
return res;
}
/**
* 获取合计聚合数据
* @param aggregations
* @param build
*/
public void getOrderSumAggsData(Aggregations aggregations, DepositOrderBucketsRes build) {
ParsedTerms totalTypeData = aggregations.get("group_totalType_data");
List<? extends Terms.Bucket> totalTypeDataBuckets = totalTypeData.getBuckets();
totalTypeDataBuckets.stream().forEach(bucket -> {
Aggregations agg = bucket.getAggregations();
ParsedSum sumQty = agg.get("qty");
ParsedSum sumMoney = agg.get("orderMoney");
ParsedSum sumPeople = agg.get("people");
build.setSumQty(new BigDecimal(sumQty.getValueAsString()).intValue());
build.setSumMoney(NumberUtil.round(new BigDecimal(sumMoney.getValueAsString()), BigDecimal.ROUND_CEILING).toString());
build.setSumPeople(new BigDecimal(sumPeople.getValueAsString()).intValue());
});
}
/**
* 获取购买周期数据
* @param build
* @param aggregations
*/
private void getOrderDeadlineAggsData(DepositOrderBucketsRes build, Aggregations aggregations) {
ParsedTerms deadlineData = aggregations.get("group_deadline");
List<? extends Terms.Bucket> deadlineDataBuckets = deadlineData.getBuckets();
deadlineDataBuckets.stream().forEach(bucket -> {
Integer deadline = new BigDecimal(bucket.getKey().toString()).intValue();
Aggregations agg = bucket.getAggregations();
ParsedSum qty = agg.get("qty");
ParsedSum money = agg.get("orderMoney");
ParsedSum people = agg.get("people");
DepositOrderDeadlineEnum deadlineEnum = DepositOrderDeadlineEnum.getDeadlineEnum(deadline);
if (ObjectUtil.isEmpty(deadlineEnum)) {
return;
}
switch (deadlineEnum) {
case THIRTY_DAYS:
build.setThirtyDaysQty(new BigDecimal(qty.getValueAsString()).intValue());
build.setThirtyDaysMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString());
build.setThirtyDaysPeople(new BigDecimal(people.getValueAsString()).intValue());
break;
case NINETY_DAYS:
build.setNinetyDaysQty(new BigDecimal(qty.getValueAsString()).intValue());
build.setNinetyDaysMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString());
build.setNinetyDaysPeople(new BigDecimal(people.getValueAsString()).intValue());
break;
case THREE_HUNDRED_SIXTY_DAYS:
build.setThreeHundredSixtyDaysQty(new BigDecimal(qty.getValueAsString()).intValue());
build.setThreeHundredSixtyDaysMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString());
build.setThreeHundredSixtyDaysPeople(new BigDecimal(people.getValueAsString()).intValue());
break;
default:
break;
}
});
}
/**
* 获取购买来源数据
* @param aggregations
* @param build
*/
public void getOrderSourceAggsData(Aggregations aggregations, DepositOrderBucketsRes build) {
ParsedTerms sourceData = aggregations.get("group_orderType");
List<? extends Terms.Bucket> sourceDataBuckets = sourceData.getBuckets();
sourceDataBuckets.stream().forEach(bucket -> {
Integer orderType = new BigDecimal(bucket.getKey().toString()).intValue();
Aggregations agg = bucket.getAggregations();
ParsedSum qty = agg.get("qty");
ParsedSum money = agg.get("orderMoney");
ParsedSum people = agg.get("people");
DepositOrderSourceEnum orderSourceEnum = DepositOrderSourceEnum.getOrderSourceEnum(orderType);
if (ObjectUtil.isEmpty(orderSourceEnum)) {
return;
}
switch (orderSourceEnum) {
case DEPOSIT:
build.setDepositQty(new BigDecimal(qty.getValueAsString()).intValue());
build.setDepositMoney(NumberUtil.round(new BigDecimal(money.getValueAsString()), BigDecimal.ROUND_CEILING).toString());
break;
// 这几种来源,统称为商店入驻
case CHANNEL_STORE:
case FIRM:
case CONSIGNMENT_STORE:
case IDLE_STORE:
BigDecimal newQty = new BigDecimal(qty.getValueAsString());
BigDecimal oldQty = new BigDecimal(build.getStoreQty());
BigDecimal newMoney = new BigDecimal(money.getValueAsString());
BigDecimal oldMoney = new BigDecimal(build.getStoreMoney());
build.setStoreQty((oldQty.add(newQty)).intValue());
build.setStoreMoney(NumberUtil.round(oldMoney.add(newMoney), BigDecimal.ROUND_CEILING).toString());
break;
default:
break;
}
});
}
五:kibana中新建ES汇总查询模板
# 保证金存款表统计模板查询
GET /_search/template
{
"id":"search_order_total_template",
"params":{
"gteTime":"2008-01-01",
"lteTime":"2021-12-31",
"interval":"day",
"format":"yyyy-MM-dd"
}
}
# 保证金存款表统计模板
POST _scripts/search_order_total_template
{
"script": {
"lang": "mustache",
"source": {
"size": 0,
"query": {
"range": {
"orderTime": {
"gte": "{{gteTime}}",
"lte": "{{lteTime}}",
"format": "yyyy-MM-dd"
}
}
},
"aggs": {
"group_totalType_data": {
"terms": {
"field": "totalType"
},
"aggs": {
"qty": {
"sum": {
"field": "qty"
}
},
"orderMoney": {
"sum": {
"field": "orderMoney"
}
},
"people": {
"sum": {
"field": "people"
}
}
}
},
"group_orderType": {
"terms": {
"field": "orderType"
},
"aggs": {
"qty": {
"sum": {
"field": "qty"
}
},
"orderMoney": {
"sum": {
"field": "orderMoney"
}
},
"people": {
"sum": {
"field": "people"
}
}
}
},
"group_deadline": {
"terms": {
"field": "deadline"
},
"aggs": {
"qty": {
"sum": {
"field": "qty"
}
},
"orderMoney": {
"sum": {
"field": "orderMoney"
}
},
"people": {
"sum": {
"field": "people"
}
}
}
}
}
}
}
}
模板查询显示效果
java代码(dg-search项目)
// feign接口
/**
* 保证金购买表--统计金额
* @param depositBucketsReq
* @return
*/
@GetMapping("/getDepositOrderTotal")
DepositOrderBucketsRes getDepositOrderTotal(@SpringQueryMap DepositBucketsReq depositBucketsReq);
// 实现类
@Override
public DepositOrderBucketsRes getDepositOrderTotal(DepositBucketsReq depositBucketsReq) {
final Map<String, Object> params = (Map<String, Object>) JSON.toJSON(depositBucketsReq);
String templateName = elasticsearchConfig.bzjConfig.getSearch_order_total_template();
LogUtil.info("模板名称:{} 请求参数为:{}", templateName, params.toString());
SearchResponse searchResponse = EsTemplateUtil.getEsByTemplate(elasticsearchConfig.bzjConfig.getDgFinancialOrderReportIndex(), templateName,
params, restHighLevelClient);
Aggregations aggregations = searchResponse.getAggregations();
DepositOrderBucketsRes build = DepositOrderBucketsRes.builder().build();
// 获取合计聚合数据(代码同上)
getOrderSumAggsData(aggregations, build);
// 获取购买周期数据(代码同上)
getOrderDeadlineAggsData(build, aggregations);
// 获取购买来源数据(代码同上)
getOrderSourceAggsData(aggregations, build);
return build;
}
六:对ES查询模板的数据结果进行汇总返回前端JSON数据
Controller:
/**
* 保证金购买统计表--查询
*/
@GetMapping("/order/list")
public ResponseData listPageDepositOrderByReq(@Valid DepositCollectReq depositCollectReq){
return ResponseData.success(financeReportService.listPageDepositOrderByReq(depositCollectReq));
}
/**
* 保证金购买统计表--统计金额
*/
@GetMapping("/order/total")
public ResponseData getDepositOrderTotal(@Valid DepositCollectReq depositCollectReq){
return ResponseData.success(financeReportService.getDepositOrderTotal(depositCollectReq));
}
/**
* 保证金购买统计表--导出excel
*/
@GetMapping("/order/export")
public ResponseData getDepositOrderExportFile(@Valid DepositCollectReq depositCollectReq){
return ResponseData.success(financeReportService.getDepositOrderExportFile(depositCollectReq));
}
前端请求实体:
package com.dg.mall.financial.vo.req.report.collect;
import com.dg.mall.core.page.PageVO;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
/**
* <p>
* 请求实体
* <p>
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DepositCollectReq extends PageVO {
/**
* 报表类型(0:日,1:月,2:年)
*/
@NotNull(message = "报表类型不能为空!")
private Integer reportType;
/**
* 开始时间
*/
@NotBlank(message = "开始日期不能为空!")
private String gteTime;
/**
* 结束时间
*/
@NotBlank(message = "结束日期不能为空!")
private String lteTime;
}
Service:
/**
* 保证金购买统计表--分页查询
* @param depositCollectReq
* @return
*/
PageVO<DepositOrderBucketsRes> listPageDepositOrderByReq(DepositCollectReq depositCollectReq);
/**
* 保证金购买统计表--导出excel
* @param depositCollectReq
* @return
*/
String getDepositOrderExportFile(DepositCollectReq depositCollectReq);
/**
* 保证金购买统计表--统计金额
* @param depositCollectReq
* @return
*/
DepositOrderBucketsRes getDepositOrderTotal(DepositCollectReq depositCollectReq);
ServiceImpl:
// 查询
@Override
public PageVO<DepositOrderBucketsRes> listPageDepositOrderByReq(DepositCollectReq depositCollectReq) {
PageVO<DepositOrderBucketsRes> pageVO = new PageVO<>();
// 查询条件转换成ES的请求实体类并添加默认参数
DepositBucketsReq depositBucketsReq = collectReportService.getDepositBucketsReq(depositCollectReq);
List<DepositOrderBucketsRes> resList = bzjFinanceReportFeignService.listDepositOrder(depositBucketsReq);
if (CollectionUtils.isEmpty(resList)){
return pageVO;
}
List<DepositOrderBucketsRes> collect = resList.stream().sorted(Comparator.comparing(DepositOrderBucketsRes::getOrderTime).reversed()).collect(Collectors.toList());
final PageUtils pageUtils = new PageUtils(Integer.valueOf(depositCollectReq.getCurrent() + ""), Integer.valueOf(depositCollectReq.getSize() + ""), collect);
pageVO.setRecords(pageUtils.getCurrentList());
pageVO.setTotal(pageUtils.getAllList().size());
pageVO.setSize(depositCollectReq.getSize());
pageVO.setCurrent(depositCollectReq.getCurrent());
return pageVO;
}
//统计
@Override
public DepositOrderBucketsRes getDepositOrderTotal(DepositCollectReq depositCollectReq) {
DepositBucketsReq depositBucketsReq = collectReportService.getDepositBucketsReq(depositCollectReq);
return bzjFinanceReportFeignService.getDepositOrderTotal(depositBucketsReq);
}
// 导出excel(easyPoi)
@Override
public String getDepositOrderExportFile(DepositCollectReq depositCollectReq) {
DepositBucketsReq depositBucketsReq = collectReportService.getDepositBucketsReq(depositCollectReq);
List<DepositOrderBucketsRes> resList = bzjFinanceReportFeignService.listDepositOrder(depositBucketsReq);
if (CollectionUtils.isEmpty(resList)){
return null;
}
List<DepositOrderBucketsRes> collect = resList.stream().sorted(Comparator.comparing(DepositOrderBucketsRes::getOrderTime).reversed()).collect(Collectors.toList());
final Map<String, Object> map = Maps.newHashMap();
TemplateExportParams params = new TemplateExportParams("template/excel/导出保证金购买报表模板.xlsx");
map.put("mapList", collect);
Workbook workbook = ExcelExportUtil.exportExcel(params, map);
return uploadUtils.uploadFile(workbook, "保证金购买报表");
}
//查询条件转换成ES的请求实体类并添加默认参数
@Override
public DepositBucketsReq getDepositBucketsReq(DepositCollectReq depositCollectReq) {
String gteTime = depositCollectReq.getGteTime();
String lteTime = depositCollectReq.getLteTime();
Integer type = depositCollectReq.getReportType();
ReportDateEnum reportDateEnum = ReportDateEnum.getReportDateEnum(type);
switch (reportDateEnum) {
case MONTH_FORMAT: {
gteTime = TimeUtils.getDateTime(gteTime, type).toDateStr();
lteTime = DateUtil.endOfMonth(TimeUtils.getDateTime(lteTime, type)).toDateStr();
break;
}
case YEAR_FORMAT: {
gteTime = TimeUtils.getDateTime(gteTime, type).toDateStr();
lteTime = DateUtil.endOfYear(TimeUtils.getDateTime(lteTime, type)).toDateStr();
break;
}
default:
break;
}
return DepositBucketsReq.builder()
.gteTime(gteTime)
.lteTime(lteTime)
.format(reportDateEnum.getFormat())
.interval(reportDateEnum.getInterval())
.build();
}
逻辑分页:因为ES查询模板我们没有实现使用ES的from、size这种分页语法(可能可以,但我们没有实现),所以封装了一个逻辑分页类
package com.dg.mall.financial.utils;
import com.google.common.collect.Lists;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* <p>
* 逻辑分页工具
* <p>
*/
@Data
public class PageUtils implements Serializable {
/**
* 当前页
*/
private Integer current;
/**
* 每页数据
*/
private Integer size;
/**
* 当前页数据
*/
private List currentList;
/**
* 所有数据
*/
private List allList;
public PageUtils(Integer current, Integer size, List allList) {
final List arrayList = Lists.newArrayList();
this.current = current;
this.size = size;
this.allList = allList;
if (allList.size() <= size) {
this.currentList = allList;
return;
}
int start = (current - 1) * size;
for (int i = 0; i < size; i++) {
if (start + i >= allList.size()) {
break;
}
arrayList.add(allList.get(start + i));
}
this.currentList = arrayList;
}
}
七:单笔订单购买进行数据推送ES处理
注意2点:
- 记得做人数统计的去重,这里使用的是redis;
- req.setId(),这个Id记得做到唯一,因为这个id会存入ES索引数据中,而ES索引id相同,会做数据覆盖;
注:我这里单条数据推送使用了消息队列,大家可以参照、也可以直接调用批量插入ES的方法;
/**
* 购买表数据推送es(区分购买周期和购买来源)
*
* @param depositOrder
*/
public void sendOrderToEs(DepositOrder depositOrder) {
// 合计 日期_类型_userid
addOrderByTotalTypeData(depositOrder);
// 购买周期
if (addOrderByDeadlineData(depositOrder)) return;
// 购买来源
addOrderBySourceData(depositOrder);
}
/**
* 添加订单数据,合计、并对人数进行区分
*
* @param depositOrder
*/
private void addOrderByTotalTypeData(DepositOrder depositOrder) {
DepositOrderMappingReq req = DepositOrderMappingReq.builder().build();
String formatDate = DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd");
// 人数去重
String key = DEPOSIT_ORDER_COUNT.concat(formatDate).concat("_").concat(SUM_TYPE.getType()).concat("_").concat(depositOrder.getUserId());
if (!redisTemplate.hasKey(key)) {
req.setPeople(1L);
redisTemplate.opsForValue().set(key, key, 1, TimeUnit.DAYS);
}
req.setId(String.valueOf(depositOrder.getId()).concat("_").concat(SUM_TYPE.getType()));
req.setOrderTime(DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd"));
req.setTotalType(SUM_TYPE.getCode());
req.setQty(1L);
req.setOrderMoney(depositOrder.getOrderMoney());
ReportReq reportReq = ReportReq.builder()
.shortName(ReportTypeConstants.FINANCIEL_BZJGMTJ)
.data(JSON.toJSONString(req)).build();
financialProducer.sendMessage(MQ_DEPOSIT_EXCHANGE, DEPOSIT_FINANCEL_REPORT_QUEUE.getRoutingKey(), JSON.toJSONString(reportReq));
}
/**
* 添加订单数据,根据购买周期区分
*
* @param depositOrder
* @return
*/
private boolean addOrderByDeadlineData(DepositOrder depositOrder) {
DepositOrderMappingReq req = DepositOrderMappingReq.builder().build();
String formatDate = DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd");
DepositOrderSubsidy orderSubsidy = depositOrderSubsidyService.getOne(new LambdaQueryWrapper<DepositOrderSubsidy>()
.eq(DepositOrderSubsidy::getOrderId, depositOrder.getOrderId()));
if (ObjectUtil.isEmpty(orderSubsidy)) {
return true;
}
// 人数去重
String key = DEPOSIT_ORDER_COUNT.concat(formatDate).concat("_").concat(DEADLINE_TYPE.getType()).concat("_").concat(String.valueOf(orderSubsidy.getDeadline())).concat("_").concat(depositOrder.getUserId());
if (!redisTemplate.hasKey(key)) {
req.setPeople(1L);
redisTemplate.opsForValue().set(key, key, 1, TimeUnit.DAYS);
}
req.setId(String.valueOf(depositOrder.getId()).concat("_").concat(DEADLINE_TYPE.getType()));
req.setOrderTime(DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd"));
req.setDeadline(orderSubsidy.getDeadline());
req.setQty(1L);
req.setOrderMoney(depositOrder.getOrderMoney());
ReportReq reportReq = ReportReq.builder()
.shortName(ReportTypeConstants.FINANCIEL_BZJGMTJ)
.data(JSON.toJSONString(req)).build();
financialProducer.sendMessage(MQ_DEPOSIT_EXCHANGE, DEPOSIT_FINANCEL_REPORT_QUEUE.getRoutingKey(), JSON.toJSONString(reportReq));
return false;
}
/**
* 添加订单数据,根据购买来源区分
*
* @param depositOrder
*/
private void addOrderBySourceData(DepositOrder depositOrder) {
DepositOrderMappingReq req = DepositOrderMappingReq.builder()
.id(String.valueOf(depositOrder.getId()).concat("_").concat(ORDER_SOURCE_TYPE.getType()))
.orderTime(DateUtil.format(depositOrder.getCreatedTime(), "yyyy-MM-dd"))
.orderType(depositOrder.getOrderType())
.qty(1L)
.orderMoney(depositOrder.getOrderMoney())
.build();
ReportReq reportReq = ReportReq.builder()
.shortName(ReportTypeConstants.FINANCIEL_BZJGMTJ)
.data(JSON.toJSONString(req)).build();
financialProducer.sendMessage(MQ_DEPOSIT_EXCHANGE, DEPOSIT_FINANCEL_REPORT_QUEUE.getRoutingKey(), JSON.toJSONString(reportReq));
}
/**
* 发送消息队列
*
* @param exchange
* @param routingKey
* @param content
*/
public void sendMessage(String exchange, String routingKey, Object content) {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData correlationData = new CorrelationData(String.valueOf(IdWorker.getId()));
rabbitTemplate.convertAndSend(exchange, routingKey, getMessage(content),
message -> {
message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}, correlationData);
LogUtil.info("消息队列发送完成,消息为:" + content);
}
消息队列--消费者:单条数据推送ES
package com.dg.mall.financial.rabbitmq.consumer;
/**
* <p>
* 财务报表--消费
* <p>
*/
@Service
public class DepositFinancelReportConsumer {
@Autowired
private BzjFinanceReportFeignService bzjFinanceReportFeignService;
@Resource
private DepositRenewalReportFeignService depositRenewalReportFeignService;
@Resource
private DepositRollReportFeignService depositRollReportFeignService;
@Resource
private DepositAwardEsFeignService depositAwardEsFeignService;
@Resource
private DepositFinanceReportService financeReportService;
@Resource
private SubsidyAwardService subsidyAwardService;
@Resource
private RedisTemplate redisTemplate;
@RabbitListener(queues = "deposit-financel-report-queue")
@RabbitHandler
@Transactional(rollbackFor = Exception.class)
public void financelReportQueue(Message msg, Channel channel) throws Exception {
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
String resMsg = new String(msg.getBody(), "utf-8");
try {
// 根据报表类型区分
ReportReq req = JSON.parseObject(resMsg, new TypeReference<ReportReq>() {});
String shortName = req.getShortName();
switch (shortName) {
case ReportTypeConstants.FINANCIEL_BZJGMTJ:
// 批量添加--保证金购买统计表es数据
batchAddOrUpdateDepositOrderEsMapping(channel, deliveryTag, req);
break;
// ……
default:
throw new IllegalStateException("Unexpected value: " + shortName);
}
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true);
throw new RuntimeException();
}
channel.basicAck(deliveryTag, true);
}
/**
* 批量新增保证金存款数据(按"购买周期、购买来源"区分)
*
* @param channel
* @param deliveryTag
* @param req
* @throws IOException
*/
public void batchAddOrUpdateDepositOrderEsMapping(Channel channel, long deliveryTag, ReportReq req) throws IOException {
DepositOrderMappingReq orderMappingReq = JSON.parseObject(req.getData(), new TypeReference<DepositOrderMappingReq>() {
});
if (ObjectUtil.isEmpty(orderMappingReq.getOrderTime())) {
channel.basicAck(deliveryTag, true);
return;
}
ArrayList<DepositOrderMappingReq> list = new ArrayList<>();
list.add(orderMappingReq);
bzjFinanceReportFeignService.batchAddOrUpdateDepositOrderEsMapping(list);
return;
}
}
总结:
至此为止,功能就算全部完成了,对于我来说,重点就在于如何设计ES索引数据结构和查询模板的聚合使用。