elasticsearch Java High Level REST Client API 使用 - Update

1.maven依赖:

<!-- 只有5.6.12以上的版本支持 -->

<dependency>

    <groupId>org.elasticsearch</groupId>

    <artifactId>elasticsearch</artifactId>

    <version>5.6.12</version>

</dependency>

<dependency>

    <groupId>org.elasticsearch.client</groupId>

    <artifactId>rest</artifactId>

    <version>5.5.3</version>

</dependency>

<!-- 只有5.6.12以上的版本支持 -->

<dependency>

    <groupId>org.elasticsearch.client</groupId>

    <artifactId>elasticsearch-rest-high-level-client</artifactId>

    <version>5.6.12</version>

</dependency>

2.接入rest-higl-level-client

import org.apache.http.HttpHost;

import org.apache.http.auth.AuthScope;

import org.apache.http.auth.UsernamePasswordCredentials;

import org.apache.http.client.CredentialsProvider;

import org.apache.http.impl.client.BasicCredentialsProvider;

import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;

import org.elasticsearch.client.RestClient;

import org.elasticsearch.client.RestClientBuilder;

import org.elasticsearch.client.RestHighLevelClient;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.DisposableBean;

import org.springframework.beans.factory.FactoryBean;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Configuration;

@Configuration

public class ElasticsearchConfiguration implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfiguration.class);

    //ES地址

    @Value("${spring.data.elasticsearch.host}")

    private String host;

    //ES端口

    @Value("${spring.data.elasticsearch.port}")

    private int port;

    //ES用户名

    @Value("${spring.data.elasticsearch.username}")

    private String username;

    //ES密码

    @Value("${spring.data.elasticsearch.password}")

    private String password;

    //Java Low Level REST Client (要想使用高版本client必须依赖低版本的client)

    private RestClient client;

    //Java High Level REST Client (高版本client)

    private RestHighLevelClient restHighLevelClient;

    //销毁方法

    @Override

    public void destroy() throws Exception {

        try {

            LOGGER.info("Closing elasticSearch client");

            if (client != null) {

                client.close();

            }

        } catch (final Exception e) {

            LOGGER.error("Error closing ElasticSearch client: ", e);

        }

    }

    @Override

    public RestHighLevelClient getObject() throws Exception {

        return restHighLevelClient;

    }

    @Override

    public Class<RestHighLevelClient> getObjectType() {

        return RestHighLevelClient.class;

    }

    @Override

    public boolean isSingleton() {

        return false;

    }

    @Override

    public void afterPropertiesSet() throws Exception {

        buildClient();

    }

    //初始化client

    protected void buildClient() {

        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();

        credentialsProvider.setCredentials(AuthScope.ANY,

                new UsernamePasswordCredentials(username, password));

        client = RestClient.builder(new HttpHost(host, port))

                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {

                    @Override

                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {

                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);

                    }

                })

                .build();

        restHighLevelClient = new RestHighLevelClient(client);

    }

}

3.index api

Map<String, Object> jsonMap = new HashMap<>();

jsonMap.put("user", "laimailai");

jsonMap.put("postDate", new Date());

jsonMap.put("message", "trying out Elasticsearch");

IndexRequest indexRequest = new IndexRequest("index", "type", "1")

        .source(jsonMap);

IndexResponse indexResponse = client.index(request);

4.get api

GetRequest getRequest = new GetRequest( "index","type","id");

GetResponse getResponse = client.get(request);

5.update api

UpdateRequest request = new UpdateRequest("index","type","id");

UpdateResponse updateResponse = client.update(request);

6.delete api

DeleteRequest request = new DeleteRequest("index","type", "1");

7.bulk api

//1.bulk

BulkRequest request = new BulkRequest();

request.add(new IndexRequest("index", "type", "1")

        .source(XContentType.JSON, "field", "foo"));

request.add(new IndexRequest("index", "type", "2")

        .source(XContentType.JSON, "field", "bar"));

request.add(new IndexRequest("index", "type", "3")

        .source(XContentType.JSON, "field", "baz"));

//同步

BulkResponse bulkResponse = client.bulk(request);

//异步

client.bulkAsync(request, new ActionListener<BulkResponse>() {

    @Override

    public void onResponse(BulkResponse bulkResponse) {

    }

    @Override

    public void onFailure(Exception e) {

    }

});

8.bulkprocessor (根据请求的数量或大小,或在给定的时间段后自动执行批量操作)

private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchUtil.class);

@Autowired

private RestHighLevelClient restHighLevelClient;

private BulkProcessor bulkProcessor;

@PostConstruct

public void init() {

    Settings settings = Settings.builder().put("node.name", "").build();

    ThreadPool threadPool = new ThreadPool(settings);

    BulkProcessor.Listener listener = new BulkProcessor.Listener() {

        //bulk提交之前

        @Override

        public void beforeBulk(long executionId, BulkRequest request) {

            int numberOfActions = request.numberOfActions();

            LOGGER.info("Executing bulk [{}] with {} requests", executionId, numberOfActions);

        }

        //bulk提交以后

        @Override

        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {

            if (response.hasFailures()) {

                LOGGER.error("Bulk [{}] executed with failures,response = {}", executionId, response.buildFailureMessage());

            } else {

                LOGGER.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());

            }

        }

        //bulk提交以后并且返回异常

        @Override

        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

            LOGGER.error("Failed to execute bulk", failure);

        }

    };

    BulkProcessor bulkProcessor = new BulkProcessor.Builder(restHighLevelClient::bulkAsync, listener, threadPool)

            // 2000条数据请求执行一次bulk

            .setBulkActions(2000)

            // 5mb的数据刷新一次bulk

            .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))

            // 并发请求数量, 0不并发, 1并发允许执行

            .setConcurrentRequests(0)

            // 固定5s必须刷新一次

            .setFlushInterval(TimeValue.timeValueSeconds(5L))

            // 重试3次,间隔1s

            .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3))

            .build();

    this.bulkProcessor = bulkProcessor;

}

@PreDestroy

public void destroy() {

    try {

        //执行关闭方法会把bulk剩余的数据都写入ES再执行关闭

        bulkProcessor.awaitClose(30, TimeUnit.SECONDS);

    } catch (InterruptedException e) {

        LOGGER.error("Failed to close bulkProcessor", e);

    }

    LOGGER.info("bulkProcessor closed!");

}

//容器设置好以后,每次只要调用对应的insert/update方法放入processor即可。

//bulkProcessor 会根据策略自动处理

/**

* 修改

*

* @param request

* @throws IOException

*/

public void update(UpdateRequest request) {

    this.bulkProcessor.add(request);

}

/**

* 新增

*

* @param request

*/

public void insert(IndexRequest request) {

    this.bulkProcessor.add(request);

}

9.upsert api

update --当id不存在时将会抛出异常

UpdateRequest request = new UpdateRequest(index, type, "1").doc(jsonMap);

UpdateResponse response = restHighLevelClient.update(request);

upsert--id不存在时就插入

UpdateRequest request = new UpdateRequest(index, type, "1").doc(jsonMap).upsert(jsonMap);

UpdateResponse response = restHighLevelClient.update(request);

10.search api

//全量搜索

SearchRequest searchRequest = new SearchRequest();

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

searchSourceBuilder.query(QueryBuilders.matchAllQuery());

searchRequest.source(searchSourceBuilder);

SearchRequest searchRequest = new SearchRequest("index");

//根据多个条件搜索

BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();

for (String id: ids) {

    TermQueryBuilder termQueryBuilder = new TermQueryBuilder("id", id);

    boolQueryBuilder.should(termQueryBuilder);

}

SearchRequest searchRequest = new SearchRequest(index);

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

searchSourceBuilder.query(boolQueryBuilder);

searchRequest.source(searchSourceBuilder);

SearchResponse response = null;

    response = restHighLevelClient.search(searchRequest);

return response;

11.search scroll api

//scroll 分页搜索

final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));

SearchRequest searchRequest = new SearchRequest("posts");

searchRequest.scroll(scroll);

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

searchSourceBuilder.query(matchQuery("title", "Elasticsearch"));

searchRequest.source(searchSourceBuilder);

SearchResponse searchResponse = client.search(searchRequest);

String scrollId = searchResponse.getScrollId();

SearchHit[] searchHits = searchResponse.getHits().getHits();

while (searchHits != null && searchHits.length > 0) {

    SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);

    scrollRequest.scroll(scroll);

    searchResponse = client.searchScroll(scrollRequest);

    scrollId = searchResponse.getScrollId();

    searchHits = searchResponse.getHits().getHits();

}

ClearScrollRequest clearScrollRequest = new ClearScrollRequest();

clearScrollRequest.addScrollId(scrollId);

ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest);

boolean succeeded = clearScrollResponse.isSucceeded();

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351

推荐阅读更多精彩内容