按条件批量更新elasticsearch中的某个字段的值

1.附一个数据示例:


image.png

2.第一种方法

private void batchUpdateTask(int type, String old, String reset) {
        log.info("-----开始更新ES 类型:" + type + "-----" + old);
        //查询
        NativeSearchQueryBuilder searchQueryBuilder = new NativeSearchQueryBuilder();
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        switch (type) {
            case 1:
                boolQueryBuilder.must(QueryBuilders.matchQuery("category", old));
                break;
            case 2:
                boolQueryBuilder.must(QueryBuilders.matchQuery("tags.keyword", old));
                break;
            case 3:
                boolQueryBuilder.must(QueryBuilders.matchQuery("types.keyword", old));
                break;
            default:
                return;
        }
        searchQueryBuilder.withQuery(boolQueryBuilder);
        NativeSearchQuery searchQuery = searchQueryBuilder.build();

        SearchHits<EsDataInfo> search = elasticsearchTemplate.search(searchQuery, EsDataInfo.class);
        if (!search.getSearchHits().isEmpty()) {
            int i = 0;
            for (SearchHit<EsDataInfo> searchHit : search.getSearchHits()) {
                EsDataInfo info = searchHit.getContent();
                switch (type) {
                    case 1:
                        info.setCategory(info.getCategory().replace(old, reset));
                        break;
                    case 2:
                        List<String> infoTags = info.getTags();
                        infoTags.remove(old);
                        infoTags.add(reset);
                        info.setTags(infoTags);
                        break;
                    case 3:
                        List<String> infoTypes = info.getTypes();
                        infoTypes.remove(old);
                        infoTypes.add(reset);
                        info.setTypes(infoTypes);
                        break;
                    default:
                }
                i++;
                esDataInfoRepository.save(info);
            }
            log.info("更新了:" + i + "条");
        }
        log.info("-----结束更新ES 类型:" + type + "-----" + old);
    }

3.第二种方法

@Resource
    private ElasticsearchRestClient elasticsearchRestClient;

    private void batchUpdateTask2(int type, String old, String reset) {
        log.info("-----batchUpdateTask2开始更新ES 类型:" + type + "-----" + old);

        // 设置查询条件,第一个参数是字段名,第二个参数是字段的值
        switch (type) {
            case 1:
                MatchPhraseQueryBuilder category = new MatchPhraseQueryBuilder("category", old);
                Script script1 = new Script(ScriptType.INLINE, "painless",
                        "if (ctx._source.category == '" + old + "') {ctx._source.category = '" + reset + "';}",
                        Collections.emptyMap());
                updateByQuery(category, script1);
                break;
            case 2:
                MatchPhraseQueryBuilder tags = new MatchPhraseQueryBuilder("tags.keyword", old);
                // 修改时,新值如果已存在则只删除旧值
                Script script2 = new Script(ScriptType.INLINE, "painless",
                        "if(ctx._source.tags!=null && ctx._source.tags.length!=0 && ctx._source.tags.contains" +
                                "('"+old+"')){ \n" +
                                "       for (int i = 0; i < ctx._source.tags.length; ++i) { \n" +
                                "           if (ctx._source.tags[i] == '" + old + "'){\n" +
                                "               if (ctx._source.tags.contains('" + reset + "')) { \n" +
                                "                   ctx._source.tags.remove(ctx._source.tags.indexOf" +
                                "('" + old + "')) \n" +
                                "               } else{" +
                                "                   ctx._source.tags[i]=('" + reset + "')" +
                                "               }" +
                                "           } " +
                                "       }" +
                                "}",
                        Collections.emptyMap());
                updateByQuery(tags, script2);
                break;
            case 3:
                MatchPhraseQueryBuilder types = new MatchPhraseQueryBuilder("types.keyword", old);
                Script script3 = new Script(ScriptType.INLINE, "painless",
                        "if(ctx._source.types!=null && ctx._source.types.length!=0 && ctx._source.types.contains" +
                                "('"+old+"')){ \n" +
                                "for (int i = 0; i < ctx._source.types.length; ++i) { \n" +
                                "if (ctx._source.types[i] == '" + old + "'){\n" +
                                "ctx._source.types[i]=('" + reset + "')} " +
                                "}" +
                                "}",
                        Collections.emptyMap());
                updateByQuery(types, script3);
                break;
            default:
                return;
        }
        log.info("-----batchUpdateTask2结束更新ES 类型:" + type + "-----" + old);
    }

    private void updateByQuery(QueryBuilder queryBuilder, Script script) {
        //参数为索引名,可以不指定,可以一个,可以多个
        UpdateByQueryRequest request = new UpdateByQueryRequest("datainfo_dev");
        // 更新时版本冲突
        request.setConflicts("proceed");
        // 设置查询条件
        request.setQuery(queryBuilder);
        // 更新最大文档数
        request.setMaxDocs(9000);
        // 批次大小
        request.setBatchSize(1000);
        // 传入 script 对象(执行脚本)
        request.setScript(script);

        // 并行
        request.setSlices(2);
        // 使用滚动参数来控制“搜索上下文”存活的时间
        request.setScroll(TimeValue.timeValueMinutes(10));
        // 如果提供路由,则将路由复制到滚动查询,将流程限制为匹配该路由值的切分
        //      request.setRouting("=cat");

        // 可选参数
        // 超时
        request.setTimeout(TimeValue.timeValueMinutes(2));
        // 刷新索引
        request.setRefresh(true);

        RestHighLevelClient client = elasticsearchRestClient.getRHLClient();
        try {
            BulkByScrollResponse response = client.updateByQuery(request, RequestOptions.DEFAULT);
            log.info("更新了:" + response.getTotal() + "条");
        } catch (IOException e) {
            log.error("updateByQuery error!", e);
        }
    }

4.调用示例

/**
     * 批量更新es中分类、标签、类型数据
     *
     * @param type  1.category 2.tag 3.type
     * @param old   旧数据
     * @param reset 新数据
     */
    public void batchUpdateName(int type, String old, String reset) {
        ExecutorService executorService = SingleThreadPool.getExecutorService();
        // 开启线程
        executorService.execute(() -> {
            try {
                Thread.sleep(3 * 1000);
                batchUpdateTask2(type, old, reset);
            } catch (Exception e) {
                log.error("batchUpdateName error.", e);
            }
        });
    }

参考链接:Elasticsearch:Painless scripting 编程实践 (136.la)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容