1. 在build.gradle中添加需要的jar包
elasticsearch: "org.elasticsearch:elasticsearch:7.5.1",
elasticsearch_client: "org.elasticsearch.client:transport:7.5.1",
springboot_elasticsearch: "org.springframework.boot:spring-boot-starter-data-elasticsearch:2.2.0.RELEASE"
请注意,SpringBoot是2.2.0.RELEASE才兼容elasticsearch 7.x
2. application.properties
在application.properties添加elasticsearch的配置
#es的默认名称,如果安装es时没有做特殊的操作名字都是此名称
spring.data.elasticsearch.cluster-name=my-application
# Elasticsearch 集群节点服务地址,用逗号分隔,如果没有指定其他就启动一个客户端节点,默认java访问端口9300
spring.data.elasticsearch.cluster-nodes=localhost:9300
# 设置连接超时时间
spring.data.elasticsearch.properties.transport.tcp.connect_timeout=120s
3. Aggregation基础实现
3.1 聚合基础
操作之前,我们首先建一个employee索引,employee索引类型包括8个字段:编号(id), 姓名(ename),性别(sex),年龄(age),籍贯(birthplace),部门名称(deptid),职位(job),工资(salary)。
生成的mapping如下:
(1)对单个字段group by
例如要计算每个部门的员工数,如果使用SQL语句,应表达如下:
select deptid, count(*) as emp_count
from employee
group by deptid
where sex=0
es的java代码如下:
public static void groupByTest(TransportClient client) {
SearchRequestBuilder requestBuilder = client.prepareSearch("esindex").setTypes("employee");
TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("emp_count").field("deptid");
requestBuilder.setQuery(QueryBuilders.termQuery("sex", 0)).addAggregation(termsAggregationBuilder);
SearchResponse response = requestBuilder.execute().actionGet();
Terms aggregation = response.getAggregations().get("emp_count");
for (Terms.Bucket bucket : aggregation.getBuckets()) {
System.out.println("部门编号=" + bucket.getKey() + ";员工数=" + bucket.getDocCount());
}
}
输出结果:
部门编号=8;员工数=31
部门编号=5;员工数=25
部门编号=6;员工数=22
部门编号=7;员工数=22
(2)group by多个field
例如要计算每个部门每个籍贯的员工数,如果使用SQL语句,应表达如下:
select deptid, birthplace, count(*) as emp_count
from employee
group by deptid, birthplace
es的java代码如下:
public static void groupByMutilFieldTest(TransportClient client) {
SearchRequestBuilder requestBuilder = client.prepareSearch("esindex").setTypes("employee");
TermsAggregationBuilder aggregationBuilder1 = AggregationBuilders.terms("emp_count").field("deptid");
TermsAggregationBuilder aggregationBuilder2 = AggregationBuilders.terms("region_count").field("birthplace");
requestBuilder.addAggregation(aggregationBuilder1.subAggregation(aggregationBuilder2));
SearchResponse response = requestBuilder.execute().actionGet();
Terms terms1 = response.getAggregations().get("emp_count");
Terms terms2;
for (Terms.Bucket bucket : terms1.getBuckets()) {
System.out.println("部门编号=" + bucket.getKey());
terms2 = bucket.getAggregations().get("region_count");
for (Terms.Bucket bucket2 : terms2.getBuckets()) {
System.out.println("籍贯=" + bucket2.getKey() + ";员工数=" + bucket2.getDocCount());
}
}
}
输出结果:
部门编号=8
籍贯=广东佛山;员工数=9
籍贯=湖北武汉;员工数=8
籍贯=湖南长沙;员工数=5
籍贯=广东深圳;员工数=4
籍贯=广东广州;员工数=3
籍贯=湖南岳阳;员工数=2
部门编号=5
籍贯=湖北武汉;员工数=7
籍贯=广东佛山;员工数=4
籍贯=广东广州;员工数=4
籍贯=广东深圳;员工数=4
籍贯=湖南岳阳;员工数=3
籍贯=湖南长沙;员工数=3
部门编号=6
籍贯=广东广州;员工数=6
籍贯=广东佛山;员工数=4
籍贯=广东深圳;员工数=4
籍贯=湖南岳阳;员工数=4
籍贯=湖北武汉;员工数=2
籍贯=湖南长沙;员工数=2
部门编号=7
籍贯=广东广州;员工数=6
籍贯=湖南岳阳;员工数=6
籍贯=广东深圳;员工数=4
籍贯=湖北武汉;员工数=3
籍贯=广东佛山;员工数=2
籍贯=湖南长沙;员工数=1
(3)max/min/sum/avg
例如计算每个部门最高的工资,如果使用SQL语句,应表达如下:
select deptid, max(salary) as max_salary
from employee
group by deptid
es的java代码如下:
public static void maxTest(TransportClient client) {
SearchRequestBuilder requestBuilder = client.prepareSearch("esindex").setTypes("employee");
TermsAggregationBuilder aggregationBuilder1 = AggregationBuilders.terms("deptid").field("deptid");
MaxAggregationBuilder aggregationBuilder2 = AggregationBuilders.max("maxsalary").field("salary");
requestBuilder.addAggregation(aggregationBuilder1.subAggregation(aggregationBuilder2));
SearchResponse response = requestBuilder.execute().actionGet();
Terms aggregation = response.getAggregations().get("deptid");
Max terms2 = null;
for (Terms.Bucket bucket : aggregation.getBuckets()) {
terms2 = bucket.getAggregations().get("maxsalary"); //class org.elasticsearch.search.aggregations.metrics.max.InternalMax
System.out.println("部门编号=" + bucket.getKey() + ";最高工资=" + terms2.getValue());
}
}
输出结果:
部门编号=8;最高工资=8000.0
部门编号=5;最高工资=8000.0
部门编号=6;最高工资=8000.0
部门编号=7;最高工资=8000.0
(4)对多个field求max/min/sum/avg
例如要计算每个部门的平均年龄,同时又要计算总薪资,最后按平均年龄升序排序,如果使用SQL语句,应表达如下:
select deptid, avg(age) as avg_age, sum(salary) as max_salary
from employee
group by deptid
order by avg_age asc
es的java代码如下:
public static void maxSumTest1(TransportClient client) {
SearchRequestBuilder requestBuilder = client.prepareSearch("esindex").setTypes("employee");
TermsAggregationBuilder aggregationBuilder1 = AggregationBuilders.terms("deptid").field("deptid")
.order(BucketOrder.aggregation("avg_age",true)); //按平均年龄升序排序,
AggregationBuilder aggregationBuilder2 = AggregationBuilders.avg("avg_age").field("age");
AggregationBuilder aggregationBuilder3 = AggregationBuilders.sum("sum_salary").field("salary");
requestBuilder.addAggregation(aggregationBuilder1.subAggregation(aggregationBuilder2).subAggregation(aggregationBuilder3));
SearchResponse response = requestBuilder.execute().actionGet();
Terms aggregation = response.getAggregations().get("deptid");
Avg terms2 = null;
Sum term3 = null;
for (Terms.Bucket bucket : aggregation.getBuckets()) {
terms2 = bucket.getAggregations().get("avg_age"); // org.elasticsearch.search.aggregations.metrics.avg.InternalAvg
term3 = bucket.getAggregations().get("sum_salary"); // org.elasticsearch.search.aggregations.metrics.sum.InternalSum
System.out.println("部门编号=" + bucket.getKey() + ";平均年龄=" + terms2.getValue() + ";总工资=" + term3.getValue());
}
}
输出结果:
部门编号=7;平均年龄=31.954545454545453;总工资=129000.0
部门编号=5;平均年龄=33.36;总工资=121000.0
部门编号=8;平均年龄=33.54838709677419;总工资=171000.0
部门编号=6;平均年龄=33.59090909090909;总工资=121000.0
3.2 ORDER 排序
1. 按桶的doc_count升序排序
AggregationBuilders
.terms("genders")
.field("gender")
.order(BucketOrder.count(true))
2. 按桶的英文字母顺序,按升序排列
AggregationBuilders
.terms("genders")
.field("gender")
.order(BucketOrder.key(true))
3.按单值 度量子聚合 (由聚合名称标识)对桶进行排序
AggregationBuilders
.terms("genders")
.field("gender")
.order(BucketOrder.aggregation("avg_height", false))
.subAggregation(
AggregationBuilders.avg("avg_height").field("height")
)
4. 按多个标准排序桶
AggregationBuilders
.terms("genders")
.field("gender")
.order(BucketOrder.compound( // in order of priority:
BucketOrder.aggregation("avg_height", false), // sort by sub-aggregation first
BucketOrder.count(true))) // then bucket count as a tie-breaker
.subAggregation(
AggregationBuilders.avg("avg_height").field("height")
)
3.3 Range Aggregation 范围聚合
AggregationBuilder aggregation =
AggregationBuilders
.range("agg")
.field("height")
.addUnboundedTo(1.0f) // from -infinity to 1.0 (excluded)
.addRange(1.0f, 1.5f) // from 1.0 to 1.5 (excluded)
.addUnboundedFrom(1.5f); // from 1.5 to +infinity
3.4 Date Range Aggregation 日期范围聚合
AggregationBuilder aggregation =
AggregationBuilders
.dateRange("agg")
.field("dateOfBirth")
.format("yyyy")
.addUnboundedTo("1950") // from -infinity to 1950 (excluded)
.addRange("1950", "1960") // from 1950 to 1960 (excluded)
.addUnboundedFrom("1960"); // from 1960 to +infinity
3.5 Ip Range Aggregation IP范围聚合
AggregationBuilder aggregation =
AggregationBuilders
.ipRange("agg")
.field("ip")
.addUnboundedTo("192.168.1.0") // from -infinity to 192.168.1.0 (excluded)
.addRange("192.168.1.0", "192.168.2.0") // from 192.168.1.0 to 192.168.2.0 (excluded)
.addUnboundedFrom("192.168.2.0"); // from 192.168.2.0 to +infinity
3.6 Histogram Aggregation 直方图聚合
AggregationBuilder aggregation =
AggregationBuilders
.histogram("agg")
.field("height")
.interval(1);
3.7 Date Histogram Aggregation 日期直方图句
AggregationBuilder aggregation =
AggregationBuilders
.dateHistogram("agg")
.field("dateOfBirth")
.dateHistogramInterval(DateHistogramInterval.YEAR);
或者你想要设置一个十天的间隔
AggregationBuilder aggregation =
AggregationBuilders
.dateHistogram("agg")
.field("dateOfBirth")
.dateHistogramInterval(DateHistogramInterval.days(10));
3.8 Geo Distance Aggregation 地理距离聚合
AggregationBuilder aggregation =
AggregationBuilders
.geoDistance("agg", new GeoPoint(48.84237171118314,2.33320027692004))
.field("address.location")
.unit(DistanceUnit.KILOMETERS)
.addUnboundedTo(3.0)
.addRange(3.0, 10.0)
.addRange(10.0, 500.0);