依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
接口
public interface BaseElasticsearchRepository<T, ID extends Serializable> {
/**
* 异步批量提交
* @param entities 操作实体对象
* @param indexName 索引名称
*/
void bulkAsync(String indexName, List<T> entities);
/**
* 同步批量提交
* @param entities 操作实体对象
* @param indexName 索引名称
*/
void bulkSync(String indexName, List<T> entities);
}
实现
@Slf4j
public class BaseElasticsearchRepositoryImpl<T, ID extends Serializable> implements BaseElasticsearchRepository<T, ID> {
@Resource
private RestHighLevelClient restHighLevelClient;
private ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
log.info("Bulk indexing is success, cost time ms is : [{}],result={}", bulkItemResponses.getTook().duration(),bulkItemResponses.buildFailureMessage());
}
@Override
public void onFailure(Exception e) {
log.error("Bulk indexing is failure", e);
e.printStackTrace();
}
};
@Override
public void bulkAsync(String indexName, List<T> entities) {
BulkRequest bulkRequest = new BulkRequest();
for (T entity : entities) {
bulkRequest.add(new IndexRequest(indexName)
.source(JSON.toJSONString(entity), XContentType.JSON));
}
restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
}
@Override
public void bulkSync(String indexName, List<T> entities) {
BulkRequest bulkRequest = new BulkRequest();
for (T entity : entities) {
bulkRequest.add(new IndexRequest(indexName)
.source(JSON.toJSONString(entity), XContentType.JSON));
}
try {
restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("Bulk Sync indexing is failure", e);
}
}
}
使用
@Repository
public interface TestInfoRepository extends ElasticsearchRepository<TestInfoDO, String>, BaseElasticsearchRepository<TestInfoDO, String> {
}