1.使用时调用该类中的方法:
package cn.com.cewell.search.esBase;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.apache.http.HttpHost;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg;
import org.elasticsearch.search.aggregations.metrics.cardinality.InternalCardinality;
import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
import org.elasticsearch.search.aggregations.metrics.min.InternalMin;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import java.lang.reflect.Field;
import java.util.*;
public class EsBaseUtils {
private static int TIME_OUT = 5 * 60 * 1000;
private static RestHighLevelClient restHighLevelClient = null;
private static String host = "127.0.0.1";
private static int port = 9200;
//获取客户端
private static RestHighLevelClient getClient() {
if (restHighLevelClient == null) {
buildClient();
}
return restHighLevelClient;
}
//构建客户端
private static void buildClient() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port))
.setMaxRetryTimeoutMillis(TIME_OUT)
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
restHighLevelClient = new RestHighLevelClient(builder);
System.out.println(">>>>>>buildClient>>>host:" + host + " port:" + port);
}
//判断索引是否存在
public static boolean indexExists(String index) {
GetIndexRequest request = new GetIndexRequest();
request.indices(index);
try {
return getClient().indices().exists(request, RequestOptions.DEFAULT);
} catch (Exception e) {
e.printStackTrace();
System.out.println(">>>>>>indexExists>>>index isExists fail! index=" + index);
return false;
}
}
//删除索引
public static void deleteIndex(String index) {
DeleteIndexRequest request = new DeleteIndexRequest(index);
request.indices(index);
try {
getClient().indices().delete(request, RequestOptions.DEFAULT);
System.out.println(">>>>>>deleteIndex>>>delete index success! index=" + index);
} catch (Exception e) {
e.printStackTrace();
System.out.println(">>>>>>deleteIndex>>>delete index fail! index=" + index);
}
}
//创建索引
public static boolean createIndexAndMapping(String index, String type, Class<?> clazz) {
if (indexExists(index)) {
// 存在就删除重新创建
deleteIndex(index);
}
try {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
CreateIndexResponse createIndexResponse = getClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);
if (!createIndexResponse.isAcknowledged()) {
System.out.println(">>>>>>createIndexAndMapping>>>create index error! index=" + index);
return false;
}
PutMappingRequest putMapping = Requests.putMappingRequest(index).type(type).source(createMapping(clazz));
AcknowledgedResponse response = getClient().indices().putMapping(putMapping, RequestOptions.DEFAULT);
if (!response.isAcknowledged()) {
System.out.println(">>>>>>createIndexAndMapping>>>create mapping error! index=" + index+" type="+type);
return false;
}
System.out.println(">>>>>>createIndexAndMapping>>>create mapping success! index=" + index+" type="+type);
} catch (Exception e) {
e.printStackTrace();
System.out.println(">>>>>>createIndexAndMapping>>>create error! index=" + index+" type="+type);
return false;
}
return true;
}
//查询数据-返回数据
public static JSONArray queryList(String index, String type, QueryRule rule) {
JSONArray jsonArray = new JSONArray();
try {
//拼接查询条件
SearchSourceBuilder sourceBuilder = search(rule);
if (null == sourceBuilder) {
return jsonArray;
}
SearchRequest searchRequest = new SearchRequest(index).types(type)
.searchType(SearchType.QUERY_THEN_FETCH)
.source(sourceBuilder);
SearchResponse response = getClient().search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
jsonArray.add(hit.getSourceAsMap());
}
System.out.println(">>>>>>queryList>>>size:" + jsonArray.size() + " jsonArray:" + jsonArray);
} catch (Exception e) {
e.printStackTrace();
System.out.println(">>>>>>queryList>>>size:" + jsonArray.size() + " 失败:" + e.getMessage());
}
return jsonArray;
}
//查询数据-聚合
public static JSONArray queryListAgg(String index, String type, QueryRule rule,List<String> groupFields){
JSONArray jsonArray = new JSONArray();
try {
//拼接普通查询条件
SearchSourceBuilder sourceBuilder = search(rule);
if (null == sourceBuilder) {
return jsonArray;
}
//拼接聚合条件
TermsAggregationBuilder termsBuilder = AggregationBuilders.terms(groupFields.get(0)).field(groupFields.get(0)).size(10000);
searchAgg(termsBuilder,groupFields,0);
sourceBuilder.aggregation(termsBuilder);
//查询
SearchRequest searchRequest = new SearchRequest(index).types(type)
.searchType(SearchType.QUERY_THEN_FETCH)
.source(sourceBuilder);
SearchResponse response = getClient().search(searchRequest, RequestOptions.DEFAULT);
//解析聚合结果
ArrayList<Object> results = results(response);
System.out.println(results);
} catch (Exception e) {
e.printStackTrace();
}
return jsonArray;
}
private static ArrayList<Object> results(SearchResponse agg){
Map<String, Aggregation> aggregations = agg.getAggregations().asMap();
ArrayList<Object> objects = new ArrayList<>();
for (Map.Entry<String, Aggregation> entry:aggregations.entrySet()) {
String key = entry.getKey();
Aggregation value = entry.getValue();
HashMap<String, Object> group = new HashMap<>();
parseAggs(value,group,key,objects);
}
return objects;
}
//解析聚合结果(解析结果逻辑有问题,导致返回的集合中没有数据,暂时没有解决)
private static ArrayList<Object> parseAggs(Aggregation agg, HashMap<String, Object> group,String field,ArrayList<Object> objects){
if (agg instanceof Terms){
for (Terms.Bucket bucket:((Terms) agg).getBuckets() ){
String keyAsString = bucket.getKeyAsString();
group.put(field,keyAsString);
for (Map.Entry<String, Aggregation> entry :bucket.getAggregations().asMap().entrySet()) {
String key = entry.getKey();
Aggregation value = entry.getValue();
if (value instanceof Terms){
parseAggs(value,group,key,objects);
}else {
LinkedHashMap<String, Object> map = package2map(bucket);
map.putAll(group);
objects.add(map);
break;
}
}
}
}
return objects;
}
private static LinkedHashMap<String, Object> package2map(Terms.Bucket bucket){
LinkedHashMap<String, Object> map = new LinkedHashMap<>();
for (Map.Entry<String, Aggregation> entry :bucket.getAggregations().asMap().entrySet()) {
String key = entry.getKey();
Aggregation value = entry.getValue();
map.put(key,getValue(value));
}
return map;
}
//取值
public static String getValue(Aggregation agg) {
String type = agg.getType();
String result = "";
switch (type) {
case "avg":
result = String.valueOf(((InternalAvg) agg).getValue());
break;
case "sum":
result = String.valueOf(((Sum) agg).getValue());
break;
case "value_count":
result = String.valueOf(((InternalValueCount) agg).getValue());
break;
case "min":
result = String.valueOf(((InternalMin) agg).getValue());
break;
case "max":
result = String.valueOf(((InternalMax) agg).getValue());
break;
case "cardinality":
result = String.valueOf(((InternalCardinality) agg).getValue());
break;
default:
result = String.valueOf(agg);
break;
}
return result;
}
//插入数据
public static int save(String index, String type, List<Map<String, Object>> result) {
BulkRequest bulkRequest = new BulkRequest();
for (Map<String, Object> map : result) {
bulkRequest.add(new IndexRequest(index, type).source(map));
}
//插入
try {
BulkResponse bulkResponse = getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulkResponse.hasFailures()) {
for (BulkItemResponse item : bulkResponse.getItems()) {
System.out.println(">>>>>>save>>>error1... msg=" + item.getFailureMessage());
}
}
} catch (Exception e) {
e.printStackTrace();
System.out.println(">>>>>>save>>>error2... msg=" + e.getMessage());
}
int count = bulkRequest.numberOfActions();
System.out.println(">>>>>>save>>>save success, count=" + count);
return count;
}
//修改数据-根据查询条件
public static void update(String index, String type, QueryRule rule, Map<String, Object> result) {
List<String> esId = getESId(index, type, rule);
if (esId.size() == 1) {
upsertEs(index, type, esId.get(0), result);
} else {
System.out.println(">>>>>>update>>>getESId number is not one!");
}
}
//删除数据-根据查询条件
public static void delete(String index, String type, QueryRule rule) {
List<String> esId = getESId(index, type, rule);
try {
for (String id : esId) {
DeleteRequest deleteRequest = new DeleteRequest(index, index, id);
getClient().delete(deleteRequest, RequestOptions.DEFAULT);
}
System.out.println(">>>>>>delete>>>delete success ids=" + esId);
} catch (Exception e) {
e.printStackTrace();
System.out.println(">>>>>>delete>>>delete fail id=" + esId);
}
}
//查询数据-返回响应
private static SearchSourceBuilder search(QueryRule rule) {
try {
//获取所有查询条件
List<QueryCriterion> criterions = rule.getCriterions();
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder builder = QueryBuilders.boolQuery();
//拼装查询条件
for (QueryCriterion criterion : criterions) {
builder.filter(criterion.getBuilder());
}
sourceBuilder.query(builder).explain(true);
//拼开始和数量
if (rule.getFirstResult() > -1 && rule.getMaxResults() > 0) {
sourceBuilder.from(rule.getFirstResult()).size(rule.getMaxResults());
}
//拼排序条件
List<String> sorts = rule.getOrderPropertyNames();
if (null != sorts && !sorts.isEmpty()) {
for (String s : sorts) {
sourceBuilder.sort(new FieldSortBuilder(s).order(rule.getSortMode()));
}
}
return sourceBuilder;
} catch (Exception e) {
e.printStackTrace();
System.out.println(">>>>>>queryList>>>search fail:" + e.getMessage());
}
return null;
}
//根据查询条件返回主键id
private static List<String> getESId(String index, String type, QueryRule rule) {
SearchSourceBuilder sourceBuilder = search(rule);
List<String> ids = new ArrayList<>();
if (null == sourceBuilder) {
return ids;
}
try {
SearchRequest searchRequest = new SearchRequest(index).types(type)
.searchType(SearchType.QUERY_THEN_FETCH)
.source(sourceBuilder);
SearchResponse response = getClient().search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
ids.add(hit.getId());
}
} catch (Exception e) {
e.printStackTrace();
}
return ids;
}
//批量更新ES
public static Integer upsertEs(String index, String type, String id, Map params) {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(getRequest(index, type, id, JSONObject.fromObject(params).toString()));
//插入
try {
BulkResponse bulkResponse = getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulkResponse.hasFailures()) {
for (BulkItemResponse item : bulkResponse.getItems()) {
System.out.println(">>>>>>upsertEs>>>error... msg=" + item.getFailureMessage());
}
}
} catch (Exception e) {
e.printStackTrace();
}
int count = bulkRequest.numberOfActions();
System.out.println(">>>>>>upsertEs>>>update success, count=" + count);
return count;
}
//批量更新设置参数
public static UpdateRequest getRequest(String index, String type, String id, String json) {
UpdateRequest updateRequest = new UpdateRequest(index, type, id);
//冲突重试设置
updateRequest.retryOnConflict(3);
//如果尚未存在,则表明必须将部分文档用作upsert文档
updateRequest.docAsUpsert(true);
//禁用noop检测
updateRequest.detectNoop(false);
//更新后是否获取_source
updateRequest.fetchSource(true);
//upsert
updateRequest.upsert(json, XContentType.JSON);
//指定doc
updateRequest.doc(json, XContentType.JSON);
return updateRequest;
}
//反射构建mapping
private static XContentBuilder createMapping(Class<?> clazz) {
XContentBuilder mapping = null;
try {
mapping = XContentFactory.jsonBuilder().startObject()
.startObject("properties");
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if (field.getType().getTypeName().equals("java.lang.Long")) {
mapping.startObject(field.getName()).field("type", "long").endObject();
} else if (field.getType().getTypeName().equals("java.lang.Integer") || field.getType().getTypeName().equals("int")) {
mapping.startObject(field.getName()).field("type", "integer").endObject();
} else if (field.getType().getTypeName().equals("java.lang.Double")) {
mapping.startObject(field.getName()).field("type", "double").endObject();
} else {
mapping.startObject(field.getName()).field("type", "keyword").endObject();
}
}
mapping.endObject().endObject();
} catch (Exception e) {
e.printStackTrace();
System.out.println(">>>>>>createMapping>>>create mapping error:" + e.getMessage());
}
return mapping;
}
//拼装聚合条件
private static void searchAgg(TermsAggregationBuilder termsBuilder, List<String> groupFields,int count) {
if (count < groupFields.size() - 1) {
count ++;
TermsAggregationBuilder termsBuilder1 = AggregationBuilders.terms(groupFields.get(count)).field(groupFields.get(count)).size(10000);
termsBuilder.subAggregation(termsBuilder1);
searchAgg(termsBuilder1, groupFields, count);
}
}
}
2.使用方法
@RequestMapping("/testQuery")
public void list(HttpServletRequest request, HttpServletResponse response){
System.out.println("测试查询ES数据");
QueryRule rule = new QueryRule();
rule.add(Restrictions.like("file_id", "123"))
.add(Restrictions.ge("start_time","2021-04-01 00:00:24"))
.add(Restrictions.le("start_time","2021-04-01 23:00:24"))
.addOrder("start_time").orderDesc();
EsBaseUtils.queryList("test", "type", rule);
}
3.查询前置条件拼装
package cn.com.cewell.search.esBase;
import org.elasticsearch.search.sort.SortOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class QueryRule {
private int firstResult;
private int maxResults;
List<QueryCriterion> criterions = new ArrayList<>();
private List<String> orderPropertyNames = new ArrayList<>();
private SortOrder sortMode = SortOrder.ASC;
public QueryRule add(QueryCriterion criterion) {
criterions.add(criterion);
return this;
}
public QueryRule clear(QueryCriterion criterion) {
criterions.clear();
firstResult = -1;
maxResults = 0;
return this;
}
public QueryRule addOrder(String ...name) {
orderPropertyNames.addAll(Arrays.asList(name));
return this;
}
public QueryRule orderDesc() {
this.sortMode = SortOrder.DESC;
return this;
}
public int getFirstResult() {
return firstResult;
}
public QueryRule setFirstResult(int firstResult) {
this.firstResult = firstResult;
return this;
}
public int getMaxResults() {
return maxResults;
}
public QueryRule setMaxResults(int maxResults) {
this.maxResults = maxResults;
return this;
}
public List<QueryCriterion> getCriterions() {
return criterions;
}
public List<String> getOrderPropertyNames() {
return orderPropertyNames;
}
public SortOrder getSortMode() {
return sortMode;
}
}
4.封装不同查询条件
package cn.com.cewell.search.esBase;
import java.util.Collection;
public class Restrictions {
public static QueryCriterion eq(String propertyName, Object value) {
return new SimpleExpression(propertyName, value, QueryCriterion.eq);
}
public static QueryCriterion eq(String propertyName, Object ...value) {
return new ValuesExpression(propertyName, value);
}
public static QueryCriterion like(String propertyName, Object value) {
return new SimpleExpression(propertyName, value, QueryCriterion.like);
}
public static QueryCriterion ne(String propertyName, Object value) {
return new SimpleExpression(propertyName, value, QueryCriterion.ne);
}
public static QueryCriterion gt(String propertyName, Object value) {
return new SimpleExpression(propertyName, value, QueryCriterion.gt);
}
public static QueryCriterion lt(String propertyName, Object value) {
return new SimpleExpression(propertyName, value, QueryCriterion.lt);
}
public static QueryCriterion ge(String propertyName, Object value) {
return new SimpleExpression(propertyName, value, QueryCriterion.ge);
}
public static QueryCriterion le(String propertyName, Object value) {
return new SimpleExpression(propertyName, value, QueryCriterion.le);
}
public static QueryCriterion matchQuery(String propertyName, Object value) {
return new SimpleExpression(propertyName, value, QueryCriterion.matchQuery);
}
public static QueryCriterion in(String propertyName, Collection<?> values) {
return new InExpression(propertyName, values);
}
public static QueryCriterion multipleFileds(String value, String ...propertyNames) {
return new MultipleFieldExpression(value, propertyNames);
}
}
5.常量
package cn.com.cewell.search.esBase;
import org.elasticsearch.index.query.QueryBuilder;
public abstract class QueryCriterion {
public abstract QueryBuilder getBuilder();
public static final String eq = "=";
public static final String like = "like";
public static final String ne = "<>";
public static final String gt = ">";
public static final String lt = "<";
public static final String ge = ">=";
public static final String le = "<=";
public static final String matchQuery = "matchQuery";
}
6.查询条件拼接
package cn.com.cewell.search.esBase;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
public class SimpleExpression extends QueryCriterion {
private String propertyName;
private Object value;
private String op;
public SimpleExpression(String propertyName, Object value, String op) {
this.propertyName = propertyName;
this.value = value;
this.op = op;
}
public String getPropertyName() {
return propertyName;
}
public Object getValue() {
return value;
}
public String getOp() {
return op;
}
@Override
public QueryBuilder getBuilder() {
QueryBuilder qb = null;
switch (op) {
case eq:
// qb = QueryBuilders.termQuery(propertyName, value);
//可能有多个值,此处的替换要根据实际数据格式调整
qb = QueryBuilders.regexpQuery(propertyName, value.toString().replaceAll(",", "|"));
break;
case like:
// qb = QueryBuilders.wildcardQuery(propertyName, "*"+value.toString()+"*");
//可能会有多个值
String[] split = value.toString().split(",");
String fileVal = "";
for (String s : split) {
fileVal+=".*"+s+".*|";
}
fileVal = fileVal.length() > 0 ? fileVal.substring(0, fileVal.length() - 1) : "";
if (!"".equals(fileVal)){
qb = QueryBuilders.regexpQuery(propertyName,fileVal);
}
break;
case ne:
break;
case gt:
qb = QueryBuilders.rangeQuery(propertyName).gt(value);
break;
case lt:
qb = QueryBuilders.rangeQuery(propertyName).lt(value);
break;
case ge:
qb = QueryBuilders.rangeQuery(propertyName).gte(value);
break;
case le:
qb = QueryBuilders.rangeQuery(propertyName).lte(value);
break;
case matchQuery:
qb = QueryBuilders.matchQuery(propertyName, value);
break;
}
return qb;
}
}
7.不同查询条件拼接---一次匹配多个值
package cn.com.cewell.search.esBase;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
public class ValuesExpression extends QueryCriterion {
private String propertyName;
private Object []values;
public ValuesExpression(String propertyName, Object ...values) {
this.propertyName = propertyName;
this.values = values;
}
public void setPropertyName(String propertyName) {
this.propertyName = propertyName;
}
public void setValues(Object[] values) {
this.values = values;
}
@Override
public QueryBuilder getBuilder() {
return QueryBuilders.termsQuery(propertyName, values);
}
}
8.不同查询条件拼接---一次匹配多个值
package cn.com.cewell.search.esBase;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import java.util.Collection;
public class InExpression extends QueryCriterion {
private String propertyName;
private Collection<?> values;
public InExpression(String propertyName, Collection<?> values) {
this.propertyName = propertyName;
this.values = values;
}
public String getPropertyName() {
return propertyName;
}
public Collection<?> getValues() {
return values;
}
@Override
public QueryBuilder getBuilder() {
return QueryBuilders.termsQuery(propertyName, values);
}
}
9.不同查询条件拼接---多字段匹配
package cn.com.cewell.search.esBase;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
public class MultipleFieldExpression extends QueryCriterion {
private String []propertyName;
private String value;
public MultipleFieldExpression(String value, String ...propertyName) {
this.propertyName = propertyName;
this.value = value;
}
public String[] getPropertyName() {
return propertyName;
}
public String getValue() {
return value;
}
@Override
public QueryBuilder getBuilder() {
return QueryBuilders.multiMatchQuery(value, propertyName);
}
}