内容简介
1.spring整合es
2.普通查询的使用方法
3.聚合查询的使用方法
4.普通查询与聚合查询的使用区别
spring整合es
1.service层的接口
public interface EsClient {
public TransportClient getClient();
public QueryBuilder getQueryCondition(final String name, final String value);
}
2.serviceimpl层的实现类
@Service("esClient")
public class EsClientImpl implements EsClient{
@Value("${es.clustername}")
private String clusterName;
@Value("${es.port}")
private String port;
@Value("${es.hosts}")
private String hosts;
private TransportClient transportClient = null;
/**
* 条件分割器,多条件使用逗号分割
*
* @param name 属性名
* @param value 查询值
* @return
*/
public QueryBuilder getQueryCondition(final String name, final String value) {
if (value.contains(",")) {
final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);
for (final String v : value.split(",")) {
boolQuery.should(QueryBuilders.termQuery(name, v));
}
return boolQuery;
} else {
return QueryBuilders.termQuery(name, value);
}
}
@PostConstruct
public void init() throws Exception {
connectEs();
}
@PreDestroy
public void destory() {
disConnectEs();
}
private void disConnectEs() {
if (null != transportClient) {
transportClient.close();
}
}
private void connectEs() throws Exception {
Settings settings = Settings.builder().put("cluster.name", clusterName).build();
transportClient = new PreBuiltTransportClient(settings);
if (StringUtils.isNotBlank(hosts)) {
String[] hostArray = hosts.split(",");
for (String host : hostArray) {
InetSocketTransportAddress ist = new InetSocketTransportAddress(InetAddress.getByName(host), Integer.parseInt(port));
transportClient.addTransportAddress(ist);
}
}
}
public TransportClient getClient() {
return this.transportClient;
}
}
3.属性注入(多种属性文件,这里只是一种pom文件存放属性的方式)
<!--es-->
<sink.es.clustername>dolphin</sink.es.clustername>
<sink.es.port>9300</sink.es.port>
<sink.es.hosts>172.20.78.56,172.20.78.57,172.20.78.58</sink.es.hosts>
<sink.es.index>sinkreport</sink.es.index>
<sink.es.type>mg_sink_user_d</sink.es.type>
普通查询
1.关于es模块提供的查询
public SearchResponse queryFlowInspireActiveChart(CommonQueryParam commonQueryParam) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if (StringUtils.isNotEmpty(commonQueryParam.getProvince())) {
boolQueryBuilder.must(QueryBuilders.termQuery("province", commonQueryParam.getProvince()));
}
if (StringUtils.isNotEmpty(commonQueryParam.getCity())) {
boolQueryBuilder.must(QueryBuilders.termQuery("city", commonQueryParam.getCity()));
}
boolQueryBuilder.must(QueryBuilders.termQuery("content_id", "c0001"));
if (StringUtils.isNotEmpty(commonQueryParam.getStart()) && StringUtils.isNotEmpty(commonQueryParam.getEnd())) {
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("dayid");
rangeQuery.from(commonQueryParam.getStart());
rangeQuery.to(commonQueryParam.getEnd());
boolQueryBuilder.must(rangeQuery);
}
SearchRequestBuilder searchRequestBuilder = esClient.getClient().prepareSearch(index)
.setTypes(typeName).setSize(10000)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(boolQueryBuilder)
.addSort("dayid",SortOrder.ASC);//普通查询按照dayid升序排列
//打印es语句
LOGGER.debug(searchRequestBuilder.toString());
SearchResponse response = searchRequestBuilder.execute().actionGet();
return response;
}
2.业务层得到es层返回的数据进行业务开发
SearchHits hits = response.getHits();
Iterator<SearchHit> it = hits.iterator();
while(it.hasNext()){
Map<String,Object> map = it.next().getSource();
//日期
String dayid = map.get("dayid").toString())
//日新增手机用户数
String userNewActive = map.get("user_new_active").toString());
//日存量手机用户数 = 日活跃手机用户数减去日新增手机用户数
int resultNumber = Integer.valueOf(map.get("user_active").toString()) - Integer.valueOf(map.get("user_new_active").toString());
}
3.小总结
普通查询很简单,对于普通查询的各种条件,比如限定日期,限定省市,直接在boolQueryBuilder作用must即可。
对于对数据进行限定size分页,直接跟在esClient.getClient().prepareSearch(index)
.setTypes(typeName).setSize(10000)即可,这里就是设置返回最大1000条数据。
聚合查询
es的聚合是聚合,普通查询是普通查询,取数据的方式不同,所以不能两个同时使用,所以对数据进行限定的时候,应该放到聚合语句里面限定,关闭掉普通的size。
1.es聚合查询
public SearchResponse queryFlowInspireTopTenDataByProvince(
CommonQueryParam commonQueryParam) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if (StringUtils.isNotEmpty(commonQueryParam.getProvince())) {
boolQueryBuilder.must(QueryBuilders.termQuery("province", commonQueryParam.getProvince()));
}
//省数据内容 排除city=0001
if (StringUtils.isNotEmpty(commonQueryParam.getCity())) {
if("0001".equals(commonQueryParam.getCity())){
boolQueryBuilder.mustNot(QueryBuilders.termQuery("city", "0001"));
}
}
boolQueryBuilder.mustNot(QueryBuilders.termQuery("content_id", "c0001"));
boolQueryBuilder.mustNot(QueryBuilders.termQuery("content_name", "ignore"));
boolQueryBuilder.mustNot(QueryBuilders.termQuery("level1_name", "ignore"));
//排除掉-2异常数据
boolQueryBuilder.mustNot(QueryBuilders.termQuery("flow", "-2"));
boolQueryBuilder.mustNot(QueryBuilders.termQuery("flow_mobile", "-2"));
boolQueryBuilder.mustNot(QueryBuilders.termQuery("flow_wifi", "-2"));
boolQueryBuilder.mustNot(QueryBuilders.termQuery("avg_flow_mobile", "-2"));
if (StringUtils.isNotEmpty(commonQueryParam.getStart()) && StringUtils.isNotEmpty(commonQueryParam.getEnd())) {
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("dayid");
rangeQuery.from(commonQueryParam.getEnd());
rangeQuery.to(commonQueryParam.getEnd());
boolQueryBuilder.must(rangeQuery);
}
SearchRequestBuilder searchRequestBuilder = esClient.getClient().prepareSearch(index)
.setTypes(typeName).setSize(0)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(boolQueryBuilder);
//order对别名为contentName的桶数据按照求和后的flow进行降序排序
//如果不对求和数据进行排序,仅针对桶排序,即仅对contentName排序,很简单:将order的参数替换为Terms.Order.asc即可
searchRequestBuilder.addAggregation(AggregationBuilders.terms("contentName").field("content_name").size(2147483647).order(Terms.Order.aggregation("flow", false))
.subAggregation(AggregationBuilders.sum("flow").field("flow"))
.subAggregation(AggregationBuilders.sum("flowMobile").field("flow_mobile"))
.subAggregation(AggregationBuilders.sum("flowWifi").field("flow_wifi"))
.subAggregation(AggregationBuilders.sum("avgFlowMobile").field("avg_flow_mobile"))
);
LOGGER.debug(searchRequestBuilder.toString());
SearchResponse response = searchRequestBuilder.execute().actionGet();
return response;
}
以上因为使用了聚合,所以关闭的普通查询的数据size,将其设置为0,相反的,打开聚合需要的数据size。
注意:一定要看清楚aggregation的层级,在了解到es桶特性之后,其实想要排序只能对桶进行排序,一般情况下是对单个桶里面的数据进行排序,多个桶也能排序,只需要将求和,平均这些函数方法放到最后一个桶即可,但是es支持不友好很有可能多个桶返回的数据并没有按照预想的排序。
2.业务层获取es数据进行业务开发
Aggregations firstAggs = response.getAggregations();
if (null == firstAggs) {
return null;
}
Terms contentNameTerms = firstAggs.get("contentName");
for (Terms.Bucket contentNameBucket : contentNameTerms.getBuckets()) {
number++;
if(number >= 11){
break;
}
String contentName = contentNameBucket.getKeyAsString();
TableData td = new TableData();
td.setRank(String.valueOf(number));
td.setName(contentName);
td.setKind(this.queryLevel1NameByContentName(commonQueryParam,contentName));
Sum flow = contentNameBucket.getAggregations().get("flow");
td.setTotalFlow(String.format("%.0f", flow.getValue()));
Sum flowMobile = contentNameBucket.getAggregations().get("flowMobile");
td.setMobileFlow(String.format("%.0f", flowMobile.getValue()));
Sum flowWifi = contentNameBucket.getAggregations().get("flowWifi");
td.setWifiFlow(String.format("%.0f", flowWifi.getValue()));
Sum avgFlowMobile = contentNameBucket.getAggregations().get("avgFlowMobile");
td.setAvgFlow(String.format("%.2f", avgFlowMobile.getValue()*1024));
tableDataList.add(td);
}
return tableDataList;
}
要清楚聚合返回数据的结构要去了解es桶的原理,简单来说就是,有多少tems就要for循环多少次,tems循环出来的就是桶(bucket),一个桶可以就是group by的数据,通过这个桶可以拿到group by的字段名称比如contentName,然后可以根据这个桶再去拿桶里面聚合好的数据比如contentNameBucket.getAggregations().get("flow");
普通查询与聚合查询的使用区别
1.普通查询在javaApi里面只能用普通查询的方式获取值
2.聚合查询在javaApi里面只能用聚合查询方式获取值
3.由于两个的不相关性,他们的size要分别设置,如果要求聚合,那么在普通查询的size设置将失去意义,应该设置普通查询size参数为0,而在聚合查询的size参数尽量设置大一些,比如.size(2147483647),聚合的size越大越能保证桶内聚合的时候,sum(value)的数据尽量准确。
补充一个多字段聚合的例子说明多字段的层级关系
注意多少个group by就需要多少个terms,最后一个terms里面才放sum等函数聚合,而不是sum聚合函数放到terms的任意层级!
//设置用户运营指标聚合
private void setMemberServiceAgg(SearchRequestBuilder searchRequestBuilder) {
searchRequestBuilder.addAggregation(AggregationBuilders.terms("dayid").field("dayid").size(2147483647)
.subAggregation(AggregationBuilders.terms("provinceName").field("province_name").size(2147483647)
.subAggregation(AggregationBuilders.terms("cityName").field("city_name").size(2147483647).order(Terms.Order.term(true))
.subAggregation(AggregationBuilders.sum("sumNewPayMember").field("new_pay_member"))
.subAggregation(AggregationBuilders.sum("sumCancelMember").field("cancel_member"))
.subAggregation(AggregationBuilders.sum("sumMembers").field("members"))
.subAggregation(AggregationBuilders.sum("sumMemberActive").field("member_active"))
)
)
);
}