1.附一个数据示例:
image.png
2.第一种方法
private void batchUpdateTask(int type, String old, String reset) {
log.info("-----开始更新ES 类型:" + type + "-----" + old);
//查询
NativeSearchQueryBuilder searchQueryBuilder = new NativeSearchQueryBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
switch (type) {
case 1:
boolQueryBuilder.must(QueryBuilders.matchQuery("category", old));
break;
case 2:
boolQueryBuilder.must(QueryBuilders.matchQuery("tags.keyword", old));
break;
case 3:
boolQueryBuilder.must(QueryBuilders.matchQuery("types.keyword", old));
break;
default:
return;
}
searchQueryBuilder.withQuery(boolQueryBuilder);
NativeSearchQuery searchQuery = searchQueryBuilder.build();
SearchHits<EsDataInfo> search = elasticsearchTemplate.search(searchQuery, EsDataInfo.class);
if (!search.getSearchHits().isEmpty()) {
int i = 0;
for (SearchHit<EsDataInfo> searchHit : search.getSearchHits()) {
EsDataInfo info = searchHit.getContent();
switch (type) {
case 1:
info.setCategory(info.getCategory().replace(old, reset));
break;
case 2:
List<String> infoTags = info.getTags();
infoTags.remove(old);
infoTags.add(reset);
info.setTags(infoTags);
break;
case 3:
List<String> infoTypes = info.getTypes();
infoTypes.remove(old);
infoTypes.add(reset);
info.setTypes(infoTypes);
break;
default:
}
i++;
esDataInfoRepository.save(info);
}
log.info("更新了:" + i + "条");
}
log.info("-----结束更新ES 类型:" + type + "-----" + old);
}
3.第二种方法
@Resource
private ElasticsearchRestClient elasticsearchRestClient;
private void batchUpdateTask2(int type, String old, String reset) {
log.info("-----batchUpdateTask2开始更新ES 类型:" + type + "-----" + old);
// 设置查询条件,第一个参数是字段名,第二个参数是字段的值
switch (type) {
case 1:
MatchPhraseQueryBuilder category = new MatchPhraseQueryBuilder("category", old);
Script script1 = new Script(ScriptType.INLINE, "painless",
"if (ctx._source.category == '" + old + "') {ctx._source.category = '" + reset + "';}",
Collections.emptyMap());
updateByQuery(category, script1);
break;
case 2:
MatchPhraseQueryBuilder tags = new MatchPhraseQueryBuilder("tags.keyword", old);
// 修改时,新值如果已存在则只删除旧值
Script script2 = new Script(ScriptType.INLINE, "painless",
"if(ctx._source.tags!=null && ctx._source.tags.length!=0 && ctx._source.tags.contains" +
"('"+old+"')){ \n" +
" for (int i = 0; i < ctx._source.tags.length; ++i) { \n" +
" if (ctx._source.tags[i] == '" + old + "'){\n" +
" if (ctx._source.tags.contains('" + reset + "')) { \n" +
" ctx._source.tags.remove(ctx._source.tags.indexOf" +
"('" + old + "')) \n" +
" } else{" +
" ctx._source.tags[i]=('" + reset + "')" +
" }" +
" } " +
" }" +
"}",
Collections.emptyMap());
updateByQuery(tags, script2);
break;
case 3:
MatchPhraseQueryBuilder types = new MatchPhraseQueryBuilder("types.keyword", old);
Script script3 = new Script(ScriptType.INLINE, "painless",
"if(ctx._source.types!=null && ctx._source.types.length!=0 && ctx._source.types.contains" +
"('"+old+"')){ \n" +
"for (int i = 0; i < ctx._source.types.length; ++i) { \n" +
"if (ctx._source.types[i] == '" + old + "'){\n" +
"ctx._source.types[i]=('" + reset + "')} " +
"}" +
"}",
Collections.emptyMap());
updateByQuery(types, script3);
break;
default:
return;
}
log.info("-----batchUpdateTask2结束更新ES 类型:" + type + "-----" + old);
}
private void updateByQuery(QueryBuilder queryBuilder, Script script) {
//参数为索引名,可以不指定,可以一个,可以多个
UpdateByQueryRequest request = new UpdateByQueryRequest("datainfo_dev");
// 更新时版本冲突
request.setConflicts("proceed");
// 设置查询条件
request.setQuery(queryBuilder);
// 更新最大文档数
request.setMaxDocs(9000);
// 批次大小
request.setBatchSize(1000);
// 传入 script 对象(执行脚本)
request.setScript(script);
// 并行
request.setSlices(2);
// 使用滚动参数来控制“搜索上下文”存活的时间
request.setScroll(TimeValue.timeValueMinutes(10));
// 如果提供路由,则将路由复制到滚动查询,将流程限制为匹配该路由值的切分
// request.setRouting("=cat");
// 可选参数
// 超时
request.setTimeout(TimeValue.timeValueMinutes(2));
// 刷新索引
request.setRefresh(true);
RestHighLevelClient client = elasticsearchRestClient.getRHLClient();
try {
BulkByScrollResponse response = client.updateByQuery(request, RequestOptions.DEFAULT);
log.info("更新了:" + response.getTotal() + "条");
} catch (IOException e) {
log.error("updateByQuery error!", e);
}
}
4.调用示例
/**
* 批量更新es中分类、标签、类型数据
*
* @param type 1.category 2.tag 3.type
* @param old 旧数据
* @param reset 新数据
*/
public void batchUpdateName(int type, String old, String reset) {
ExecutorService executorService = SingleThreadPool.getExecutorService();
// 开启线程
executorService.execute(() -> {
try {
Thread.sleep(3 * 1000);
batchUpdateTask2(type, old, reset);
} catch (Exception e) {
log.error("batchUpdateName error.", e);
}
});
}