elasticsearch
- java聚合api使用(多字段分组统计、聚合,最大最小值)
// select sum(event_count),sourceip,destip,min(storagetime),max(storagetime) from index where event_rule_id = 'xxx' group by sourceip,destip
Client client = ESClient.esClient();
String index = "indexName";
TermsBuilder sourceipTermsBuilder = AggregationBuilders.terms("sourceip").field("sourceip");
TermsBuilder destipTermsBuilder = AggregationBuilders.terms("destip").field("destip");
SumBuilder alarmtimesBuilder = AggregationBuilders.sum("alarmtimes").field("event_count");
MinBuilder firstalarmtimeBuilder = AggregationBuilders.min("firstalarmtime").field("storagetime");
MaxBuilder lastalarmtimeBuilder = AggregationBuilders.max("lastalarmtime").field("storagetime");
SearchResponse response = client.prepareSearch(index).setTypes(index)
.setQuery(QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("event_rule_id", "VBD_NFireWall_MV2.0_003_001")).must(QueryBuilders.rangeQuery("storagetime").from(1530273600000L).to(1530281360470L)))
.addAggregation(sourceipTermsBuilder.subAggregation(destipTermsBuilder.subAggregation(alarmtimesBuilder).subAggregation(firstalarmtimeBuilder).subAggregation(lastalarmtimeBuilder)))
.execute()
.actionGet();
Map<String, Aggregation> aggMap = response.getAggregations().asMap();
StringTerms sourceipAgg = (StringTerms) aggMap.get("sourceip");
List<Terms.Bucket> sourceipBucket = sourceipAgg.getBuckets();
for (int i = 0; i < sourceipBucket.size(); i++) {
//源ip
String sourceip = sourceipBucket.get(i).getKeyAsString();
//得到所有子聚合
Map destAggMap = sourceipBucket.get(i).getAggregations().asMap();
StringTerms destipAgg = (StringTerms)destAggMap.get("destip");
List<Terms.Bucket> buckets = destipAgg.getBuckets();
for (Terms.Bucket bucket : buckets) {
String destip = bucket.getKeyAsString();
Map<String, Aggregation> aggregationMap = bucket.getAggregations().asMap();
long alarmtimes = new Double(((InternalSum) (aggregationMap.get("alarmtimes"))).getValue()).longValue();
long firstalarmtime = new Double(((InternalMin) (aggregationMap.get("firstalarmtime"))).getValue()).longValue();
long lastalarmtime = new Double(((InternalMax) (aggregationMap.get("lastalarmtime"))).getValue()).longValue();
System.out.println("源ip: "+sourceip+"\t目的ip: "+destip+"\t次数: "+alarmtimes+"\t开始时间: "+firstalarmtime+"\t结束时间: "+lastalarmtime);
}
}