Spring集成ES的聚合查询

1. 在build.gradle中添加需要的jar包

elasticsearch:                  "org.elasticsearch:elasticsearch:7.5.1",
elasticsearch_client:           "org.elasticsearch.client:transport:7.5.1",
springboot_elasticsearch:       "org.springframework.boot:spring-boot-starter-data-elasticsearch:2.2.0.RELEASE"

请注意,SpringBoot是2.2.0.RELEASE才兼容elasticsearch 7.x

2. application.properties

在application.properties添加elasticsearch的配置

#es的默认名称,如果安装es时没有做特殊的操作名字都是此名称
spring.data.elasticsearch.cluster-name=my-application
# Elasticsearch 集群节点服务地址,用逗号分隔,如果没有指定其他就启动一个客户端节点,默认java访问端口9300
spring.data.elasticsearch.cluster-nodes=localhost:9300
# 设置连接超时时间
spring.data.elasticsearch.properties.transport.tcp.connect_timeout=120s

3. Aggregation基础实现

3.1 聚合基础

操作之前,我们首先建一个employee索引,employee索引类型包括8个字段:编号(id), 姓名(ename),性别(sex),年龄(age),籍贯(birthplace),部门名称(deptid),职位(job),工资(salary)。

生成的mapping如下:

(1)对单个字段group by

例如要计算每个部门的员工数,如果使用SQL语句,应表达如下:

select deptid, count(*) as emp_count 
from employee 
group by deptid 
where sex=0

es的java代码如下:

public static void groupByTest(TransportClient client) {
        SearchRequestBuilder requestBuilder = client.prepareSearch("esindex").setTypes("employee");
        TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("emp_count").field("deptid");
        requestBuilder.setQuery(QueryBuilders.termQuery("sex", 0)).addAggregation(termsAggregationBuilder);

        SearchResponse response = requestBuilder.execute().actionGet();
        
        Terms aggregation = response.getAggregations().get("emp_count");
        for (Terms.Bucket bucket : aggregation.getBuckets()) {
            System.out.println("部门编号=" + bucket.getKey() + ";员工数=" + bucket.getDocCount());
        }
    }

输出结果:

部门编号=8;员工数=31
部门编号=5;员工数=25
部门编号=6;员工数=22
部门编号=7;员工数=22

(2)group by多个field

例如要计算每个部门每个籍贯的员工数,如果使用SQL语句,应表达如下:

select deptid, birthplace, count(*) as emp_count 
from employee 
group by deptid, birthplace

es的java代码如下:

public static void groupByMutilFieldTest(TransportClient client) {
        SearchRequestBuilder requestBuilder = client.prepareSearch("esindex").setTypes("employee");
        TermsAggregationBuilder aggregationBuilder1 = AggregationBuilders.terms("emp_count").field("deptid");
        TermsAggregationBuilder aggregationBuilder2 = AggregationBuilders.terms("region_count").field("birthplace");
        requestBuilder.addAggregation(aggregationBuilder1.subAggregation(aggregationBuilder2));
        
        SearchResponse response = requestBuilder.execute().actionGet();
        
        Terms terms1 = response.getAggregations().get("emp_count");
        Terms terms2;
        for (Terms.Bucket bucket : terms1.getBuckets()) {
            System.out.println("部门编号=" + bucket.getKey());
            terms2 = bucket.getAggregations().get("region_count");
            for (Terms.Bucket bucket2 : terms2.getBuckets()) {
                System.out.println("籍贯=" + bucket2.getKey() + ";员工数=" + bucket2.getDocCount());
            }
        }
    }

输出结果:

部门编号=8
籍贯=广东佛山;员工数=9
籍贯=湖北武汉;员工数=8
籍贯=湖南长沙;员工数=5
籍贯=广东深圳;员工数=4
籍贯=广东广州;员工数=3
籍贯=湖南岳阳;员工数=2
部门编号=5
籍贯=湖北武汉;员工数=7
籍贯=广东佛山;员工数=4
籍贯=广东广州;员工数=4
籍贯=广东深圳;员工数=4
籍贯=湖南岳阳;员工数=3
籍贯=湖南长沙;员工数=3
部门编号=6
籍贯=广东广州;员工数=6
籍贯=广东佛山;员工数=4
籍贯=广东深圳;员工数=4
籍贯=湖南岳阳;员工数=4
籍贯=湖北武汉;员工数=2
籍贯=湖南长沙;员工数=2
部门编号=7
籍贯=广东广州;员工数=6
籍贯=湖南岳阳;员工数=6
籍贯=广东深圳;员工数=4
籍贯=湖北武汉;员工数=3
籍贯=广东佛山;员工数=2
籍贯=湖南长沙;员工数=1

(3)max/min/sum/avg

例如计算每个部门最高的工资,如果使用SQL语句,应表达如下:

select deptid, max(salary) as max_salary 
from employee 
group by deptid

es的java代码如下:

public static void maxTest(TransportClient client) {
        SearchRequestBuilder requestBuilder = client.prepareSearch("esindex").setTypes("employee");

        TermsAggregationBuilder aggregationBuilder1 = AggregationBuilders.terms("deptid").field("deptid");
        MaxAggregationBuilder aggregationBuilder2 = AggregationBuilders.max("maxsalary").field("salary");
        requestBuilder.addAggregation(aggregationBuilder1.subAggregation(aggregationBuilder2));
        SearchResponse response = requestBuilder.execute().actionGet();

        Terms aggregation = response.getAggregations().get("deptid");
        Max terms2 = null;
        for (Terms.Bucket bucket : aggregation.getBuckets()) {
            terms2 = bucket.getAggregations().get("maxsalary");  //class org.elasticsearch.search.aggregations.metrics.max.InternalMax
            System.out.println("部门编号=" + bucket.getKey() + ";最高工资=" + terms2.getValue());
        }
    }

输出结果:

部门编号=8;最高工资=8000.0
部门编号=5;最高工资=8000.0
部门编号=6;最高工资=8000.0
部门编号=7;最高工资=8000.0

(4)对多个field求max/min/sum/avg

例如要计算每个部门的平均年龄,同时又要计算总薪资,最后按平均年龄升序排序,如果使用SQL语句,应表达如下:

select deptid, avg(age) as avg_age, sum(salary) as max_salary 
from employee 
group by deptid 
order by avg_age asc

es的java代码如下:

public static void maxSumTest1(TransportClient client) {
        SearchRequestBuilder requestBuilder = client.prepareSearch("esindex").setTypes("employee");

        TermsAggregationBuilder aggregationBuilder1 = AggregationBuilders.terms("deptid").field("deptid")
                .order(BucketOrder.aggregation("avg_age",true)); //按平均年龄升序排序,
        AggregationBuilder aggregationBuilder2 = AggregationBuilders.avg("avg_age").field("age");
        AggregationBuilder aggregationBuilder3 = AggregationBuilders.sum("sum_salary").field("salary");
        requestBuilder.addAggregation(aggregationBuilder1.subAggregation(aggregationBuilder2).subAggregation(aggregationBuilder3));
        SearchResponse response = requestBuilder.execute().actionGet();

        Terms aggregation = response.getAggregations().get("deptid");
        Avg terms2 = null;
        Sum term3 = null;
        for (Terms.Bucket bucket : aggregation.getBuckets()) {
            terms2 = bucket.getAggregations().get("avg_age"); // org.elasticsearch.search.aggregations.metrics.avg.InternalAvg
            term3 = bucket.getAggregations().get("sum_salary"); // org.elasticsearch.search.aggregations.metrics.sum.InternalSum
            System.out.println("部门编号=" + bucket.getKey() + ";平均年龄=" + terms2.getValue() + ";总工资=" + term3.getValue());
        }
    }

输出结果:

部门编号=7;平均年龄=31.954545454545453;总工资=129000.0
部门编号=5;平均年龄=33.36;总工资=121000.0
部门编号=8;平均年龄=33.54838709677419;总工资=171000.0
部门编号=6;平均年龄=33.59090909090909;总工资=121000.0

3.2 ORDER 排序

1. 按桶的doc_count升序排序

AggregationBuilders
    .terms("genders")
    .field("gender")
    .order(BucketOrder.count(true))

2. 按桶的英文字母顺序,按升序排列

AggregationBuilders
    .terms("genders")
    .field("gender")
    .order(BucketOrder.key(true))

3.按单值 度量子聚合 (由聚合名称标识)对桶进行排序

AggregationBuilders
    .terms("genders")
    .field("gender")
    .order(BucketOrder.aggregation("avg_height", false))
    .subAggregation(
        AggregationBuilders.avg("avg_height").field("height")
    )

4. 按多个标准排序桶

AggregationBuilders
    .terms("genders")
    .field("gender")
    .order(BucketOrder.compound( // in order of priority:
        BucketOrder.aggregation("avg_height", false), // sort by sub-aggregation first
        BucketOrder.count(true))) // then bucket count as a tie-breaker
    .subAggregation(
        AggregationBuilders.avg("avg_height").field("height")
    )

3.3 Range Aggregation 范围聚合

AggregationBuilder aggregation =
        AggregationBuilders
                .range("agg")
                .field("height")
                .addUnboundedTo(1.0f)               // from -infinity to 1.0 (excluded)
                .addRange(1.0f, 1.5f)               // from 1.0 to 1.5 (excluded)
                .addUnboundedFrom(1.5f);            // from 1.5 to +infinity

3.4 Date Range Aggregation 日期范围聚合

AggregationBuilder aggregation =
        AggregationBuilders
                .dateRange("agg")
                .field("dateOfBirth")
                .format("yyyy")
                .addUnboundedTo("1950")    // from -infinity to 1950 (excluded)
                .addRange("1950", "1960")  // from 1950 to 1960 (excluded)
                .addUnboundedFrom("1960"); // from 1960 to +infinity

3.5 Ip Range Aggregation IP范围聚合

AggregationBuilder aggregation =
        AggregationBuilders
                .ipRange("agg")
                .field("ip")
                .addUnboundedTo("192.168.1.0")             // from -infinity to 192.168.1.0 (excluded)
                .addRange("192.168.1.0", "192.168.2.0")    // from 192.168.1.0 to 192.168.2.0 (excluded)
                .addUnboundedFrom("192.168.2.0");          // from 192.168.2.0 to +infinity

3.6 Histogram Aggregation 直方图聚合

AggregationBuilder aggregation =
        AggregationBuilders
                .histogram("agg")
                .field("height")
                .interval(1);

3.7 Date Histogram Aggregation 日期直方图句

AggregationBuilder aggregation =
        AggregationBuilders
                .dateHistogram("agg")
                .field("dateOfBirth")
                .dateHistogramInterval(DateHistogramInterval.YEAR);

或者你想要设置一个十天的间隔

AggregationBuilder aggregation =
        AggregationBuilders
                .dateHistogram("agg")
                .field("dateOfBirth")
                .dateHistogramInterval(DateHistogramInterval.days(10));

3.8 Geo Distance Aggregation 地理距离聚合

AggregationBuilder aggregation =
        AggregationBuilders
                .geoDistance("agg", new GeoPoint(48.84237171118314,2.33320027692004))
                .field("address.location")
                .unit(DistanceUnit.KILOMETERS)
                .addUnboundedTo(3.0)
                .addRange(3.0, 10.0)
                .addRange(10.0, 500.0);
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容