摄取管道
edit摄取管道
edit摄取管道允许您在索引之前对数据执行常见的转换。例如,您可以使用管道来删除字段、从文本中提取值以及丰富您的数据。
一个管道由一系列称为处理器的可配置任务组成。每个处理器按顺序运行,对传入的文档进行特定的更改。在处理器运行完毕后,Elasticsearch会将转换后的文档添加到您的数据流或索引中。
您可以使用 Kibana 的 Ingest Pipelines 功能或 ingest APIs 来创建和管理摄取管道。Elasticsearch 将管道存储在 集群状态中。
先决条件
edit创建和管理管道
edit在 Kibana 中,打开主菜单并点击 堆栈管理 > 摄取管道。从列表视图中,您可以:
- 查看您的管道列表并深入了解详细信息
- 编辑或克隆现有管道
- 删除管道
要创建一个管道,点击 创建管道 > 新建管道。有关示例教程,请参阅 示例:解析日志。
“从CSV创建新管道”选项允许您使用CSV创建一个摄取管道,将自定义数据映射到Elastic通用架构(ECS)。将您的自定义数据映射到ECS可以使数据更易于搜索,并允许您重用其他数据集的可视化。要开始使用,请查看将自定义数据映射到ECS。
您还可以使用摄取API来创建和管理管道。
以下创建管道API请求创建
一个包含两个set处理器,然后是一个
lowercase处理器。处理器按指定的顺序依次运行。
PUT _ingest/pipeline/my-pipeline
{
"description": "My optional pipeline description",
"processors": [
{
"set": {
"description": "My optional processor description",
"field": "my-long-field",
"value": 10
}
},
{
"set": {
"description": "Set 'my-boolean-field' to true",
"field": "my-boolean-field",
"value": true
}
},
{
"lowercase": {
"field": "my-keyword-field"
}
}
]
}
管理管道版本
edit当您创建或更新管道时,可以指定一个可选的version整数。您可以将此版本号与if_version参数一起使用,以有条件地更新管道。当指定了if_version参数时,成功的更新会增加管道的版本。
PUT _ingest/pipeline/my-pipeline-id
{
"version": 1,
"processors": [ ... ]
}
要使用API取消设置version号,请替换或更新管道而不指定version参数。
测试一个管道
edit在生产环境中使用管道之前,我们建议您使用示例文档进行测试。在 Kibana 中创建或编辑管道时,点击 添加文档。在 文档 标签页中,提供示例文档并点击 运行管道。
您还可以使用模拟管道API来测试管道。您可以在请求路径中指定一个已配置的管道。例如,以下请求测试my-pipeline。
POST _ingest/pipeline/my-pipeline/_simulate
{
"docs": [
{
"_source": {
"my-keyword-field": "FOO"
}
},
{
"_source": {
"my-keyword-field": "BAR"
}
}
]
}
或者,您可以在请求体中指定一个管道及其处理器。
POST _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"lowercase": {
"field": "my-keyword-field"
}
}
]
},
"docs": [
{
"_source": {
"my-keyword-field": "FOO"
}
},
{
"_source": {
"my-keyword-field": "BAR"
}
}
]
}
API返回转换后的文档:
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_version": "-3",
"_source": {
"my-keyword-field": "foo"
},
"_ingest": {
"timestamp": "2099-03-07T11:04:03.000Z"
}
}
},
{
"doc": {
"_index": "_index",
"_id": "_id",
"_version": "-3",
"_source": {
"my-keyword-field": "bar"
},
"_ingest": {
"timestamp": "2099-03-07T11:04:04.000Z"
}
}
}
]
}
将管道添加到索引请求中
edit使用 pipeline 查询参数将管道应用于单个或批量索引请求中的文档。
POST my-data-stream/_doc?pipeline=my-pipeline
{
"@timestamp": "2099-03-07T11:04:05.000Z",
"my-keyword-field": "foo"
}
PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }
您还可以将 pipeline 参数与 update by query 或 reindex API 一起使用。
POST my-data-stream/_update_by_query?pipeline=my-pipeline
POST _reindex
{
"source": {
"index": "my-data-stream"
},
"dest": {
"index": "my-new-data-stream",
"op_type": "create",
"pipeline": "my-pipeline"
}
}
设置默认管道
edit使用 index.default_pipeline 索引设置来设置默认管道。如果未指定 pipeline 参数,Elasticsearch 会将此管道应用于索引请求。
设置最终管道
edit使用 index.final_pipeline 索引设置来设置一个最终管道。即使未指定请求或默认管道,Elasticsearch 也会在它们之后应用此管道。
Beats 的管道
edit要将摄取管道添加到Elastic Beat,请在中的output.elasticsearch下指定pipeline参数。例如,对于Filebeat,您将在filebeat.yml中指定pipeline。
output.elasticsearch: hosts: ["localhost:9200"] pipeline: my-pipeline
Fleet 和 Elastic Agent 的管道
editElastic Agent 集成附带了默认的摄取管道,这些管道在索引之前对数据进行预处理和丰富。 Fleet 使用包含 索引模板 来应用这些管道,这些模板包括 管道索引设置。Elasticsearch 根据 数据流的命名方案 将这些模板与您的 Fleet 数据流进行匹配。
每个默认的集成管道都会调用一个不存在的、未版本化的 *@custom 摄取管道。如果未修改,此管道调用不会对您的数据产生影响。但是,您可以修改此调用,为集成创建自定义管道,这些管道在升级过程中仍然有效。请参阅 教程:使用自定义摄取管道转换数据 以了解更多信息。
Fleet 不为 自定义日志 集成提供默认的摄取管道, 但您可以使用 索引模板 或 自定义配置 为此集成指定一个管道。
-
创建并测试您的 摄取管道。将您的管道命名为
logs-。这使得 为您的集成跟踪管道更容易。-default 例如,以下请求为
my-app数据集创建一个管道。 管道的名称为logs-my_app-default。PUT _ingest/pipeline/logs-my_app-default { "description": "Pipeline for `my_app` dataset", "processors": [ ... ] } -
创建一个包含您的管道的索引模板, 在
index.default_pipeline或index.final_pipeline索引设置中。确保模板是数据流启用的。 模板的索引模式应匹配logs-。-* 您可以使用 Kibana 的 索引管理 功能或 创建索引模板 API 来创建此模板。
例如,以下请求创建一个匹配
logs-my_app-*的模板。 该模板使用包含index.default_pipeline索引设置的组件模板。# Creates a component template for index settings PUT _component_template/logs-my_app-settings { "template": { "settings": { "index.default_pipeline": "logs-my_app-default", "index.lifecycle.name": "logs" } } } # Creates an index template matching `logs-my_app-*` PUT _index_template/logs-my_app-template { "index_patterns": ["logs-my_app-*"], "data_stream": { }, "priority": 500, "composed_of": ["logs-my_app-settings", "logs-my_app-mappings"] } - 当在 Fleet 中添加或编辑您的 自定义日志 集成时, 点击 配置集成 > 自定义日志文件 > 高级选项。
-
在数据集名称中,指定您的数据集名称。Fleet 会将新数据添加到生成的
logs-数据流中。-default 例如,如果你的数据集名称是
my_app,Fleet 会将新数据添加到logs-my_app-default数据流中。
-
使用rollover API来滚动你的数据流。 这确保了Elasticsearch将索引模板及其管道设置应用于集成的新数据。
POST logs-my_app-default/_rollover/
-
创建并测试您的 摄取管道。将您的管道命名为
logs-。这使得 为您的集成跟踪管道更容易。-default 例如,以下请求为
my-app数据集创建一个管道。 管道的名称为logs-my_app-default。PUT _ingest/pipeline/logs-my_app-default { "description": "Pipeline for `my_app` dataset", "processors": [ ... ] } - 当在 Fleet 中添加或编辑您的 自定义日志 集成时, 点击 配置集成 > 自定义日志文件 > 高级选项。
-
在数据集名称中,指定您的数据集名称。Fleet 会将新数据添加到生成的
logs-数据流中。-default 例如,如果你的数据集名称是
my_app,Fleet 会将新数据添加到logs-my_app-default数据流中。 -
在自定义配置中,在
pipeline策略设置中指定您的管道。
Elastic Agent 独立模式
如果你以独立模式运行 Elastic Agent,你可以使用包含 索引模板 的
index.default_pipeline 或
index.final_pipeline 索引设置来应用管道。或者,
你可以在你的 elastic-agent.yml 配置中指定 pipeline 策略设置。请参阅 安装独立 Elastic Agents。
搜索索引的管道
edit当您为搜索用例创建Elasticsearch索引时,例如,使用网络爬虫或连接器,这些索引会自动设置特定的摄取管道。 这些处理器有助于优化您的内容以进行搜索。 有关更多信息,请参阅搜索中的摄取管道。
在处理器中访问源字段
edit处理器对传入文档的源字段具有读取和写入权限。
要访问处理器中的字段键,请使用其字段名称。以下 set
处理器访问 my-long-field。
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"field": "my-long-field",
"value": 10
}
}
]
}
您也可以在前面加上 _source 前缀。
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"field": "_source.my-long-field",
"value": 10
}
}
]
}
使用点表示法访问对象字段。
如果你的文档包含扁平化的对象,请使用
dot_expander 处理器先将其展开。其他
ingest处理器无法访问扁平化的对象。
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"dot_expander": {
"description": "Expand 'my-object-field.my-property'",
"field": "my-object-field.my-property"
}
},
{
"set": {
"description": "Set 'my-object-field.my-property' to 10",
"field": "my-object-field.my-property",
"value": 10
}
}
]
}
多个处理器参数支持Mustache模板片段。要在模板片段中访问字段值,请将字段名称用三重花括号括起来:{{{field-name}}}。您可以使用模板片段来动态设置字段名称。
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "Set dynamic '<service>' field to 'code' value",
"field": "{{{service}}}",
"value": "{{{code}}}"
}
}
]
}
在处理器中访问元数据字段
edit处理器可以通过名称访问以下元数据字段:
-
_index -
_id -
_routing -
_dynamic_templates
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "Set '_routing' to 'geoip.country_iso_code' value",
"field": "_routing",
"value": "{{{geoip.country_iso_code}}}"
}
}
]
}
使用 Mustache 模板片段来访问元数据字段值。例如,
{{{_routing}}} 检索文档的路由值。
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "Use geo_point dynamic template for address field",
"field": "_dynamic_templates",
"value": {
"address": "geo_point"
}
}
}
]
}
上述的处理器告诉ES,如果字段address在索引映射中尚未定义,则使用名为geo_point的动态模板。如果字段address已经在批量请求中定义,此处理器将覆盖该字段的动态模板,但对批量请求中定义的其他动态模板没有影响。
如果你自动生成
文档ID,你不能在处理器中使用{{{_id}}}。Elasticsearch在摄取后分配自动生成的_id值。
在处理器中访问摄取元数据
edit摄取处理器可以使用 _ingest 键添加和访问摄取元数据。
与源和元数据字段不同,Elasticsearch 默认情况下不会索引摄取元数据字段。Elasticsearch 还允许以 _ingest 键开头的源字段。如果您的数据包含此类源字段,请使用 _source._ingest 来访问它们。
管道默认仅创建_ingest.timestamp摄取元数据字段。
此字段包含Elasticsearch接收到文档索引请求的时间戳。要索引_ingest.timestamp或其他摄取元数据字段,请使用set处理器。
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "Index the ingest timestamp as 'event.ingested'",
"field": "event.ingested",
"value": "{{{_ingest.timestamp}}}"
}
}
]
}
处理管道失败
edit管道的处理器按顺序运行。默认情况下,当其中一个处理器失败或遇到错误时,管道处理会停止。
要忽略处理器失败并运行管道的其余处理器,请将ignore_failure设置为true。
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"rename": {
"description": "Rename 'provider' to 'cloud.provider'",
"field": "provider",
"target_field": "cloud.provider",
"ignore_failure": true
}
}
]
}
使用 on_failure 参数来指定在处理器失败后立即运行的处理器列表。如果指定了 on_failure,Elasticsearch 之后会运行管道的剩余处理器,即使 on_failure 配置为空。
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"rename": {
"description": "Rename 'provider' to 'cloud.provider'",
"field": "provider",
"target_field": "cloud.provider",
"on_failure": [
{
"set": {
"description": "Set 'error.message'",
"field": "error.message",
"value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
"override": false
}
}
]
}
}
]
}
嵌套一组 on_failure 处理器以进行嵌套错误处理。
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"rename": {
"description": "Rename 'provider' to 'cloud.provider'",
"field": "provider",
"target_field": "cloud.provider",
"on_failure": [
{
"set": {
"description": "Set 'error.message'",
"field": "error.message",
"value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
"override": false,
"on_failure": [
{
"set": {
"description": "Set 'error.message.multi'",
"field": "error.message.multi",
"value": "Document encountered multiple ingest errors",
"override": true
}
}
]
}
}
]
}
}
]
}
您还可以为管道指定on_failure。如果一个没有on_failure值的处理器失败,Elasticsearch会使用这个管道级别的参数作为回退。Elasticsearch将不会尝试运行管道的剩余处理器。
PUT _ingest/pipeline/my-pipeline
{
"processors": [ ... ],
"on_failure": [
{
"set": {
"description": "Index document to 'failed-<index>'",
"field": "_index",
"value": "failed-{{{ _index }}}"
}
}
]
}
有关管道失败的更多信息可能可以在文档元数据字段 on_failure_message、on_failure_processor_type、
on_failure_processor_tag 和 on_failure_pipeline 中找到。这些字段仅在 on_failure 块内可访问。
以下示例使用元数据字段在文档中包含有关管道失败的信息。
PUT _ingest/pipeline/my-pipeline
{
"processors": [ ... ],
"on_failure": [
{
"set": {
"description": "Record error information",
"field": "error_information",
"value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
}
}
]
}
有条件地运行处理器
edit每个处理器都支持一个可选的if条件,以Painless脚本的形式编写。如果提供了该条件,处理器仅在if条件为true时运行。
if 条件脚本在 Painless 的
ingest processor context 中运行。在
if 条件中,ctx 值是只读的。
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"description": "Drop documents with 'network.name' of 'Guest'",
"if": "ctx?.network?.name == 'Guest'"
}
}
]
}
如果启用了 script.painless.regex.enabled 集群设置,您可以在 if 条件脚本中使用正则表达式。有关支持的语法,请参阅 Painless 正则表达式。
如果可能,避免使用正则表达式。昂贵的正则表达式可能会减慢索引速度。
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "If 'url.scheme' is 'http', set 'url.insecure' to true",
"if": "ctx.url?.scheme =~ /^http[^s]/",
"field": "url.insecure",
"value": true
}
}
]
}
您必须在单行上指定有效的 JSON 格式的 if 条件。但是,您可以使用 Kibana 控制台的三重引号语法来编写和调试更大的脚本。
如果可能,避免使用复杂或昂贵的 if 条件脚本。
昂贵的条件脚本会降低索引速度。
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"description": "Drop documents that don't contain 'prod' tag",
"if": """
Collection tags = ctx.tags;
if(tags != null){
for (String tag : tags) {
if (tag.toLowerCase().contains('prod')) {
return false;
}
}
}
return true;
"""
}
}
]
}
您还可以将 存储脚本 指定为 if 条件。
PUT _scripts/my-prod-tag-script
{
"script": {
"lang": "painless",
"source": """
Collection tags = ctx.tags;
if(tags != null){
for (String tag : tags) {
if (tag.toLowerCase().contains('prod')) {
return false;
}
}
}
return true;
"""
}
}
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"description": "Drop documents that don't contain 'prod' tag",
"if": { "id": "my-prod-tag-script" }
}
}
]
}
传入的文档通常包含对象字段。如果处理器脚本尝试访问一个父对象不存在的字段,Elasticsearch 会返回一个 NullPointerException。为了避免这些异常,请使用 空安全操作符,例如 ?.,并编写您的脚本以确保空安全。
例如,ctx.network?.name.equalsIgnoreCase('Guest') 不是空安全的。
ctx.network?.name 可能返回 null。将脚本重写为
'Guest'.equalsIgnoreCase(ctx.network?.name),这是空安全的,因为
Guest 始终是非空的。
如果你无法将脚本重写为空安全,请包含一个显式的空检查。
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"description": "Drop documents that contain 'network.name' of 'Guest'",
"if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
}
}
]
}
有条件地应用管道
edit将 if 条件与 pipeline 处理器结合使用,以根据您的标准将其他管道应用于文档。您可以将此管道用作 默认管道,在用于配置多个数据流或索引的 索引模板中使用。
PUT _ingest/pipeline/one-pipeline-to-rule-them-all
{
"processors": [
{
"pipeline": {
"description": "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
"if": "ctx.service?.name == 'apache_httpd'",
"name": "httpd_pipeline"
}
},
{
"pipeline": {
"description": "If 'service.name' is 'syslog', use 'syslog_pipeline'",
"if": "ctx.service?.name == 'syslog'",
"name": "syslog_pipeline"
}
},
{
"fail": {
"description": "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
"if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
"message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
}
}
]
}
获取管道使用统计信息
edit使用节点统计 API 获取全局和每个管道的摄取统计信息。使用这些统计信息来确定哪些管道运行最频繁或花费最多时间处理。
GET _nodes/stats/ingest?filter_path=nodes.*.ingest