ES的一些聚合,过滤及基本查询

工作中遇到的一个分析的需求即按某个字段聚合,然后计算平均数,最大,最小值等一系列指标
遇到的问题:

  1. 中位数计算 , 暂不会在ES中直接计算 , 所以全部加载到内存中计算
  2. 分组的字段内 , 存储的数据是多个值,中间用空格分隔,需求是按每个值聚合 (暂不会实现)
  3. 聚合后统计符合某条件的数据,利用filter,然后取doc_count
//1.获取es客户端
 public PreBuiltTransportClient getClient() throws UnknownHostException {
        // 确定连接地址
        TransportAddress address = new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300);//ip和port
        // 确定连接的集群名称
        Settings settings = Settings.builder()
                .put("cluster.name", "elscluster")//填es的名字
                .put("client.transport.sniff", true)//启动嗅探功能
                .build();
        //创建ElasticSearch连接客户端,客户端需要使用到配置信息
        PreBuiltTransportClient client = new PreBuiltTransportClient(settings);
        client.addTransportAddress(address);
        return client;
    }
//此处参数主要用于基本查询
public List<Map<String, Object>> findAnalyseList(String params) throws Exception {
        //获取客户端
        PreBuiltTransportClient client = getClient();

        List<Map<String, Object>> jsonArr = new ArrayList<>();
        JSONObject dataS = new JSONObject();
        try {
            // 拼接基本查询
            BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
            SearchSourceBuilder query = new SearchSourceBuilder();
            getQueryBuilder(params, queryBuilder);
            query.query(queryBuilder);
            // 拼接聚合,分组查询
            AggregationBuilder termsBuilder = termsBuilder = getAgg(type);
            query.aggregation(termsBuilder);
            //查询   索引my_index、类型_doc
            SearchRequest request = Requests.searchRequest("my_index").types("_doc")
                    .searchType(SearchType.QUERY_THEN_FETCH)
                    .source(query);
            SearchResponse response = client.search(request).actionGet();
            // 以下为解析获取结果
            // 获取查询结果的aggregation部分
            Aggregations aggregations = response.getAggregations();
            Terms teamSum = aggregations.get("term");
            // 遍历桶聚合后的每一个结果集
            for (Terms.Bucket bucket : teamSum.getBuckets()) {
                //获取name为result的top_hits结果集
                TopHits topHits = bucket.getAggregations().get("result");
                SearchHits hits = topHits.getHits();
                SearchHit[] hitArray = hits.getHits();
                Map<String, Object> sourceMap = new HashMap<>();
                //遍历取 固定的值  和 所有 score 计算中位数
                List<Double> midScoreList = new ArrayList<>();
                for (int i = 0; i < hitArray.length; i++) {
                    if (i==0){
                          // 话务员工号 话务员组
                          String repNo = StringHelper.toString(hitArray[i].getSourceAsMap().get("rep_no"));
                          String staffName = StringHelper.toString(hitArray[i].getSourceAsMap().get("staff_name"));
                          String repGroup = StringHelper.toString(hitArray[i].getSourceAsMap().get("rep_group"));
                          sourceMap.put("rep_no",repNo);
                          sourceMap.put("staff_name",staffName);
                          sourceMap.put("rep_group",repGroup);
                    }
                    String score = score = StringHelper.toString(hitArray[i].getSourceAsMap().get("score"));

                    if (!"".equals(score)&&score!=null){
                        midScoreList.add(Double.parseDouble(score));
                    }
                }
                //计算中位数
                Double mid = 0.0;
                if (midScoreList.size() > 0) {
                    Collections.sort(midScoreList);
                    if (midScoreList.size() % 2 == 0) {
                        mid = (midScoreList.get(midScoreList.size() / 2) + midScoreList.get(midScoreList.size() / 2 - 1)) / 2;
                    } else {
                        mid =midScoreList.get(midScoreList.size() / 2);
                    }
                }
                sourceMap.put("scoreMid", mid);
                //获取聚合结果
                Map map = bucket.getAggregations().asMap();

                long voiceCount = bucket.getDocCount();
                double durationAll = ((InternalSum) map.get("durationAll")).value();
                double durationAvg = ((InternalAvg) map.get("durationAvg")).value();
                durationAvg = new BigDecimal(durationAvg).setScale(0,BigDecimal.ROUND_HALF_UP).doubleValue();
                //获取风险数量
                long riskVoiceCount =((InternalFilter) map.get("riskCount")).getDocCount();
                //获取申诉录音量
                long appealedCount =((InternalFilter) map.get("appealedCount")).getDocCount();
                //获取申诉成功录音量
                long appealedSuccessCount =((InternalFilter) map.get("appealedSuccessCount")).getDocCount();
                //总分
                double scoreSum = ((InternalSum) map.get("scoreSum")).value();
                //评过分的录音量
                long personScoreCount =((InternalFilter) map.get("personScoreCount")).getDocCount();
                double scoreAvg = 0.0;
                if (personScoreCount!=0){
                    scoreAvg = new BigDecimal(scoreSum/personScoreCount).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
                }
                //double scoreAvg = ((InternalAvg) map.get("scoreAvg")).value();
                // 是否为null
                //if (Double.isNaN(scoreAvg)) {
                //scoreAvg = 0;
                //}
                double scoreMax = ((InternalMax) map.get("scoreMax")).value();
                // 无穷大,无穷小
                if (Double.isInfinite(scoreMax)) {
                    scoreMax = 0;
                }
                double scoreMin = ((InternalMin) map.get("scoreMin")).value();
                if (Double.isInfinite(scoreMin)) {
                    scoreMin = 0;
                }
                if (voiceCount!=0){
                    durationAll = new BigDecimal((float) durationAll/3600).setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
                }
                sourceMap.put("voiceCount", voiceCount);
                sourceMap.put("durationAll", durationAll);
                sourceMap.put("durationAvg", durationAvg);
                sourceMap.put("scoreAvg", scoreAvg);
                sourceMap.put("scoreMax", scoreMax);
                sourceMap.put("scoreMin", scoreMin);
                sourceMap.put("riskVoiceCount", riskVoiceCount);
                sourceMap.put("appealedCount", appealedCount);
                sourceMap.put("appealedSuccessCount", appealedSuccessCount);

                DecimalFormat df = new DecimalFormat("0.00%");
                Double riskRate = 0.0;
                Double appealedCountRate = 0.0;
                Double appealedSuccessRate = 0.0;
                if (voiceCount != 0) {
                    riskRate = new BigDecimal((float) riskVoiceCount / voiceCount).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
                    appealedCountRate = new BigDecimal((float) appealedCount / voiceCount).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
                }
                if (appealedCount != 0) {
                    appealedSuccessRate = new BigDecimal((float) appealedSuccessCount / appealedCount).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
                }
                sourceMap.put("riskRate", df.format(riskRate));
                sourceMap.put("appealedCountRate", df.format(appealedCountRate));
                sourceMap.put("appealedSuccessRate", df.format(appealedSuccessRate));
                jsonArr.add(sourceMap);
            }
            System.out.println(jsonArr);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return jsonArr;
    }
public void getQueryBuilder(String params, BoolQueryBuilder queryBuilder) {
        JSONObject jsonObject = new JSONObject().fromObject(params);
        //机构领域
        if (jsonObject.containsKey("department")) {
            String department = jsonObject.getString("department");
            if (null != department && !"".equals(department)) {
                department = new DesJs().decrypt(department);
            }
            if (null != department && !"".equals(department)) {
                if (null != department && !"".equals(department)) {
                    BoolQueryBuilder shouldBuilder = QueryBuilders.boolQuery();
                    String[] quality = department.split(",");
                    for (String s : quality) {
                        shouldBuilder.should(QueryBuilders.wildcardQuery("area_of_job", "*" + s + "*"));
                    }
                    queryBuilder.must(shouldBuilder);
                }
            }
        }
        //时间
        String starTime = jsonObject.getString("startCreateDate");
        String endTime = jsonObject.getString("endCreateDate");
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        if (null != starTime && !"".equals(starTime)){
            queryBuilder.must(QueryBuilders.rangeQuery("start_time").gte(starTime).lte(endTime));
        }
        //时长
        String durationS = jsonObject.getString("duration_s");
        if (null != durationS && !"".equals(durationS)) {
            queryBuilder.must(QueryBuilders.rangeQuery("duration").gte(durationS));
        }
        String durationE = jsonObject.getString("duration_e");
        if (null != durationE && !"".equals(durationE)) {
            queryBuilder.must(QueryBuilders.rangeQuery("duration").lte(durationE));
        }
        //完全匹配
        String fileunique = jsonObject.getString("fileunique");
        if (null != fileunique && !"".equals(fileunique)) {
            if ("B".equals(fileunique)) {
                queryBuilder.must(QueryBuilders.termQuery("fileunique", fileunique));
            } else {
                queryBuilder.mustNot(QueryBuilders.termQuery("fileunique", "B"));
            }
        }
        //多值匹配
        String fileName = jsonObject.getString("filename");
        if (null != fileName && !"".equals(fileName)) {
            fileName = new DesJs().decrypt(fileName);
        }
        if (null != fileName && !"".equals(fileName)) {
            BoolQueryBuilder shouldBuilder = QueryBuilders.boolQuery();
            String[] quality = fileName.split(" ");
            for (String s : quality) {
                //完全匹配
                //shouldBuilder.should(QueryBuilders.termQuery("filename",s));
                //模糊匹配
                shouldBuilder.should(QueryBuilders.wildcardQuery("filename", "*" + s + "*"));
            }
            queryBuilder.must(shouldBuilder);
        }
    }
private AggregationBuilder getAgg(String type) {
        String[] includes = {"rep_no", "staff_name", "rep_group","score"};
        //根据固定字段分组,返回分组数
        TermsAggregationBuilder term = AggregationBuilders.terms("term").field(type).size(100000);
        //风险录音量
        QueryBuilder riskBuilder = QueryBuilders.termQuery("is_risk","是");
        FilterAggregationBuilder riskCountFilter = AggregationBuilders.filter("riskCount", riskBuilder);
        //申诉量
        QueryBuilder appealedBuilder = QueryBuilders.termQuery("appealed","A");
        FilterAggregationBuilder appealedCountFilter = AggregationBuilders.filter("appealedCount", appealedBuilder);
        //申诉成功量
        QueryBuilder appealedSuccessBuilder = QueryBuilders.termQuery("appealed_success","A");
        FilterAggregationBuilder appealedSuccessCountFilter = AggregationBuilders.filter("appealedSuccessCount", appealedSuccessBuilder);
        //评过分的录音量
        QueryBuilder personScoreBuilder = QueryBuilders.boolQuery().must(QueryBuilders.existsQuery("score"));
        FilterAggregationBuilder personScoreCountFilter = AggregationBuilders.filter("personScoreCount", personScoreBuilder);

        //获取每个分组中的固定字段,返回1000000条,其中score用来计算中位数
        AggregationBuilder top = AggregationBuilders.topHits("result").fetchSource(includes, Strings.EMPTY_ARRAY).size(1000000);
        term.subAggregation(top);
        //录音量
        AggregationBuilder voiceCountBuilder = AggregationBuilders.cardinality("voiceCount").field("filename");
        //总时长
        AggregationBuilder durationAllBuilder = AggregationBuilders.sum("durationAll").field("duration");
        //平均时长
        AggregationBuilder durationAvgBuilder = AggregationBuilders.avg("durationAvg").field("duration");
        //平均得分(用总分/人工评过分的录音量)
        AggregationBuilder scoreSumBuilder = AggregationBuilders.sum("scoreSum").field("score");
        //平均得分
        AggregationBuilder scoreAvgBuilder = AggregationBuilders.avg("scoreAvg").field("score");
        //中位数得分--不会直接在es中计算,将score全部取出,内存中运算
        //AggregationBuilder scoreMidBuilder = AggregationBuilders.avg("scoreMid").field("score");
        //最高得分
        AggregationBuilder scoreMaxBuilder = AggregationBuilders.max("scoreMax").field("score");
        //最低得分
        AggregationBuilder scoreMinBuilder = AggregationBuilders.min("scoreMin").field("score");

        AggregationBuilder aggregation =
                term.subAggregation(voiceCountBuilder)
                        .subAggregation(durationAllBuilder)
                        .subAggregation(durationAvgBuilder)
                        .subAggregation(scoreSumBuilder)
                        .subAggregation(scoreAvgBuilder)
                        .subAggregation(scoreMaxBuilder)
                        .subAggregation(scoreMinBuilder)
                        .subAggregation(riskCountFilter)
                        .subAggregation(appealedCountFilter)
                        .subAggregation(appealedSuccessCountFilter)
                        .subAggregation(personScoreCountFilter)
                ;
        return aggregation;
    }
//对应的ES语句
GET my_index/_doc/_search
{
    "query":{
        "bool":{
            "must":[
                {
                    "range":{
                        "start_time":{
                            "from":"2020-05-31 16:00:00",
                            "to":"2020-07-03 15:59:59",
                            "include_lower":true,
                            "include_upper":true,
                            "boost":1
                        }
                    }
                }
            ],
            "adjust_pure_negative":true,
            "boost":1
        }
    },
    "aggregations":{
        "term":{
            "terms":{
                "field":"分组字段",
                "size":100000,
                "min_doc_count":1,
                "shard_min_doc_count":0,
                "show_term_doc_count_error":false,
                "order":[
                    {
                        "_count":"desc"
                    },
                    {
                        "_key":"asc"
                    }
                ]
            },
            "aggregations":{
                "result":{
                    "top_hits":{
                        "from":0,
                        "size":1000000,
                        "version":false,
                        "explain":false,
                        "_source":{
                            "includes":[
                                "rep_no",
                                "staff_name",
                                "rep_group",
                                "score"
                            ],
                            "excludes":[

                            ]
                        }
                    }
                },
                "voiceCount":{
                    "cardinality":{
                        "field":"filename"
                    }
                },
                "durationAll":{
                    "sum":{
                        "field":"duration"
                    }
                },
                "durationAvg":{
                    "avg":{
                        "field":"duration"
                    }
                },
                "scoreAvg":{
                    "avg":{
                        "field":"score"
                    }
                },
                "scoreMax":{
                    "max":{
                        "field":"score"
                    }
                },
                "scoreMin":{
                    "min":{
                        "field":"score"
                    }
                },
                "riskCount":{
                    "filter":{
                        "term":{
                            "is_risk":{
                                "value":"是",
                                "boost":1
                            }
                        }
                    }
                },
                "appealedCount":{
                    "filter":{
                        "term":{
                            "appealed":{
                                "value":"A",
                                "boost":1
                            }
                        }
                    }
                },
                "appealedSuccessCount":{
                    "filter":{
                        "term":{
                            "appealed_success":{
                                "value":"A",
                                "boost":1
                            }
                        }
                    }
                },
                "personScoreCount":{
                  "filter":{
                    "bool":{
                      "must":{
                      "exists":{
                        "field":"score"
                        }
                      }
                    }
                  }
               }
            }
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。