一、概要说明
本篇记载了从安装elastic search 数据库 (以下简称ES),到使用python操作使用ES(主要是批量新增数据),
使用java spring boot 从ES中检索数据,及ES数据库客户端工具kibana的简单使用。
感觉ES版本更新频率还是挺高了,最难的是不同版本差异点。
大概从2021年1月开始正式接触使用 ES,这个时候ES版本是:7.10.2 版本。
下载安装的版本,我选择前一个版本:7.10.1
二、安装方面
安装的系统在ubuntu上
1、安装es:
1、解压目录后,进入目录后,调整配置
tar -xzvf elasticsearch-7.10.1-linux-x86_64.tar.gz
cd elasticsearch-7.10.1/config/
2、vim elasticsearch.yml 调整配置
1) 启用配置行:
找到:network.host: 这一行,去掉注释,并改为 0.0.0.0
network.host: 0.0.0.0 ## 注意ip前有一个空格
找到:http.port: 9200 这一行,去掉注释
找到:cluster.initial_master_nodes 去掉注释,并根据实际情况改为一 个node
cluster.initial_master_nodes: ["node-1"]
找到:node.name 去掉注释 ,对应名字为以上的:node-1
node.name: node-1
2)添加配置 (启用密码)
http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-headers: Authorization
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
3、修改完启动
到目录bin下 执行:
./elasticsearch
检查端口,及启动状态
4、设置初始密码, 退出config目录,进入目录bin目录,执行程序
./elasticsearch-setup-passwords interactive
后会显示交互界面,输入多次密码
Initiating the setup of passwords for reserved users elastic,kibana,logstash_system,beats_system.
You will be prompted to enter passwords as the process progresses.
Please confirm that you would like to continue [y/N]y
Enter password for [elastic]:
Reenter password for [elastic]:
Enter password for [kibana]:
Reenter password for [kibana]:
Enter password for [logstash_system]:
Reenter password for [logstash_system]:
Enter password for [beats_system]:
Reenter password for [beats_system]:
Changed password for user [kibana]
Changed password for user [logstash_system]
Changed password for user [beats_system]
Changed password for user [elastic]
4、修改/etc/sysctl.conf文件,增加配置:vm.max_map_count=262144
vi /etc/sysctl.conf
sysctl -p
5、接下来执行启动
//直接启动ElasticSearch
./elasticsearch
//或者使用后台方式进行启动
./elasticsearch -d
访问 http://192.168.XX.XX:9200 检查安装情况
2、安装kibana
1、解压压缩包
tar -zxvf kibana-7.10.1-linux-x86_64
cd kibana-7.10.1-linux-x86_64/
2、进入config目录,vi 编辑 kibana.yml配置文件,主要配置如下:
server.host: "0.0.0.0" # 允许外网访问
elasticsearch.username: "elastic" #es用户名
elasticsearch.password: "passoword123" #es的密码
3、后台方式启动,退出config目录,到bin 目录
./kibana & 后台启动
访问http:// 192.168.xx.XX:5061 输入用户密码检查安装情况
三、python 操作ES
通过python把数据处理保存进ES数据库
需pip 安装相应包:
pip install elasticsearch==7.5.1
建立数据库访问操控类:
import json
from elasticsearch import Elasticsearch
from elasticsearch import helpers
class ElasticDB():
def __init__(self,hosts=["192.168.XX.XX"],username="elastic",password="password123"):
try:
# 创建普通类型的ES客户端
self.es = Elasticsearch(hosts, http_auth=(username, password))
if not self.es.ping():
raise Exception("连接es数据库失败,请检查IP或端口!")
except Exception as e:
raise Exception(f"连接es数据库出现异常{e}")
def __del__(self):
if self.es is not None:
self.es=None
# 批量插入数据内容
def insertBatchData(self, dataList):
# res = self.es.bulk(index=indexName,doc_type=docType, body=dataList, raise_on_error=True)
res = helpers.bulk(self.es, dataList)
return res
....
逻辑处理从mongdb中批量取数据处出来,批量插入es数据库(代码片段):
outList=self.mongoDB.getBatchDataByIdList(idLists) # 从mongodb中批量取出数据
esList=[]
errorList=[]
for doc in outList:
esDo=self._mongoToEsParser(doc) # 逐个文档转换成 es 的新dict格式的数据
if esDo is not None:
# 拼接插入数据结构
action = {
"_index":self.indexName, #设置es索引名
# "_type": self.docType, #不设置es type 使用默认的
"_id":doc['_id'], # 手动设置es 的id字段值,
"_source":esDo # 设置文档主要数据体内容
}
esList.append(action)
else:
errorList.append(doc['_id'])
end1 = time.time()
self.log.info(f'转换成es数据: {len(esList)}/{curSize} ---》耗时 {int(end1 - start)} 秒')
out=self.esDB.insertBatchData(dataList=esList) # es 执行数据的批量插入
# print(out)
end2 = time.time()
self.log.info(f'执行es: {len(esList)}条数据的插入, 结果:{out}---》耗时 {int(end2 - end1)} 秒')
四、java - spring boot 操作检索ES 数据库
1、版本需对应好:
2、pom.xml 配置,及连接配置
版本:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.2</version>
</parent>
主要依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
application.properties 的配置:
spring.elasticsearch.rest.uris=192.168.1.XXX:9200
spring.elasticsearch.rest.username=elastic
spring.elasticsearch.rest.password=password123
spring.elasticsearch.rest.connection-timeout=10s
spring.elasticsearch.rest.read-timeout=30s
3、 关键代码篇段
controller 层关键代码:
@RestController
@RequestMapping("v100/es")
@Validated
@Slf4j
public class EsSearchPatentBiControl {
@Autowired
private RestHighLevelClient client; //官方建议使用的方式,以前9300端口操作es底层将不支持了
/**
* 组合检索 数据信息
* 多字段匹配组合
* @return
*/
// @LoginRequired
@PostMapping("/multi_query")
public ResultVO multiQueryAbsData(EsDTO esDTO) {
long start=System.currentTimeMillis();
if (esDTO==null) {
log.error("检索的信息参数不能为空!");
return ResultVO.getFailResult("检索的信息参数不能为空!");
}
try {
SearchRequest request = new SearchRequest("index_name_*");//设置索引值,多个索引可以用*通配
QueryBuilder query = QueryBuilders.matchAllQuery();
List<QueryBuilder> listMustQuery=esDTO.getListMust();// 获取组合的must查询条件
List<QueryBuilder> listFilterQuery=esDTO.getListFilter();// 获取组合的filter查询条件
BoolQueryBuilder queryBool=null;
if(listMustQuery.size()>0)
{
queryBool = QueryBuilders.boolQuery();
for(QueryBuilder subQuery:listMustQuery){
queryBool=queryBool.must(subQuery);
}
}
if(listFilterQuery.size()>0){
if(queryBool==null){
queryBool=QueryBuilders.boolQuery();
}
for(QueryBuilder subQuery:listFilterQuery){
queryBool=queryBool.filter(subQuery);
}
}
if (queryBool!=null){
query=queryBool;
}
log.info("查询语句:{}",query.toString());
EsPageVO esPageVO = ElasticsearchUtil.searchDataPage(client, request, esDTO.getCurrentPage()-1, esDTO.getPageSize(), query, "", "");
return ResultVO.getSuccessResult(esPageVO);
}catch (IOException ioe){
log.error("获取文档IO异常: ", ioe);
return ResultVO.getFailResult("获取文档IO异常: "+ ioe);
}finally {
long end=System.currentTimeMillis();
Long useTime=(end-start)/1000;
log.info("执行组合查询:总共耗时[{}]秒",useTime);
}
}
}
Es工具类代码:
@Slf4j
public class ElasticsearchUtil {
// private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchUtil.class);
/**
* 使用分词查询,并分页
* @param client 查询客户端
* @param searchRequest 查询request
* @param startPage 当前页
* @param pageSize 每页显示条数
* @param query 查询条件
* @param sortField 排序字段
* @param highlightField 高亮字段
* @return
*/
public static EsPageVO searchDataPage(RestHighLevelClient client,SearchRequest searchRequest,
int startPage,int pageSize, QueryBuilder query, String sortField, String highlightField)throws IOException {
SearchSourceBuilder sourceBuilder=new SearchSourceBuilder();
//排序字段
if (Strings.isNotEmpty(sortField)) {
sourceBuilder.sort(sortField, SortOrder.DESC);
}
// 高亮设置一个字段
if (Strings.isNotEmpty(highlightField)) {
HighlightBuilder highlightBuilder =new HighlightBuilder();
highlightBuilder.preTags("<span style='color:red' >");//设置前缀
highlightBuilder.postTags("</span>");//设置后缀
// 设置高亮字段
highlightBuilder.field(highlightField);
sourceBuilder.highlighter(highlightBuilder);
}
//searchRequestBuilder.setQuery(QueryBuilders.matchAllQuery());
sourceBuilder.query(query);
//分页设置
sourceBuilder.from(startPage);
sourceBuilder.size(pageSize);
// 设置是否按查询匹配度排序
sourceBuilder.explain(true);
sourceBuilder.trackTotalHits(true);
sourceBuilder.fetchSource(null,new String[]{"create_time","update_count","source_id"});
searchRequest.source(sourceBuilder);
// searchRequest.searchType(SearchType.QUERY_THEN_FETCH);
// 执行搜索,返回搜索响应信息
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
long totalHits = searchResponse.getHits().getTotalHits().value;
long length = searchResponse.getHits().getHits().length;
log.info("共查询到[{}]条数据,处理的分页条数[{}]", totalHits, length);
if (searchResponse.status().getStatus() ==200) {
// 解析对象
List> sourceList =setSearchResponse(searchResponse, highlightField);
return new EsPageVO(startPage, pageSize, (int) totalHits, sourceList);
}
return null;
}
/**
* 高亮结果集 特殊处理
*
* @param searchResponse
* @param highlightField
*/
private static List> setSearchResponse(SearchResponse searchResponse, String highlightField) {
List> sourceList =new ArrayList>();
StringBuffer stringBuffer =new StringBuffer();
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
searchHit.getSourceAsMap().put("id", searchHit.getId());
if (Strings.isNotEmpty(highlightField)) {
// System.out.println("遍历 高亮结果集,覆盖 正常结果集" + searchHit.getSourceAsMap());
Text[] text = searchHit.getHighlightFields().get(highlightField).getFragments();
if (text !=null) {
for (Text str : text) {
stringBuffer.append(str.string());
}
//遍历 高亮结果集,覆盖 正常结果集
searchHit.getSourceAsMap().put(highlightField, stringBuffer.toString());
}
}
sourceList.add(searchHit.getSourceAsMap());
}
return sourceList;
}
}
输出的分页类代码:
@Data
@ToString
public class EsPageVO {
/**
* 当前页
*/
private int currentPage;
/**
* 每页显示多少条
*/
private int pageSize;
/**
* 总记录数
*/
private int recordCount;
/**
* 本页的数据列表
*/
private List>recordList;
/**
* 总页数
*/
private int pageCount;
/**
* 只接受前4个必要的属性,会自动的计算出其他3个属性的值
*
* @param currentPage
* @param pageSize
* @param recordCount
* @param recordList
*/
public EsPageVO(int currentPage,int pageSize,int recordCount, List> recordList) {
this.currentPage = currentPage+1;
this.pageSize = pageSize;
this.recordCount = recordCount;
this.recordList = recordList;
// 计算总页码
pageCount = (recordCount + pageSize -1) / pageSize;
}
}
检索的DTO类代码:
@Data
@NoArgsConstructor
@AllArgsConstructor
public class EsDTO {
@ApiModelProperty("编号号")
private String code;
@ApiModelProperty("国家(值为:CN,JP,US..)")
private String country;
@ApiModelProperty("发行日期-起始时间")
private String publicDateStart;
@ApiModelProperty("公开日期-结束时间")
private String publicDateEnd;
@ApiModelProperty("种类编码")
private String kindCode;
@ApiModelProperty("标题")
private String title;
@ApiModelProperty("摘要")
private String abs;
/**
* 当前页
*/
@ApiModelProperty(value = "当前页", required=true)
private int currentPage=1;
/**
* 每页显示多少条
*/
@ApiModelProperty(value = "分页条数",required = true)
private int pageSize=20;
@ApiModelProperty(hidden = true)
public List<QueryBuilder> getListMust(){
List<QueryBuilder> listQuery=new ArrayList<>();
if (Strings.isNotBlank(title)){
listQuery.add(QueryBuilders.matchPhraseQuery("title",title));
}
if (Strings.isNotBlank(abs)) {
listQuery.add(QueryBuilders.matchPhraseQuery("abstract",abs));
}
return listQuery;
}
public List<QueryBuilder> getListFilter(){
List<QueryBuilder> listQuery=new ArrayList<>();
if (Strings.isNotBlank(code)){
listQuery.add(QueryBuilders.matchPhraseQuery("p_code",code));
}
if (Strings.isNotBlank(country)){
listQuery.add(QueryBuilders.matchPhraseQuery("p_country",country));
}
if (Strings.isNotBlank(publicDateStart)) {
listQuery.add(QueryBuilders.rangeQuery("p_date").gte(publicDateStart));
}
if (Strings.isNotBlank(publicDateEnd)) {
listQuery.add(QueryBuilders.rangeQuery("p_date").lte(publicDateEnd));
}
if (Strings.isNotBlank(kindCode)){
listQuery.add(QueryBuilders.matchPhraseQuery("kind_code",kindCode));
}
return listQuery;
}
}
五、kibana的使用
作为经常跟传统数据库打交道的开发人员,最希望还是有一个称手的 数据库客户端工具。
对于ES来说,就是kibana . 这个工具给我感觉,还是不那么熟悉,(版本也一样差异大),
我相信它的定位,还不是简单只是数据库客户端工具。。。
入门者的我,先简单使用吧。
在主菜单下:Home -》 ManageMent -->Dev Tools .
点开,左右格局的查询与结果展示。
以下贴一两个常用命令(更多请,细细品读官网吧):
1、简单查询单个匹配的:
GET /index_name_xx/_search
{
"query": {
"match": { "public_number": "3821464B"}
}
}
2、查询总数据量
GET /index_name_xx/_search
{
"track_total_hits": true,
"query": {
"match_all": {}
}
}
3、查询重复的数据
GET /index_name_xx/_search
{
"from": 0,
"size": 0,
"aggs": {
"my":{
"terms":{
"field": "public_number.keyword"
}
}
}
}
4、组合查询
GET index_name_XX_*/_search
{
"query": {
"bool": {
"must": [
{"match_phrase": { "in_name":"张三" }},
{"match_phrase": { "co_name": "XX公司"}}
],
"filter":[
{"match_phrase":{"p_country":"CN"}},
{"range":{"create_date":{"gte":"20181024"}}}
]
}
}
}