ES的一些聚合,过滤及基本查询

工作中遇到的一个分析的需求即按某个字段聚合,然后计算平均数,最大,最小值等一系列指标
遇到的问题:

  1. 中位数计算 , 暂不会在ES中直接计算 , 所以全部加载到内存中计算
  2. 分组的字段内 , 存储的数据是多个值,中间用空格分隔,需求是按每个值聚合 (暂不会实现)
  3. 聚合后统计符合某条件的数据,利用filter,然后取doc_count
//1.获取es客户端
 public PreBuiltTransportClient getClient() throws UnknownHostException {
        // 确定连接地址
        TransportAddress address = new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300);//ip和port
        // 确定连接的集群名称
        Settings settings = Settings.builder()
                .put("cluster.name", "elscluster")//填es的名字
                .put("client.transport.sniff", true)//启动嗅探功能
                .build();
        //创建ElasticSearch连接客户端,客户端需要使用到配置信息
        PreBuiltTransportClient client = new PreBuiltTransportClient(settings);
        client.addTransportAddress(address);
        return client;
    }
//此处参数主要用于基本查询
public List<Map<String, Object>> findAnalyseList(String params) throws Exception {
        //获取客户端
        PreBuiltTransportClient client = getClient();

        List<Map<String, Object>> jsonArr = new ArrayList<>();
        JSONObject dataS = new JSONObject();
        try {
            // 拼接基本查询
            BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
            SearchSourceBuilder query = new SearchSourceBuilder();
            getQueryBuilder(params, queryBuilder);
            query.query(queryBuilder);
            // 拼接聚合,分组查询
            AggregationBuilder termsBuilder = termsBuilder = getAgg(type);
            query.aggregation(termsBuilder);
            //查询   索引my_index、类型_doc
            SearchRequest request = Requests.searchRequest("my_index").types("_doc")
                    .searchType(SearchType.QUERY_THEN_FETCH)
                    .source(query);
            SearchResponse response = client.search(request).actionGet();
            // 以下为解析获取结果
            // 获取查询结果的aggregation部分
            Aggregations aggregations = response.getAggregations();
            Terms teamSum = aggregations.get("term");
            // 遍历桶聚合后的每一个结果集
            for (Terms.Bucket bucket : teamSum.getBuckets()) {
                //获取name为result的top_hits结果集
                TopHits topHits = bucket.getAggregations().get("result");
                SearchHits hits = topHits.getHits();
                SearchHit[] hitArray = hits.getHits();
                Map<String, Object> sourceMap = new HashMap<>();
                //遍历取 固定的值  和 所有 score 计算中位数
                List<Double> midScoreList = new ArrayList<>();
                for (int i = 0; i < hitArray.length; i++) {
                    if (i==0){
                          // 话务员工号 话务员组
                          String repNo = StringHelper.toString(hitArray[i].getSourceAsMap().get("rep_no"));
                          String staffName = StringHelper.toString(hitArray[i].getSourceAsMap().get("staff_name"));
                          String repGroup = StringHelper.toString(hitArray[i].getSourceAsMap().get("rep_group"));
                          sourceMap.put("rep_no",repNo);
                          sourceMap.put("staff_name",staffName);
                          sourceMap.put("rep_group",repGroup);
                    }
                    String score = score = StringHelper.toString(hitArray[i].getSourceAsMap().get("score"));

                    if (!"".equals(score)&&score!=null){
                        midScoreList.add(Double.parseDouble(score));
                    }
                }
                //计算中位数
                Double mid = 0.0;
                if (midScoreList.size() > 0) {
                    Collections.sort(midScoreList);
                    if (midScoreList.size() % 2 == 0) {
                        mid = (midScoreList.get(midScoreList.size() / 2) + midScoreList.get(midScoreList.size() / 2 - 1)) / 2;
                    } else {
                        mid =midScoreList.get(midScoreList.size() / 2);
                    }
                }
                sourceMap.put("scoreMid", mid);
                //获取聚合结果
                Map map = bucket.getAggregations().asMap();

                long voiceCount = bucket.getDocCount();
                double durationAll = ((InternalSum) map.get("durationAll")).value();
                double durationAvg = ((InternalAvg) map.get("durationAvg")).value();
                durationAvg = new BigDecimal(durationAvg).setScale(0,BigDecimal.ROUND_HALF_UP).doubleValue();
                //获取风险数量
                long riskVoiceCount =((InternalFilter) map.get("riskCount")).getDocCount();
                //获取申诉录音量
                long appealedCount =((InternalFilter) map.get("appealedCount")).getDocCount();
                //获取申诉成功录音量
                long appealedSuccessCount =((InternalFilter) map.get("appealedSuccessCount")).getDocCount();
                //总分
                double scoreSum = ((InternalSum) map.get("scoreSum")).value();
                //评过分的录音量
                long personScoreCount =((InternalFilter) map.get("personScoreCount")).getDocCount();
                double scoreAvg = 0.0;
                if (personScoreCount!=0){
                    scoreAvg = new BigDecimal(scoreSum/personScoreCount).setScale(2,BigDecimal.ROUND_HALF_UP).doubleValue();
                }
                //double scoreAvg = ((InternalAvg) map.get("scoreAvg")).value();
                // 是否为null
                //if (Double.isNaN(scoreAvg)) {
                //scoreAvg = 0;
                //}
                double scoreMax = ((InternalMax) map.get("scoreMax")).value();
                // 无穷大,无穷小
                if (Double.isInfinite(scoreMax)) {
                    scoreMax = 0;
                }
                double scoreMin = ((InternalMin) map.get("scoreMin")).value();
                if (Double.isInfinite(scoreMin)) {
                    scoreMin = 0;
                }
                if (voiceCount!=0){
                    durationAll = new BigDecimal((float) durationAll/3600).setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
                }
                sourceMap.put("voiceCount", voiceCount);
                sourceMap.put("durationAll", durationAll);
                sourceMap.put("durationAvg", durationAvg);
                sourceMap.put("scoreAvg", scoreAvg);
                sourceMap.put("scoreMax", scoreMax);
                sourceMap.put("scoreMin", scoreMin);
                sourceMap.put("riskVoiceCount", riskVoiceCount);
                sourceMap.put("appealedCount", appealedCount);
                sourceMap.put("appealedSuccessCount", appealedSuccessCount);

                DecimalFormat df = new DecimalFormat("0.00%");
                Double riskRate = 0.0;
                Double appealedCountRate = 0.0;
                Double appealedSuccessRate = 0.0;
                if (voiceCount != 0) {
                    riskRate = new BigDecimal((float) riskVoiceCount / voiceCount).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
                    appealedCountRate = new BigDecimal((float) appealedCount / voiceCount).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
                }
                if (appealedCount != 0) {
                    appealedSuccessRate = new BigDecimal((float) appealedSuccessCount / appealedCount).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
                }
                sourceMap.put("riskRate", df.format(riskRate));
                sourceMap.put("appealedCountRate", df.format(appealedCountRate));
                sourceMap.put("appealedSuccessRate", df.format(appealedSuccessRate));
                jsonArr.add(sourceMap);
            }
            System.out.println(jsonArr);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return jsonArr;
    }
public void getQueryBuilder(String params, BoolQueryBuilder queryBuilder) {
        JSONObject jsonObject = new JSONObject().fromObject(params);
        //机构领域
        if (jsonObject.containsKey("department")) {
            String department = jsonObject.getString("department");
            if (null != department && !"".equals(department)) {
                department = new DesJs().decrypt(department);
            }
            if (null != department && !"".equals(department)) {
                if (null != department && !"".equals(department)) {
                    BoolQueryBuilder shouldBuilder = QueryBuilders.boolQuery();
                    String[] quality = department.split(",");
                    for (String s : quality) {
                        shouldBuilder.should(QueryBuilders.wildcardQuery("area_of_job", "*" + s + "*"));
                    }
                    queryBuilder.must(shouldBuilder);
                }
            }
        }
        //时间
        String starTime = jsonObject.getString("startCreateDate");
        String endTime = jsonObject.getString("endCreateDate");
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        if (null != starTime && !"".equals(starTime)){
            queryBuilder.must(QueryBuilders.rangeQuery("start_time").gte(starTime).lte(endTime));
        }
        //时长
        String durationS = jsonObject.getString("duration_s");
        if (null != durationS && !"".equals(durationS)) {
            queryBuilder.must(QueryBuilders.rangeQuery("duration").gte(durationS));
        }
        String durationE = jsonObject.getString("duration_e");
        if (null != durationE && !"".equals(durationE)) {
            queryBuilder.must(QueryBuilders.rangeQuery("duration").lte(durationE));
        }
        //完全匹配
        String fileunique = jsonObject.getString("fileunique");
        if (null != fileunique && !"".equals(fileunique)) {
            if ("B".equals(fileunique)) {
                queryBuilder.must(QueryBuilders.termQuery("fileunique", fileunique));
            } else {
                queryBuilder.mustNot(QueryBuilders.termQuery("fileunique", "B"));
            }
        }
        //多值匹配
        String fileName = jsonObject.getString("filename");
        if (null != fileName && !"".equals(fileName)) {
            fileName = new DesJs().decrypt(fileName);
        }
        if (null != fileName && !"".equals(fileName)) {
            BoolQueryBuilder shouldBuilder = QueryBuilders.boolQuery();
            String[] quality = fileName.split(" ");
            for (String s : quality) {
                //完全匹配
                //shouldBuilder.should(QueryBuilders.termQuery("filename",s));
                //模糊匹配
                shouldBuilder.should(QueryBuilders.wildcardQuery("filename", "*" + s + "*"));
            }
            queryBuilder.must(shouldBuilder);
        }
    }
private AggregationBuilder getAgg(String type) {
        String[] includes = {"rep_no", "staff_name", "rep_group","score"};
        //根据固定字段分组,返回分组数
        TermsAggregationBuilder term = AggregationBuilders.terms("term").field(type).size(100000);
        //风险录音量
        QueryBuilder riskBuilder = QueryBuilders.termQuery("is_risk","是");
        FilterAggregationBuilder riskCountFilter = AggregationBuilders.filter("riskCount", riskBuilder);
        //申诉量
        QueryBuilder appealedBuilder = QueryBuilders.termQuery("appealed","A");
        FilterAggregationBuilder appealedCountFilter = AggregationBuilders.filter("appealedCount", appealedBuilder);
        //申诉成功量
        QueryBuilder appealedSuccessBuilder = QueryBuilders.termQuery("appealed_success","A");
        FilterAggregationBuilder appealedSuccessCountFilter = AggregationBuilders.filter("appealedSuccessCount", appealedSuccessBuilder);
        //评过分的录音量
        QueryBuilder personScoreBuilder = QueryBuilders.boolQuery().must(QueryBuilders.existsQuery("score"));
        FilterAggregationBuilder personScoreCountFilter = AggregationBuilders.filter("personScoreCount", personScoreBuilder);

        //获取每个分组中的固定字段,返回1000000条,其中score用来计算中位数
        AggregationBuilder top = AggregationBuilders.topHits("result").fetchSource(includes, Strings.EMPTY_ARRAY).size(1000000);
        term.subAggregation(top);
        //录音量
        AggregationBuilder voiceCountBuilder = AggregationBuilders.cardinality("voiceCount").field("filename");
        //总时长
        AggregationBuilder durationAllBuilder = AggregationBuilders.sum("durationAll").field("duration");
        //平均时长
        AggregationBuilder durationAvgBuilder = AggregationBuilders.avg("durationAvg").field("duration");
        //平均得分(用总分/人工评过分的录音量)
        AggregationBuilder scoreSumBuilder = AggregationBuilders.sum("scoreSum").field("score");
        //平均得分
        AggregationBuilder scoreAvgBuilder = AggregationBuilders.avg("scoreAvg").field("score");
        //中位数得分--不会直接在es中计算,将score全部取出,内存中运算
        //AggregationBuilder scoreMidBuilder = AggregationBuilders.avg("scoreMid").field("score");
        //最高得分
        AggregationBuilder scoreMaxBuilder = AggregationBuilders.max("scoreMax").field("score");
        //最低得分
        AggregationBuilder scoreMinBuilder = AggregationBuilders.min("scoreMin").field("score");

        AggregationBuilder aggregation =
                term.subAggregation(voiceCountBuilder)
                        .subAggregation(durationAllBuilder)
                        .subAggregation(durationAvgBuilder)
                        .subAggregation(scoreSumBuilder)
                        .subAggregation(scoreAvgBuilder)
                        .subAggregation(scoreMaxBuilder)
                        .subAggregation(scoreMinBuilder)
                        .subAggregation(riskCountFilter)
                        .subAggregation(appealedCountFilter)
                        .subAggregation(appealedSuccessCountFilter)
                        .subAggregation(personScoreCountFilter)
                ;
        return aggregation;
    }
//对应的ES语句
GET my_index/_doc/_search
{
    "query":{
        "bool":{
            "must":[
                {
                    "range":{
                        "start_time":{
                            "from":"2020-05-31 16:00:00",
                            "to":"2020-07-03 15:59:59",
                            "include_lower":true,
                            "include_upper":true,
                            "boost":1
                        }
                    }
                }
            ],
            "adjust_pure_negative":true,
            "boost":1
        }
    },
    "aggregations":{
        "term":{
            "terms":{
                "field":"分组字段",
                "size":100000,
                "min_doc_count":1,
                "shard_min_doc_count":0,
                "show_term_doc_count_error":false,
                "order":[
                    {
                        "_count":"desc"
                    },
                    {
                        "_key":"asc"
                    }
                ]
            },
            "aggregations":{
                "result":{
                    "top_hits":{
                        "from":0,
                        "size":1000000,
                        "version":false,
                        "explain":false,
                        "_source":{
                            "includes":[
                                "rep_no",
                                "staff_name",
                                "rep_group",
                                "score"
                            ],
                            "excludes":[

                            ]
                        }
                    }
                },
                "voiceCount":{
                    "cardinality":{
                        "field":"filename"
                    }
                },
                "durationAll":{
                    "sum":{
                        "field":"duration"
                    }
                },
                "durationAvg":{
                    "avg":{
                        "field":"duration"
                    }
                },
                "scoreAvg":{
                    "avg":{
                        "field":"score"
                    }
                },
                "scoreMax":{
                    "max":{
                        "field":"score"
                    }
                },
                "scoreMin":{
                    "min":{
                        "field":"score"
                    }
                },
                "riskCount":{
                    "filter":{
                        "term":{
                            "is_risk":{
                                "value":"是",
                                "boost":1
                            }
                        }
                    }
                },
                "appealedCount":{
                    "filter":{
                        "term":{
                            "appealed":{
                                "value":"A",
                                "boost":1
                            }
                        }
                    }
                },
                "appealedSuccessCount":{
                    "filter":{
                        "term":{
                            "appealed_success":{
                                "value":"A",
                                "boost":1
                            }
                        }
                    }
                },
                "personScoreCount":{
                  "filter":{
                    "bool":{
                      "must":{
                      "exists":{
                        "field":"score"
                        }
                      }
                    }
                  }
               }
            }
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,063评论 6 510
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,805评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,403评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,110评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,130评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,877评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,533评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,429评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,947评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,078评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,204评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,894评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,546评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,086评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,195评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,519评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,198评论 2 357