需求:修复与增强写入的数据
- Tags字段中,逗号分隔的文本应该是数组,而不是一个字符串
- 需求:后期需要对Tags进行Aggregation统计
Ingest Node
- Elasticsearch 5.0后,引入的一种新的节点类型。默认配置下,每个节点都是Ingest Node
- 具有预处理数据的能力,可拦截Index或Bulk API的请求
- 对数据进行转换,并重新返回给Index或Bulk API
- 无需Logstash,就可以进行数据的预处理,例如
- 为某个字段设置默认值;重命名某个字段的字段名;对字段值进行Split操作
- 支持设置Painles脚本,对数据进行更加复杂的加工
Pipeline & Processor
- Pipeline:管道会对通过的数据(文档),按照顺序进行加工
- Processor:Elasticsearch对一些加工的行为进行了抽象包装
- Elasticsearch有很多内置的processors,也支持通过插件的方式,实现自己的Processor
使用pileline切分字符串
其他的processor
pileline的创建、查看及使用
#为ES添加一个pipeline
PUT _ingest/pipeline/blog_pipeline
{
"description": "a blog pipeline",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
},
"set": {
"field": "views",
"value": 0
}
}
]
}
#查看pipeline
GET _ingest/pipeline/blog_pipeline
#测试pipeline
POST _ingest/pipeline/blog_pipeline/_simulate
{
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"title": "Introducing big data",
"tags": "hadoop,elasticsearch,spark",
"content": "you known, for big data"
}
},
{
"_index": "index",
"_id": "id",
"_source": {
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "you known, for cloud"
}
}
]
}
使用update by query
PUT tech_blog/_doc/1
{
"title": "Introducing big data",
"tags": "hadoop,elasticsearch,spark",
"content": "you known, for big data"
}
#使用pipeline更新数据
PUT tech_blog/_doc/2?pipeline=blog_pipeline
{
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "you known, for cloud"
}
POST tech_blog/_update_by_query?pipeline=blog_pipeline
#查看两条数据,一条被处理,一条未被处理
POST tech_blog/_search
{}
#update by query,会导致错误
POST tech_blog/_update_by_query?pipeline=blog_pipeline
#增加upate_by_query的条件
POST tech_blog/_update_by_query?pipeline=blog_pipeline
{
"query": {
"bool": {
"must_not": [
{"exists": {
"field": "views"
}}
]
}
}
}
一些内置的processors
-
https://www.elastic.co/guide/en/elasticsearch/reference/7.1/ingest-processors.html
- Split Processor(例:将给定字段值分成一个数组)
- Remove / Rename Processor(例:移除一个,重命名一个)
- Append(例:为商品新增标签)
- Convert(例:将商品价格,从字符串转成数字)
- Date / JSON(例:日期格式转换,字符串转成JSON对象)
- Date Index Name Processor(例:将通过该处理器的文档,分配到指定时间格式的索引中)
...
Ingest Node vs Logstash
Logstash | Ingest Node | |
---|---|---|
数据输入与输出 | 支持从不同的数据源读取,并写入不同的数据源 | 支持从ES REST API获取数据,并且写入Elasticsearch |
数据缓冲 | 实现了简单的数据队列,支持重写 | 不支持缓冲 |
数据处理 | 支持大量的插件,也支持定制开发 | 内置的插件,可以开发Plugin进行扩展(Plugin更新需重启) |
配置和使用 | 增加了一定的架构复杂度 | 无需额外部署 |
https://www.elastic.co/cn/blog/should-use-logstash-or-elasticsearch-ingest-nodes
Painless简介
- 自Elasticsearch 5.x后引入,专门为Elasticsearch设计,扩展了Java的语法
- 6.0开始,ES只支持Painless。Groovy,Javascript和Python都不再支持
- painless支持所有java的数据类型及java api自己
- painless script具备以下特性
- 高性能 / 安全
- 支持显示类型或者动态定义类型
Plainless的用途
- 可以对文档字段进行加工处理
- 更新或删除字段,处理数据聚合操作
- Script Field:对返回的字段提前进行计算
- Function Score:对文档的算分进行处理
- 在Ingest Pipeline中执行脚本
- 在Reindex API, Update by query时,对数据进行处理
通过Painless脚本访问字段
上下文 | 语法 |
---|---|
Ingestion | ctx.field_name |
Update | ctx._source.field_name |
Search & Agg | doc["field_name"] |
其他用法
PUT tech_blogs/_doc/1
{
"title": "Introducing big data",
"tags": "hadoop,elasticsearch,spark",
"content": "you known, for big data",
"views": 0
}
POST tech_blogs/_update/1
{
"script": {
"source": "ctx._source.views += params.new_views",
"params": {
"new_views": 100
}
}
}
- 保存脚本及使用
#保存script在cluster中
POST _scripts/update_views
{
"script": {
"lang": "painless",
"source": "ctx._source.views += params.new_views"
}
}
POST tech_blogs/_update/1
{
"script": {
"id": "update_views",
"params": {
"new_views": 20
}
}
}
#查询时新增随机数
GET tech_blogs/_search
{
"script_fields": {
"rnd_views": {
"script": {
"lang": "painless",
"source": """
java.util.Random rnd = new Random();
doc['views'].value+rnd.nextInt(100);
"""
}
}
}
}
脚本缓存
- 编译的开销较大
- Elasticsearch会将脚本编译后缓存在Cache中
- Inline Script和Stored Scripts都会被缓存
- 默认缓存100个脚本
参数 | 说明 |
---|---|
script.cache.max_size | 设置最大缓存数 |
script.cache.expire | 设置缓存超时 |
script.max_compilations_rate | 默认5分钟最多75次编译(75/5m) |