摘要:Elasticsearch
,Java
本文为《Elasticsearch搜索引擎构建入门与实战》第七章内容的笔记
内容摘要
- 计算指定文档的字段统计量(sum/mean/max/count)
- 空值填充统计
- 分组统计(groupBy)
- 聚合结果排序(order)
- 聚合结果分组取topN(窗口函数)
- 聚合结果的后过滤(having)
计算指定文档的字段统计量
(1)sum,max,min,value_count,stats
es的聚合使用aggs
关键字,是DSL下的顶级,以sum为例,对指定文档计算统计量的DSL为
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"sum": {
"field": "model_1"
}
}
}
}
返回如下
"hits" : {
"total" : 13,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"my_agg" : {
"value" : 2.240000009536743
}
}
以上语法指定的GET的_search请求方式,然后指定了size:0
设置返回中没有文档信息,然后命名了一个my_agg作为返回聚合值的字段名,然后采用sun的聚合方式给到my_agg的value属性,注意返回的sum
值是小数位有偏差的,原始的加数都是保留两位小数的,其他指标avg
,max
,min
同理,对于count使用value_count
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"value_count": {
"field": "model_1"
}
}
}
}
返回如下
"aggregations" : {
"my_agg" : {
"value" : 4
}
}
在es的聚合语句中指定stats
参数就可以一起返回预设置的统计量
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"stats": {
"field": "model_1"
}
}
}
}
返回如下
"aggregations" : {
"my_agg" : {
"count" : 4,
"min" : 0.0,
"max" : 0.8299999833106995,
"avg" : 0.5600000023841858,
"sum" : 2.240000009536743
}
}
(2)和query一起使用
和query一起使用可以对指定范围的文档做聚合统计
GET /hotel/_doc/_search
{
"query": {
"term": {
"city": "HK"
}
},
"size": 0,
"aggs": {
"my_agg": {
"sum": {
"field": "model_1"
}
}
}
}
以上结果只会对query的输出求聚合
(3)Java客户端的使用
使用query之后再求Sum的代码如下,在query的同级构建aggregation
,具体通过AggregationBuilders
设置聚合方法和字段,在返回时先使用getAggregations
拿到聚合对象,然后拿到聚合指标对象,最后拿到聚合值,也就是从aggregations -> my_agg -> value的三级
public Double getHotelSum() {
double res = 0.0;
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(QueryBuilders.termQuery("city", "HK"))
.aggregation(AggregationBuilders.sum("my_agg").field("model_1"));
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse.status() == RestStatus.OK) {
if (searchResponse.getHits().getTotalHits() != 0) {
Aggregations aggregations = searchResponse.getAggregations();
Sum sum = aggregations.get("my_agg");
res = sum.getValue();
}
}
} catch (Exception e) {
e.printStackTrace();
}
return res;
}
注意如果query到的文档为空,或者聚合的field全为空,则聚合指标值可能存在初始化错误,比如Max返回-inf,Min返回inf,Sum返回0,avg返回inf,因此加了一层命中文档数searchResponse.getHits().getTotalHits()做判断,再看value_count的例子
public long getModel1Count() {
long res = 0;
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.aggregation(AggregationBuilders.count("my_agg").field("model_1"));
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse.status() == RestStatus.OK) {
Aggregations aggregations = searchResponse.getAggregations();
ValueCount count = aggregations.get("my_agg");
res = count.getValue();
}
} catch (Exception e) {
e.printStackTrace();
}
return res;
}
空值填充统计
空值在计算统计值的时候是略过的,比如在求均值时不会将控制的个数作为分母,es在统计的时候可以指定缺失值填充的策略,例如将所有model_1字段为空的在统计时当做0
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"avg": {
"field": "model_1"
}
}
}
}
输出
"aggregations" : {
"my_agg" : {
"value" : 1.9825000315904617
}
再加入填充逻辑
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"avg": {
"field": "model_1" ,
"missing": 0
}
}
}
}
输出的均值降低,因为加入了填充0的影响
"aggregations" : {
"my_agg" : {
"value" : 0.49562500789761543
}
}
添加填充的逻辑在Java中实现如下,在AggregationBuilders对象中增加missing
属性即可
public Double getHotelAvg() {
double res = 0.0;
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.aggregation(AggregationBuilders.avg("my_agg").field("model_1").missing(0.0));
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse.status() == RestStatus.OK) {
if (searchResponse.getHits().getTotalHits() != 0) {
Aggregations aggregations = searchResponse.getAggregations();
Avg avg = aggregations.get("my_agg");
res = avg.getValue();
}
}
} catch (Exception e) {
e.printStackTrace();
}
return res;
}
分组统计groupBy
除了直接统计聚合值,es支持分组统计,就是结构化数据的groupBy,以及更复杂的交叉表统计Pivot
分组字段一定是一个离散字段,对于keyword型自然支持,对于数值型(range)需要先分箱再作为分组列
a.使用terms设置分组字段
直接使用terms
设置字段,在命名的自定义字段的下一级,但是不指定聚合方式的化,会返回分组统计count
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"terms": {
"field": "city"
}
}
}
}
输出如下,同样空值是不被统计的,返回值在doc_count
字段下
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "HK",
"doc_count" : 2
},
{
"key" : "SZ",
"doc_count" : 1
}
]
注意es返回的doc_count是近似值,并不一定准确,因此es给出了doc_count_error_upper_bound
和sum_other_doc_count
,分别表示可能被遗漏的文档数量做大值,除了返回的文档外剩下的文档总数,再看一下boolean类型的字段分组统计
b.使用ranges设置分组字段
基于数值字段分箱之后获得每个组,使用from
,to
指定分箱的起点和终点,注意是包含起始点,不包含终点,左闭右开
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"range": {
"field": "degree3",
"ranges": [
{
"from": 1, "to":3
},
{
"from": 3
}
]
}
}
}
}
返回如下,确实是左闭右开,自动命名了key
"aggregations" : {
"my_agg" : {
"buckets" : [
{
"key" : "1.0-3.0",
"from" : 1.0,
"to" : 3.0,
"doc_count" : 2
},
{
"key" : "3.0-*",
"from" : 3.0,
"doc_count" : 3
}
]
}
}
也可以自定义分组的key名
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"range": {
"field": "degree3",
"ranges": [
{
"from": 1, "to":3, "key": "low_degree"
},
{
"from": 3,"key": "high_degree"
}
]
}
}
}
}
返回带有自命名的key
"aggregations" : {
"my_agg" : {
"buckets" : [
{
"key" : "low_degree",
"from" : 1.0,
"to" : 3.0,
"doc_count" : 2
},
{
"key" : "high_degree",
"from" : 3.0,
"doc_count" : 3
}
]
}
c.设置分组聚合指标
以上默认都是以doc_count
聚合获取计数,还可以结合其他统计指标,具体实在分组的同级(terms,range),再写一个aggs,设置聚合的字段和聚合方式,以及空值处理方式等等。以均值为例DSL如下
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"range": {
"field": "degree3",
"ranges": [
{
"from": 1, "to":3, "key": "low_degree"
},
{
"from": 3,"key": "high_degree"
}
]
},
"aggs": {
"my_avg": {
"avg": {
"field": "price"
}
}
}
}
}
}
以上是对degree做groupBy,以price求均值聚合,看一下返回
"aggregations" : {
"my_agg" : {
"buckets" : [
{
"key" : "low_degree",
"from" : 1.0,
"to" : 3.0,
"doc_count" : 2,
"my_avg" : {
"value" : 1514.0
}
},
{
"key" : "high_degree",
"from" : 3.0,
"doc_count" : 3,
"my_avg" : {
"value" : 1486990.6666666667
}
}
]
}
d.Java客户端groupBy分组统计代码
分别整理terms,range分组,以及带有聚合指标的代码
public void getTermsBucket() {
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.aggregation(AggregationBuilders.terms("my_agg").field("degree3"));
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse.status() == RestStatus.OK) {
if (searchResponse.getHits().getTotalHits() != 0) {
Aggregations aggregations = searchResponse.getAggregations();
Terms terms = aggregations.get("my_agg");
for (Terms.Bucket bucket : terms.getBuckets()) {
String bucketKey = bucket.getKeyAsString();
long docCount = bucket.getDocCount();
System.out.println(bucketKey + ":" + docCount);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
输出如下
3:2
1:1
2:1
4:1
在range分组的代码中使用addRange
和RangeAggregator.Range
来设置分组起始和结束,三元素分别是key名,起始,结束,没有就是null,输入的必须是double
public void getRangeBucket() {
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.aggregation(AggregationBuilders.range("my_agg").field("degree3")
.addRange(new RangeAggregator.Range("low_degree", null, 3d))
.addRange(new RangeAggregator.Range("high_degree", 3d, null)));
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse.status() == RestStatus.OK) {
if (searchResponse.getHits().getTotalHits() != 0) {
Aggregations aggregations = searchResponse.getAggregations();
Range range = aggregations.get("my_agg");
for (Range.Bucket bucket : range.getBuckets()) {
String bucketKey = bucket.getKeyAsString();
long docCount = bucket.getDocCount();
System.out.println(bucketKey + ":" + docCount);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
输出如下
low_degree:2
high_degree:3
在分组并且指定聚合指标时,需要在分组之后使用subAggregation
创建子聚合逻辑,在其中指定聚合名称好聚合字段,聚合方式,在返回结果时,使用getAggregations
对bucket拿到值
public void getRangeBucket() {
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.aggregation(AggregationBuilders.range("my_agg").field("degree3")
.addRange(new RangeAggregator.Range("low_degree", null, 3d))
.addRange(new RangeAggregator.Range("high_degree", 3d, null))
.subAggregation(AggregationBuilders.avg("my_avg").field("price")));
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse.status() == RestStatus.OK) {
if (searchResponse.getHits().getTotalHits() != 0) {
Aggregations aggregations = searchResponse.getAggregations();
Range range = aggregations.get("my_agg");
for (Range.Bucket bucket : range.getBuckets()) {
String bucketKey = bucket.getKeyAsString();
Avg avg = bucket.getAggregations().get("my_avg");
double value = avg.getValue();
System.out.println(bucketKey + ":" + value);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
返回结果如下
low_degree:1514.0
high_degree:1486990.6666666667
聚合结果截取和排序(order by)
可以指定聚合计算之后,根据聚合的key,或者结果的value进行排序,默认根据doc_count的大小进行降序排序,使用_count
排序达到同样的效果,写在分组方式terms内
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"terms": {
"field": "city",
"order":{
"_count": "desc"
}
}
}
}
}
也可以根据_key
进行排序,根据分组的key值排序
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"terms": {
"field": "city",
"order":{
"_key": "desc"
}
}
}
}
}
最要解决的还是对value进行排序,在排序的时候指定自定义的聚合字段名即可
GET /hotel/_doc/_search
{
"size": 0,
"aggs": {
"my_agg": {
"terms": {
"field": "city",
"order":{
"my_avg": "desc"
}
},
"aggs": {
"my_avg": {
"avg": {
"field": "price"
}
}
}
}
}
}
Java代码示例,在子聚合中加入order
属性,设置自定义字段以倒序排列BucketOrder.aggregation("my_avg", false)
public void getTermsBucket() {
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.aggregation(AggregationBuilders.terms("my_agg").field("degree3")
.subAggregation(AggregationBuilders.avg("my_avg").field("price"))
.order(BucketOrder.aggregation("my_avg", false)));
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse.status() == RestStatus.OK) {
if (searchResponse.getHits().getTotalHits() != 0) {
Aggregations aggregations = searchResponse.getAggregations();
Terms terms = aggregations.get("my_agg");
for (Terms.Bucket bucket : terms.getBuckets()) {
String bucketKey = bucket.getKeyAsString();
Avg avg = bucket.getAggregations().get("my_avg");
double value = avg.getValue();
System.out.println(bucketKey + ":" + value);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
聚合结果分组取topN(窗口函数)
能实现类似窗口函数的功能,比如分组求top1,在例子中想拿到每个实体的最新更新日期的那一条数据详情,先插入几条数据
PUT /stock
POST /stock/_doc/_mapping
{
"properties": {
"security_code": {"type": "keyword"},
"stock_price": {"type": "double"},
"date": {"type": "date", "format": "yyyy-MM-dd"}
}
}
POST /_bulk
{ "index": { "_index": "stock", "_type": "_doc"}}
{"security_code":"300124","stock_price":3.14,"date":"2021-01-01"}
{ "index": { "_index": "stock", "_type": "_doc"}}
{"security_code":"300124","stock_price":9.14,"date":"2021-01-02"}
{ "index": { "_index": "stock", "_type": "_doc"}}
{"security_code":"300124","stock_price":4.14,"date":"2021-01-03"}
{ "index": { "_index": "stock", "_type": "_doc"}}
{"security_code":"002334","stock_price":2.97,"date":"2021-01-02"}
{ "index": { "_index": "stock", "_type": "_doc"}}
{"security_code":"002334","stock_price":3.54,"date":"2021-01-03"}
{ "index": { "_index": "stock", "_type": "_doc"}}
{"security_code":"002334","stock_price":7.84,"date":"2021-01-04"}
{ "index": { "_index": "stock", "_type": "_doc"}}
{"security_code":"300198","stock_price":9.26,"date":"2021-01-02"}
{ "index": { "_index": "stock", "_type": "_doc"}}
{"security_code":"300198","stock_price":3.14,"date":"2021-01-01"}
下面查询最新的一条股价详情,在子聚合中指定top_hits
的sort
排序条件和返回条数size
,每个分组返回top1
GET /stock/_doc/_search
{
"size": 0,
"aggs": {
"type": {
"terms": {
"field": "security_code"
},"aggs": {
"latest_price": {
"top_hits": {
"sort": [{
"date": {"order": "desc"}
}],
"size": 1
}
}
}
}
}
}
查看输出
"buckets" : [
{
"key" : "002334",
"doc_count" : 3,
"latest_price" : {
"hits" : {
"total" : 3,
"max_score" : null,
"hits" : [
{
"_index" : "stock",
"_type" : "_doc",
"_id" : "8-d9HIABDkVv6XsnfJUi",
"_score" : null,
"_source" : {
"security_code" : "002334",
"stock_price" : 7.84,
"date" : "2021-01-04"
},
"sort" : [
1609718400000
]
}
]
}
}
},
{
"key" : "300124",
"doc_count" : 3,
"latest_price" : {
"hits" : {
"total" : 3,
"max_score" : null,
"hits" : [
{
"_index" : "stock",
"_type" : "_doc",
"_id" : "8Od9HIABDkVv6XsnfJUi",
"_score" : null,
"_source" : {
"security_code" : "300124",
"stock_price" : 4.14,
"date" : "2021-01-03"
},
"sort" : [
1609632000000
]
}
]
}
}
},
{
"key" : "300198",
"doc_count" : 2,
"latest_price" : {
"hits" : {
"total" : 2,
"max_score" : null,
"hits" : [
{
"_index" : "stock",
"_type" : "_doc",
"_id" : "9Od9HIABDkVv6XsnfJUi",
"_score" : null,
"_source" : {
"security_code" : "300198",
"stock_price" : 9.26,
"date" : "2021-01-02"
},
"sort" : [
1609545600000
]
}
]
}
}
}
]
Java客户端的实现
public void getLatestStockPrice() {
SearchRequest searchRequest = new SearchRequest("stock");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.aggregation(AggregationBuilders.terms("my_agg").field("security_code")
.subAggregation(AggregationBuilders.topHits("latest_price").sort("date").size(1)));
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse.status() == RestStatus.OK) {
if (searchResponse.getHits().getTotalHits() != 0) {
Aggregations aggregations = searchResponse.getAggregations();
Terms terms = aggregations.get("my_agg");
for (Terms.Bucket bucket : terms.getBuckets()) {
String bucketKey = bucket.getKeyAsString();
TopHits topHits = bucket.getAggregations().get("latest_price");
topHits.getHits().forEach(s -> System.out.println(s.getSourceAsMap()));
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
打印输出如下
{date=2021-01-02, security_code=002334, stock_price=2.97}
{date=2021-01-01, security_code=300124, stock_price=3.14}
{date=2021-01-01, security_code=300198, stock_price=3.14}