丰富您的数据
edit丰富您的数据
edit您可以使用enrich processor在摄取期间将现有索引中的数据添加到传入文档中。
例如,您可以使用enrich处理器来:
- 根据已知的IP地址识别网络服务或供应商
- 根据产品ID将产品信息添加到零售订单中
- 根据电子邮件地址补充联系信息
- 根据用户坐标添加邮政编码
富集处理器的工作原理
edit大多数处理器是自包含的,并且只更改传入文档中的现有数据。
enrich 处理器向传入的文档添加 新 数据,并需要一些特殊组件:
- enrich policy
-
一组配置选项,用于将正确的丰富数据添加到正确的传入文档中。
一个富集策略包含:
- 一个或多个源索引,用于将丰富数据存储为文档
- 策略类型,用于确定处理器如何将丰富数据与传入文档匹配
- 源索引中的匹配字段,用于匹配传入文档
- 包含丰富数据的丰富字段,您希望将其从源索引添加到传入文档中
在使用丰富处理器之前,必须先执行丰富策略。执行后,丰富策略使用策略源索引中的丰富数据创建一个称为丰富索引的简化系统索引。处理器使用此索引来匹配和丰富传入的文档。
- source index
- 一个存储您希望添加到传入文档的丰富数据的索引。您可以像管理常规 Elasticsearch 索引一样创建和管理这些索引。您可以在一个丰富策略中使用多个源索引。您也可以在多个丰富策略中使用相同的源索引。
- enrich index
-
一个与特定富化策略绑定的特殊系统索引。
直接将传入的文档与源索引中的文档进行匹配可能会很慢且资源消耗大。为了加快速度,enrich 处理器使用了一个 enrich 索引。
Enrich 索引包含从源索引中提取的 enrich 数据,但具有一些特殊的属性,以帮助简化它们:
-
它们是系统索引,意味着它们由 Elasticsearch 内部管理,并且仅用于 enrich 处理器和 ES|QL
ENRICH命令。 -
它们总是以
.enrich-*开头。 - 它们是只读的,意味着你不能直接更改它们。
- 它们是为了快速检索而 强制合并 的。
-
它们是系统索引,意味着它们由 Elasticsearch 内部管理,并且仅用于 enrich 处理器和 ES|QL
设置一个数据丰富处理器
edit要设置一个富集处理器,请按照以下步骤操作:
一旦设置了富集处理器, 你可以 更新你的富集数据 和 更新你的富集策略。
enrich 处理器执行多项操作,可能会影响您的摄取管道的速度。
我们强烈建议在生产环境中部署之前,对您的富集处理器进行测试和基准测试。
我们不建议使用 enrich 处理器来附加实时数据。 enrich 处理器最适合处理不经常变化的参考数据。
先决条件
edit要使用丰富策略,您必须具备:
-
read对任何使用的索引的读取权限 -
enrich_user内置角色
添加丰富数据
edit首先,将文档添加到一个或多个源索引中。这些文档应包含您最终希望添加到传入数据中的丰富数据。
执行丰富策略
edit一旦创建了丰富策略,您需要使用 执行丰富策略 API 或 Kibana 中的索引管理 来创建一个 丰富索引。
该丰富索引包含来自策略源索引的文档。
丰富索引始终以.enrich-*开头,
是只读的,
并且会进行强制合并。
Enrich 索引应仅由 enrich 处理器
或 ES|QL ENRICH 命令使用。避免将 enrich 索引用于其他目的。
向摄取管道添加一个丰富处理器
edit一旦你有了源索引、丰富策略以及相关的丰富索引,你就可以设置一个包含丰富处理器的数据流管道,用于你的策略。
定义一个enrich processor,并使用创建或更新管道 API将其添加到摄取管道中。
在定义丰富处理器时,您必须包含至少以下内容:
- 使用的丰富策略。
- 用于匹配传入文档与丰富索引中文档的字段。
- 要添加到传入文档的目标字段。此目标字段包含在丰富策略中指定的匹配字段和丰富字段。
您还可以使用 max_matches 选项来设置传入文档可以匹配的丰富文档数量。如果设置为默认值 1,数据将作为 JSON 对象添加到传入文档的目标字段中。否则,数据将作为数组添加。
查看 Enrich 以获取完整的配置选项列表。
您还可以在您的摄取管道中添加其他处理器。
更新一个富集索引
edit一旦创建,您无法更新或将文档索引到扩展索引中。 相反,请更新您的源索引并执行扩展策略。 这将使用您更新的源索引创建一个新的扩展索引。 之前的扩展索引将通过延迟维护作业删除。 默认情况下,这是每15分钟执行一次。
更新一个丰富策略
edit一旦创建,您无法更新或更改一个富化策略。 相反,您可以:
- 创建并执行一个新的丰富策略。
- 用新的丰富策略替换之前使用的丰富策略, 在任何正在使用的丰富处理器或ES|QL查询中。
- 使用删除丰富策略 API 或 Kibana中的索引管理 删除之前的丰富策略。
丰富组件
edit富集协调器是一个组件,负责管理和执行在每个摄取节点上富集文档所需的搜索。它将所有管道中的所有富集处理器的搜索合并为批量多重搜索。
富集策略执行器是一个管理所有富集策略执行的组件。当执行一个富集策略时,此组件会创建一个新的富集索引并删除之前的富集索引。富集策略的执行由当选的主节点管理。这些策略的执行发生在不同的节点上。
节点设置
editThe enrich processor 具有用于 enrich coordinator 和 enrich policy executor 的节点设置。
富集协调器支持以下节点设置:
-
enrich.cache_size -
缓存用于丰富文档的搜索的最大缓存大小。
大小可以以三种单位指定:缓存的搜索的原始数量(例如
1000),以字节为单位的绝对大小(例如100Mb), 或节点最大堆空间的百分比(例如1%)。 对于绝对字节大小和堆空间百分比, Elasticsearch 不保证丰富缓存大小将完全符合该最大值, 因为 Elasticsearch 使用序列化搜索响应的字节大小, 这是堆上使用空间的良好表示,但不是精确匹配。 默认为1%。集群中所有丰富处理器共享一个缓存。 -
enrich.coordinator_proxy.max_concurrent_requests -
在丰富文档时运行的最大并发多搜索请求数量。默认为
8。 -
enrich.coordinator_proxy.max_lookups_per_request -
在丰富文档时,多搜索请求中包含的最大搜索次数。默认为
128。
富集策略执行器支持以下节点设置:
-
enrich.fetch_size -
重新索引源索引到增强索引时的最大批次大小。默认为
10000。 -
enrich.max_force_merge_attempts -
enrich 索引允许的最大 force merge 尝试次数。默认为
3。 -
enrich.cleanup_period -
Elasticsearch检查未使用的enrich索引是否可以删除的频率。默认为
15m。 -
enrich.max_concurrent_policy_executions -
最大并发执行的丰富策略数量。默认为
50。
示例:基于地理位置丰富您的数据
editgeo_match 丰富策略 根据地理位置将丰富数据匹配到传入的文档,使用geo_shape查询。
以下示例创建了一个 geo_match 丰富策略,该策略根据一组坐标为传入文档添加邮政编码。然后,它将 geo_match 丰富策略添加到摄取管道中的处理器。
使用创建索引 API来创建一个包含至少一个geo_shape字段的源索引。
PUT /postal_codes
{
"mappings": {
"properties": {
"location": {
"type": "geo_shape"
},
"postal_code": {
"type": "keyword"
}
}
}
}
使用索引 API 将富化数据索引到此源索引中。
PUT /postal_codes/_doc/1?refresh=wait_for
{
"location": {
"type": "envelope",
"coordinates": [ [ 13.0, 53.0 ], [ 14.0, 52.0 ] ]
},
"postal_code": "96598"
}
使用创建丰富策略 API来创建一个带有geo_match策略类型的丰富策略。此策略必须包括:
- 一个或多个源索引
-
一个
match_field, 源索引中的geo_shape字段,用于匹配传入的文档 - 您希望从源索引中追加到传入文档的丰富字段
PUT /_enrich/policy/postal_policy
{
"geo_match": {
"indices": "postal_codes",
"match_field": "location",
"enrich_fields": [ "location", "postal_code" ]
}
}
使用执行丰富策略 API来为策略创建一个丰富索引。
POST /_enrich/policy/postal_policy/_execute?wait_for_completion=false
使用创建或更新管道 API来创建一个摄取管道。在管道中,添加一个丰富处理器,其中包括:
- 您的丰富策略。
-
用于匹配来自丰富索引的文档的地理形状的传入文档的
field。 -
用于存储传入文档的附加丰富数据的
target_field。 此字段包含在您的丰富策略中指定的match_field和enrich_fields。 -
shape_relation,指示处理器如何将传入文档中的地理形状与来自丰富索引的文档中的地理形状进行匹配。有关有效选项和更多信息,请参阅空间关系。
PUT /_ingest/pipeline/postal_lookup
{
"processors": [
{
"enrich": {
"description": "Add 'geo_data' based on 'geo_location'",
"policy_name": "postal_policy",
"field": "geo_location",
"target_field": "geo_data",
"shape_relation": "INTERSECTS"
}
}
]
}
使用摄取管道来索引文档。传入的文档应包含在您的丰富处理器中指定的字段。
PUT /users/_doc/0?pipeline=postal_lookup
{
"first_name": "Mardy",
"last_name": "Brown",
"geo_location": "POINT (13.5 52.5)"
}
要验证富集处理器是否匹配并附加了适当的字段数据,请使用get API查看索引文档。
GET /users/_doc/0
API返回以下响应:
{
"found": true,
"_index": "users",
"_id": "0",
"_version": 1,
"_seq_no": 55,
"_primary_term": 1,
"_source": {
"geo_data": {
"location": {
"type": "envelope",
"coordinates": [[13.0, 53.0], [14.0, 52.0]]
},
"postal_code": "96598"
},
"first_name": "Mardy",
"last_name": "Brown",
"geo_location": "POINT (13.5 52.5)"
}
}
示例:基于精确值丰富您的数据
editmatch 丰富策略 根据精确值(如电子邮件地址或ID)将丰富数据匹配到传入文档,使用term查询。
以下示例创建了一个匹配富化策略,该策略根据电子邮件地址向传入文档添加用户名和联系信息。然后,它将匹配富化策略添加到摄取管道中的处理器。
以下索引API请求创建一个源索引,并将新文档索引到该索引中。
PUT /users/_doc/1?refresh=wait_for
{
"email": "mardy.brown@asciidocsmith.com",
"first_name": "Mardy",
"last_name": "Brown",
"city": "New Orleans",
"county": "Orleans",
"state": "LA",
"zip": 70116,
"web": "mardy.asciidocsmith.com"
}
使用创建丰富策略 API 来创建一个带有 match 策略类型的丰富策略。此策略必须包括:
- 一个或多个源索引
-
一个
match_field, 用于匹配传入文档的源索引中的字段 - 您希望从源索引中追加到传入文档的丰富字段
PUT /_enrich/policy/users-policy
{
"match": {
"indices": "users",
"match_field": "email",
"enrich_fields": ["first_name", "last_name", "city", "zip", "state"]
}
}
使用 execute enrich policy API 为策略创建一个丰富索引。
POST /_enrich/policy/users-policy/_execute?wait_for_completion=false
使用创建或更新管道 API来创建一个摄取管道。在管道中,添加一个丰富处理器,其中包括:
- 您的丰富策略。
-
用于匹配来自丰富索引的文档的传入文档的
field。 -
用于存储传入文档的附加丰富数据的
target_field。 此字段包含在您的丰富策略中指定的match_field和enrich_fields。
PUT /_ingest/pipeline/user_lookup
{
"processors" : [
{
"enrich" : {
"description": "Add 'user' data based on 'email'",
"policy_name": "users-policy",
"field" : "email",
"target_field": "user",
"max_matches": "1"
}
}
]
}
使用摄取管道来索引文档。传入的文档应包含在您的丰富处理器中指定的字段。
PUT /my-index-000001/_doc/my_id?pipeline=user_lookup
{
"email": "mardy.brown@asciidocsmith.com"
}
要验证富集处理器是否匹配并附加了适当的字段数据,请使用get API查看索引文档。
GET /my-index-000001/_doc/my_id
API返回以下响应:
{
"found": true,
"_index": "my-index-000001",
"_id": "my_id",
"_version": 1,
"_seq_no": 55,
"_primary_term": 1,
"_source": {
"user": {
"email": "mardy.brown@asciidocsmith.com",
"first_name": "Mardy",
"last_name": "Brown",
"zip": 70116,
"city": "New Orleans",
"state": "LA"
},
"email": "mardy.brown@asciidocsmith.com"
}
}
示例:通过将值与范围匹配来丰富您的数据
edit一个 range 丰富策略 使用一个 term
查询 来匹配传入文档中的数字、日期或IP地址与丰富索引中相同类型的范围。不支持将范围与范围进行匹配。
以下示例创建了一个 range 丰富策略,该策略根据 IP 地址为传入文档添加描述性网络名称和负责部门。然后,它将该丰富策略添加到摄取管道中的处理器中。
使用带有适当映射的创建索引 API来创建源索引。
PUT /networks
{
"mappings": {
"properties": {
"range": { "type": "ip_range" },
"name": { "type": "keyword" },
"department": { "type": "keyword" }
}
}
}
以下索引API请求将一个新文档索引到该索引中。
PUT /networks/_doc/1?refresh=wait_for
{
"range": "10.100.0.0/16",
"name": "production",
"department": "OPS"
}
使用创建丰富策略 API 来创建一个带有 range 策略类型的丰富策略。此策略必须包括:
- 一个或多个源索引
-
一个
match_field, 用于匹配传入文档的源索引中的字段 - 您希望从源索引中追加到传入文档的丰富字段
由于我们计划基于IP地址丰富文档,策略的match_field必须是ip_range字段。
PUT /_enrich/policy/networks-policy
{
"range": {
"indices": "networks",
"match_field": "range",
"enrich_fields": ["name", "department"]
}
}
使用执行丰富策略 API来为策略创建一个丰富索引。
POST /_enrich/policy/networks-policy/_execute?wait_for_completion=false
使用创建或更新管道 API来创建一个摄取管道。在管道中,添加一个丰富处理器,其中包括:
- 您的丰富策略。
-
用于匹配来自丰富索引的文档的传入文档的
field。 -
用于存储传入文档的附加丰富数据的
target_field。 此字段包含在您的丰富策略中指定的match_field和enrich_fields。
PUT /_ingest/pipeline/networks_lookup
{
"processors" : [
{
"enrich" : {
"description": "Add 'network' data based on 'ip'",
"policy_name": "networks-policy",
"field" : "ip",
"target_field": "network",
"max_matches": "10"
}
}
]
}
使用摄取管道来索引文档。传入的文档应包含在您的丰富处理器中指定的字段。
PUT /my-index-000001/_doc/my_id?pipeline=networks_lookup
{
"ip": "10.100.34.1"
}
要验证富集处理器是否匹配并附加了适当的字段数据,请使用get API查看索引文档。
GET /my-index-000001/_doc/my_id
API返回以下响应:
{
"_index" : "my-index-000001",
"_id" : "my_id",
"_version" : 1,
"_seq_no" : 0,
"_primary_term" : 1,
"found" : true,
"_source" : {
"ip" : "10.100.34.1",
"network" : [
{
"name" : "production",
"range" : "10.100.0.0/16",
"department" : "OPS"
}
]
}
}