初识Ingest Pipeline
ES在具备node.ingest:true
的节点上提供了pipeline
功能,可以在文档真正写入ES之前进行一些转换,插入新值,修改文档内容等功能。pipeline会拦截index和bulk请求,请求经处理后再写入ES。用户可以定义一系列的"处理器",在处理文档时,pipeline会按照processor的定义顺序执行processor。定义个格式:
PUT _ingest/pipeline/my_pipeline_id
{
"description" : "describe pipeline",
"processors" : [
{
"set" : {
"field": "foo",
"value": "new"
}
}
]
}
官方示例:在PIPELINE中使用Scripts Processor
以ES 官方文档中的这个例子进行试验:https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-context-examples.html
其提供了一份数据,里面包含了歌剧的场次信息统计。我们首先要把样例数据导入到ES中。其中一条是这样:
{ "create" : { "_index" : "seats", "_type" : "seat", "_id" : "36203" } }
{ "theatre" : "Skyline", "play" : "Auntie Jo", "actors": [ "Jo Hangum", "Jon Hittle", "Rob Kettleman", "Laura Conrad", "Simon Hower", "Nora Blue" ], "date": "2018-12-14", "time": "5:40PM", "row": 11, "number": 14, "cost": 17.5, "sold": false }
我们可以看到,其内容是有问题的,_type
类型在目前的版本中即将被废弃,只能是doc
类型。因此如果按照原文的方法导入,是会失败的。
https://www.elastic.co/guide/en/elasticsearch/reference/7.6/mapping-type-field.html
我们需要对_type进行修改。在painless
脚本中,可以使用ctx['_type']来修改插入操作的type.
ctx['_index']
The name of the index.
ctx['_type']
The type of document within an index.
增加一条ingest的处理器。
PUT _ingest/pipeline/seats
{
"description": "seats-ingest",
"processors": [
{"script": {
"lang": "painless",
"source": """
ctx['_type'] = "_doc";
"""
}}
]
}
在kibana上尝试插入一条:
POST _bulk?pipeline=seats
{ "create" : { "_index" : "seats", "_type" : "seat", "_id" : "36203" } }
{ "theatre" : "Skyline", "play" : "Auntie Jo", "actors": [ "Jo Hangum", "Jon Hittle", "Rob Kettleman", "Laura Conrad", "Simon Hower", "Nora Blue" ], "date": "2018-12-14", "time": "5:40PM", "row": 11, "number": 14, "cost": 17.5, "sold": false }
#! Deprecation: [types removal] Specifying types in bulk requests is deprecated.
{
"took" : 4960,
"ingest_took" : 373,
"errors" : false,
"items" : [
{
"create" : {
"_index" : "seats",
"_type" : "_doc",
"_id" : "36203",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1,
"status" : 201
}
}
]
}
可以看到,是有_bulk
请求插入时指定_doc
类型的做法已经废弃了。可以看到_type
已经被改成了_doc
,而如果没有这个pipeline,结果将是_bulk
请求中指定的seats
类型。
修改后脚本改为如下:
PUT _ingest/pipeline/seats
{
"description": "seats-ingest",
"processors": [
{"script": {
"lang": "painless",
"source": """
String[] split(String str, char delimiter)
{
int count = 0;
for (char c: str.toCharArray())
{
if (c == delimiter)
{
++ count;
}
}
if (count == 0)
{
return new String[] {str};
}
String[] r = new String[count + 1];
int i0 = 0, i1 = 0, n = 0;
for (char c: str.toCharArray())
{
if (c == delimiter)
{
r[n] = str.substring(i0, i1);
++n;
i0 = i1 + 1;
}
++ i1;
}
r[count] = str.substring(i0, i1);
return r;
}
ctx['_type'] = "_doc";
String[] date_array = split(ctx.date, (char)"-");
String year = date_array[0].trim();
String month = date_array[1].trim();
if (month.length() == 1)
{
month = "0" + month;
}
String day = date_array[2].trim();
if (day.length() == 1)
{
day = "0" + day;
}
boolean is_pm = ctx.time.substring(ctx.time.length() - 2).equals("PM");
String[] time_array = split(ctx.time.substring(0, ctx.time.length()-2), (char)":");
int hour = Integer.parseInt(time_array[0].trim());
if (is_pm)
{
hour += 12;
}
String hour_str = "";
if (hour < 10)
{
hour_str += "0";
}
hour_str += hour;
int min = Integer.parseInt(time_array[1].trim());
String min_str = "";
if (min < 10)
{
min_str += "0";
}
min_str += min;
String date_time = year + "-" + month + "-" + day + "T" + hour_str + ":" + min + ":00+08:00";
ZonedDateTime dt = ZonedDateTime.parse(
date_time, DateTimeFormatter.ISO_OFFSET_DATE_TIME);
ctx.datetime = dt.getLong(ChronoField.INSTANT_SECONDS)*1000L;
ctx["_type"] = "_doc";
"""
}}
]
}
对应的导入命令也做一下修改,改为:
curl -k -XPOST https://elastic:elastic@localhost:9200/_bulk?pipeline=seats -H "Content-Type: application/x-ndjson" --data-binary "@/home/DATA/seats-init.json"
之前试了好几次,都在指定输入文件这一步出错了,最后发现是需要在路径最前面加一个@
。
可能是我的机器不是特别好,导入时间有点长。7万多条记录花了很长时间。事实上最后导入失败了,只导入了3000多条。错误显示主分片不可访问。
对比ClickHouse,8000万条记录只花了几分钟就搞定。用来进行分析,ClickHouse各方面指标要来得更好。
对pipeline的添加,查看,修改,模拟调试API
ES 提供了一些API对ingest 的pipeline进行调试.
添加
PUT _ingest/pipeline/my-pipeline-id
{
"description" : "describe pipeline",
"processors" : [
{
"set" : {
"field": "foo",
"value": "bar"
}
}
]
}
查看
GET _ingest/pipeline/my-pipeline-id
删除
DELETE _ingest/pipeline/my-pipeline-id
模拟
POST _ingest/pipeline/_simulate
{
"pipeline" : {
// pipeline definition here
},
"docs" : [
{ "_source": {/** first document **/} },
{ "_source": {/** second document **/} },
// ...
]
}
也可以这样,在POST行中指定索引名称进行处理:
POST _ingest/pipeline/seats/_simulate
{
"docs": [
{
"_source": {
"theatre": "Skyline",
"play": "Auntie Jo",
"actors": [
"Jo Hangum",
"Jon Hittle",
"Rob Kettleman",
"Laura Conrad",
"Simon Hower",
"Nora Blue"
],
"date": "2018-12-14",
"time": "5:40PM",
"row": 11,
"number": 14,
"cost": 17.5,
"sold": false
}
}
]
}
如何在Pipeline中操作使用文档的数据
文档的"_source"数据
上文的例子中,我们在Pipeline中使用script处理器时,可以使用painless
的内置数据结构ctx对文档source进行处理。
ctx是一个mapping对象,内部存储了包括文档的meta数据和_source数据:
ingest情况下
_index: ctx._index or ctx["_index"]
_doc: ctx._doc or ctx["_doc"]
_op: ctx._op or ctx["_op"]
xxxx: ctx.xxxx or ctx["xxxx"]
Update情况下
xxxx: ctx._source.xxxx
在不使用script的情况下,同样可以对_source中的数据进行访问。
比如说使用set处理器的情况下:
PUT _ingest/pipeline/bbb
{
"description": "test",
"processors": [
{"set": {
"field": "_source.foo",
"value": "bar"
}}
]
}
PUT _ingest/pipeline/aaa
{
"description": "test",
"processors": [
{"set": {
"field": "foo",
"value": "bar"
}}
]
}
可以直接通过数据名或者_source.数据吗进行处理。
文档的metadata
文档的metadata包括_index, _type, _routing, _id,在pipeline中均可以直接访问。
Ingest的metadata
PUT _ingest/pipeline/ccc
{
"description": "test",
"processors": [
{"set": {
"field": "received",
"value": "{{_ingest.timestamp}}"
}}
]
}
不同于metadata里的成员,_ingest可以是一个_source中的合法成员,因此,访问ingest的meta数据需要使用这样的方式{{_ingest.timestamp}}。
template 中的数据与metadata
在模拟中,也可以使用pipeline,同样,访问template中的数据也需要使用{{ }}。
{
"set": {
"field": "field_c",
"value": "{{field_a}} {{field_b}}"
}
}
上面的这个set processor,将原有的 field_a 和 field_b 做了拼接之后,将其赋给了一个新字段field_c。
同时,template中也支持动态设置字段。
{
"set": {
"field": "{{service}}",
"value": "{{code}}"
}
}
要说明的是,如果想在template中使用,需要设置动态索引参数:
index.default_pipeline
The default ingest node pipeline for this index. Index requests will fail if the default pipeline is set and the pipeline does not exist. The default may be overridden using the pipeline
parameter. The special pipeline name _none
indicates no ingest pipeline should be run.
index.final_pipeline
The final ingest node pipeline for this index. Index requests will fail if the final pipeline is set and the pipeline does not exist. The final pipeline always runs after the request pipeline (if specified) and the default pipeline (if it exists). The special pipeline name _none
indicates no ingest pipeline will run.
需要注意的是: final_pipeline是7.5版本之后才支持的。之前版本没有,比如我用的7.4版本,这个参数就不支持。
使用方法,在index的setting中指定:
"settings": {
"number_of_shards": 1,
"default_pipeline": "_none"
},
default_pipeline会被命令行中指定的pipeline覆盖,而final_pipeline会在最后执行。
条件执行
Pipeline中的每个processor都支持一个if参数,满足条件的processor才会执行。
PUT _ingest/pipeline/drop_guests_network
{
"processors": [
{
"drop": {
"if": "ctx.network_name == 'Guest'"
}
}
]
}
这是官方文档中的一个例子,当network_name为Guest时,丢弃这条记录。
同时,if
参数也支持复杂的脚本。
PUT _ingest/pipeline/not_prod_dropper
{
"processors": [
{
"drop": {
"if": """
Collection tags = ctx.tags;
if(tags != null){
for (String tag : tags) {
if (tag.toLowerCase().contains('prod')) {
return false;
}
}
}
return true;
"""
}
}
]
}
编写一段painless脚本,返回true or false即可。
Processors中可以使用嵌套对象进行判断,但是为了防止产生null point exception,可以使用?.
来访问成员以防止成员不存在。
PUT _ingest/pipeline/not_prod_dropper
{
"processors": [
{
"drop": {
"if": """
Collection tags = ctx.tags;
if(tags != null){
for (String tag : tags) {
if (tag.toLowerCase().contains('prod')) {
return false;
}
}
}
return true;
"""
}
}
]
}
有一种processor名为pipeline,可以结合if
条件指定对应的pipeline名称。
curl -X PUT "localhost:9200/_ingest/pipeline/logs_pipeline?pretty" -H 'Content-Type: application/json' -d'
{
"description": "A pipeline of pipelines for log files",
"version": 1,
"processors": [
{
"pipeline": {
"if": "ctx.service?.name == \u0027apache_httpd\u0027",
"name": "httpd_pipeline"
}
},
{
"pipeline": {
"if": "ctx.service?.name == \u0027syslog\u0027",
"name": "syslog_pipeline"
}
},
{
"fail": {
"message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
}
}
]
}
'
上面的pipeline processor指定了两个参数,if
是该processor满足的条件,name是对应需要执行的pipeline的名称。
此外,pipeline的条件中还可以使用正则表达式。
PUT _ingest/pipeline/check_url
{
"processors": [
{
"set": {
"if": "ctx.href?.url =~ /^http[^s]/",
"field": "href.insecure",
"value": true
}
}
]
}
异常处理
正常情况下,pipeline会顺序执行所有的processor,并在遇到第一个异常时,中断当前文档的处理。但是有时候,用户希望自定义异常的处理,这时候就需要为processor设置on_failure
参数。这样当process遇到error的时候,会执行on_failure里的内容,然后继续执行下一个处理器。
{
"description" : "my first pipeline with handled exceptions",
"processors" : [
{
"rename" : {
"field" : "foo",
"target_field" : "bar",
"on_failure" : [
{
"set" : {
"field" : "error",
"value" : "field \"foo\" does not exist, cannot rename to \"bar\""
}
}
]
}
}
]
}
也可以进行索引级别的异常处理:
{
"description" : "my first pipeline with handled exceptions",
"processors" : [
{
"rename" : {
"field" : "foo",
"target_field" : "bar"
]
}
}
],
"on_failure" : [
{
"set" : {
"field" : "_index",
"value" : "failed-{{ _index }}"
}
}
]
}
值得注意的是,同一个异常只能被捕获一次,这个和java的异常处理机制是一致的。如果同时定义了processor和pipeline级别的异常处理模块,则异常只会被processor级别所捕获。
同时,我们也可以让processor忽略此处的错误。只要将参数ignore_failure置为true即可。
{
"description" : "my first pipeline with handled exceptions",
"processors" : [
{
"rename" : {
"field" : "foo",
"target_field" : "bar",
"ignore_failure" : true
}
}
]
}