ES的聚合主要有Bucket、Metric、Pipeline Aggregations ,本篇主要介绍Bucket、Metric这两种。
聚合:从数据中分组和提取数据。类似于 SQL GROUP BY 和 SQL 聚合函数
聚合语法:
"aggregations" : {
"<聚合名称 1>" : {
"<聚合类型>" : {
<聚合体内容>
}
[,"元数据" : { [<meta_data_body>] }]?
[,"aggregations" : { [<sub_aggregation>]+ }]?
}
[,"聚合名称 2>" : { ... }]*
}
实例数据
Elasticsearch 示例数据 accounts.json 下载
kibana中执行
POST /bank/account/_bulk
示例 1:搜索 address 中包含 Lane的所有人的年龄分布 ( 前 10 条 ) 以及平均年龄,以及平均薪资
DSL语句
GET bank/_search
{
"size": 0,
"query": {
"match": {
"address": "Lane"
}
},
"aggs": {
"ageAggs": {
"terms": {
"field": "age",
"size": 10
}
},
"avgAge": {
"avg": {
"field": "age"
}
},
"avgPrice": {
"avg": {
"field": "balance"
}
}
}
}
java代码
public void testSearchData() throws IOException {
SearchRequest request = new SearchRequest();
request.indices("bank");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 1.1)address 中包含 road 的所有人
sourceBuilder.query(QueryBuilders.matchQuery("address", "road"));
// 1.2)按照年龄分布进行聚合
TermsAggregationBuilder ageAgg = AggregationBuilders.terms("ageAgg").field("age").size(10);
sourceBuilder.aggregation(ageAgg);
// 1.3)计算平均薪资
AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");
sourceBuilder.aggregation((balanceAvg));
System.out.println("检索参数:" + sourceBuilder.toString());
request.source(sourceBuilder);
// 2、执行检索
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 3、分析结果
System.out.println(response.toString());
// 3.1)获取查到的数据。
SearchHits hits = response.getHits();
// 3.2)获取真正命中的结果
SearchHit[] searchHits = hits.getHits();
// 3.3)遍历命中结果
for (SearchHit hit : searchHits) {
String hitStr = hit.getSourceAsString();
BankMember bankMember = JSON.parseObject(hitStr, BankMember.class);
System.out.println(bankMember);
}
// 3.4)获取聚合信息
// 参考文档:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-search.html
Aggregations aggregations = response.getAggregations();
Terms ageAgg1 = aggregations.get("ageAgg");
for (Terms.Bucket bucket : ageAgg1.getBuckets()) {
String keyAsString = bucket.getKeyAsString();
System.out.println("用户年龄: " + keyAsString + " 人数:" + bucket.getDocCount());
}
Avg balanceAvg1 = aggregations.get("balanceAvg");
System.out.println("平均薪资:" + balanceAvg1.getValue());
}
kibana查询结果
java执行结果
示例 2:按照年龄分组,然后将分组后的结果按照性别分组,然后查询出这些分组后的平均薪资
DSL语句
GET bank/_search
{
"size": 0,
"query": {
"match_all": {}
},
"aggs": {
"ageAggs": {
"terms": {
"field": "age",
"size": 10
},
"aggs": {
"genderAgg": {
"terms": {
"field": "gender.keyword",
"size": 10
},
"aggs": {
"avgBalance": {
"avg": {
"field": "balance"
}
}
}
}
}
}
}
}
java代码
@Test
public void testAgeAggs2() throws IOException {
SearchRequest request = new SearchRequest("bank");
SearchSourceBuilder searchSourceBuilder = request.source();
TermsAggregationBuilder ageAgg = AggregationBuilders.terms("ageAgg").field("age").size(10);
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
ageAgg.subAggregation(AggregationBuilders.terms("genderAgg")
.field("gender.keyword")
.subAggregation(AggregationBuilders.avg("balanceAvg")
.field("balance"))
);
searchSourceBuilder.aggregation(ageAgg);
request.source(searchSourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
Aggregations aggregations = response.getAggregations();
//拿到桶
Terms ageAgg1 = aggregations.get("ageAgg");
//最外层年龄桶
for (Terms.Bucket bucket : ageAgg1.getBuckets()) {
String ageString = bucket.getKeyAsString();
long ageDocCount = bucket.getDocCount();
Aggregations genderAggregations = bucket.getAggregations();
//年龄桶
Terms genderTerm = genderAggregations.get("genderAgg");
for (Terms.Bucket genderTermbucket : genderTerm.getBuckets()) {
String genderString = genderTermbucket.getKeyAsString();
long genderdocCount = genderTermbucket.getDocCount();
//年龄指标
Avg balanceAvg = genderTermbucket.getAggregations().get("balanceAvg");
System.out.println("用户年龄: " + ageString + " 人数:" + ageDocCount + "用户性别: " + genderString + " 人数:" + genderdocCount+ "平均薪资:" + balanceAvg.getValue());
}
}
}
kibana查询视图
java查询结果