工作中遇到的一个分析的需求即按某个字段聚合,然后计算平均数,最大,最小值等一系列指标
遇到的问题:
- 中位数计算 , 暂不会在ES中直接计算 , 所以全部加载到内存中计算
- 分组的字段内 , 存储的数据是多个值,中间用空格分隔,需求是按每个值聚合 (暂不会实现)
- 聚合后统计符合某条件的数据,利用filter,然后取doc_count
//1.获取es客户端
public PreBuiltTransportClient getClient() throws UnknownHostException {
// 确定连接地址
TransportAddress address = new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300);//ip和port
// 确定连接的集群名称
Settings settings = Settings.builder()
.put("cluster.name", "elscluster")//填es的名字
.put("client.transport.sniff", true)//启动嗅探功能
.build();
//创建ElasticSearch连接客户端,客户端需要使用到配置信息
PreBuiltTransportClient client = new PreBuiltTransportClient(settings);
client.addTransportAddress(address);
return client;
}
//此处参数主要用于基本查询
public List<Map<String, Object>> findAnalyseList(String params) throws Exception {
//获取客户端
PreBuiltTransportClient client = getClient();
List<Map<String, Object>> jsonArr = new ArrayList<>();
JSONObject dataS = new JSONObject();
try {
// 拼接基本查询
BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
SearchSourceBuilder query = new SearchSourceBuilder();
getQueryBuilder(params, queryBuilder);
query.query(queryBuilder);
// 拼接聚合,分组查询
AggregationBuilder termsBuilder = termsBuilder = getAgg(type);
query.aggregation(termsBuilder);
//查询 索引my_index、类型_doc
SearchRequest request = Requests.searchRequest("my_index").types("_doc")
.searchType(SearchType.QUERY_THEN_FETCH)
.source(query);
SearchResponse response = client.search(request).actionGet();
// 以下为解析获取结果
// 获取查询结果的aggregation部分
Aggregations aggregations = response.getAggregations();
Terms teamSum = aggregations.get("term");
// 遍历桶聚合后的每一个结果集
for (Terms.Bucket bucket : teamSum.getBuckets()) {
//获取name为result的top_hits结果集
TopHits topHits = bucket.getAggregations().get("result");
SearchHits hits = topHits.getHits();
SearchHit[] hitArray = hits.getHits();
Map<String, Object> sourceMap = new HashMap<>();
//遍历取 固定的值 和 所有 score 计算中位数
List<Double> midScoreList = new ArrayList<>();
for (int i = 0; i < hitArray.length; i++) {
if (i==0){
// 话务员工号 话务员组
String repNo = StringHelper.toString(hitArray[i].getSourceAsMap().get("rep_no"));
String staffName = StringHelper.toString(hitArray[i].getSourceAsMap().get("staff_name"));
String repGroup = StringHelper.toString(hitArray[i].getSourceAsMap().get("rep_group"));
sourceMap.put("rep_no",repNo);
sourceMap.put("staff_name",staffName);
sourceMap.put("rep_group",repGroup);
}
String score = score = StringHelper.toString(hitArray[i].getSourceAsMap().get("score"));
if (!"".equals(score)&&score!=null){
midScoreList.add(Double.parseDouble(score));
}
}
//计算中位数
Double mid = 0.0;
if (midScoreList.size() > 0) {
Collections.sort(midScoreList);
if (midScoreList.size() % 2 == 0) {
mid = (midScoreList.get(midScoreList.size() / 2) + midScoreList.get(midScoreList.size() / 2 - 1)) / 2;
} else {
mid =midScoreList.get(midScoreList.size() / 2);
}
}
sourceMap.put("scoreMid", mid);
//获取聚合结果
Map map = bucket.getAggregations().asMap();
long voiceCount = bucket.getDocCount();
double durationAll = ((InternalSum) map.get("durationAll")).value();
double durationAvg = ((InternalAvg) map.get("durationAvg")).value();
durationAvg = new BigDecimal(durationAvg).setScale(0,BigDecimal.ROUND_HALF_UP).doubleValue();
//获取风险数量
long riskVoiceCount =((InternalFilter) map.get("riskCount")).getDocCount();
//获取申诉录音量
long appealedCount =((InternalFilter) map.get("appealedCount")).getDocCount();
//获取申诉成功录音量
long appealedSuccessCount =((InternalFilter) map.get("appealedSuccessCount")).getDocCount();
//总分
double scoreSum = ((InternalSum) map.get("scoreSum")).value();
//评过分的录音量
long personScoreCount =((InternalFilter) map.get("personScoreCount")).getDocCount();
double scoreAvg = 0.0;
if (personScoreCount!=0){
scoreAvg = new BigDecimal(scoreSum/personScoreCount).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
}
//double scoreAvg = ((InternalAvg) map.get("scoreAvg")).value();
// 是否为null
//if (Double.isNaN(scoreAvg)) {
//scoreAvg = 0;
//}
double scoreMax = ((InternalMax) map.get("scoreMax")).value();
// 无穷大,无穷小
if (Double.isInfinite(scoreMax)) {
scoreMax = 0;
}
double scoreMin = ((InternalMin) map.get("scoreMin")).value();
if (Double.isInfinite(scoreMin)) {
scoreMin = 0;
}
if (voiceCount!=0){
durationAll = new BigDecimal((float) durationAll/3600).setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
}
sourceMap.put("voiceCount", voiceCount);
sourceMap.put("durationAll", durationAll);
sourceMap.put("durationAvg", durationAvg);
sourceMap.put("scoreAvg", scoreAvg);
sourceMap.put("scoreMax", scoreMax);
sourceMap.put("scoreMin", scoreMin);
sourceMap.put("riskVoiceCount", riskVoiceCount);
sourceMap.put("appealedCount", appealedCount);
sourceMap.put("appealedSuccessCount", appealedSuccessCount);
DecimalFormat df = new DecimalFormat("0.00%");
Double riskRate = 0.0;
Double appealedCountRate = 0.0;
Double appealedSuccessRate = 0.0;
if (voiceCount != 0) {
riskRate = new BigDecimal((float) riskVoiceCount / voiceCount).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
appealedCountRate = new BigDecimal((float) appealedCount / voiceCount).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
}
if (appealedCount != 0) {
appealedSuccessRate = new BigDecimal((float) appealedSuccessCount / appealedCount).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
}
sourceMap.put("riskRate", df.format(riskRate));
sourceMap.put("appealedCountRate", df.format(appealedCountRate));
sourceMap.put("appealedSuccessRate", df.format(appealedSuccessRate));
jsonArr.add(sourceMap);
}
System.out.println(jsonArr);
} catch (Exception e) {
e.printStackTrace();
}
return jsonArr;
}
public void getQueryBuilder(String params, BoolQueryBuilder queryBuilder) {
JSONObject jsonObject = new JSONObject().fromObject(params);
//机构领域
if (jsonObject.containsKey("department")) {
String department = jsonObject.getString("department");
if (null != department && !"".equals(department)) {
department = new DesJs().decrypt(department);
}
if (null != department && !"".equals(department)) {
if (null != department && !"".equals(department)) {
BoolQueryBuilder shouldBuilder = QueryBuilders.boolQuery();
String[] quality = department.split(",");
for (String s : quality) {
shouldBuilder.should(QueryBuilders.wildcardQuery("area_of_job", "*" + s + "*"));
}
queryBuilder.must(shouldBuilder);
}
}
}
//时间
String starTime = jsonObject.getString("startCreateDate");
String endTime = jsonObject.getString("endCreateDate");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if (null != starTime && !"".equals(starTime)){
queryBuilder.must(QueryBuilders.rangeQuery("start_time").gte(starTime).lte(endTime));
}
//时长
String durationS = jsonObject.getString("duration_s");
if (null != durationS && !"".equals(durationS)) {
queryBuilder.must(QueryBuilders.rangeQuery("duration").gte(durationS));
}
String durationE = jsonObject.getString("duration_e");
if (null != durationE && !"".equals(durationE)) {
queryBuilder.must(QueryBuilders.rangeQuery("duration").lte(durationE));
}
//完全匹配
String fileunique = jsonObject.getString("fileunique");
if (null != fileunique && !"".equals(fileunique)) {
if ("B".equals(fileunique)) {
queryBuilder.must(QueryBuilders.termQuery("fileunique", fileunique));
} else {
queryBuilder.mustNot(QueryBuilders.termQuery("fileunique", "B"));
}
}
//多值匹配
String fileName = jsonObject.getString("filename");
if (null != fileName && !"".equals(fileName)) {
fileName = new DesJs().decrypt(fileName);
}
if (null != fileName && !"".equals(fileName)) {
BoolQueryBuilder shouldBuilder = QueryBuilders.boolQuery();
String[] quality = fileName.split(" ");
for (String s : quality) {
//完全匹配
//shouldBuilder.should(QueryBuilders.termQuery("filename",s));
//模糊匹配
shouldBuilder.should(QueryBuilders.wildcardQuery("filename", "*" + s + "*"));
}
queryBuilder.must(shouldBuilder);
}
}
private AggregationBuilder getAgg(String type) {
String[] includes = {"rep_no", "staff_name", "rep_group","score"};
//根据固定字段分组,返回分组数
TermsAggregationBuilder term = AggregationBuilders.terms("term").field(type).size(100000);
//风险录音量
QueryBuilder riskBuilder = QueryBuilders.termQuery("is_risk","是");
FilterAggregationBuilder riskCountFilter = AggregationBuilders.filter("riskCount", riskBuilder);
//申诉量
QueryBuilder appealedBuilder = QueryBuilders.termQuery("appealed","A");
FilterAggregationBuilder appealedCountFilter = AggregationBuilders.filter("appealedCount", appealedBuilder);
//申诉成功量
QueryBuilder appealedSuccessBuilder = QueryBuilders.termQuery("appealed_success","A");
FilterAggregationBuilder appealedSuccessCountFilter = AggregationBuilders.filter("appealedSuccessCount", appealedSuccessBuilder);
//评过分的录音量
QueryBuilder personScoreBuilder = QueryBuilders.boolQuery().must(QueryBuilders.existsQuery("score"));
FilterAggregationBuilder personScoreCountFilter = AggregationBuilders.filter("personScoreCount", personScoreBuilder);
//获取每个分组中的固定字段,返回1000000条,其中score用来计算中位数
AggregationBuilder top = AggregationBuilders.topHits("result").fetchSource(includes, Strings.EMPTY_ARRAY).size(1000000);
term.subAggregation(top);
//录音量
AggregationBuilder voiceCountBuilder = AggregationBuilders.cardinality("voiceCount").field("filename");
//总时长
AggregationBuilder durationAllBuilder = AggregationBuilders.sum("durationAll").field("duration");
//平均时长
AggregationBuilder durationAvgBuilder = AggregationBuilders.avg("durationAvg").field("duration");
//平均得分(用总分/人工评过分的录音量)
AggregationBuilder scoreSumBuilder = AggregationBuilders.sum("scoreSum").field("score");
//平均得分
AggregationBuilder scoreAvgBuilder = AggregationBuilders.avg("scoreAvg").field("score");
//中位数得分--不会直接在es中计算,将score全部取出,内存中运算
//AggregationBuilder scoreMidBuilder = AggregationBuilders.avg("scoreMid").field("score");
//最高得分
AggregationBuilder scoreMaxBuilder = AggregationBuilders.max("scoreMax").field("score");
//最低得分
AggregationBuilder scoreMinBuilder = AggregationBuilders.min("scoreMin").field("score");
AggregationBuilder aggregation =
term.subAggregation(voiceCountBuilder)
.subAggregation(durationAllBuilder)
.subAggregation(durationAvgBuilder)
.subAggregation(scoreSumBuilder)
.subAggregation(scoreAvgBuilder)
.subAggregation(scoreMaxBuilder)
.subAggregation(scoreMinBuilder)
.subAggregation(riskCountFilter)
.subAggregation(appealedCountFilter)
.subAggregation(appealedSuccessCountFilter)
.subAggregation(personScoreCountFilter)
;
return aggregation;
}
//对应的ES语句
GET my_index/_doc/_search
{
"query":{
"bool":{
"must":[
{
"range":{
"start_time":{
"from":"2020-05-31 16:00:00",
"to":"2020-07-03 15:59:59",
"include_lower":true,
"include_upper":true,
"boost":1
}
}
}
],
"adjust_pure_negative":true,
"boost":1
}
},
"aggregations":{
"term":{
"terms":{
"field":"分组字段",
"size":100000,
"min_doc_count":1,
"shard_min_doc_count":0,
"show_term_doc_count_error":false,
"order":[
{
"_count":"desc"
},
{
"_key":"asc"
}
]
},
"aggregations":{
"result":{
"top_hits":{
"from":0,
"size":1000000,
"version":false,
"explain":false,
"_source":{
"includes":[
"rep_no",
"staff_name",
"rep_group",
"score"
],
"excludes":[
]
}
}
},
"voiceCount":{
"cardinality":{
"field":"filename"
}
},
"durationAll":{
"sum":{
"field":"duration"
}
},
"durationAvg":{
"avg":{
"field":"duration"
}
},
"scoreAvg":{
"avg":{
"field":"score"
}
},
"scoreMax":{
"max":{
"field":"score"
}
},
"scoreMin":{
"min":{
"field":"score"
}
},
"riskCount":{
"filter":{
"term":{
"is_risk":{
"value":"是",
"boost":1
}
}
}
},
"appealedCount":{
"filter":{
"term":{
"appealed":{
"value":"A",
"boost":1
}
}
}
},
"appealedSuccessCount":{
"filter":{
"term":{
"appealed_success":{
"value":"A",
"boost":1
}
}
}
},
"personScoreCount":{
"filter":{
"bool":{
"must":{
"exists":{
"field":"score"
}
}
}
}
}
}
}
}
}