摄取管道

edit

摄取管道允许您在索引之前对数据执行常见的转换。例如,您可以使用管道来删除字段、从文本中提取值以及丰富您的数据。

一个管道由一系列称为处理器的可配置任务组成。每个处理器按顺序运行,对传入的文档进行特定的更改。在处理器运行完毕后,Elasticsearch会将转换后的文档添加到您的数据流或索引中。

Ingest pipeline diagram

您可以使用 Kibana 的 Ingest Pipelines 功能或 ingest APIs 来创建和管理摄取管道。Elasticsearch 将管道存储在 集群状态中。

先决条件

edit
  • 具有ingest节点角色的节点处理管道处理。要使用摄取管道,您的集群必须至少有一个具有ingest角色的节点。对于大量的摄取负载,我们建议创建专用的摄取节点
  • 如果启用了Elasticsearch安全功能,您必须具有manage_pipeline 集群权限来管理摄取管道。要使用Kibana的Ingest Pipelines功能,您还需要cluster:monitor/nodes/info集群权限。
  • 包括enrich处理器的管道需要额外的设置。请参阅丰富您的数据

创建和管理管道

edit

在 Kibana 中,打开主菜单并点击 堆栈管理 > 摄取管道。从列表视图中,您可以:

  • 查看您的管道列表并深入了解详细信息
  • 编辑或克隆现有管道
  • 删除管道
Kibana’s Ingest Pipelines list view

要创建一个管道,点击 创建管道 > 新建管道。有关示例教程,请参阅 示例:解析日志

从CSV创建新管道”选项允许您使用CSV创建一个摄取管道,将自定义数据映射到Elastic通用架构(ECS)。将您的自定义数据映射到ECS可以使数据更易于搜索,并允许您重用其他数据集的可视化。要开始使用,请查看将自定义数据映射到ECS

您还可以使用摄取API来创建和管理管道。 以下创建管道API请求创建 一个包含两个set处理器,然后是一个 lowercase处理器。处理器按指定的顺序依次运行。

PUT _ingest/pipeline/my-pipeline
{
  "description": "My optional pipeline description",
  "processors": [
    {
      "set": {
        "description": "My optional processor description",
        "field": "my-long-field",
        "value": 10
      }
    },
    {
      "set": {
        "description": "Set 'my-boolean-field' to true",
        "field": "my-boolean-field",
        "value": true
      }
    },
    {
      "lowercase": {
        "field": "my-keyword-field"
      }
    }
  ]
}

管理管道版本

edit

当您创建或更新管道时,可以指定一个可选的version整数。您可以将此版本号与if_version参数一起使用,以有条件地更新管道。当指定了if_version参数时,成功的更新会增加管道的版本。

PUT _ingest/pipeline/my-pipeline-id
{
  "version": 1,
  "processors": [ ... ]
}

要使用API取消设置version号,请替换或更新管道而不指定version参数。

测试一个管道

edit

在生产环境中使用管道之前,我们建议您使用示例文档进行测试。在 Kibana 中创建或编辑管道时,点击 添加文档。在 文档 标签页中,提供示例文档并点击 运行管道

Test a pipeline in Kibana

您还可以使用模拟管道API来测试管道。您可以在请求路径中指定一个已配置的管道。例如,以下请求测试my-pipeline

POST _ingest/pipeline/my-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "my-keyword-field": "FOO"
      }
    },
    {
      "_source": {
        "my-keyword-field": "BAR"
      }
    }
  ]
}

或者,您可以在请求体中指定一个管道及其处理器。

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "lowercase": {
          "field": "my-keyword-field"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "my-keyword-field": "FOO"
      }
    },
    {
      "_source": {
        "my-keyword-field": "BAR"
      }
    }
  ]
}

API返回转换后的文档:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "my-keyword-field": "foo"
        },
        "_ingest": {
          "timestamp": "2099-03-07T11:04:03.000Z"
        }
      }
    },
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "my-keyword-field": "bar"
        },
        "_ingest": {
          "timestamp": "2099-03-07T11:04:04.000Z"
        }
      }
    }
  ]
}

将管道添加到索引请求中

edit

使用 pipeline 查询参数将管道应用于单个批量索引请求中的文档。

POST my-data-stream/_doc?pipeline=my-pipeline
{
  "@timestamp": "2099-03-07T11:04:05.000Z",
  "my-keyword-field": "foo"
}

PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }

您还可以将 pipeline 参数与 update by queryreindex API 一起使用。

POST my-data-stream/_update_by_query?pipeline=my-pipeline

POST _reindex
{
  "source": {
    "index": "my-data-stream"
  },
  "dest": {
    "index": "my-new-data-stream",
    "op_type": "create",
    "pipeline": "my-pipeline"
  }
}

设置默认管道

edit

使用 index.default_pipeline 索引设置来设置默认管道。如果未指定 pipeline 参数,Elasticsearch 会将此管道应用于索引请求。

设置最终管道

edit

使用 index.final_pipeline 索引设置来设置一个最终管道。即使未指定请求或默认管道,Elasticsearch 也会在它们之后应用此管道。

Beats 的管道

edit

要将摄取管道添加到Elastic Beat,请在.yml中的output.elasticsearch下指定pipeline参数。例如,对于Filebeat,您将在filebeat.yml中指定pipeline

output.elasticsearch:
  hosts: ["localhost:9200"]
  pipeline: my-pipeline

Fleet 和 Elastic Agent 的管道

edit

Elastic Agent 集成附带了默认的摄取管道,这些管道在索引之前对数据进行预处理和丰富。 Fleet 使用包含 索引模板 来应用这些管道,这些模板包括 管道索引设置。Elasticsearch 根据 数据流的命名方案 将这些模板与您的 Fleet 数据流进行匹配。

每个默认的集成管道都会调用一个不存在的、未版本化的 *@custom 摄取管道。如果未修改,此管道调用不会对您的数据产生影响。但是,您可以修改此调用,为集成创建自定义管道,这些管道在升级过程中仍然有效。请参阅 教程:使用自定义摄取管道转换数据 以了解更多信息。

Fleet 不为 自定义日志 集成提供默认的摄取管道, 但您可以使用 索引模板自定义配置 为此集成指定一个管道。

选项1:索引模板

  1. 创建测试您的 摄取管道。将您的管道命名为logs--default。这使得 为您的集成跟踪管道更容易。

    例如,以下请求为my-app数据集创建一个管道。 管道的名称为logs-my_app-default

    PUT _ingest/pipeline/logs-my_app-default
    {
      "description": "Pipeline for `my_app` dataset",
      "processors": [ ... ]
    }
  2. 创建一个包含您的管道的索引模板, 在index.default_pipelineindex.final_pipeline索引设置中。确保模板是数据流启用的。 模板的索引模式应匹配logs--*

    您可以使用 Kibana 的 索引管理 功能或 创建索引模板 API 来创建此模板。

    例如,以下请求创建一个匹配 logs-my_app-* 的模板。 该模板使用包含 index.default_pipeline 索引设置的组件模板。

    # Creates a component template for index settings
    PUT _component_template/logs-my_app-settings
    {
      "template": {
        "settings": {
          "index.default_pipeline": "logs-my_app-default",
          "index.lifecycle.name": "logs"
        }
      }
    }
    
    # Creates an index template matching `logs-my_app-*`
    PUT _index_template/logs-my_app-template
    {
      "index_patterns": ["logs-my_app-*"],
      "data_stream": { },
      "priority": 500,
      "composed_of": ["logs-my_app-settings", "logs-my_app-mappings"]
    }
  3. 当在 Fleet 中添加或编辑您的 自定义日志 集成时, 点击 配置集成 > 自定义日志文件 > 高级选项
  4. 数据集名称中,指定您的数据集名称。Fleet 会将新数据添加到生成的 logs--default 数据流中。

    例如,如果你的数据集名称是 my_app,Fleet 会将新数据添加到 logs-my_app-default 数据流中。

    在 Fleet 中设置自定义日志集成
  5. 使用rollover API来滚动你的数据流。 这确保了Elasticsearch将索引模板及其管道设置应用于集成的新数据。

    POST logs-my_app-default/_rollover/

选项 2:自定义配置

  1. 创建测试您的 摄取管道。将您的管道命名为logs--default。这使得 为您的集成跟踪管道更容易。

    例如,以下请求为 my-app 数据集创建一个管道。 管道的名称为 logs-my_app-default

    PUT _ingest/pipeline/logs-my_app-default
    {
      "description": "Pipeline for `my_app` dataset",
      "processors": [ ... ]
    }
  2. 当在 Fleet 中添加或编辑您的 自定义日志 集成时, 点击 配置集成 > 自定义日志文件 > 高级选项
  3. 数据集名称中,指定您的数据集名称。Fleet 会将新数据添加到生成的 logs--default 数据流中。

    例如,如果你的数据集名称是 my_app,Fleet 会将新数据添加到 logs-my_app-default 数据流中。

  4. 自定义配置中,在pipeline策略设置中指定您的管道。

    自定义日志集成的自定义管道配置

Elastic Agent 独立模式

如果你以独立模式运行 Elastic Agent,你可以使用包含 索引模板index.default_pipelineindex.final_pipeline 索引设置来应用管道。或者, 你可以在你的 elastic-agent.yml 配置中指定 pipeline 策略设置。请参阅 安装独立 Elastic Agents

搜索索引的管道

edit

当您为搜索用例创建Elasticsearch索引时,例如,使用网络爬虫连接器,这些索引会自动设置特定的摄取管道。 这些处理器有助于优化您的内容以进行搜索。 有关更多信息,请参阅搜索中的摄取管道

在处理器中访问源字段

edit

处理器对传入文档的源字段具有读取和写入权限。 要访问处理器中的字段键,请使用其字段名称。以下 set 处理器访问 my-long-field

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "field": "my-long-field",
        "value": 10
      }
    }
  ]
}

您也可以在前面加上 _source 前缀。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "field": "_source.my-long-field",
        "value": 10
      }
    }
  ]
}

使用点表示法访问对象字段。

如果你的文档包含扁平化的对象,请使用 dot_expander 处理器先将其展开。其他 ingest处理器无法访问扁平化的对象。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "dot_expander": {
        "description": "Expand 'my-object-field.my-property'",
        "field": "my-object-field.my-property"
      }
    },
    {
      "set": {
        "description": "Set 'my-object-field.my-property' to 10",
        "field": "my-object-field.my-property",
        "value": 10
      }
    }
  ]
}

多个处理器参数支持Mustache模板片段。要在模板片段中访问字段值,请将字段名称用三重花括号括起来:{{{field-name}}}。您可以使用模板片段来动态设置字段名称。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "description": "Set dynamic '<service>' field to 'code' value",
        "field": "{{{service}}}",
        "value": "{{{code}}}"
      }
    }
  ]
}

在处理器中访问元数据字段

edit

处理器可以通过名称访问以下元数据字段:

  • _index
  • _id
  • _routing
  • _dynamic_templates
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "description": "Set '_routing' to 'geoip.country_iso_code' value",
        "field": "_routing",
        "value": "{{{geoip.country_iso_code}}}"
      }
    }
  ]
}

使用 Mustache 模板片段来访问元数据字段值。例如, {{{_routing}}} 检索文档的路由值。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "description": "Use geo_point dynamic template for address field",
        "field": "_dynamic_templates",
        "value": {
          "address": "geo_point"
        }
      }
    }
  ]
}

上述的处理器告诉ES,如果字段address在索引映射中尚未定义,则使用名为geo_point的动态模板。如果字段address已经在批量请求中定义,此处理器将覆盖该字段的动态模板,但对批量请求中定义的其他动态模板没有影响。

如果你自动生成 文档ID,你不能在处理器中使用{{{_id}}}。Elasticsearch在摄取后分配自动生成的_id值。

在处理器中访问摄取元数据

edit

摄取处理器可以使用 _ingest 键添加和访问摄取元数据。

与源和元数据字段不同,Elasticsearch 默认情况下不会索引摄取元数据字段。Elasticsearch 还允许以 _ingest 键开头的源字段。如果您的数据包含此类源字段,请使用 _source._ingest 来访问它们。

管道默认仅创建_ingest.timestamp摄取元数据字段。 此字段包含Elasticsearch接收到文档索引请求的时间戳。要索引_ingest.timestamp或其他摄取元数据字段,请使用set处理器。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "description": "Index the ingest timestamp as 'event.ingested'",
        "field": "event.ingested",
        "value": "{{{_ingest.timestamp}}}"
      }
    }
  ]
}

处理管道失败

edit

管道的处理器按顺序运行。默认情况下,当其中一个处理器失败或遇到错误时,管道处理会停止。

要忽略处理器失败并运行管道的其余处理器,请将ignore_failure设置为true

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "ignore_failure": true
      }
    }
  ]
}

使用 on_failure 参数来指定在处理器失败后立即运行的处理器列表。如果指定了 on_failure,Elasticsearch 之后会运行管道的剩余处理器,即使 on_failure 配置为空。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false
            }
          }
        ]
      }
    }
  ]
}

嵌套一组 on_failure 处理器以进行嵌套错误处理。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false,
              "on_failure": [
                {
                  "set": {
                    "description": "Set 'error.message.multi'",
                    "field": "error.message.multi",
                    "value": "Document encountered multiple ingest errors",
                    "override": true
                  }
                }
              ]
            }
          }
        ]
      }
    }
  ]
}

您还可以为管道指定on_failure。如果一个没有on_failure值的处理器失败,Elasticsearch会使用这个管道级别的参数作为回退。Elasticsearch将不会尝试运行管道的剩余处理器。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Index document to 'failed-<index>'",
        "field": "_index",
        "value": "failed-{{{ _index }}}"
      }
    }
  ]
}

有关管道失败的更多信息可能可以在文档元数据字段 on_failure_messageon_failure_processor_typeon_failure_processor_tagon_failure_pipeline 中找到。这些字段仅在 on_failure 块内可访问。

以下示例使用元数据字段在文档中包含有关管道失败的信息。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Record error information",
        "field": "error_information",
        "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
      }
    }
  ]
}

有条件地运行处理器

edit

每个处理器都支持一个可选的if条件,以Painless脚本的形式编写。如果提供了该条件,处理器仅在if条件为true时运行。

if 条件脚本在 Painless 的 ingest processor context 中运行。在 if 条件中,ctx 值是只读的。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents with 'network.name' of 'Guest'",
        "if": "ctx?.network?.name == 'Guest'"
      }
    }
  ]
}

如果启用了 script.painless.regex.enabled 集群设置,您可以在 if 条件脚本中使用正则表达式。有关支持的语法,请参阅 Painless 正则表达式

如果可能,避免使用正则表达式。昂贵的正则表达式可能会减慢索引速度。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "description": "If 'url.scheme' is 'http', set 'url.insecure' to true",
        "if": "ctx.url?.scheme =~ /^http[^s]/",
        "field": "url.insecure",
        "value": true
      }
    }
  ]
}

您必须在单行上指定有效的 JSON 格式的 if 条件。但是,您可以使用 Kibana 控制台的三重引号语法来编写和调试更大的脚本。

如果可能,避免使用复杂或昂贵的 if 条件脚本。 昂贵的条件脚本会降低索引速度。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents that don't contain 'prod' tag",
        "if": """
            Collection tags = ctx.tags;
            if(tags != null){
              for (String tag : tags) {
                if (tag.toLowerCase().contains('prod')) {
                  return false;
                }
              }
            }
            return true;
        """
      }
    }
  ]
}

您还可以将 存储脚本 指定为 if 条件。

PUT _scripts/my-prod-tag-script
{
  "script": {
    "lang": "painless",
    "source": """
      Collection tags = ctx.tags;
      if(tags != null){
        for (String tag : tags) {
          if (tag.toLowerCase().contains('prod')) {
            return false;
          }
        }
      }
      return true;
    """
  }
}

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents that don't contain 'prod' tag",
        "if": { "id": "my-prod-tag-script" }
      }
    }
  ]
}

传入的文档通常包含对象字段。如果处理器脚本尝试访问一个父对象不存在的字段,Elasticsearch 会返回一个 NullPointerException。为了避免这些异常,请使用 空安全操作符,例如 ?.,并编写您的脚本以确保空安全。

例如,ctx.network?.name.equalsIgnoreCase('Guest') 不是空安全的。 ctx.network?.name 可能返回 null。将脚本重写为 'Guest'.equalsIgnoreCase(ctx.network?.name),这是空安全的,因为 Guest 始终是非空的。

如果你无法将脚本重写为空安全,请包含一个显式的空检查。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents that contain 'network.name' of 'Guest'",
        "if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
      }
    }
  ]
}

有条件地应用管道

edit

if 条件与 pipeline 处理器结合使用,以根据您的标准将其他管道应用于文档。您可以将此管道用作 默认管道,在用于配置多个数据流或索引的 索引模板中使用。

PUT _ingest/pipeline/one-pipeline-to-rule-them-all
{
  "processors": [
    {
      "pipeline": {
        "description": "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
        "if": "ctx.service?.name == 'apache_httpd'",
        "name": "httpd_pipeline"
      }
    },
    {
      "pipeline": {
        "description": "If 'service.name' is 'syslog', use 'syslog_pipeline'",
        "if": "ctx.service?.name == 'syslog'",
        "name": "syslog_pipeline"
      }
    },
    {
      "fail": {
        "description": "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
        "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
        "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
      }
    }
  ]
}

获取管道使用统计信息

edit

使用节点统计 API 获取全局和每个管道的摄取统计信息。使用这些统计信息来确定哪些管道运行最频繁或花费最多时间处理。

GET _nodes/stats/ingest?filter_path=nodes.*.ingest