Elasticsearch 7.x 深入【12】Pipeline

1. 借鉴

极客时间 阮一鸣老师的Elasticsearch核心技术与实战
如何在Elasticsearch中使用pipeline API来对事件进行处理
(译) Ingest Node (预处理节点)
Elasticsearch Pipeline 详解

2. 开始

数据准备:<Elasticsearch 7.x 深入 数据准备>

默认情况下,每个节点都是Ingest Node,可以通过node.ingest=true|false来设置

Ingest Node 具有以下功能

  1. 预处理,可拦截Index或者Bulk API的请求
  2. 对数据进行转化和加工,并重新返回给Index或者Bulk API

Ingest Node的组件

  • Pipeline 对通过的数据按照顺序进行加工
  • Processor 对一些加工的行为进行了封装

处理流程图

处理流程

Pipeline的组成

  • description 描述该pipeline
  • processors 定义了一系列的processors

测试未加入节点的Pipeline

在将pipeline加入到节点之前,我么可以使用simulate API测试编写的Pipeline,模板如下:

POST _ingest/pipeline/_simulate
{
  "pipeline" : { // 这里指定pipeline
    , "description": "" // 这里是pipeline的描述
    , "processors": [ // 这里指定一个或者多个processors
      {}
    ]
  },
  "docs" : [ // 这里指定一个或者多个文档
    { "_source": {} }
  ]
}

接下来,我们测试一下:
这里我用两个酒店,以及他们没有处理的标签进行测试,其中split是pipeline中的一个processor,等会我们介绍一下其他的processor类型,目前这个是按照","进行切分

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "split pipeline测试",
    "processors": [
      {
        "split": {
          "field": "tags",
          "separator": ","
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "name": "好哇热油酒店",
        "tags": "高,大,上"
      }
    },
    {
      "_source": {
        "name": "IMfine酒店",
        "tags": "舒适,情趣,惬意"
      }
    }]
}

我们看下结果:
可以看到tag已经按照","切分了

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "name" : "好哇热油酒店",
          "tags" : [
            "高",
            "大",
            "上"
          ]
        },
        "_ingest" : {
          "timestamp" : "2020-06-01T09:28:34.386Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "name" : "IMfine酒店",
          "tags" : [
            "舒适",
            "情趣",
            "惬意"
          ]
        },
        "_ingest" : {
          "timestamp" : "2020-06-01T09:28:34.386Z"
        }
      }
    }
  ]
}

添加Pipeline

如果上面我们测试的pipeline,觉得已经很完美了,我想把它加入到节点中,那我们就来做这个事情。
模板如下:

PUT _ingest/pipeline/pipeline名称
{
  "description": "" // 这里是pipeline的描述
    , "processors": [ // 这里指定一个或者多个processors
      {}
    ]
}

我们来试下,将上面的那个加入到节点中

PUT _ingest/pipeline/hotel_pipeline
{
  "description": "酒店索引的pipeline",
  "processors": [
    {
      "split": {
        "field": "tags",
        "separator": ","
      }
    }
  ]
}

测试已加入到节点中的Pipeline

如果pipeline已经加入到了节点中,我们如何测试呢?
模板

POST _ingest/pipeline/pipeline的名称/_simulate
{
  "docs": [ // 指定多个文档
    {
      "_source": {
      }
    }]
}

我们测试一下:

POST _ingest/pipeline/hotel_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "name": "好哇热油酒店",
        "tags": "高,大,上"
      }
    },
    {
      "_source": {
        "name": "IMfine酒店",
        "tags": "舒适,情趣,惬意"
      }
    }]
}

我们看下结果

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "name" : "好哇热油酒店",
          "tags" : [
            "高",
            "大",
            "上"
          ]
        },
        "_ingest" : {
          "timestamp" : "2020-06-01T09:43:26.398Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "name" : "IMfine酒店",
          "tags" : [
            "舒适",
            "情趣",
            "惬意"
          ]
        },
        "_ingest" : {
          "timestamp" : "2020-06-01T09:43:26.398Z"
        }
      }
    }
  ]
}

使用Pipeline

pipeline已经被加入到节点中了,那我们如何在索引文档时使用上呢?
模板

PUT /index的名称/_doc/1?pipeline=pipeline的名称
{
  // 数据
}

我们来试一下
我往pipeline_hotel这个索引里面加入2篇文档,第一篇成功了,第二篇失败了,那我们如何处理异常情况呢?

PUT /pipeline_hotel/_doc/1?pipeline=hotel_pipeline
{
  "name": "莫德凯撒酒店",
  "tags": "舒适,极端舒适,舒适极了"
}

PUT /pipeline_hotel/_doc/1?pipeline=hotel_pipeline
{
  "name": "莫德凯撒酒店"
}

这里我只粘贴出第二篇的错误信息,第一个成功就不粘贴了

{
  "error": {
    "root_cause": [
      {
        "type": "exception",
        "reason": "java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: field [tags] not present as part of path [tags]",
        "header": {
          "processor_type": "split"
        }
      }
    ],
    "type": "exception",
    "reason": "java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: field [tags] not present as part of path [tags]",
    "caused_by": {
      "type": "illegal_argument_exception",
      "reason": "java.lang.IllegalArgumentException: field [tags] not present as part of path [tags]",
      "caused_by": {
        "type": "illegal_argument_exception",
        "reason": "field [tags] not present as part of path [tags]"
      }
    },
    "header": {
      "processor_type": "split"
    }
  },
  "status": 500
}

处理异常情况

在上面我们看到了异常信息,那我们如何处理呢?
对于我们上面的这个例子,错误忽略即可,那我们可以重新定义pipeline,并做以下配置:

PUT _ingest/pipeline/hotel_pipeline2
{
  "description": "酒店索引的pipeline",
  "processors": [
    {
      "split": {
        "field": "tags",
        "separator": ",",
        "ignore_failure": true // 这里指定发生错误时忽略
      }
    }
  ]
}

当然这里只是简单的略过了,有时候我们必须做一些额外的事情该如何搞呢?

processor级别错误处理

此级别可以有两种处理:

  • 忽略:配置ignore_failure=true
  • 处理:配置on_failure,里面可以定义多个处理错误的processor

忽略上面我们用到了,我们来处理一下,如果出现错误,我们就新增一个字段(error),并拼接上新的错误信息

PUT _ingest/pipeline/hotel_pipeline3
{
  "description": "酒店索引的pipeline",
  "processors": [
    {
      "split": {
        "field": "tags",
        "separator": ",",
        "on_failure": [
          {
            "set": {
                "field": "error",
                "value": "{{ctx.error}};{{_ingest.on_failure_message}}"
              }
          }]
      }
    }
  ]
}

看到了,在on_failure中,我们能通过_ingest能获取并仅能获取到三个属性:on_failure_message(错误信息), on_failure_processor_type(处理器类型), and on_failure_processor_tag(处理器的tag)

pipeline级别错误处理

此级别只有一种处理:

  • 处理:配置on_failure

这里我们配置的是:如果出错了,将这个文档添加到索引“failed-pipeline_hotel”中。当然这里的“pipeline_hotel”是一个变量,对哪个索引运用了pipeline,那这个变量就是哪个索引的名字
注:其实这是官网的例子,这里我就用这个为例了,我感觉官网这个例子极好

PUT _ingest/pipeline/hotel_pipeline4
{
  "description": "酒店索引的pipeline",
  "processors": [
    {
      "split": {
        "field": "tags",
        "separator": ","
      }
    }
  ],
  "on_failure" : [
    {
      "set" : {
        "field" : "_index",
        "value" : "failed-{{_index}}"
      }
    }
  ]
}

我们试一下:
我们索引以下文档,以下文档不包含tags

PUT /pipeline_hotel/_doc/1?pipeline=hotel_pipeline4
{
  "name": "莫德凯撒酒店"
}

我们看看es的返回信息

{
  "_index" : "failed-pipeline_hotel",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 1,
  "_primary_term" : 1
}

可以看到这个文档被索引到了failed-pipeline_hotel里面

查看Pipeline

GET _ingest/pipeline/pipeline的名称
如:
GET _ingest/pipeline/hotel_pipeline

删除Pipeline

DELETE _ingest/pipeline/pipeline的名称
如:
DELETE _ingest/pipeline/hotel_pipeline

Processor类型

processors

注:本图来自借鉴的文章,多谢。基础使用大家也可以看借鉴的文章,以后有其他用法再整理在这里。// TODO

3. 大功告成

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,525评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,203评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,862评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,728评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,743评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,590评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,330评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,244评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,693评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,885评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,001评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,723评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,343评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,919评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,042评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,191评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,955评论 2 355

推荐阅读更多精彩内容