Ingest Pipeline
用于预处理数据,pipeline是一系列处理管道,一系列的processors,先来看下pipeline的处理过程:
pipeline.png
常用pipeline如下:
- Trim
去除空格,如果是字符串类型的数组,数组中所有字符串都会被替换空格处理 - Split
切分字符串,使用指定切分符,切分字符串为数组结构,只作用与字符串类型 - Rename
重命名字段 - Foreach
对一组数据进行相同的预处理,可以使用Foreach - Lowercase/Uppercase
对字段进行大小写转换 - Remove
移除字段 - Set
设置字段值
Trim
PUT _ingest/pipeline/trim_pipeline
{
"processors": [
{
"foreach": {
"field": "message",
"processor": {
"trim": {
"field": "_ingest._value"
}
}
}
}
]
}
POST _ingest/pipeline/trim_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": [
"car222 ",
" auto2222 "
]
}
}
]
}
#返回:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"message" : [
"car222",
"auto2222"
]
},
"_ingest" : {
"_value" : null,
"timestamp" : "2021-04-28T13:19:13.542743Z"
}
}
}
]
}
Split / Foreach
切分字符串,使用指定切分符,切分字符串为数组结构,只作用与字符串类型
PUT _ingest/pipeline/split_pipeline
{
"processors": [
{
"foreach": {
"field": "message",
"processor": {
"split": {
"field": "_ingest._value",
"separator": " "
}
}
}
}
]
}
#测试
POST _ingest/pipeline/split_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": [
"car222 aaa",
" auto2222 aaaa bbb"
]
}
}
]
}
#返回,可以看到 message 按照空格切分为了多个字符串数组
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"message" : [
[
"car222",
"aaa"
],
[
"",
"auto2222",
"aaaa",
"bbb"
]
]
},
"_ingest" : {
"_value" : null,
"timestamp" : "2021-04-28T13:28:20.762312Z"
}
}
}
]
}
Rename
重命名一个字段,rename往往和reindex 结合使用
POST goods_info_comment_message/_bulk
{"index":{"_id":1}}
{"message":"美 国苹果 "}
{"index":{"_id":2}}
{"message":"山东 苹果 "}
POST _reindex
{
"source": {
"index": "goods_info_comment_message"
},
"dest": {
"index": "goods_info_comment_message_new",
"pipeline": "rename_pipeline"
}
}
#查询mapping
GET goods_info_comment_message_new/_mapping
#返回
{
"goods_info_comment_message_new" : {
"mappings" : {
"properties" : {
"message_new" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
}
Lowercase/Uppercase
将字符串修改为大写或者小写
PUT _ingest/pipeline/lowercase_pipeline
{
"description": "lowercase processor",
"processors": [
{
"lowercase": {
"field": "message"
}
}
]
}
#测试,部分字符大写
POST _ingest/pipeline/lowercase_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": [
"CAr222 aaa",
" auto2222 aaaa Bbb"
]
}
}
]
}
#结果,全部输出为小写
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"message" : [
"car222 aaa",
" auto2222 aaaa bbb"
]
},
"_ingest" : {
"timestamp" : "2021-04-28T15:12:10.041308Z"
}
}
}
]
}
Remove
移除已经存在的字段
#定义remove pipelint
PUT _ingest/pipeline/remove_pipeline
{
"description": "remove processor",
"processors": [
{
"remove": {
"field": "message"
}
}
]
}
#测试
POST _ingest/pipeline/remove_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": [
"CAr222 aaa",
" auto2222 aaaa Bbb"
]
}
}
]
}
#返回,可以看到message字段已经被移除
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : { },
"_ingest" : {
"timestamp" : "2021-04-28T15:15:27.811516Z"
}
}
}
]
}
Set
给已有字段进行赋值
PUT _ingest/pipeline/set_pipeline
{
"description": "set processor",
"processors": [
{
"set": {
"field": "message",
"value": "this is a new message"
}
}
]
}
POST _ingest/pipeline/set_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": "this"
}
}
]
}
#返回
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"message" : "this is a new message"
},
"_ingest" : {
"timestamp" : "2021-04-28T15:21:28.928512Z"
}
}
}
]
}