1.maven依赖:
<!-- 只有5.6.12以上的版本支持 -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>5.6.12</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>rest</artifactId>
<version>5.5.3</version>
</dependency>
<!-- 只有5.6.12以上的版本支持 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>5.6.12</version>
</dependency>
2.接入rest-higl-level-client
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticsearchConfiguration implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfiguration.class);
//ES地址
@Value("${spring.data.elasticsearch.host}")
private String host;
//ES端口
@Value("${spring.data.elasticsearch.port}")
private int port;
//ES用户名
@Value("${spring.data.elasticsearch.username}")
private String username;
//ES密码
@Value("${spring.data.elasticsearch.password}")
private String password;
//Java Low Level REST Client (要想使用高版本client必须依赖低版本的client)
private RestClient client;
//Java High Level REST Client (高版本client)
private RestHighLevelClient restHighLevelClient;
//销毁方法
@Override
public void destroy() throws Exception {
try {
LOGGER.info("Closing elasticSearch client");
if (client != null) {
client.close();
}
} catch (final Exception e) {
LOGGER.error("Error closing ElasticSearch client: ", e);
}
}
@Override
public RestHighLevelClient getObject() throws Exception {
return restHighLevelClient;
}
@Override
public Class<RestHighLevelClient> getObjectType() {
return RestHighLevelClient.class;
}
@Override
public boolean isSingleton() {
return false;
}
@Override
public void afterPropertiesSet() throws Exception {
buildClient();
}
//初始化client
protected void buildClient() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
client = RestClient.builder(new HttpHost(host, port))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
})
.build();
restHighLevelClient = new RestHighLevelClient(client);
}
}
3.index api
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "laimailai");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("index", "type", "1")
.source(jsonMap);
IndexResponse indexResponse = client.index(request);
4.get api
GetRequest getRequest = new GetRequest( "index","type","id");
GetResponse getResponse = client.get(request);
5.update api
UpdateRequest request = new UpdateRequest("index","type","id");
UpdateResponse updateResponse = client.update(request);
6.delete api
DeleteRequest request = new DeleteRequest("index","type", "1");
7.bulk api
//1.bulk
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("index", "type", "1")
.source(XContentType.JSON, "field", "foo"));
request.add(new IndexRequest("index", "type", "2")
.source(XContentType.JSON, "field", "bar"));
request.add(new IndexRequest("index", "type", "3")
.source(XContentType.JSON, "field", "baz"));
//同步
BulkResponse bulkResponse = client.bulk(request);
//异步
client.bulkAsync(request, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
8.bulkprocessor (根据请求的数量或大小,或在给定的时间段后自动执行批量操作)
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchUtil.class);
@Autowired
private RestHighLevelClient restHighLevelClient;
private BulkProcessor bulkProcessor;
@PostConstruct
public void init() {
Settings settings = Settings.builder().put("node.name", "").build();
ThreadPool threadPool = new ThreadPool(settings);
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
//bulk提交之前
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
LOGGER.info("Executing bulk [{}] with {} requests", executionId, numberOfActions);
}
//bulk提交以后
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
LOGGER.error("Bulk [{}] executed with failures,response = {}", executionId, response.buildFailureMessage());
} else {
LOGGER.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
}
}
//bulk提交以后并且返回异常
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
LOGGER.error("Failed to execute bulk", failure);
}
};
BulkProcessor bulkProcessor = new BulkProcessor.Builder(restHighLevelClient::bulkAsync, listener, threadPool)
// 2000条数据请求执行一次bulk
.setBulkActions(2000)
// 5mb的数据刷新一次bulk
.setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
// 并发请求数量, 0不并发, 1并发允许执行
.setConcurrentRequests(0)
// 固定5s必须刷新一次
.setFlushInterval(TimeValue.timeValueSeconds(5L))
// 重试3次,间隔1s
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3))
.build();
this.bulkProcessor = bulkProcessor;
}
@PreDestroy
public void destroy() {
try {
//执行关闭方法会把bulk剩余的数据都写入ES再执行关闭
bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.error("Failed to close bulkProcessor", e);
}
LOGGER.info("bulkProcessor closed!");
}
//容器设置好以后,每次只要调用对应的insert/update方法放入processor即可。
//bulkProcessor 会根据策略自动处理
/**
* 修改
*
* @param request
* @throws IOException
*/
public void update(UpdateRequest request) {
this.bulkProcessor.add(request);
}
/**
* 新增
*
* @param request
*/
public void insert(IndexRequest request) {
this.bulkProcessor.add(request);
}
9.upsert api
update --当id不存在时将会抛出异常
UpdateRequest request = new UpdateRequest(index, type, "1").doc(jsonMap);
UpdateResponse response = restHighLevelClient.update(request);
upsert--id不存在时就插入
UpdateRequest request = new UpdateRequest(index, type, "1").doc(jsonMap).upsert(jsonMap);
UpdateResponse response = restHighLevelClient.update(request);
10.search api
//全量搜索
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
searchRequest.source(searchSourceBuilder);
SearchRequest searchRequest = new SearchRequest("index");
//根据多个条件搜索
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
for (String id: ids) {
TermQueryBuilder termQueryBuilder = new TermQueryBuilder("id", id);
boolQueryBuilder.should(termQueryBuilder);
}
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(boolQueryBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse response = null;
response = restHighLevelClient.search(searchRequest);
return response;
11.search scroll api
//scroll 分页搜索
final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
SearchRequest searchRequest = new SearchRequest("posts");
searchRequest.scroll(scroll);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(matchQuery("title", "Elasticsearch"));
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest);
String scrollId = searchResponse.getScrollId();
SearchHit[] searchHits = searchResponse.getHits().getHits();
while (searchHits != null && searchHits.length > 0) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(scroll);
searchResponse = client.searchScroll(scrollRequest);
scrollId = searchResponse.getScrollId();
searchHits = searchResponse.getHits().getHits();
}
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest);
boolean succeeded = clearScrollResponse.isSucceeded();