对应7.16官方文档路径: Ingest pipelines » Enrich your data » Example: Enrich your data xxx
官方地址如下:
https://www.elastic.co/guide/en/elasticsearch/reference/7.16/geo-match-enrich-policy-type.html
在创建 enrich policy 时需要指定enrich policy type
,用来明确采用何种方式将 enrich data 匹配到传入的文档中,分为三类:
- geo_match:根据 geo_shape 查询匹配
- match:根据 term 查询匹配精准值
- range:根据 term 查询匹配范围值
示例1:基于地理位置丰富你的数据
geo_match
类型的 enrich policy 使用geo_shape
查询基于地理位置将 enrich data 匹配到传入的文档中。
以下示例创建了一个geo_match
类型的 enrich policy ,该策略基于一组地理坐标向传入的文档中添加邮政编码。然后将geo_match
丰富策略添加到 ingest pipeline processor。
创建包含geo_shape
类型字段的source index
:
PUT /postal_codes
{
"mappings": {
"properties": {
"location": {
"type": "geo_shape"
},
"postal_code": {
"type": "keyword"
}
}
}
}
向source index
中加载数据:地理位置为一个矩形以及其对应的邮编
PUT /postal_codes/_doc/1?refresh=wait_for
{
"location": {
"type": "envelope",
"coordinates": [ [ 13.0, 53.0 ], [ 14.0, 52.0 ] ]
},
"postal_code": "96598"
}
创建geo_match
类型的 enrich policy,必须具备以下前提:
- 一个或者多个 source indices
- 一个
match_field
,来自 source indices 并用其匹配传入的文档 - source indices 中用以加入传入文档的 enrich fields
PUT /_enrich/policy/postal_policy
{
"geo_match": {
"indices": "postal_codes",
"match_field": "location",
"enrich_fields": [ "location", "postal_code" ]
}
}
执行该策略创建 enrich index
:
POST /_enrich/policy/postal_policy/_execute
创建带有刚配置完的 enrich processor 的 ingest pipeline,其必须包含以下部分:
- 刚配置的 enrich policy
- 传入的文档中用来和 enrich index 中 match_field 进行匹配的字段
field
- 用来存储来自 enrich index 的 enrich data 的字段
target_field
- 配置两个字段空间匹配逻辑
shape_relation
PUT /_ingest/pipeline/postal_lookup
{
"processors": [
{
"enrich": {
"description": "Add 'geo_data' based on 'geo_location'",
"policy_name": "postal_policy",
"field": "geo_location",
"target_field": "geo_data",
"shape_relation": "INTERSECTS"
}
}
]
}
使用配置好的 pipeline 加载数据,传入的数据必须包含上述设置的field
字段:
PUT /users/_doc/0?pipeline=postal_lookup
{
"first_name": "Mardy",
"last_name": "Brown",
"geo_location": "POINT (13.5 52.5)"
}
读取刚写入的文档,发现已被丰富了geo_data
字段:
{
"found": true,
"_index": "users",
"_type": "_doc",
"_id": "0",
"_version": 1,
"_seq_no": 55,
"_primary_term": 1,
"_source": {
"geo_data": {
"location": {
"type": "envelope",
"coordinates": [[13.0, 53.0], [14.0, 52.0]]
},
"postal_code": "96598"
},
"first_name": "Mardy",
"last_name": "Brown",
"geo_location": "POINT (13.5 52.5)"
}
}
示例2:基于精确值丰富你的数据`
match
类型的 enrich policy 使用term
查询基于类似邮件地址、ID等精确数据将 enrich data 匹配到传入的文档中。
以下示例创建了一个match
类型的 enrich policy,该策略基于邮件地址向传入的文档中添加用户姓名和其他用户信息。然后将match
丰富策略添加到 ingest pipeline processor。
创建包好用到上述字段的source index
并写入数据:
PUT /users/_doc/1?refresh=wait_for
{
"email": "mardy.brown@asciidocsmith.com",
"first_name": "Mardy",
"last_name": "Brown",
"city": "New Orleans",
"county": "Orleans",
"state": "LA",
"zip": 70116,
"web": "mardy.asciidocsmith.com"
}
创建match
类型的 enrich policy,必须具备一下前提:
- 一个或者多个 source indices
- 一个
match_field
,来自 source indices 并用其匹配传入的文档 - source indices 中用以加入传入文档的 enrich fields
PUT /_enrich/policy/users-policy
{
"match": {
"indices": "users",
"match_field": "email",
"enrich_fields": ["first_name", "last_name", "city", "zip", "state"]
}
}
执行该策略创建enrich index
:
POST /_enrich/policy/users-policy/_execute
创建带有刚配置完的 enrich processor 的 ingest pipeline,其必须包含以下部分:
- 刚配置的 enrich policy
- 传入的文档中用来和 enrich index 中 match_field 进行匹配的字段
field
- 用来存储来自 enrich index 的 enrich data 的字段
target_field
PUT /_ingest/pipeline/user_lookup
{
"processors" : [
{
"enrich" : {
"description": "Add 'user' data based on 'email'",
"policy_name": "users-policy",
"field" : "email",
"target_field": "user",
"max_matches": "1"
}
}
]
}
使用配置好的 pipeline 加载数据,传入的数据必须包含上述设置的field字段:
PUT /my-index-000001/_doc/my_id?pipeline=user_lookup
{
"email": "mardy.brown@asciidocsmith.com"
}
读取刚写入的文档,发现已被丰富了user
字段:
{
"found": true,
"_index": "my-index-000001",
"_type": "_doc",
"_id": "my_id",
"_version": 1,
"_seq_no": 55,
"_primary_term": 1,
"_source": {
"user": {
"email": "mardy.brown@asciidocsmith.com",
"first_name": "Mardy",
"last_name": "Brown",
"zip": 70116,
"city": "New Orleans",
"state": "LA"
},
"email": "mardy.brown@asciidocsmith.com"
}
}
示例3:基于值的范围丰富你的数据
range
类型的 enrich policy 使用term
查询匹配传入文档中的数字、日期或者IP类型字段与 enrich index 范围一致的文档,不支持范围-范围形式的匹配。
以下示例创建了一个range
类型的 enrich policy,该策略基于 IP 地址将而外信息添加到传入文档中。然后将range
丰富策略添加 ingest pipeline processor。
创建source index
:
PUT /networks
{
"mappings": {
"properties": {
"range": { "type": "ip_range" },
"name": { "type": "keyword" },
"department": { "type": "keyword" }
}
}
}
向source index
中加载数据:
PUT /networks/_doc/1?refresh=wait_for
{
"range": "10.100.0.0/16",
"name": "production",
"department": "OPS"
}
创建range
类型的 enrich policy,必须具备以下前提:
- 一个或者多个 source indices
- 一个
match_field
,来自 source indices 并用其匹配传入的文档 - source indices 中用以加入传入文档的 enrich fields
由于我们计划基于 IP 地址来丰富文档,因此策略的match_field
必须是一个ip_range
字段:
PUT /_enrich/policy/networks-policy
{
"range": {
"indices": "networks",
"match_field": "range",
"enrich_fields": ["name", "department"]
}
}
执行该策略创建 enrich index:
POST /_enrich/policy/networks-policy/_execute
创建带有刚配置完的 enrich processor 的 ingest pipeline,其必须包含以下部分:
- 刚配置的 enrich policy
- 传入的文档中用来和 enrich index 中 match_field 进行匹配的字段
field
- 用来存储来自 enrich index 的 enrich data 的字段
target_field
PUT /_ingest/pipeline/networks_lookup
{
"processors" : [
{
"enrich" : {
"description": "Add 'network' data based on 'ip'",
"policy_name": "networks-policy",
"field" : "ip",
"target_field": "network",
"max_matches": "10"
}
}
]
}
使用配置好的 pipeline 加载数据,传入的数据必须包含上述设置的field
字段:
PUT /my-index-000001/_doc/my_id?pipeline=networks_lookup
{
"ip": "10.100.34.1"
}
读取刚写入的文档,发现已被丰富了network
字段:
{
"_index" : "my-index-000001",
"_type" : "_doc",
"_id" : "my_id",
"_version" : 1,
"_seq_no" : 0,
"_primary_term" : 1,
"found" : true,
"_source" : {
"ip" : "10.100.34.1",
"network" : [
{
"name" : "production",
"range" : "10.100.0.0/16",
"department" : "OPS"
}
]
}
}
总结
经过官方提供的三个示例,我们不难发现,enrich 的使用方式就是将传入的文档与 ES 内已存在的数据进行匹配,如果匹配上了就将匹配到的 ES 内已存在的数据的部分内容补充进传入的文档中,用以丰富数据。