个人专题目录](https://www.jianshu.com/p/140e2a59db2c)
1. elasticsearch文档及索引管理初级
1.1 项目及索引创建
java api 文档 https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.5.1/java-rest-overview.html
low : 偏向底层。
high:高级封装。足够。<font color=red>通过API操作与kibana查看操作结果。</font>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.5.1</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.5.1</version>
</dependency>
语法:put /index
- title:商品标题
- price:商品价格
- createTime:创建时间
- categoryName:分类名称。如:家电,手机
- brandName:品牌名称。如:华为,小米
- spec: 商品规格。如: spec:{"屏幕尺寸","5寸","内存大小","128G"}
- saleNum:销量
- stock:库存量
PUT book-index
{
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "ik_smart"
},
"price": {
"type": "double"
},
"createTime": {
"type": "date",
"format" : "yyyy-MM-dd HH:mm:ss"
},
"categoryName": {
"type": "keyword"
},
"brandName": {
"type": "keyword"
},
"spec": {
"type": "object"
},
"saleNum": {
"type": "integer"
},
"stock": {
"type": "integer"
}
}
}
}
PUT /book-index
@Service
@Log4j2
public class IndexServiceImpl implements IndexService {
@Autowired
private RestHighLevelClient restHighLevelClient;
@Override
public void createIndex(String index, CreateIndexRequest request, boolean async) throws Exception {
log.info("source:{}", request.toString());
//操作索引的客户端
IndicesClient indices = restHighLevelClient.indices();
CreateIndexResponse response = null;
//要创建索引,首先我们得先判断索引是不是不存在,如果存在就不创建
if (!existsIndex(index)) {
if (async) {
//异步新增索引
//监听方法
ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
log.info("!!!!!!!!创建索引成功" + createIndexResponse.toString());
}
@Override
public void onFailure(Exception e) {
log.error("!!!!!!!!创建索引失败", e);
}
};
//执行创建索引库
indices.createAsync(request, RequestOptions.DEFAULT, listener);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//专门用于 index 相关的操作
response = indices.create(request, RequestOptions.DEFAULT);
//得到响应(全部)
boolean acknowledged = response.isAcknowledged();
//得到响应 指示是否在超时前为索引中的每个分片启动了所需数量的碎片副本
boolean shardsAcknowledged = response.isShardsAcknowledged();
log.info("创建索引{}的结果是{}", index, response.isAcknowledged());
}
} else {
log.info("索引已经存在{}", index);
}
}
@Override
public void deleteIndex(String index, boolean async) throws Exception {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index);
IndicesClient indices = restHighLevelClient.indices();
if (existsIndex(index)) {
if (async) {
//异步删除索引库
//监听方法
ActionListener<AcknowledgedResponse> listener = new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse deleteIndexResponse) {
log.info("!!!!!!!!删除索引成功 {}", deleteIndexResponse.toString());
}
@Override
public void onFailure(Exception e) {
log.error("!!!!!!!!删除索引失败", e);
}
};
//执行删除索引
indices.deleteAsync(deleteIndexRequest, RequestOptions.DEFAULT, listener);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//如果index 存在就删除
//创建用于删除索引的请求
AcknowledgedResponse response = indices.delete(deleteIndexRequest, RequestOptions.DEFAULT);
log.info("删除索引{}的结果是{}", index, response.isAcknowledged());
}
} else {
log.info("索引不存在{},无法删除", index);
}
}
@Override
public boolean existsIndex(String index) throws Exception {
//设置要查询的索引
GetIndexRequest getIndexRequest = new GetIndexRequest(index);
IndicesClient indices = restHighLevelClient.indices();
//从主节点返回本地信息或检索状态
getIndexRequest.local(false);
//以适合人类的格式返回结果
getIndexRequest.humanReadable(true);
//是否返回每个索引的所有默认设置
getIndexRequest.includeDefaults(false);
boolean exists = indices.exists(getIndexRequest, RequestOptions.DEFAULT);
log.info("索引{}存在的状态是{}", index, exists);
return exists;
}
@Override
public void openIndex(String indexName) throws IOException {
OpenIndexRequest request = new OpenIndexRequest(indexName);
IndicesClient indices = restHighLevelClient.indices();
OpenIndexResponse openIndexResponse = indices.open(request, RequestOptions.DEFAULT);
boolean acknowledged = openIndexResponse.isAcknowledged();
log.info("!!!!!!!!!" + acknowledged);
}
@Override
public void closeIndex(String indexName) throws IOException {
CloseIndexRequest request = new CloseIndexRequest(indexName);
CloseIndexResponse closeIndexResponse = restHighLevelClient.indices().close(request, RequestOptions.DEFAULT);
boolean acknowledged = closeIndexResponse.isAcknowledged();
log.info("!!!!!!!!!" + acknowledged);
}
}
1.2 索引操作示例
@SpringBootTest(classes = SearchServiceApplication.class)
@WebAppConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class IndexServiceTest {
@Autowired
private IndexService indexService;
@Test
public void testCreateIndex() throws Exception {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(Constants.INDEX_NAME);
//我们创建index 和 type 的 时候需要指定分配和 mapping
buildingSetting(createIndexRequest);
buildingMapping(createIndexRequest);
//设置别名
//createIndexRequest.alias(new Alias("alias_index_name"));
// 额外参数
//设置超时时间
createIndexRequest.setTimeout(TimeValue.timeValueMinutes(2));
//设置主节点超时时间
createIndexRequest.setMasterTimeout(TimeValue.timeValueMinutes(1));
//在创建索引API返回响应之前等待的活动分片副本的数量,以int形式表示
createIndexRequest.waitForActiveShards(ActiveShardCount.from(2));
createIndexRequest.waitForActiveShards(ActiveShardCount.DEFAULT);
indexService.createIndex(Constants.INDEX_NAME, createIndexRequest, false);
}
/**
* - title:商品标题
* - price:商品价格
* - createTime:创建时间
* - categoryName:分类名称。如:家电,手机
* - brandName:品牌名称。如:华为,小米
* - spec: 商品规格。如: spec:{"屏幕尺寸","5寸","内存大小","128G"}
* - saleNum:销量
* - stock:库存量
*/
private void buildingMapping(CreateIndexRequest createIndexRequest) throws IOException {
XContentBuilder xContentBuilder = JsonXContent.contentBuilder()
.startObject()
.startObject("properties")
.startObject("title")
.field("type", "text")
.field("analyzer", "ik_smart")
.endObject()
.startObject("price")
.field("type", "double")
.endObject()
.startObject("createTime")
.field("type", "date")
.field("format", "yyyy-MM-dd HH:mm:ss")
.endObject()
.startObject("categoryName")
.field("type", "keyword")
.endObject()
.startObject("brandName")
.field("type", "keyword")
.endObject()
.startObject("spec")
.field("type", "object")
.endObject()
.startObject("saleNum")
.field("type", "integer")
.endObject()
.startObject("stock")
.field("type", "integer")
.endObject()
.endObject()
.endObject();
createIndexRequest.mapping(xContentBuilder);
}
/**
* 设置 index 的分片规则
*
* @param createIndexRequest
*/
private void buildingSetting(CreateIndexRequest createIndexRequest) {
createIndexRequest.settings(Settings.builder()
// 设置主分片为 3
.put("number_of_shards", 3)
//设置从分片为 2
.put("number_of_replicas", 2));
}
@Test
public void testDeleteIndex() throws Exception {
indexService.deleteIndex(Constants.INDEX_NAME, false);
}
@Test
public void testExistsIndex() throws Exception {
indexService.existsIndex(Constants.INDEX_NAME);
}
@Test
public void testOpenIndex() throws IOException {
indexService.openIndex(Constants.INDEX_NAME);
}
@Test
public void testCloseIndex() throws IOException {
indexService.closeIndex(Constants.INDEX_NAME);
}
}
1.3 查询文档
语法:GET /index/type/id
查看:GET /book-index/ 就可看到json形式的文档。方便程序解析。
_mget批量查询
批量查询可以提高查询效率。推荐使用(相对于单数据查询来说)。
@Override
public void get(String index, String id, boolean async) throws Exception {
//构建请求
GetRequest getRequest = new GetRequest(index, id);
//可选参数
//为特定字段配置_source_include
String[] includes = new String[]{"id", "price"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
getRequest.fetchSourceContext(fetchSourceContext);
//设置路由
//getRequest.routing("id");
if (async) {
// 执行 查询 同步查询
GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
// 获取结果
if (getResponse.isExists()) {
long version = getResponse.getVersion();
//检索文档(String形式)
String sourceAsString = getResponse.getSourceAsString();
log.info(sourceAsString);
//以字节接受
byte[] sourceAsBytes = getResponse.getSourceAsBytes();
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
log.info(sourceAsMap);
}
} else {
//异步查询
ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
//查询成功时的立马执行的方法
@Override
public void onResponse(GetResponse getResponse) {
long version = getResponse.getVersion();
//检索文档(String形式)
String sourceAsString = getResponse.getSourceAsString();
log.info(sourceAsString);
}
//查询失败时的立马执行的方法
@Override
public void onFailure(Exception e) {
e.printStackTrace();
}
};
//执行异步请求
restHighLevelClient.getAsync(getRequest, RequestOptions.DEFAULT, listener);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void mGet(List<BulkBean> beans) throws Exception {
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (BulkBean bean : beans) {
multiGetRequest.add(bean.getIndex(), bean.getId());
}
MultiGetResponse multiGetResponse = restHighLevelClient.mget(multiGetRequest, RequestOptions.DEFAULT);
//获取响应
MultiGetItemResponse[] responses = multiGetResponse.getResponses();
for (MultiGetItemResponse response : responses) {
//将数据以 map 格式展示
Map<String, Object> map = response.getResponse().getSource();
//将数据以 json 格式展示
String source = response.getResponse().getSourceAsString();
log.info("每一条数据是{}" + source);
}
log.info(multiGetResponse);
}
1.3 新建文档
语法:PUT /index/_doc/id
为防止覆盖原有数据,我们在新增时,设置为强制创建,不会覆盖原有文档。
语法:PUT /index/ _doc/id/_create
使用强制新增语法时,如果Document的id在Elasticsearch中已存在,则会报错。(version conflict, document already exists)
# 此操作为Elasticsearch自动生成id的新增Document方式。
POST book-index/_doc/1
{
"title":"小米手机",
"price":1000,
"createTime":"2019-12-01",
"categoryName":"手机",
"brandName":"小米",
"saleNum":3000,
"stock":10000,
"spec":{
"网络制式":"移动4G",
"屏幕尺寸":"4.5"
}
}
#指定id创建文档,需要 PUT 请求
PUT /book-index/1
{
}
public void addDoc(String index, String json, String docId) throws IOException {
IndexRequest indexRequest = new IndexRequest(index);
//设置我们要传递的数据
indexRequest.source(json, XContentType.JSON);
if (docId != null) {
indexRequest.id(docId);
}
//可选参数
//设置超时时间
indexRequest.timeout(TimeValue.timeValueSeconds(1));
indexRequest.timeout("1s");
//自己维护版本号
//indexRequest.version(2);
//indexRequest.versionType(VersionType.EXTERNAL);
IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
log.info("添加数据indexResponse {}" + objectMapper.writeValueAsString(indexResponse));
//构建方法2
// XContentBuilder builder = XContentFactory.jsonBuilder();
// builder.startObject();
// {
// builder.field("user", "tomas");
// builder.timeField("postDate", new Date());
// builder.field("message", "trying out es2");
// }
// builder.endObject();
// indexRequest.source(builder);
// //异步
// ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
// @Override
// public void onResponse(IndexResponse indexResponse) {
//
// }
//
// @Override
// public void onFailure(Exception e) {
//
// }
// };
// client.indexAsync(indexRequest, RequestOptions.DEFAULT, listener);
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
String indexName = indexResponse.getIndex();
String id = indexResponse.getId();
//获取插入的类型
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
DocWriteResponse.Result result = indexResponse.getResult();
log.info("CREATED:" + result);
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
DocWriteResponse.Result result = indexResponse.getResult();
log.info("UPDATED:" + result);
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
log.info("处理成功的分片数少于总分片!");
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
//处理潜在的失败原因
String reason = failure.reason();
log.info(reason);
}
}
}
1.4 更新文档
PUT /index/type/1 替换操作是整体覆盖,要带上所有信息。
执行两次,返回结果中版本号(_version)在不断上升。此过程为全量替换。
实质:旧文档的内容不会立即删除,只是标记为deleted。适当的时机,集群会将这些文档删除。
局部替换 partial update
POST方式更新单个内容
语法:POST /{index}/type /{id}/_update
或者POST /{index}/_update/{id}
partial update局部替换则只修改变动字段。
内部与全量替换是一样的,旧文档标记为删除,新建一个文档。
优点:
- 大大减少网络传输次数和流量,提升性能
- 减少并发冲突发生的概率。
POST /book-index/1/_update
{
"doc":{
"ipAddr":"10.126.2.9"
}
}
public UpdateResponse update(String index, String type, Map<String, Object> values, String id) throws IOException {
//创建更新请求,并指定 index,type 和 id 局部更新部分数据
UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(values);
//可选参数
//超时时间
updateRequest.timeout("1s");
//重试次数
updateRequest.retryOnConflict(3);
//设置在继续更新之前,必须激活的分片数
updateRequest.waitForActiveShards(2);
//所有分片都是active状态,才更新
updateRequest.waitForActiveShards(ActiveShardCount.ALL);
UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
updateResponse.getId();
updateResponse.getIndex();
//判断结果
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
DocWriteResponse.Result result = updateResponse.getResult();
System.out.println("CREATED:" + result);
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
DocWriteResponse.Result result = updateResponse.getResult();
System.out.println("UPDATED:" + result);
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
DocWriteResponse.Result result = updateResponse.getResult();
System.out.println("DELETED:" + result);
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
//没有操作
DocWriteResponse.Result result = updateResponse.getResult();
System.out.println("NOOP:" + result);
}
return updateResponse;
}
1.5 删除文档
Elasticsearch中执行删除操作时,Elasticsearch先标记Document为deleted状态,而不是直接物理删除。当Elasticsearch存储空间不足或工作空闲时,才会执行物理删除操作。标记为deleted状态的数据不会被查询搜索到。
DELETE /book-index/1
public void deleteDocById(String index, String id) throws Exception {
DeleteRequest deleteRequest = new DeleteRequest(index, id);
DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
deleteResponse.getId();
deleteResponse.getIndex();
DocWriteResponse.Result result = deleteResponse.getResult();
log.info(result);
log.info("删除状态是{}", deleteResponse.status().getStatus());
}
1.6 批量操作bulk
注意:bulk语法中要求一个完整的json串不能有换行。不同的json串必须使用换行分隔。多个操作中,如果有错误情况,不会影响到其他的操作,只会在批量操作返回结果中标记失败。bulk语法批量操作时,bulk request会一次性加载到内存中,如果请求数据量太大,性能反而下降(内存压力过高),需要反复尝试一个最佳的bulk request size。一般从1000~5000条数据开始尝试,逐渐增加。如果查看bulk request size的话,一般是5~15MB之间为好。
bulk语法要求json格式是为了对内存的方便管理,和尽可能降低内存的压力。如果json格式没有特殊的限制,Elasticsearch在解释bulk请求时,需要对任意格式的json进行解释处理,需要对bulk请求数据做json对象会json array对象的转化,那么内存的占用量至少翻倍,当请求量过大的时候,对内存的压力会直线上升,且需要jvm gc进程对垃圾数据做频繁回收,影响Elasticsearch效率。
生产环境中,bulk api常用。都是使用java代码实现循环操作。一般一次bulk请求,执行一种操作。如:批量新增10000条数据等。
POST /_bulk
{"action": {"metadata"}}
{"data"}
#bulk 批量添加,批量的时候第一行为id 列,第二行为数据列,中间不能出现换行
POST /book-index/_doc/_bulk
{"index":{"_id":1}}
{"corpName":"途虎养车",...}
{"index":{"_id":2}}
{"corpName":"盒马鲜生"...}
#可以删除不同 index 下的数据,下面案例不演示了,注意会在倒数第二行报错,因为第一条就是删除的它,即便在地址中指定了库,可以去删除其他index 的数据,在参数中不指定 index 的情况下就是按照地址中的来,指定了 index 的情况下就是按照具体指定的来
POST /lib2/books/_bulk
{"delete":{"_index":"lib2","_type":"books","_id":4}}
{"create":{"_index":"tt","_type":"ttt","_id":"100"}}
{"name":"lisi"}
{"index":{"_index":"tt","_type":"ttt"}}
{"name":"zhaosi"}
{"update":{"_index":"lib2","_type":"books","_id":"4"}}
{"doc":{"price":58}}
#可以指定不同的 index 和 type
GET /_mget
{
"docs":[
{
"_index": "book-index", #索引
"_type": "_doc", #数据类型
"_id": 1 #要查询的主键
},
{
"_index": "book-index",
"_type": "_doc",
"_id": 2
}
]
}
action:(行为)
create:文档不存在时创建
update:更新文档
index:创建新文档或替换已有文档
delete:删除一个文档
metadata:_index,_type,_id
create 和index的区别
如果数据存在,使用create操作失败,会提示文档已经存在,使用index则可以成功执行。
public void bulkOption(List<BulkBeanWithOption> beanWithOptionList) throws IOException {
BulkRequest bulkRequest = new BulkRequest();
for (BulkBeanWithOption bean : beanWithOptionList) {
switch (bean.getBulkOption()) {
//根据我们的操作类型来决定做什么
case INDEX:
IndexRequest indexRequest = new IndexRequest(bean.getIndex());
indexRequest.id(bean.getId());
indexRequest.source(bean.getJson(), XContentType.JSON);
bulkRequest.add(indexRequest);
break;
case CREATE:
break;
case DELETE:
DeleteRequest deleteRequest = new DeleteRequest(bean.getIndex());
deleteRequest.id(bean.getId());
bulkRequest.add(deleteRequest);
break;
case UPDATE:
log.info("update");
break;
default:
throw new IllegalStateException("Unexpected value: " + bean.getBulkOption());
}
}
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
for (BulkItemResponse itemResponse : bulkResponse) {
DocWriteResponse itemResponseResponse = itemResponse.getResponse();
switch (itemResponse.getOpType()) {
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponseResponse;
indexResponse.getId();
log.info(indexResponse.getResult());
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponseResponse;
updateResponse.getIndex();
log.info(updateResponse.getResult());
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponseResponse;
log.info(deleteResponse.getResult());
break;
default:
throw new IllegalStateException("Unexpected value: " + itemResponse.getOpType());
}
}
}
1.7 索引refresh
一个理想的搜索解决方案中,新索引的数据应该能立即搜索到。ElasticSearch给人的第一印象仿佛就是如此工作的,即使是在多服务器环境下,然而事实并非如此(至少不是任何场景都能保证新索引的数据能被实时检索到)。
elasticsearch是基于lucene的,lucene是可以做到实时的,就是创建索引之后,立即能查询到。但是这样,要么是牺牲索引的效率,每次都索引之后都刷新,要么就是牺牲查询的效率每次查询之前都进行刷新。
无论哪一种,都会让你的性能下降10倍以上,所以只能采取一种折中的方案,每隔n秒自动刷新,这样你创建索引之后,最多在ns之内肯定能查到。这就是所谓的准实时(near real-time)查询。
elasticsearch默认刷新时间是1s。
刷新索引方法:
public void refreshIndex(String indexName) throws IOException {
RefreshRequest refreshRequest = new RefreshRequest(indexName);
IndicesClient indices = restHighLevelClient.indices();
RefreshResponse refresh = indices.refresh(refreshRequest, RequestOptions.DEFAULT);
log.info(refresh.toString());
}