1:引入依赖
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.4.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.4.2</version>
</dependency>
2:elasticSearch配置
elasticSearch配置
es.userName=superuser
es.password=Dxm_123
es.host=10.157.24.122
es.port=8200
3:ES的配置
@Configuration
public class ElasticSearchConfig {
@Value("${es.userName}")
private String userName;
@Value("${es.password}")
private String password;
@Value("${es.host}")
private String esHost;
@Value("${es.port}")
private Integer esPort;
@Bean
public RestHighLevelClient esRestClient() {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
RestClientBuilder builder = RestClient.builder(new HttpHost(esHost, esPort))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
RestHighLevelClient client = new RestHighLevelClient(builder);
return client;
}
}
4:ES服务的接口
package com.dxm.insur.bi.biz.es;
import java.util.List;
import java.util.Map;
/**
* Created by huxiaona on 2020-10-12
**/
public interface ElasticSearchService {
/**
* json形式存储doc
* @param index
* @param docJsonString
* @param id
*/
void saveDoc(String index, String docJsonString, String id);
/**
* es搜索
* @param index
* @param where 范围查询对应Object为map且对应的key值分别为start和end
* or查询对应的Object为Set,且是模糊匹配
* in查询对应的Object为List
* 否则为单个精准匹配
* @param sortFieldsToAsc
* @param includeFields
* @param excludeFields
* @param timeOut
* @return
*/
List<Map<String, Object>> search(String index, Map<String, Object> where, Map<String, Boolean> sortFieldsToAsc, String[] includeFields, String[] excludeFields, int timeOut, String collapseFields);
}
5:ES服务的具体实现
package com.dxm.insur.bi.biz.es.impl;
import com.dxm.insur.bi.base.exception.InsureEtlException;
import com.dxm.insur.bi.base.exception.InsureEtlResponseCode;
import com.dxm.insur.bi.base.util.StringUtil;
import com.dxm.insur.bi.biz.es.ElasticSearchService;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Created by huxiaona on 2020-10-12
**/
@Service
public class ElasticSearchServiceImpl implements ElasticSearchService {
public static final Logger logger = LoggerFactory.getLogger(ElasticSearchService.class);
@Autowired
private RestHighLevelClient highLevelClient;
@Override
public void saveDoc(String index, String doc, String id) {
try {
IndexRequest indexRequest = new IndexRequest(index).id(id).source(doc, XContentType.JSON);
IndexResponse indexResponse = highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
logger.info("[ElasticSearchService] save doc of index:{} success, response:{}!", index, indexResponse);
} catch (Exception e) {
logger.error("[ElasticSearchService] save doc:{} of index:{} error", doc, index, e);
throw new InsureEtlException(InsureEtlResponseCode.ES_SAVE_DOC_ERROR);
}
}
@Override
public List<Map<String, Object>> search(String index, Map<String, Object> where, Map<String, Boolean> sortFieldsToAsc, String[] includeFields, String[] excludeFields,int timeOut, String collapseField) {
SearchResponse searchResponse = null;
logger.info("[ElasticSearchService]search of index:{} begin", index);
try {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 构造条件
if(where != null && !CollectionUtils.isEmpty(where)) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
where.forEach((k, v) -> {
// 范围查询或者 or 查询
if (v instanceof Map && v != null) {
// 包含start和end则为范围查询
Map<String, String> mapV = (Map<String, String>) v;
if (mapV.containsKey("start") && mapV.containsKey("end")) {
boolQueryBuilder.must(QueryBuilders.rangeQuery(k).
gte(mapV.get("start")).
lte(mapV.get("end")));
}
} else if (v instanceof Set && v != null) {
// or 查询
BoolQueryBuilder orQueryBuilder = QueryBuilders.boolQuery();
((Set) v).forEach(value -> {
orQueryBuilder.should(QueryBuilders.wildcardQuery(k, value.toString()));
});
boolQueryBuilder.must(orQueryBuilder);
} else if (v instanceof List && v != null) {
// in查询
boolQueryBuilder.must(QueryBuilders.termsQuery(k, (List<String>)v));
} else {
// 模糊匹配
boolQueryBuilder.must(QueryBuilders.termQuery(k, v.toString()));
}
});
sourceBuilder.query(boolQueryBuilder);
}
sourceBuilder.timeout(new TimeValue(timeOut, TimeUnit.SECONDS));
if (sortFieldsToAsc != null && !sortFieldsToAsc.isEmpty()) {
sortFieldsToAsc.forEach((k, v) -> {
sourceBuilder.sort(new FieldSortBuilder(k).order(v ? SortOrder.ASC : SortOrder.DESC));
});
}
// 指定字段去重折叠
if (!StringUtil.isBlank(collapseField)) {
sourceBuilder.collapse(new CollapseBuilder(collapseField));
}
sourceBuilder.size(10000);
sourceBuilder.fetchSource(includeFields, excludeFields);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(index);
searchRequest.source(sourceBuilder);
logger.info("[ElasticSearchService] search source:{}", searchRequest.source().toString());
searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
logger.info("[ElasticSearchService] search response:{}", searchResponse);
} catch (Exception e) {
logger.error("[ElasticSearchService] search error!", e);
throw new InsureEtlException(InsureEtlResponseCode.ES_QUERY_ERROR);
}
//解析返回
if (searchResponse.status() != RestStatus.OK) {
logger.error("[ElasticSearchService] search error!");
throw new InsureEtlException(InsureEtlResponseCode.ES_QUERY_ERROR);
}
if (searchResponse.getHits().getTotalHits().value <= 0) {
logger.info("[ElasticSearchService] search res is empty, return!");
return Collections.emptyList();
}
return Arrays.stream(searchResponse.getHits().getHits()).map(b -> {
return b.getSourceAsMap();
}).collect(Collectors.toList());
}
}
7:实际查询应用
// 组装查询条件
Map<String, Object> where = new HashMap<>();
where.put("eventDate", request.getDate());
if(!StringUtils.isEmpty(request.getItemId())) {
where.put("itemId", request.getItemId());
}
where.put("passId", request.getUserInfos());
String[] includeFields = {"uaId"};
StopWatch clock = new StopWatch();
clock.start();
List<Map<String, Object>> searchRes = elasticSearchService.search(CommonConstants.ES_H5_LOG_INDEX, where, null, includeFields, null, CommonConstants.ES_SEARCH_LIMIT_TIME, "uaId");
clock.stop();
long handlingTime1 = clock.getTime();
logger.info("--------------精确查找, 耗时: " + handlingTime1 + "ms");
// passId条件改为uaId
where.remove("passId");
where.put("uaId", new ArrayList<String>(uaIds));
List<String> spms = Splitter.on(",").splitToList(AppConfigUtil.getProperty(AppConfigConstants.USER_TRACK_SPM));
where.put("spm", new HashSet<>(spms));
Map<String, Boolean> sortFieldsToAsc = new HashMap<>();
sortFieldsToAsc.put("sessionId", true);
sortFieldsToAsc.put("requesttime", true);
includeFields = new String[]{"sessionId", "requesttime", "spm"};
clock.start();
searchRes = elasticSearchService.search(CommonConstants.ES_H5_LOG_INDEX, where, sortFieldsToAsc, includeFields, null, CommonConstants.ES_SEARCH_LIMIT_TIME, null);
clock.stop();
long handlingTime2 = clock.getTime();
logger.info("--------------模糊查找, 耗时: " + handlingTime2 + "ms");
8:存储的bean
package com.dxm.insur.bi.biz.log.bean;
import lombok.Data;
import java.util.Map;
/**
* Created by huxiaona on 2020-10-08
**/
@Data
public class InsurH5Log {
private String eventDate;
private String requesttime;
private String sessionId;
private String uaId;
private String passId;
private String channelId;
private String customerId;
private String userId;
private Long itemId;
private String spm;
private String prespm;
private String eventTag;
private String preeventTag;
private Integer from;
private String refer;
private String activityId;
private Map<String, String> extendParam;
private String ua;
private Map<String, String> apiParams;
}