转换数据
edit转换数据
edit转换功能使您能够将现有的 Elasticsearch 索引转换为汇总索引,从而提供新的洞察和分析机会。例如,您可以使用转换功能将数据透视为以实体为中心的索引,这些索引汇总了用户、会话或其他实体在数据中的行为。或者,您可以使用转换功能在具有某个唯一键的所有文档中找到最新的文档。
转换概述
edit您可以选择以下任一方法来转换您的数据: pivot 或 latest。
- 所有转换操作都会保留您的源索引。它们会创建一个新的索引,专门用于存储转换后的数据。
- 转换操作可能会有更多的配置选项,这些选项由API提供,而不仅仅是Kibana中可用的选项。有关所有转换配置选项,请参阅API文档。
转换是持久性任务;它们存储在集群状态中,这使得它们能够抵御节点故障。请参阅转换检查点的工作原理和错误处理以了解更多关于转换背后的机制。
透视变换
edit您可以使用转换将您的数据透视到一个新的以实体为中心的索引中。通过转换和汇总您的数据,可以以替代和有趣的方式对其进行可视化和分析。
许多Elasticsearch索引被组织为事件流:每个事件都是一个单独的文档,例如单个商品购买。转换功能使您能够汇总这些数据,将其整理成更有利于分析的格式。例如,您可以汇总单个客户的全部购买记录。
转换功能使您能够定义一个枢轴,这是一组将索引转换为不同、更易消化的格式的特征。枢轴转换结果会在新索引中生成数据的摘要。
要定义一个数据透视表,首先您需要选择一个或多个字段,这些字段将用于对数据进行分组。您可以选择分类字段(术语)和数值字段进行分组。如果使用数值字段,字段值将使用您指定的间隔进行分桶。
第二步是决定如何聚合分组后的数据。在使用聚合时,您实际上是在对索引提出问题。有不同类型的聚合,每种聚合都有其自己的用途和输出。要了解更多关于支持的聚合和分组字段的信息,请参阅创建转换API。
作为一个可选步骤,您还可以添加一个查询以进一步限制聚合的范围。
转换执行一个复合聚合,通过源索引查询定义的所有数据进行分页。聚合的输出存储在目标索引中。每次转换查询源索引时,它都会创建一个检查点。您可以决定是否希望转换运行一次或连续运行。批量转换是一次性操作,只有一个检查点。连续转换会随着新源数据的摄取不断递增并处理检查点。
想象一下,你经营一家销售服装的网店。每个订单都会创建一个包含唯一订单ID、订购产品的名称和类别、价格、订购数量、订单的确切日期以及一些客户信息(姓名、性别、位置等)的文档。你的数据集包含了去年所有的交易记录。
如果您想查看上一财年中不同类别的销售额,请定义一个按产品类别(女鞋、男装等)和订单日期对数据进行分组的转换。将订单日期的区间设置为上一年。然后在订购数量上添加一个求和聚合。结果是一个以实体为中心的索引,显示了上一财年中每个产品类别的销售数量。
最新转换
edit您可以使用 latest 类型的转换将最新的文档复制到一个新的索引中。您必须指定一个或多个字段作为数据的唯一键进行分组,以及一个按时间顺序排序的日期字段。例如,您可以使用这种类型的转换来跟踪每个客户的最新购买记录或每个主机的最新事件。
与枢轴的情况一样,最新的转换可以运行一次或连续运行。它对源索引中的数据执行复合聚合,并将输出存储在目标索引中。如果转换连续运行,新唯一键值将自动添加到目标索引中,并且每个检查点都会自动更新现有键值的最新文档。
性能考虑
edit转换操作对源索引进行搜索聚合,然后将结果索引到目标索引中。因此,转换操作所需的时间和资源永远不会少于聚合和索引过程。
如果你的转换必须处理大量历史数据,它在初始阶段会消耗大量资源——特别是在第一次检查点期间。
为了获得更好的性能,请确保您的搜索聚合和查询已优化,并且您的转换仅处理必要的数据。考虑是否可以在转换中应用源查询以缩小其处理数据的范围。还要考虑集群是否具备足够的资源来支持复合聚合搜索及其结果的索引。
如果您希望分散对集群的影响(以较慢的转换为代价),可以限制其执行搜索和索引请求的速率。在创建或更新转换时,设置docs_per_second限制。如果您想计算当前速率,请使用以下来自获取转换统计信息API的信息:
documents_processed / search_time_in_ms * 1000
设置转换
edit需求概述
edit要使用转换,您必须具备:
- 至少一个转换节点,
- 在Kibana空间中可见的管理功能,以及
-
安全权限,满足以下条件:
- 授予使用转换的权限,并且
- 授予访问源索引和目标索引的权限
安全权限
edit分配安全权限会影响用户访问转换的方式。考虑以下两个主要类别:
- Elasticsearch API 用户: 使用 Elasticsearch 客户端、cURL 或 Kibana 开发工具 通过 Elasticsearch API 访问转换。此场景需要 Elasticsearch 安全权限。
- Kibana 用户: 在 Kibana 中使用转换。此 场景需要 Kibana 功能权限 和 Elasticsearch 安全权限。
Elasticsearch API 用户
edit要管理转换,您必须满足以下所有要求:
-
transform_admin内置角色或manage_transform集群权限, -
read和view_index_metadata对源索引的索引权限,以及 -
create_index、index、manage和read对目标索引的索引权限。如果配置了retention_policy,则还需要对目标索引的delete索引权限。
要仅查看转换的配置和状态,您必须具备:
-
transform_user内置角色或monitor_transform集群权限
有关Elasticsearch角色和权限的更多信息,请参阅 内置角色 和 安全权限。
Kibana 用户
edit在 Kibana 空间中,要获得对转换的完全访问权限,您必须满足以下所有要求:
-
在Kibana空间中可见的管理功能,包括
数据视图管理和堆栈监控, -
monitoring_user内置角色, -
transform_admin内置角色或manage_transform集群权限, -
kibana_admin内置角色或具有读取或全部Kibana 权限的自定义角色,用于数据视图管理功能(取决于您的目标索引是否已经存在数据视图), - 源索引的数据视图,
-
源索引上的
读取和查看索引元数据索引权限,以及 -
目标索引上的
创建索引、索引、管理和读取索引权限。此外,当使用保留策略时,目标索引上需要删除索引权限。 -
读取管道集群权限,如果转换使用摄取管道
在 Kibana 空间中,要获得对转换的只读访问权限,您必须满足以下所有要求:
-
在Kibana空间中可见的管理功能,包括
Stack Monitoring, -
monitoring_user内置角色, -
transform_user内置角色或monitor_transform集群权限, -
kibana_admin内置角色或具有readKibana权限的自定义角色,至少在空间中使用一个功能, - 源索引和目标索引的数据视图,以及
-
源索引和目标索引上的
read和view_index_metadata索引权限
有关更多信息和Kibana安全功能,请参阅 Kibana角色管理和 Kibana权限。
Kibana 空间
editSpaces 使您能够在Kibana中组织您的源和目标索引以及其他已保存的对象,并且只能看到属于您空间的对象。然而,转换是一个长时间运行的任务,它在集群级别进行管理,因此不受限于特定的空间。可以在Stack Management > Kibana下为数据视图实现空间感知,这允许对转换目标索引的权限。
要在 Kibana 中成功创建转换,您必须登录到一个空间,在该空间中可以看到源索引,并且可以看到 数据视图管理 和 堆栈监控 功能。
何时使用转换
editElasticsearch 聚合是一个强大且灵活的功能,使您能够总结和检索关于数据的复杂见解。您可以总结复杂的事物,例如繁忙网站上每天的网页请求数量,按地理位置和浏览器类型细分。然而,如果您使用相同的数据集尝试计算像访问者网页会话平均持续时间这样简单的单一数字,您可能会很快耗尽内存。
为什么会发生这种情况?网络会话持续时间是行为属性的一个例子,它并不存在于任何一个日志记录中;它必须通过在我们的网络日志中找到每个会话的第一个和最后一个记录来推导出来。这种推导需要一些复杂的查询表达式和大量的内存来连接所有的数据点。如果你有一个持续的后台进程,将一个索引中的相关事件融合到另一个索引中的实体中心汇总中,你会得到一个更有用、更连贯的画面。这个新索引有时被称为数据框架。
您可能希望在使用以下情况时考虑使用转换而不是聚合:
-
你需要一个完整的特征索引,而不是一个前N个项目的集合。
在机器学习中,您通常需要一整套行为特征,而不仅仅是前N个。例如,如果您正在预测客户流失,您可能会查看诸如上周网站访问次数、销售总数或发送的电子邮件数量等特征。Elastic Stack 机器学习功能基于这种多维特征空间创建模型,因此它们受益于由转换创建的完整特征索引。
当你尝试在聚合或多个聚合的结果中进行搜索时,这种情况也同样适用。聚合结果可以进行排序或过滤,但在排序方面有 限制 并且通过 桶选择器进行过滤 受到返回的最大桶数的限制。如果你想搜索所有聚合结果,你需要创建完整的数据框。如果你需要根据多个字段对聚合结果进行排序或过滤,转换功能特别有用。
-
您需要通过管道聚合对聚合结果进行排序。
管道聚合不能用于排序。从技术上讲,这是因为管道聚合是在所有其他聚合完成后,在reduce阶段运行的。如果你创建一个转换,你可以有效地对数据进行多次遍历。
-
您希望创建汇总表以优化查询。
例如,如果你有一个高层次的仪表板,它被大量用户访问,并且它在一个大数据集上使用复杂的聚合,那么创建一个转换来缓存结果可能会更有效率。因此,每个用户不需要运行聚合查询。
为转换生成警报
editKibana 告警功能包括支持转换健康规则,这些规则会根据特定条件检查连续转换的健康状况。如果规则的条件满足,则会创建告警并运行相关操作。例如,您可以创建一个规则来检查连续转换是否已启动,并在未启动时通过电子邮件通知您。要了解更多关于 Kibana 告警功能的信息,请参阅 告警。
创建规则
edit您可以在堆栈管理 > 规则下创建转换规则。
- 点击创建规则并选择转换健康规则类型。
- 为规则命名并可选地提供标签。
-
选择要包含的转换或转换。您还可以使用特殊字符(
*)将规则应用于所有转换。在规则之后创建的转换将自动包含。
-
以下健康检查是可用的并且默认启用:
- 转换未启动
- 当转换未启动或未索引任何数据时获取警报。通知消息建议解决错误所需的必要操作。
- 不健康的转换
- 当转换处于不健康状态时获取警报。通知消息包含状态详细信息和相关问题。
- 设置检查间隔,该间隔定义了评估规则条件的频率。
- 在高级选项中,您可以更改在发生警报之前必须满足规则条件的连续运行次数。默认值为1。
作为规则创建过程的最后一步,定义其操作。
定义操作
edit您可以为规则添加一个或多个操作,以在满足条件时以及不再满足条件时生成通知。特别是,此规则类型支持:
- 警报摘要
- 检测到问题时运行的操作
- 当规则条件不再满足时运行的恢复操作
对于每个操作,您必须选择一个连接器,该连接器提供Kibana服务或第三方集成的连接信息。有关所有支持的连接器的更多信息,请访问连接器。
选择连接器后,您必须设置操作频率。您可以选择在每次检查间隔或自定义间隔上创建警报摘要。例如,发送通知以汇总新的、持续的和已恢复的警报:
如果你选择自定义操作间隔,它不能短于规则的检查间隔。
或者,您可以设置操作频率,使得每个警报都运行操作。选择操作运行的频率(在每次检查间隔时、仅在警报状态变化时,或自定义操作间隔)。您还必须选择一个操作组,该操作组指示操作是在检测到问题时运行还是在问题恢复时运行。
您可以通过指定操作仅在匹配KQL查询或在特定时间范围内发生警报时运行,来进一步细化操作运行的条件。
有一组变量可以用于自定义每个操作的通知消息。点击消息文本框上方的图标以获取变量列表,或参考操作变量。
保存配置后,规则会出现在规则列表中,您可以在其中查看其状态并查看其配置信息的概览。
警报的名称始终与触发它的关联转换的转换ID相同。您可以在列出各个警报的规则页面中,为特定转换静音通知。您可以通过选择规则名称,从规则中打开它。
操作变量
edit以下变量是特定于转换健康规则类型的。 您还可以指定适用于所有规则的通用变量。
-
context.message -
规则的预构建消息。例如:
Transform test-1 未启动。 -
context.results -
最新的结果,您可以通过使用 Mustache 模板数组语法进行迭代。例如,电子邮件连接器操作中的消息可能包含:
[{{rule.name}}] 转换健康检查结果: {{context.message}} {{#context.results}} 转换ID: {{transform_id}} {{#description}}转换描述: {{description}} {{/description}}{{#transform_state}}转换状态: {{transform_state}} {{/transform_state}}{{#health_status}}转换健康状态: {{health_status}} {{/health_status}}{{#issues}}问题: {{issue}} 问题计数: {{count}} {{#details}}问题详情: {{details}} {{/details}}{{#first_occurrence}}首次出现: {{first_occurrence}} {{/first_occurrence}} {{/issues}}{{#failure_reason}}失败原因: {{failure_reason}} {{/failure_reason}}{{#notification_message}}通知消息: {{notification_message}} {{/notification_message}}{{#node_name}}节点名称: {{node_name}} {{/node_name}}{{#timestamp}}时间戳: {{timestamp}} {{/timestamp}} {{/context.results}}
更多示例,请参阅 规则操作变量。
大规模使用变换
edit转换将现有的 Elasticsearch 索引转换为汇总索引,从而提供了新的洞察和分析机会。转换执行的搜索和索引操作使用标准的 Elasticsearch 功能,因此在使用 Elasticsearch 进行大规模操作时,通常适用于转换的注意事项也类似。如果您遇到性能问题,首先需要识别瓶颈区域(搜索、索引、处理或存储),然后查看本指南中的相关注意事项以提高性能。了解转换的工作原理也有助于,因为不同的注意事项适用于您的转换是否在连续模式或批处理模式下运行。
在本指南中,您将学习如何:
- 了解配置选项对转换性能的影响。
前提条件:
这些指南假设您有一个想要调整的转换,并且您已经熟悉:
以下考虑事项并非按顺序排列——数字有助于在列表项之间导航;您可以按任意顺序对一个或多个事项采取行动。大多数建议适用于连续和批量转换。如果某个列表项仅适用于一种转换类型,则会在描述中突出显示此例外情况。
每个建议标题末尾括号中的关键词表示可能通过遵循该建议来改进的瓶颈区域。
测量转换性能
edit为了优化转换性能,首先需要识别工作量最大的区域。Kibana中转换页面的统计信息接口包含了涵盖三个主要领域的信息:索引、搜索和处理时间(或者,您可以使用转换统计信息 API)。例如,如果结果显示大部分时间都花在搜索上,那么应优先优化转换的搜索查询。转换还支持Rally,这使得在需要时可以对转换配置进行性能检查。如果您已经优化了关键因素,但仍然遇到性能问题,您可能还需要考虑改进您的硬件。
1. 优化 frequency(索引)
edit在连续转换中,frequency 配置选项设置检查源索引中更改的时间间隔。如果检测到更改,则搜索源数据并将更改应用到目标索引。根据您的使用场景,您可能希望减少应用更改的频率。通过将 frequency 设置为更高的值(最大为一小时),可以在时间上分散工作负载,但代价是数据更新频率较低。
2. 增加目标索引(index)的分片数量
edit根据目标索引的大小,您可能需要考虑增加其分片数量。转换在创建目标索引时默认使用一个分片。要覆盖索引设置,请在启动转换之前创建目标索引。有关分片数量如何影响可扩展性和弹性的更多信息,请参阅Get ready for production
使用预览转换来检查转换将用于创建目标索引的设置。您可以复制并调整这些设置,以便在启动转换之前创建目标索引。
3. 分析和优化您的搜索查询 (search)
edit如果你已经定义了一个转换源索引 query,请确保它尽可能高效。在 Kibana 的 Dev Tools 下使用 Search Profiler 来获取搜索请求中各个组件执行的详细时间信息。或者,你可以使用 Profile。结果将为你提供关于搜索请求在低级别执行的洞察,以便你可以理解为什么某些请求较慢,并采取措施来改进它们。
转换执行标准的 Elasticsearch 搜索请求。编写 Elasticsearch 查询有多种方式,其中一些比其他方式更高效。请参阅 调整搜索速度 以了解更多关于 Elasticsearch 性能调优的信息。
4. 限制源查询(搜索)的范围
edit假设您的连续转换配置为按 IP 分组并计算 bytes_sent 的总和。对于每个检查点,连续转换会检测自上一个检查点以来源数据中的变化,识别已摄取新数据的 IP。然后,它执行第二次搜索,过滤这组 IP,以计算总的 bytes_sent。如果第二次搜索匹配了许多分片,那么这可能是资源密集型的。考虑限制源索引模式和查询将匹配的范围。
要限制访问哪些历史索引,排除某些层级(例如 "must_not": { "terms": { "_tier": [ "data_frozen", "data_cold" ] } }
和/或使用绝对时间值作为源查询中的日期范围过滤器(例如,大于 2024-01-01T00:00:00)。如果您使用相对时间值(例如,gte now-30d/d),请确保应用日期舍入以利用查询缓存,并确保相对时间远大于 frequency 或 time.sync.delay 或日期直方图桶的最大值,否则可能会遗漏数据。不要使用小于日期值的日期过滤器(例如,lt:小于 或 lte:小于或等于),因为这会与每次检查点执行时应用的逻辑冲突,并且可能会遗漏数据。
考虑在索引名称中使用日期数学来减少查询中需要解析的索引数量。在索引名称中添加一个日期模式 - 例如,yyyy-MM-dd - 并使用它来将查询限制在特定日期。下面的示例仅查询昨天和今天的索引:
"source": {
"index": [
"<mydata-{now/d-1d{yyyy-MM-dd}}*>",
"<mydata-{now/d{yyyy-MM-dd}}*>"
]
},
5. 优化源索引的分片策略(搜索)
edit没有一种放之四海而皆准的分片策略。在一个环境中有效的策略在另一个环境中可能无法扩展。一个好的分片策略必须考虑你的基础设施、使用场景和性能预期。
分片过少可能意味着无法实现负载分布的好处;然而,分片过多可能会影响集群的健康状况。要了解更多关于调整分片大小的信息,请阅读此指南。
6. 调整 max_page_search_size(搜索)
edit转换配置选项 max_page_search_size 定义了每个搜索请求返回的桶数。默认值是 500。如果你增加这个值,你可以在更高的延迟和内存使用成本下获得更好的吞吐量。
此参数的理想值在很大程度上取决于您的使用场景。如果您的转换执行内存密集型聚合(例如,基数或百分位数),那么增加max_page_search_size需要更多的可用内存。如果超出内存限制,将发生断路器异常。
7. 在源索引中使用索引字段(搜索)
edit运行时字段和脚本化字段不是索引字段;它们的值仅在搜索时提取或计算。虽然这些字段提供了访问数据的灵活性,但它们会增加搜索时的性能成本。如果使用运行时字段或脚本化字段进行转换的性能是一个问题,您可能希望考虑改用索引字段。出于性能原因,我们不建议将运行时字段用作同步连续转换的时间字段。
8. 使用索引排序(搜索、处理)
edit索引排序使您能够按照特定顺序将文档存储在磁盘上,这可以提高查询效率。理想的排序逻辑取决于您的使用场景,但一般规则可能是从基于时间的字段开始,按降序(从高到低基数)对字段进行排序。索引排序只能在索引创建时定义一次。如果您要使用的索引上尚未设置索引排序,请考虑将其重新索引到新的已排序索引中。
9. 禁用目标索引(存储)上的_source字段
editThe _source field 包含在索引时传递的原始 JSON 文档主体。_source 字段本身未被索引(因此不可搜索),但它仍存储在索引中并产生存储开销。如果你有一个大的目标索引,考虑禁用 _source 以节省存储空间。禁用 _source 仅在索引创建期间可能。
当_source字段被禁用时,许多功能将不再支持。请参阅禁用_source字段以了解禁用前的后果。
进一步阅读
edit转换检查点的工作原理
edit每次转换检查源索引并创建或更新目标索引时,它都会生成一个检查点。
如果你的转换只运行一次,那么逻辑上只有一个检查点。然而,如果你的转换是连续运行的,它会在摄取和转换新的源数据时创建检查点。转换配置中的sync属性通过指定一个时间字段来配置检查点。
要创建一个检查点,连续变换:
-
检查源索引的更改。
使用一个简单的周期性定时器,转换检查源索引的变化。此检查是基于转换的
频率属性中定义的间隔进行的。如果源索引保持不变或检查点已经在进行中,则等待下一个计时器。
如果发现更改,则会创建一个检查点。
-
标识哪些实体和/或时间桶已更改。
该转换搜索查看哪些实体或时间桶在最后一个和新检查点之间发生了变化。转换使用这些值来同步源索引和目标索引,其操作次数少于完全重新运行。
-
更新目标索引(数据框)以反映更改。
转换将新实体或时间桶相关的更改应用到目标索引。更改集可以分页。转换执行类似于批量转换操作的复合聚合,但它还会根据上一步注入查询过滤器,以减少工作量。在所有更改应用完毕后,检查点完成。
这个检查点过程涉及集群上的搜索和索引活动。在开发转换时,我们尝试优先考虑控制而非性能。我们决定让转换花费更长时间完成,而不是快速完成并在资源消耗上占据优先地位。尽管如此,集群仍然需要足够的资源来支持复合聚合搜索及其结果的索引。
如果集群由于转换导致性能下降,请停止转换并参考性能考虑。
使用摄取时间戳进行转换同步
edit在大多数情况下,强烈建议使用源索引的摄取时间戳来同步转换。这是转换能够识别新更改的最优方式。如果您的数据源遵循ECS标准,您可能已经有一个event.ingested字段。在这种情况下,使用event.ingested作为转换的sync.time.field属性。
如果你没有 event.ingested 字段或它没有被填充,你可以通过使用一个摄取管道来设置它。创建一个摄取管道,可以使用 ingest pipeline API(如下例所示)或通过 Kibana 在 Stack Management > Ingest Pipelines 下创建。使用一个 set 处理器 来设置字段并将其与摄取时间戳的值关联起来。
PUT _ingest/pipeline/set_ingest_time
{
"description": "Set ingest timestamp.",
"processors": [
{
"set": {
"field": "event.ingested",
"value": "{{{_ingest.timestamp}}}"
}
}
]
}
在创建了摄取管道后,将其应用于转换的源索引。该管道会在每个文档中添加字段 event.ingested,其值为摄取时间戳。配置转换的 sync.time.field 属性,以使用该字段,通过使用 创建转换 API 用于新转换或 更新转换 API 用于现有转换。event.ingested 字段用于同步转换。
请参阅 向索引请求添加管道 和 摄取管道 以了解更多关于如何使用摄取管道的信息。
变更检测启发式算法
edit当转换在连续模式下运行时,它会随着新数据的到来更新目标索引中的文档。转换使用一组称为变更检测的启发式方法,以较少的操作更新目标索引。
在这个例子中,数据按主机名分组。变更检测检测哪些主机名发生了变化,例如,主机A、C和G,并且只更新这些主机的文档,但不更新存储有关主机B、D或任何其他未更改主机的信息的文档。
当使用 date_histogram 按时间桶分组时,可以应用另一种启发式方法来处理时间桶。变化检测会检测哪些时间桶发生了变化,并且只更新那些发生变化的时间桶。
错误处理
edit转换中的失败通常与搜索或索引有关。 为了提高转换的弹性,聚合搜索和变更实体搜索的光标位置会在内存中跟踪并定期持久化。
检查点失败可以分为以下几类:
- 临时失败:检查点会重试。如果连续失败10次,转换将处于失败状态。例如,当存在分片失败且查询仅返回部分结果时,可能会发生这种情况。
- 不可恢复的失败:转换立即失败。例如,当找不到源索引时,会发生这种情况。
-
调整失败:转换会使用调整后的设置重试。例如,如果在复合聚合期间发生父级断路器内存错误,转换将接收部分结果。聚合搜索会以较少的桶数重试。此重试在转换的
frequency属性定义的时间间隔内执行。如果搜索重试到达到最小桶数,则会发生不可恢复的失败。
如果运行转换的节点失败,转换将从最近持久化的游标位置重新启动。此恢复过程可能会重复转换已经完成的一些工作,但它确保了数据的一致性。
教程:转换电子商务示例数据
edit转换使您能够从Elasticsearch索引中检索信息,对其进行转换,并存储在另一个索引中。让我们使用Kibana示例数据来演示如何使用转换对数据进行透视和汇总。
- 验证您的环境是否已正确设置以使用转换。如果启用了Elasticsearch安全功能,要完成本教程,您需要一个有权限预览和创建转换的用户。您还必须对源索引和目标索引具有特定的索引权限。请参阅设置。
-
选择您的源索引。
在这个示例中,我们将使用电子商务订单样本数据。如果你还不熟悉
kibana_sample_data_ecommerce索引,请使用Kibana中的收入仪表板来探索数据。考虑一下你可能想从这些电子商务数据中得出什么见解。 -
选择变换的枢轴类型,并尝试各种分组和聚合数据的选项。
有两种类型的转换,但首先我们将尝试对你的数据进行透视,这涉及到使用至少一个字段对其进行分组,并应用至少一个聚合。你可以预览转换后的数据会是什么样子,所以大胆去尝试吧!你还可以启用直方图图表,以便更好地理解数据中值的分布情况。
例如,您可能希望按产品ID对数据进行分组,并计算每个产品的销售总数及其平均价格。或者,您可能希望查看单个客户的行为,并计算每个客户总共花费了多少以及他们购买了多少不同类别的产品。或者,您可能希望考虑货币或地理位置。您能以哪些最有趣的方式来转换和解释这些数据?
转到 管理 > 堆栈管理 > 数据 > 转换 在 Kibana 中并使用向导创建转换:
按客户ID对数据进行分组,并添加一个或多个聚合以了解更多关于每个客户的订单信息。例如,让我们计算他们购买的产品总数、他们的购买总价、他们在单个订单中购买的最大产品数量以及他们的订单总数。我们将通过在
total_quantity和taxless_total_price字段上使用sum聚合,在total_quantity字段上使用max聚合,以及在order_id字段上使用cardinality聚合来实现这一点:
如果你愿意,你可以使用 预览转换API。
API 示例
POST _transform/_preview { "source": { "index": "kibana_sample_data_ecommerce", "query": { "bool": { "filter": { "term": {"currency": "EUR"} } } } }, "pivot": { "group_by": { "customer_id": { "terms": { "field": "customer_id" } } }, "aggregations": { "total_quantity.sum": { "sum": { "field": "total_quantity" } }, "taxless_total_price.sum": { "sum": { "field": "taxless_total_price" } }, "total_quantity.max": { "max": { "field": "total_quantity" } }, "order_id.cardinality": { "cardinality": { "field": "order_id" } } } } } -
当您对预览中的内容感到满意时,创建转换。
- 提供一个转换ID、目标索引的名称,并可选地提供一个描述。如果目标索引不存在,它将在您启动转换时自动创建。
-
决定您是否希望转换只运行一次或连续运行。由于此样本数据索引是不可变的,让我们使用默认行为并只运行转换一次。但是,如果您想尝试一下,请继续点击连续模式。您必须选择一个字段,转换可以使用该字段来检查哪些实体已更改。通常,使用摄取时间戳字段是一个好主意。在这个例子中,您可以使用
order_date字段。 - 可选地,您可以配置一个适用于您的转换的保留策略。选择一个日期字段,该字段用于标识目标索引中的旧文档,并提供一个最大年龄。超过配置值的文档将从目标索引中删除。
在 Kibana 中,在完成创建转换之前,您可以将预览转换 API 请求复制到剪贴板。这些信息在您决定是否要手动创建目标索引时非常有用。
如果你愿意,你可以使用 创建转换API。
API 示例
PUT _transform/ecommerce-customer-transform { "source": { "index": [ "kibana_sample_data_ecommerce" ], "query": { "bool": { "filter": { "term": { "currency": "EUR" } } } } }, "pivot": { "group_by": { "customer_id": { "terms": { "field": "customer_id" } } }, "aggregations": { "total_quantity.sum": { "sum": { "field": "total_quantity" } }, "taxless_total_price.sum": { "sum": { "field": "taxless_total_price" } }, "total_quantity.max": { "max": { "field": "total_quantity" } }, "order_id.cardinality": { "cardinality": { "field": "order_id" } } } }, "dest": { "index": "ecommerce-customers" }, "retention_policy": { "time": { "field": "order_date", "max_age": "60d" } } } -
可选:创建目标索引。
如果目标索引不存在,它会在您首次启动转换时创建。透视转换会从源索引和转换聚合中推断出目标索引的映射。如果目标索引中有从脚本派生的字段(例如,如果您使用
scripted_metrics或bucket_scripts聚合),它们将使用动态映射创建。您可以使用预览转换API来预览它将用于目标索引的映射。在Kibana中,如果您已将API请求复制到剪贴板,请将其粘贴到控制台中,然后参考API响应中的generated_dest_index对象。API 示例
{ "preview" : [ { "total_quantity" : { "max" : 2, "sum" : 118.0 }, "taxless_total_price" : { "sum" : 3946.9765625 }, "customer_id" : "10", "order_id" : { "cardinality" : 59 } }, ... ], "generated_dest_index" : { "mappings" : { "_meta" : { "_transform" : { "transform" : "transform-preview", "version" : { "created" : "8.0.0" }, "creation_date_in_millis" : 1621991264061 }, "created_by" : "transform" }, "properties" : { "total_quantity.sum" : { "type" : "double" }, "total_quantity" : { "type" : "object" }, "taxless_total_price" : { "type" : "object" }, "taxless_total_price.sum" : { "type" : "double" }, "order_id.cardinality" : { "type" : "long" }, "customer_id" : { "type" : "keyword" }, "total_quantity.max" : { "type" : "integer" }, "order_id" : { "type" : "object" } } }, "settings" : { "index" : { "number_of_shards" : "1", "auto_expand_replicas" : "0-1" } }, "aliases" : { } } }在某些情况下,推导出的映射可能与实际数据不兼容。例如,可能会发生数值溢出,或者动态映射的字段可能同时包含数字和字符串。为了避免这个问题,请在开始转换之前创建目标索引。有关更多信息,请参阅创建索引 API。
API 示例
您可以使用转换预览中的信息来创建目标索引。例如:
PUT /ecommerce-customers { "mappings": { "properties": { "total_quantity.sum" : { "type" : "double" }, "total_quantity" : { "type" : "object" }, "taxless_total_price" : { "type" : "object" }, "taxless_total_price.sum" : { "type" : "double" }, "order_id.cardinality" : { "type" : "long" }, "customer_id" : { "type" : "keyword" }, "total_quantity.max" : { "type" : "integer" }, "order_id" : { "type" : "object" } } } } -
开始转换。
尽管资源利用率会根据集群负载自动调整,但在运行时,转换会增加集群的搜索和索引负载。然而,如果您正在经历过高的负载,您可以停止它。
您可以在 Kibana 中启动、停止、重置和管理转换:
<img alt="在 Kibana 中管理转换" src="images/manage-transform
转换示例
edit这些示例展示了如何使用转换从您的数据中获取有用的见解。所有示例都使用其中一个 Kibana 示例数据集。有关更详细的逐步示例,请参阅 教程:转换电子商务示例数据。
寻找你的最佳客户
edit此示例使用电子商务订单样本数据集来查找在假设的网店中花费最多的客户。让我们使用pivot类型的转换,使得目标索引包含每个客户的订单数量、订单的总价格、唯一产品的数量、每个订单的平均价格以及订购产品的总数。
API示例
POST _transform/_preview
{
"source": {
"index": "kibana_sample_data_ecommerce"
},
"dest" : {
"index" : "sample_ecommerce_orders_by_customer"
},
"pivot": {
"group_by": {
"user": { "terms": { "field": "user" }},
"customer_id": { "terms": { "field": "customer_id" }}
},
"aggregations": {
"order_count": { "value_count": { "field": "order_id" }},
"total_order_amt": { "sum": { "field": "taxful_total_price" }},
"avg_amt_per_order": { "avg": { "field": "taxful_total_price" }},
"avg_unique_products_per_order": { "avg": { "field": "total_unique_products" }},
"total_unique_products": { "cardinality": { "field": "products.product_id" }}
}
}
}
|
转换的目标索引。它被 |
|
|
选择了两个 |
在上面的示例中,使用了压缩的JSON格式,以便更容易阅读透视对象。
预览转换API使您能够提前查看转换的布局,并填充一些示例值。例如:
{
"preview" : [
{
"total_order_amt" : 3946.9765625,
"order_count" : 59.0,
"total_unique_products" : 116.0,
"avg_unique_products_per_order" : 2.0,
"customer_id" : "10",
"user" : "recip",
"avg_amt_per_order" : 66.89790783898304
},
...
]
}
此转换使得回答以下问题变得更加容易:
- 哪些客户花费最多?
- 哪些客户每单花费最多?
- 哪些客户下单最频繁?
- 哪些客户订购的不同产品数量最少?
仅使用聚合就可以回答这些问题,然而转换允许我们将这些数据持久化为以客户为中心的索引。这使我们能够大规模分析数据,并提供更多灵活性,以便从客户为中心的角度探索和导航数据。在某些情况下,它甚至可以使创建可视化变得更加简单。
寻找延误最多的航空公司
edit此示例使用Flights样本数据集来找出哪个航空公司延误最多。首先,通过使用查询过滤器过滤源数据,排除所有已取消的航班。然后,转换数据以包含每个航空公司的航班数量、延误分钟数的总和以及航班分钟数的总和。最后,使用bucket_script来确定实际延误时间占航班时间的百分比。
POST _transform/_preview
{
"source": {
"index": "kibana_sample_data_flights",
"query": {
"bool": {
"filter": [
{ "term": { "Cancelled": false } }
]
}
}
},
"dest" : {
"index" : "sample_flight_delays_by_carrier"
},
"pivot": {
"group_by": {
"carrier": { "terms": { "field": "Carrier" }}
},
"aggregations": {
"flights_count": { "value_count": { "field": "FlightNum" }},
"delay_mins_total": { "sum": { "field": "FlightDelayMin" }},
"flight_mins_total": { "sum": { "field": "FlightTimeMin" }},
"delay_time_percentage": {
"bucket_script": {
"buckets_path": {
"delay_time": "delay_mins_total.value",
"flight_time": "flight_mins_total.value"
},
"script": "(params.delay_time / params.flight_time) * 100"
}
}
}
}
}
|
过滤源数据,仅选择未取消的航班。 |
|
|
转换的目标索引。它被 |
|
|
数据按包含航空公司名称的 |
|
|
这个 |
预览显示,新索引将为每个运营商包含如下数据:
{
"preview" : [
{
"carrier" : "ES-Air",
"flights_count" : 2802.0,
"flight_mins_total" : 1436927.5130677223,
"delay_time_percentage" : 9.335543983955839,
"delay_mins_total" : 134145.0
},
...
]
}
这个转换使得回答以下问题变得更加容易:
- 哪家航空公司的延误时间占飞行时间的百分比最高?
这些数据是虚构的,并不反映任何所列目的地或出发机场的实际延误或航班统计数据。
查找可疑的客户端IP
edit此示例使用Web日志样本数据集来识别可疑的客户端IP。它转换数据,使得新索引包含每个客户端IP的字节总和、不同URL的数量、代理、按位置的传入请求以及地理目的地。它还使用过滤聚合来计算每个客户端IP接收到的特定类型的HTTP响应。最终,下面的示例将Web日志数据转换为一个以实体为中心的索引,其中实体是clientip。
PUT _transform/suspicious_client_ips
{
"source": {
"index": "kibana_sample_data_logs"
},
"dest" : {
"index" : "sample_weblogs_by_clientip"
},
"sync" : {
"time": {
"field": "timestamp",
"delay": "60s"
}
},
"pivot": {
"group_by": {
"clientip": { "terms": { "field": "clientip" } }
},
"aggregations": {
"url_dc": { "cardinality": { "field": "url.keyword" }},
"bytes_sum": { "sum": { "field": "bytes" }},
"geo.src_dc": { "cardinality": { "field": "geo.src" }},
"agent_dc": { "cardinality": { "field": "agent.keyword" }},
"geo.dest_dc": { "cardinality": { "field": "geo.dest" }},
"responses.total": { "value_count": { "field": "timestamp" }},
"success" : {
"filter": {
"term": { "response" : "200"}}
},
"error404" : {
"filter": {
"term": { "response" : "404"}}
},
"error5xx" : {
"filter": {
"range": { "response" : { "gte": 500, "lt": 600}}}
},
"timestamp.min": { "min": { "field": "timestamp" }},
"timestamp.max": { "max": { "field": "timestamp" }},
"timestamp.duration_ms": {
"bucket_script": {
"buckets_path": {
"min_time": "timestamp.min.value",
"max_time": "timestamp.max.value"
},
"script": "(params.max_time - params.min_time)"
}
}
}
}
}
|
转换的目标索引。 |
|
|
配置转换以持续运行。它使用 |
|
|
数据按 |
|
|
过滤聚合,用于计算 |
|
|
这个 |
创建转换后,您必须启动它:
POST _transform/suspicious_client_ips/_start
不久之后,第一个结果应该可以在目标索引中找到:
GET sample_weblogs_by_clientip/_search
搜索结果显示每个客户端IP的数据如下:
"hits" : [
{
"_index" : "sample_weblogs_by_clientip",
"_id" : "MOeHH_cUL5urmartKj-b5UQAAAAAAAAA",
"_score" : 1.0,
"_source" : {
"geo" : {
"src_dc" : 2.0,
"dest_dc" : 2.0
},
"success" : 2,
"error404" : 0,
"error503" : 0,
"clientip" : "0.72.176.46",
"agent_dc" : 2.0,
"bytes_sum" : 4422.0,
"responses" : {
"total" : 2.0
},
"url_dc" : 2.0,
"timestamp" : {
"duration_ms" : 5.2191698E8,
"min" : "2020-03-16T07:51:57.333Z",
"max" : "2020-03-22T08:50:34.313Z"
}
}
}
]
与其他Kibana样本数据集一样,Web日志样本数据集包含相对于您安装它时的时间戳,包括未来的时间戳。连续转换将在数据点成为过去时拾取它们。如果您在一段时间前安装了Web日志样本数据集,您可以卸载并重新安装它,时间戳将会改变。
此转换使得回答以下问题变得更加容易:
- 哪些客户端IP传输的数据量最大?
- 哪些客户端IP与大量不同的URL进行交互?
- 哪些客户端IP的错误率较高?
- 哪些客户端IP与大量不同的目的地国家进行交互?
查找每个IP地址的最后日志事件
edit此示例使用Web日志样本数据集来查找来自IP地址的最后一条日志。让我们在连续模式下使用latest类型的转换。它将每个唯一键的最新文档从源索引复制到目标索引,并在新数据进入源索引时更新目标索引。
选择 clientip 字段作为唯一键;数据按此字段分组。
选择 timestamp 作为按时间顺序排序数据的日期字段。对于
连续模式,指定一个用于标识新文档的日期字段,
以及用于检查源索引中更改的间隔。
让我们假设我们只对保留日志中最近出现的IP地址的文档感兴趣。您可以定义一个保留策略,并指定一个用于计算文档年龄的日期字段。此示例使用与用于排序数据的相同日期字段。然后设置文档的最大年龄;超过您设置的值的文档将从目标索引中删除。
此转换创建包含每个客户端IP最新登录日期的目标索引。由于转换在连续模式下运行,目标索引将随着新数据进入源索引而更新。最后,由于应用的保留策略,超过30天的文档将从目标索引中删除。
API示例
PUT _transform/last-log-from-clientip
{
"source": {
"index": [
"kibana_sample_data_logs"
]
},
"latest": {
"unique_key": [
"clientip"
],
"sort": "timestamp"
},
"frequency": "1m",
"dest": {
"index": "last-log-from-clientip"
},
"sync": {
"time": {
"field": "timestamp",
"delay": "60s"
}
},
"retention_policy": {
"time": {
"field": "timestamp",
"max_age": "30d"
}
},
"settings": {
"max_page_search_size": 500
}
}
|
指定用于分组数据的字段。 |
|
|
指定用于对数据进行排序的日期字段。 |
|
|
设置转换检查源索引中更改的时间间隔。 |
|
|
包含用于同步源索引和目标索引的时间字段和延迟设置。 |
|
|
指定转换的保留策略。超过配置值的文档将从目标索引中删除。 |
创建转换后,启动它:
POST _transform/last-log-from-clientip/_start
在转换过程处理数据后,搜索目标索引:
GET last-log-from-clientip/_search
搜索结果显示每个客户端IP的数据如下:
{
"_index" : "last-log-from-clientip",
"_id" : "MOeHH_cUL5urmartKj-b5UQAAAAAAAAA",
"_score" : 1.0,
"_source" : {
"referer" : "http://twitter.com/error/don-lind",
"request" : "/elasticsearch",
"agent" : "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)",
"extension" : "",
"memory" : null,
"ip" : "0.72.176.46",
"index" : "kibana_sample_data_logs",
"message" : "0.72.176.46 - - [2018-09-18T06:31:00.572Z] \"GET /elasticsearch HTTP/1.1\" 200 7065 \"-\" \"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)\"",
"url" : "https://www.elastic.co/downloads/elasticsearch",
"tags" : [
"success",
"info"
],
"geo" : {
"srcdest" : "IN:PH",
"src" : "IN",
"coordinates" : {
"lon" : -124.1127917,
"lat" : 40.80338889
},
"dest" : "PH"
},
"utc_time" : "2021-05-04T06:31:00.572Z",
"bytes" : 7065,
"machine" : {
"os" : "ios",
"ram" : 12884901888
},
"response" : 200,
"clientip" : "0.72.176.46",
"host" : "www.elastic.co",
"event" : {
"dataset" : "sample_web_logs"
},
"phpmemory" : null,
"timestamp" : "2021-05-04T06:31:00.572Z"
}
}
这个转换使得回答以下问题变得更加容易:
- 与特定IP地址相关的最新日志事件是什么?
查找向服务器发送最多字节的客户端IP
edit此示例使用Web日志样本数据集,查找每小时内向服务器发送最多字节数的客户端IP。该示例使用了一个pivot转换和一个top_metrics聚合。
按时间字段上的日期直方图对数据进行分组,间隔为1小时。在bytes字段上使用最大值聚合以获取发送到服务器的最大数据量。如果没有max聚合,API调用仍然返回发送最多字节数的客户端IP,但是,发送的字节数不会返回。在top_metrics属性中,指定clientip和geo.src,然后按bytes字段降序排序。转换返回发送最多数据量的客户端IP和相应位置的2字母ISO代码。
POST _transform/_preview
{
"source": {
"index": "kibana_sample_data_logs"
},
"pivot": {
"group_by": {
"timestamp": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "1h"
}
}
},
"aggregations": {
"bytes.max": {
"max": {
"field": "bytes"
}
},
"top": {
"top_metrics": {
"metrics": [
{
"field": "clientip"
},
{
"field": "geo.src"
}
],
"sort": {
"bytes": "desc"
}
}
}
}
}
}
上述API调用返回的响应类似于以下内容:
{
"preview" : [
{
"top" : {
"clientip" : "223.87.60.27",
"geo.src" : "IN"
},
"bytes" : {
"max" : 6219
},
"timestamp" : "2021-04-25T00:00:00.000Z"
},
{
"top" : {
"clientip" : "99.74.118.237",
"geo.src" : "LK"
},
"bytes" : {
"max" : 14113
},
"timestamp" : "2021-04-25T03:00:00.000Z"
},
{
"top" : {
"clientip" : "218.148.135.12",
"geo.src" : "BR"
},
"bytes" : {
"max" : 4531
},
"timestamp" : "2021-04-25T04:00:00.000Z"
},
...
]
}
通过客户ID获取客户姓名和电子邮件地址
edit此示例使用电子商务样本数据集基于客户ID创建以实体为中心的索引,并通过使用top_metrics聚合来获取客户姓名和电子邮件地址。
按 customer_id 分组数据,然后添加一个 top_metrics 聚合,其中 metrics 是 email、customer_first_name.keyword 和 customer_last_name.keyword 字段。按 order_date 降序排序 top_metrics。API 调用如下所示:
POST _transform/_preview
{
"source": {
"index": "kibana_sample_data_ecommerce"
},
"pivot": {
"group_by": {
"customer_id": {
"terms": {
"field": "customer_id"
}
}
},
"aggregations": {
"last": {
"top_metrics": {
"metrics": [
{
"field": "email"
},
{
"field": "customer_first_name.keyword"
},
{
"field": "customer_last_name.keyword"
}
],
"sort": {
"order_date": "desc"
}
}
}
}
}
}
API返回的响应与此类似:
{
"preview" : [
{
"last" : {
"customer_last_name.keyword" : "Long",
"customer_first_name.keyword" : "Recip",
"email" : "recip@long-family.zzz"
},
"customer_id" : "10"
},
{
"last" : {
"customer_last_name.keyword" : "Jackson",
"customer_first_name.keyword" : "Fitzgerald",
"email" : "fitzgerald@jackson-family.zzz"
},
"customer_id" : "11"
},
{
"last" : {
"customer_last_name.keyword" : "Cross",
"customer_first_name.keyword" : "Brigitte",
"email" : "brigitte@cross-family.zzz"
},
"customer_id" : "12"
},
...
]
}
转换的无痛示例
edit使用 scripted_metric 聚合的示例在 Elasticsearch Serverless 上不受支持。
这些示例展示了如何在转换中使用Painless。你可以在Painless指南中了解更多关于Painless脚本语言的信息。
- 虽然以下示例的上下文是转换用例,但下面片段中的Painless脚本也可以用于其他Elasticsearch搜索聚合中。
- 以下所有示例都使用脚本,当字段由脚本创建时,转换无法推断输出字段的映射。转换不会为目标索引中的这些字段创建任何映射,这意味着它们会被动态映射。如果您希望显式映射,请在启动转换之前创建目标索引。
使用脚本化度量聚合获取顶部命中
edit此代码片段展示了如何找到最新的文档,换句话说,就是具有最新时间戳的文档。从技术角度来看,它通过在转换中使用脚本化度量聚合来实现Top hits的功能,从而提供度量输出。
此示例使用了scripted_metric聚合,该聚合在Elasticsearch Serverless上不受支持。
"aggregations": {
"latest_doc": {
"scripted_metric": {
"init_script": "state.timestamp_latest = 0L; state.last_doc = ''",
"map_script": """
def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
if (current_date > state.timestamp_latest)
{state.timestamp_latest = current_date;
state.last_doc = new HashMap(params['_source']);}
""",
"combine_script": "return state",
"reduce_script": """
def last_doc = '';
def timestamp_latest = 0L;
for (s in states) {if (s.timestamp_latest > (timestamp_latest))
{timestamp_latest = s.timestamp_latest; last_doc = s.last_doc;}}
return last_doc
"""
}
}
}
|
The |
|
|
The |
|
|
The |
|
|
The |
查看脚本的作用域以获取关于各自脚本的详细解释。
你可以用类似的方式获取最后一个值:
"aggregations": {
"latest_value": {
"scripted_metric": {
"init_script": "state.timestamp_latest = 0L; state.last_value = ''",
"map_script": """
def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli();
if (current_date > state.timestamp_latest)
{state.timestamp_latest = current_date;
state.last_value = params['_source']['value'];}
""",
"combine_script": "return state",
"reduce_script": """
def last_value = '';
def timestamp_latest = 0L;
for (s in states) {if (s.timestamp_latest > (timestamp_latest))
{timestamp_latest = s.timestamp_latest; last_value = s.last_value;}}
return last_value
"""
}
}
}
使用存储脚本获取最高点击量
edit您还可以利用 存储脚本来获取最新值。 存储脚本减少了编译时间,使搜索更快,并且是可更新的。
-
创建存储的脚本:
POST _scripts/last-value-map-init { "script": { "lang": "painless", "source": """ state.timestamp_latest = 0L; state.last_value = '' """ } } POST _scripts/last-value-map { "script": { "lang": "painless", "source": """ def current_date = doc['@timestamp'].getValue().toInstant().toEpochMilli(); if (current_date > state.timestamp_latest) {state.timestamp_latest = current_date; state.last_value = doc[params['key']].value;} """ } } POST _scripts/last-value-combine { "script": { "lang": "painless", "source": """ return state """ } } POST _scripts/last-value-reduce { "script": { "lang": "painless", "source": """ def last_value = ''; def timestamp_latest = 0L; for (s in states) {if (s.timestamp_latest > (timestamp_latest)) {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}} return last_value """ } } -
在脚本化度量聚合中使用存储的脚本。
通过使用聚合获取时间特征
edit此代码片段展示了如何使用 Painless 在转换中提取基于时间的特征。该代码片段使用了一个索引,其中 @timestamp 被定义为 date 类型字段。
"aggregations": {
"avg_hour_of_day": {
"avg":{
"script": {
"source": """
ZonedDateTime date = doc['@timestamp'].value;
return date.getHour();
"""
}
}
},
"avg_month_of_year": {
"avg":{
"script": {
"source": """
ZonedDateTime date = doc['@timestamp'].value;
return date.getMonthValue();
"""
}
}
},
...
}
使用bucket script获取持续时间
edit此示例展示了如何通过使用bucket script从数据日志中按客户端IP获取会话持续时间。该示例使用了Kibana示例Web日志数据集。
PUT _transform/data_log
{
"source": {
"index": "kibana_sample_data_logs"
},
"dest": {
"index": "data-logs-by-client"
},
"pivot": {
"group_by": {
"machine.os": {"terms": {"field": "machine.os.keyword"}},
"machine.ip": {"terms": {"field": "clientip"}}
},
"aggregations": {
"time_frame.lte": {
"max": {
"field": "timestamp"
}
},
"time_frame.gte": {
"min": {
"field": "timestamp"
}
},
"time_length": {
"bucket_script": {
"buckets_path": {
"min": "time_frame.gte.value",
"max": "time_frame.lte.value"
},
"script": "params.max - params.min"
}
}
}
}
}
使用脚本化度量聚合统计HTTP响应
edit您可以通过使用脚本化度量聚合作为转换的一部分来计算Web日志数据集中不同的HTTP响应类型。您可以使用过滤器聚合实现类似的功能,详情请查看查找可疑客户端IP示例。
下面的示例假设HTTP响应代码作为关键字存储在文档的response字段中。
此示例使用了scripted_metric聚合,该聚合在Elasticsearch Serverless上不受支持。
"aggregations": {
"responses.counts": {
"scripted_metric": {
"init_script": "state.responses = ['error':0L,'success':0L,'other':0L]",
"map_script": """
def code = doc['response.keyword'].value;
if (code.startsWith('5') || code.startsWith('4')) {
state.responses.error += 1 ;
} else if(code.startsWith('2')) {
state.responses.success += 1;
} else {
state.responses.other += 1;
}
""",
"combine_script": "state.responses",
"reduce_script": """
def counts = ['error': 0L, 'success': 0L, 'other': 0L];
for (responses in states) {
counts.error += responses['error'];
counts.success += responses['success'];
counts.other += responses['other'];
}
return counts;
"""
}
},
...
}
|
转换中包含所有聚合的 |
|
|
脚本化度量聚合的对象。 |
|
|
这个 |
|
|
The |
|
|
The |
|
|
The |
|
|
The |
通过使用脚本化度量聚合比较索引
edit此示例展示了如何通过使用脚本化指标聚合的转换来比较两个索引的内容。
此示例使用了scripted_metric聚合,该聚合在Elasticsearch Serverless上不受支持。
POST _transform/_preview
{
"id" : "index_compare",
"source" : {
"index" : [
"index1",
"index2"
],
"query" : {
"match_all" : { }
}
},
"dest" : {
"index" : "compare"
},
"pivot" : {
"group_by" : {
"unique-id" : {
"terms" : {
"field" : "<unique-id-field>"
}
}
},
"aggregations" : {
"compare" : {
"scripted_metric" : {
"map_script" : "state.doc = new HashMap(params['_source'])",
"combine_script" : "return state",
"reduce_script" : """
if (states.size() != 2) {
return "count_mismatch"
}
if (states.get(0).equals(states.get(1))) {
return "match"
} else {
return "mismatch"
}
"""
}
}
}
}
}
|
在 |
|
|
索引 |
|
|
The |
|
|
脚本化度量聚合的对象。 |
|
|
The |
|
|
The |
|
|
The |
通过使用脚本化度量聚合获取Web会话详细信息
edit此示例展示了如何从一个交易中派生出多个特征。 让我们来看一下数据中的示例源文档:
源文档
{
"_index":"apache-sessions",
"_type":"_doc",
"_id":"KvzSeGoB4bgw0KGbE3wP",
"_score":1.0,
"_source":{
"@timestamp":1484053499256,
"apache":{
"access":{
"sessionid":"571604f2b2b0c7b346dc685eeb0e2306774a63c2",
"url":"http://www.leroymerlin.fr/v3/search/search.do?keyword=Carrelage%20salle%20de%20bain",
"path":"/v3/search/search.do",
"query":"keyword=Carrelage%20salle%20de%20bain",
"referrer":"http://www.leroymerlin.fr/v3/p/produits/carrelage-parquet-sol-souple/carrelage-sol-et-mur/decor-listel-et-accessoires-carrelage-mural-l1308217717?resultOffset=0&resultLimit=51&resultListShape=MOSAIC&priceStyle=SALEUNIT_PRICE",
"user_agent":{
"original":"Mobile Safari 10.0 Mac OS X (iPad) Apple Inc.",
"os_name":"Mac OS X (iPad)"
},
"remote_ip":"0337b1fa-5ed4-af81-9ef4-0ec53be0f45d",
"geoip":{
"country_iso_code":"FR",
"location":{
"lat":48.86,
"lon":2.35
}
},
"response_code":200,
"method":"GET"
}
}
}
}
...
通过使用 sessionid 作为分组字段,您能够枚举会话中的事件,并通过使用脚本化度量聚合获取更多会话的详细信息。
此示例使用了scripted_metric聚合,该聚合在Elasticsearch Serverless上不受支持。
POST _transform/_preview
{
"source": {
"index": "apache-sessions"
},
"pivot": {
"group_by": {
"sessionid": {
"terms": {
"field": "apache.access.sessionid"
}
}
},
"aggregations": {
"distinct_paths": {
"cardinality": {
"field": "apache.access.path"
}
},
"num_pages_viewed": {
"value_count": {
"field": "apache.access.url"
}
},
"session_details": {
"scripted_metric": {
"init_script": "state.docs = []",
"map_script": """
Map span = [
'@timestamp':doc['@timestamp'].value,
'url':doc['apache.access.url'].value,
'referrer':doc['apache.access.referrer'].value
];
state.docs.add(span)
""",
"combine_script": "return state.docs;",
"reduce_script": """
def all_docs = [];
for (s in states) {
for (span in s) {
all_docs.add(span);
}
}
all_docs.sort((HashMap o1, HashMap o2)->o1['@timestamp'].toEpochMilli().compareTo(o2['@timestamp'].toEpochMilli()));
def size = all_docs.size();
def min_time = all_docs[0]['@timestamp'];
def max_time = all_docs[size-1]['@timestamp'];
def duration = max_time.toEpochMilli() - min_time.toEpochMilli();
def entry_page = all_docs[0]['url'];
def exit_path = all_docs[size-1]['url'];
def first_referrer = all_docs[0]['referrer'];
def ret = new HashMap();
ret['first_time'] = min_time;
ret['last_time'] = max_time;
ret['duration'] = duration;
ret['entry_page'] = entry_page;
ret['exit_path'] = exit_path;
ret['first_referrer'] = first_referrer;
return ret;
"""
}
}
}
}
}
|
数据按 |
|
|
聚合计算路径的数量并枚举会话期间查看的页面。 |
|
|
The |
|
|
The |
|
|
The |
|
|
The |
API调用结果返回类似的响应:
{
"num_pages_viewed" : 2.0,
"session_details" : {
"duration" : 100300001,
"first_referrer" : "https://www.bing.com/",
"entry_page" : "http://www.leroymerlin.fr/v3/p/produits/materiaux-menuiserie/porte-coulissante-porte-interieure-escalier-et-rambarde/barriere-de-securite-l1308218463",
"first_time" : "2017-01-10T21:22:52.982Z",
"last_time" : "2017-01-10T21:25:04.356Z",
"exit_path" : "http://www.leroymerlin.fr/v3/p/produits/materiaux-menuiserie/porte-coulissante-porte-interieure-escalier-et-rambarde/barriere-de-securite-l1308218463?__result-wrapper?pageTemplate=Famille%2FMat%C3%A9riaux+et+menuiserie&resultOffset=0&resultLimit=50&resultListShape=PLAIN&nomenclatureId=17942&priceStyle=SALEUNIT_PRICE&fcr=1&*4294718806=4294718806&*14072=14072&*4294718593=4294718593&*17942=17942"
},
"distinct_paths" : 1.0,
"sessionid" : "000046f8154a80fd89849369c984b8cc9d795814"
},
{
"num_pages_viewed" : 10.0,
"session_details" : {
"duration" : 343100405,
"first_referrer" : "https://www.google.fr/",
"entry_page" : "http://www.leroymerlin.fr/",
"first_time" : "2017-01-10T16:57:39.937Z",
"last_time" : "2017-01-10T17:03:23.049Z",
"exit_path" : "http://www.leroymerlin.fr/v3/p/produits/porte-de-douche-coulissante-adena-e168578"
},
"distinct_paths" : 8.0,
"sessionid" : "000087e825da1d87a332b8f15fa76116c7467da6"
}
...
转换限制
edit以下限制和已知问题适用于Elastic转换功能的9.0.0-beta1版本。这些限制分为以下几类:
- 配置限制适用于转换的配置过程。
- 操作限制影响正在运行的转换的行为。
- Kibana中的限制仅适用于通过用户界面管理的转换。
配置限制
edit以下划线开头的字段名称在最新的转换中被省略
edit如果你使用 latest 类型的转换,并且源索引的字段名称以下划线 (_) 开头,它们将被视为内部字段。这些字段将从目标索引的文档中省略。
如果远程集群配置正确,转换支持跨集群搜索
edit如果您使用跨集群搜索,远程集群必须支持您在转换中使用的搜索和聚合。转换会验证其配置;如果您使用跨集群搜索并且验证失败,请确保远程集群支持您使用的查询和聚合。
在转换中使用脚本
edit在聚合支持脚本的每种情况下,转换也支持脚本。 然而,在使用转换中的脚本时,您可能需要考虑某些因素:
- 当字段由脚本创建时,转换无法推断输出字段的索引映射。在这种情况下,您可能希望在创建转换之前自行创建目标索引的映射。
- 脚本字段可能会增加转换的运行时间。
-
当您在
group_by中定义的所有分组都使用脚本时,转换无法优化查询,您在使用脚本时会收到警告消息。
转换中Painless脚本的弃用警告
edit如果一个转换包含使用已弃用语法的Painless脚本,在预览或启动转换时会显示弃用警告。然而,由于运行所需的查询可能是一个资源密集型过程,因此无法作为批量操作检查所有转换的弃用警告。因此,由于已弃用的Painless语法导致的任何弃用警告在升级助手中不可用。
转换在索引字段上表现更好
edit根据用户定义的时间字段对转换数据进行排序,该字段经常被访问。如果时间字段是一个运行时字段,在查询时计算字段值的性能影响可能会显著减慢转换速度。在使用转换时,请使用索引字段作为时间字段。
连续转换调度限制
edit连续转换会定期检查源数据的变化。调度器的功能目前仅限于一个基本的周期性定时器,其频率范围可以从1秒到1小时。默认值为1分钟。这是为了频繁地进行小规模操作。在为这个定时器选择频率时,请考虑您的摄取速率以及转换搜索/索引操作对集群中其他用户的影响。还要注意,重试操作会在频率间隔内进行。
操作限制
edit聚合响应可能与目标索引映射不兼容
edit当首次启动数据透视转换时,它将推断出目标索引所需的映射。此过程基于源索引的字段类型和使用的聚合。如果字段源自
scripted_metrics
或 bucket_scripts,
将使用动态映射。在某些情况下,推断的映射可能与实际数据不兼容。例如,可能会发生数值溢出,或者动态映射的字段可能同时包含数字和字符串。如果您认为可能发生了这种情况,请检查 Elasticsearch 日志。
您可以通过使用预览转换API来查看推导出的映射。请参阅API响应中的generated_dest_index对象。
如果需要,您可以在启动转换之前通过使用创建索引 API创建自定义目标索引来定义自定义映射。由于推导出的映射不能被索引模板覆盖,因此请使用创建索引 API 来定义自定义映射。索引模板仅适用于使用动态映射的脚本派生的字段。
批量转换可能不会考虑已更改的文档
edit批量转换使用了一个 复合聚合 ,它允许通过所有桶进行高效的分页。复合聚合尚不支持搜索上下文,因此如果在批量数据框进行过程中源数据发生了变化(删除、更新、添加),那么结果可能不会包含这些变化。
连续转换一致性不考虑已删除或更新的文档
edit虽然转换过程允许在新数据被摄取时不断重新计算转换,但它也有一些局限性。
只有在时间字段也被更新并且落在检查更改的操作范围内时,才会识别更改的实体。这主要是为以下用例设计的,并且适合该用例:新数据被赋予摄取时间的时戳。
如果删除了落在源索引模式范围内的索引,例如在删除历史时间基准索引时,那么在连续的检查点处理中执行的复合聚合将搜索不同的源数据,并且仅存在于已删除索引中的实体将不会从数据框目标索引中移除。
根据您的使用场景,您可能希望在删除后完全重新创建转换。或者,如果您的使用场景对历史归档有容忍度,您可能希望在聚合中包含一个最大摄取时间戳。这将允许您在查看目标索引时排除未最近更新的结果。
删除转换不会删除目标索引或 Kibana 索引模式
edit当使用 DELETE _transform/index 删除一个转换时,目标索引和Kibana索引模式(如果已创建)都不会被删除。这些对象必须单独删除。
处理聚合页面大小的动态调整
edit在开发转换功能时,控制优先于性能。 在设计考虑中,更倾向于让转换在后台安静地完成,即使需要更长的时间,而不是快速完成并优先占用资源。
复合聚合非常适合高基数数据,能够通过结果进行分页。如果在执行复合聚合搜索时发生断路器内存异常,我们会尝试减少请求的桶数并再次执行。此断路器是基于集群内的所有活动计算的,而不仅仅是来自转换的活动,因此可能只是暂时的资源可用性问题。
对于批量转换,请求的桶数只会向下调整。值的降低可能会导致转换检查点完成的时间变长。对于连续转换,请求的桶数在每个检查点开始时重置为其默认值,并且在Elasticsearch日志中可能会反复出现断路器异常。
转换以批量方式检索数据,这意味着它一次计算多个桶。默认情况下,每个搜索/索引操作是500个桶。可以使用max_page_search_size更改默认值,最小值为10。如果将请求的桶数量减少到最小值后仍然发生故障,则转换将被设置为失败状态。
处理多词的动态调整
edit对于每个检查点,会识别自上次检查以来发生变化的实体。这个变更实体列表作为terms query提供给转换复合聚合,一次一页。然后,对每个实体页面应用更新到目标索引。
页面大小由max_page_search_size定义,该参数也用于定义复合聚合搜索返回的桶的数量。默认值为500,最小值为10。
索引设置 index.max_terms_count 定义了在 terms 查询中可以使用的最大术语数量。默认值为 65536。如果 max_page_search_size 超过 index.max_terms_count,转换将失败。
使用较小的 max_page_search_size 值可能会导致转换检查点完成的时间更长。
处理失败的转换
edit失败的转换任务会作为持久任务保留,应适当处理,可以通过删除它或解决失败的根源并重新启动来处理。
当使用API删除失败的转换时,首先使用_stop?force=true停止它,然后删除它。
如果文档尚未可用于搜索,连续转换可能会给出不正确的结果
edit文档被索引后,会有一个非常小的延迟,直到它可供搜索。
连续转换定期检查自上次检查以来与now减去sync.time.delay之间发生变化的实体。这个时间窗口不会重叠地移动。如果最近索引的文档的时间戳落在此时间窗口内,但该文档尚未可用于搜索,则此实体将不会被更新。
如果使用表示数据摄取时间的 sync.time.field,并且使用零秒或非常小的 sync.time.delay,那么更可能出现此问题。
支持日期纳秒数据类型
edit如果你的数据使用的是日期纳秒数据类型,聚合仍然是在毫秒分辨率上进行的。这一限制也会影响你的转换中的聚合。
不支持将数据流作为目标索引
edit转换目标索引中的更新数据,这需要写入目标索引。数据流设计为仅追加,这意味着您不能直接向数据流发送更新或删除请求。因此,数据流不支持作为转换的目标索引。
将ILM作为目标索引可能导致文档重复
editILM 不建议用作转换的目标索引。转换会更新当前目标中的文档,并且无法删除之前由 ILM 使用的索引中的文档。这可能会在使用转换与 ILM 结合时导致文档重复,尤其是在发生翻转的情况下。
如果你使用ILM来管理基于时间的索引,请考虑使用日期索引名称处理器。如果你的转换包含基于date_histogram的group_by,该处理器可以在不产生重复文档的情况下工作。
Kibana 中的限制
edit转换在所有 Kibana 空间中可见
editSpaces 使您能够在Kibana中组织您的源和目标索引以及其他已保存的对象,并且只能看到属于您空间的对象。然而,转换是一个长时间运行的任务,它在集群级别进行管理,因此不受限于特定的空间。可以在Stack Management > Kibana下为数据视图实现空间感知,这允许对转换目标索引的权限。
在 Kibana 中最多列出 1,000 个转换
editKibana 中的转换管理页面列出了最多 1000 个转换。
Kibana 可能不支持每个转换配置选项
edit通过转换API可能有一些配置选项在Kibana中不受支持。如需查看所有配置选项的完整列表,请参阅文档。