Document APIs之 Index API

文档API Document APIs

这个部分主要描述了以下的CRUD API

一 Single document APIs

1 Index API

index API 允许我们添加某种类型的JSON文档到特定的index ,并使之可搜索.

生成JSON文档

生成JSON文档的方式如下:

  • 手动使用native byte[] or as a String
  • 使用一个可以自动转换为对应JSON 的Map
  • 使用第三方库如 Jackson序列化 beans
  • 使用内置的 XContentFactory.jsonBuilder()

实际上, 每种方法都是转换成byte[] (so a String is converted to a byte[]). 所以如果一个对象已经是byte[] 就可直接使用. jsonBuilder是一个高度优化的可以直接创建一个byte[]的JSON 生成器.

各种生成JSON文档方法的具体说明

Manually

注意要根据日期格式对日期进行编码

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";

Using Map

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");

序列化Beans

可以使用 Jackson 进行序列化.
要先添加Jackson Databind 到应用项目.然后使用ObjectMapper进行序列化

import com.fasterxml.jackson.databind.*;

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse
// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);

Use Elasticsearchhelpers

import static org.elasticsearch.common.xcontent.XContentFactory.*;

XContentBuilder builder = jsonBuilder()
    .startObject()
        .field("user", "kimchy")
        .field("postDate", new Date())
        .field("message", "trying out Elasticsearch")
    .endObject()

注意 : 你也可以用方法startArray(String)endArray()添加数组. field 方法可以接受很多类型的对象. 你可以直接传入 数据, 日期 甚至其他XContentBuilder 对象.

如果你想查看生产的JSON内容,你可以使用方法string()

String json = builder.string();

应用实例

以下演示 在index为twitter ;类型为 tweet; id为1 加入一个JSON文档

import static org.elasticsearch.common.xcontent.XContentFactory.*;

IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        .get();

你可以以一个json string的形式 加入json文档,此时你不需要给出id

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";

IndexResponse response = client.prepareIndex("twitter", "tweet")
        .setSource(json)
        .get();

IndexResponse 对象会反馈一个报告

// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();

更多有关index的操作,请查看REST index 文档.

操作线程

当操作是在同一个节点执行时,index API 允许我们设置操作的执行方式为线程模式.

另一个方式是可以在不同的线程执行这次操作,或者在调用线程执行, 默认情况下,
operationThreaded设置为true 表示操作是在不同的线程执行的.

2 Get API

get API 允许我们从index下根据id获取某个类型的JSON 文档. 以下实例演示了index为 twitter, type 为 tweet, id为1 获取数据:

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

更多 get操作,请查看 REST get 文档.

操作线程

当操作是在同一个节点执行时,get API 允许我们设置操作的执行方式为线程模式.

另一个方式是可以在不同的线程执行这次操作,或者在调用线程执行, 默认情况下,
operationThreaded设置为true 表示操作是在不同的线程执行的. 以下是设置为false的例子

GetResponse response = client.prepareGet("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();

3 Delete API

Delete API 允许我们从index下根据id删除某个类型的JSON 文档. 以下实例演示了index为 twitter, type 为 tweet, id为1 删除数据:

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

更多 get操作,请查看 RESTdelete API文档.

Delete By Query API

删除一个查询出来的数据

BulkIndexByScrollResponse response =
    DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male"))  //   query
.source("persons")  //  index
.get(); //  execute the operation
long deleted = response.getDeleted(); //  number of deleted documents

也可以持续操作, 如果你想异步执行这些操作,你可以调用execute 而非get 兵器提供如下监听

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) //query
 .source("persons") //index
.execute(new ActionListener<BulkIndexByScrollResponse>() { //listener
 @Override
        public void onResponse(BulkIndexByScrollResponse response) {
             long deleted = response.getDeleted(); //number of deleted documents
       }
        @Override
        public void onFailure(Exception e) {
            // Handle the exception
        }
    });

Update API

创建UpdateRequest 并发送到 client

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

或者 使用prepareUpdate()方法

client.prepareUpdate("ttl", "doc", "1") 
    .setScript(new Script("ctx._source.gender = \"male\"" //  1, ScriptService.ScriptType.INLINE, null, null))
    .get();
client.prepareUpdate("ttl", "doc", "1")
    .setDoc(jsonBuilder()   //2
         .startObject() 
                .field("gender", "male")
         .endObject()) 
   .get();

说明1:你自己的script. 也可以是本地存储的script文件名称. 这种情况下你需要使用, you’ll ScriptService.ScriptType.FILE

说明2:被增加的文档

注意: 你不能同时提供scriptdoc

根据script 修改

UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
        .script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();

Update by merging documents

运行传入部分会被添加进已知文档的文档

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();

Upsert

支持upsert如果文档不存在 ,会使用upsert元素去增加新文档

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())

         .upsert(indexRequest); //1

client.update(updateRequest).get();

说明1:
//如果文档不存在, the one in indexRequest will be added. 文档内容如下:

{
    "name" : "Joe Smith",
    "gender": "male"
}

//如果index/type/1 文档已存在, 该操作后文档内容如下:

{ "name" : "Joe Dalton",
 "gender": "male" 
}

二 Multi-document APIs

multi get API可以根据index, type 和 id获取一系列的文档数据

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "tweet", "1")  // get by a single id   
    .add("twitter", "tweet", "2", "3", "4")//or by a list of ids for the same index / type
    .add("another", "type", "foo") //you can also get from another index
    .get();
for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
//iterate over the result set
GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {   // you can check if the document exists        
           String json = response.getSourceAsString(); //access to the _source field
    }
}

更多有关multi get 操作,请参照REST multi get 文档.

1 Bulk API

使用bulk API可以在一个请求中索引和删除几个文档 ,使用实例如下:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}

Using Bulk Processor

BulkProcessor提供一个基于请求数量和大小或者某个特定时间之后的自动刷新批处理操作接口.

使用如下:

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client   1,new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                       BulkRequest request) { ... } //2
          @Override
            public void afterBulk(long executionId,
                        BulkRequest request,
                        BulkResponse response) { ... } //3
         @Override
            public void afterBulk(long executionId,
                         BulkRequest request,
                         Throwable failure) { ... }//4

    })
        .setBulkActions(10000)//5
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//6
        .setFlushInterval(TimeValue.timeValueSeconds(5))//7
        .setConcurrentRequests(1) //8
        .setBackoffPolicy(  
           BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//9
        .build();

说明1:Add your elasticsearch client
说明2:bulk执行前会调用这个方法. 例如:你可以通过这个方法使用request.numberOfActions()查看 numberOfActions
说明3:bulk执行后会调用这个方法.例如你可以通过这个方法结合response.hasFailures() 查看失败的请求
说明4:当bulk执行失败或产生异常的时候会去调用这个方法
说明5:表示每个bulk要执行1000条请求
说明6:表示每到5mb大小的时候执行bulk
说明7:不论请求的数量如何,我们每5s刷新bulk
说明8:设置当前并发请求数量,0表示运行执行单个请求,1表示当在累计一个新bulk请求时,一个并发请求允许被执行(不知道在讲什么)
Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
说明9:Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three times. A retry is attempted whenever one or more bulk item requests have failed with an EsRejectedExecutionException which indicates that there were too little compute resources available for processing the request. To disable backoff, pass BackoffPolicy.noBackoff().

默认·BulkProcessor·参数设置:

sets bulkActions to 1000
sets bulkSize to 5mb
does not set flushInterval
sets concurrentRequests to 1, which means an asynchronous execution of the flush operation.
sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.

Add requests

创建了BulkProcessor后可以往里面添加请求

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

Closing the Bulk Processor

当所有文档都被加载入了BulkProcessor可以使用awaitClose或close方法将它关闭

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

or

bulkProcessor.close();

两个方法都 flush任何剩余的文档,并且使所有通过flushInterval规定的flushes失效,如果concurrent requests启动了,awaitClose 方法会等待特定时间直至所有bulk请求完成,然后返回true,如果在这些请求执行完成前,设定的时间已到,那么则返回false.
close不等待剩余请求执行完毕,立马退出.

Using Bulk Processor in tests

使用elasticsearch做测试并且使用BulkProcessor
往dataset添加数据时, 你最好把concurrent requests设置为0
于是bulk的flush operation将会用异步的方式执行:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
        .setBulkActions(10000)
        .setConcurrentRequests(0)
        .build();

// Add your requests
bulkProcessor.add(/* Your requests */);

// Flush any remaining requests
bulkProcessor.flush();

// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// Now you can start searching!
client.prepareSearch().get();
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,633评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,825评论 25 707
  • 一、位运算符 位运算符就是把数值的二进制里面的位上面的0和1来比较或运算。0为false,1为true。 &(与)...
    凯哥学堂阅读 237评论 0 2
  • 《好好说话》 说话也是一门学问,需要我们用心琢磨。 01.在对方时间很紧张的情况下,一定要学会在短时间内把话说清楚...
    环盈阅读 136评论 0 0
  • 20161206童磊感恩日记 【原来人的性格是可以改变的】 今天早上我一下火车就直接地铁前往通州,参加12月卓越父...
    童磊_心理咨询师阅读 475评论 0 0