Elasticsearch Ingest Pipeline

Ingest Pipeline

用于预处理数据,pipeline是一系列处理管道,一系列的processors,先来看下pipeline的处理过程:


pipeline.png

常用pipeline如下:

  • Trim
    去除空格,如果是字符串类型的数组,数组中所有字符串都会被替换空格处理
  • Split
    切分字符串,使用指定切分符,切分字符串为数组结构,只作用与字符串类型
  • Rename
    重命名字段
  • Foreach
    对一组数据进行相同的预处理,可以使用Foreach
  • Lowercase/Uppercase
    对字段进行大小写转换
  • Remove
    移除字段
  • Set
    设置字段值
Trim
PUT _ingest/pipeline/trim_pipeline
{
  "processors": [
    {
      "foreach": {
        "field": "message",
        "processor": {
          "trim": {
            "field": "_ingest._value"
          }
        }
      }
    }
  ]
}

POST _ingest/pipeline/trim_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "car222 ",
          " auto2222 "
        ]
      }
    }
  ]
}

#返回:
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : [
            "car222",
            "auto2222"
          ]
        },
        "_ingest" : {
          "_value" : null,
          "timestamp" : "2021-04-28T13:19:13.542743Z"
        }
      }
    }
  ]
}
Split / Foreach

切分字符串,使用指定切分符,切分字符串为数组结构,只作用与字符串类型

PUT _ingest/pipeline/split_pipeline
{
  "processors": [
    {
      "foreach": {
        "field": "message",
        "processor": {
          "split": {
            "field": "_ingest._value",
            "separator": " "
          }
        }
      }
    }
  ]
}

#测试
POST _ingest/pipeline/split_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "car222 aaa",
          " auto2222 aaaa bbb"
        ]
      }
    }
  ]
}
#返回,可以看到 message 按照空格切分为了多个字符串数组
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : [
            [
              "car222",
              "aaa"
            ],
            [
              "",
              "auto2222",
              "aaaa",
              "bbb"
            ]
          ]
        },
        "_ingest" : {
          "_value" : null,
          "timestamp" : "2021-04-28T13:28:20.762312Z"
        }
      }
    }
  ]
}

Rename

重命名一个字段,rename往往和reindex 结合使用

POST goods_info_comment_message/_bulk
{"index":{"_id":1}}
{"message":"美 国苹果 "}
{"index":{"_id":2}}
{"message":"山东 苹果 "}


POST _reindex
{
  "source": {
    "index": "goods_info_comment_message"
  },
  "dest": {
    "index": "goods_info_comment_message_new",
    "pipeline": "rename_pipeline"
  }
}

#查询mapping
GET goods_info_comment_message_new/_mapping

#返回
{
  "goods_info_comment_message_new" : {
    "mappings" : {
      "properties" : {
        "message_new" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    }
  }
}
Lowercase/Uppercase

将字符串修改为大写或者小写

PUT _ingest/pipeline/lowercase_pipeline
{
  "description": "lowercase processor",
  "processors": [
    {
      "lowercase": {
        "field": "message"
      }
    }
  ]
}

#测试,部分字符大写
POST _ingest/pipeline/lowercase_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "CAr222 aaa",
          " auto2222 aaaa Bbb"
        ]
      }
    }
  ]
}

#结果,全部输出为小写
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : [
            "car222 aaa",
            " auto2222 aaaa bbb"
          ]
        },
        "_ingest" : {
          "timestamp" : "2021-04-28T15:12:10.041308Z"
        }
      }
    }
  ]
}
Remove

移除已经存在的字段

#定义remove pipelint
PUT _ingest/pipeline/remove_pipeline
{
  "description": "remove processor",
  "processors": [
    {
      "remove": {
        "field": "message"
      }
    }
  ]
}

#测试
POST _ingest/pipeline/remove_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "CAr222 aaa",
          " auto2222 aaaa Bbb"
        ]
      }
    }
  ]
}

#返回,可以看到message字段已经被移除
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : { },
        "_ingest" : {
          "timestamp" : "2021-04-28T15:15:27.811516Z"
        }
      }
    }
  ]
}
Set

给已有字段进行赋值

PUT _ingest/pipeline/set_pipeline
{
  "description": "set processor",
  "processors": [
    {
      "set": {
        "field": "message",
        "value": "this is a new message"
      }
    }
  ]
}


POST _ingest/pipeline/set_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "this"
      }
    }
  ]
}

#返回
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : "this is a new message"
        },
        "_ingest" : {
          "timestamp" : "2021-04-28T15:21:28.928512Z"
        }
      }
    }
  ]
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容