使用 Elasticsearch 进行时间序列数据处理

edit

使用 Elasticsearch 进行时间序列数据处理

edit

Elasticsearch 提供了一些功能来帮助您存储、管理和搜索时间序列数据,例如日志和指标。一旦数据进入 Elasticsearch,您可以使用 Kibana 和其他 Elastic Stack 功能来分析和可视化您的数据。

设置数据层级

edit

Elasticsearch的ILM功能使用数据层来自动将较旧的数据移动到硬件成本较低的节点上,随着数据的老化。这有助于提高性能并降低存储成本。

热层和内容层是必需的。温层、冷层和冻结层是可选的。

在热层和温层中使用高性能节点,以便更快地索引和搜索您最新的数据。在冷层和冻结层中使用速度较慢、成本较低的节点以降低成本。

内容层级通常不用于时间序列数据。然而,创建系统索引和其他不属于数据流的索引时需要它。

设置数据层的步骤因部署类型而异:

  1. 登录到Elasticsearch Service控制台
  2. 从Elasticsearch Service主页或部署页面添加或选择您的部署。
  3. 从您的部署菜单中,选择编辑部署
  4. 要启用数据层,请点击添加容量

启用自动缩放

自动扩展 会自动调整您的部署容量以满足您的存储需求。要启用自动扩展,请在编辑部署页面上选择自动扩展此部署。自动扩展仅适用于Elasticsearch服务。

注册快照仓库

edit

冷层和冻结层可以使用可搜索快照来降低本地存储成本。

要使用可搜索的快照,您必须注册一个受支持的快照仓库。注册此仓库的步骤因您的部署类型和存储提供商而异:

当您创建一个集群时,Elasticsearch Service 会自动注册一个默认的 found-snapshots 仓库。此仓库支持可搜索的快照。

The found-snapshots 仓库是特定于您的集群的。要使用另一个集群的默认仓库,请参阅 Cloud 快照和恢复 文档。

您还可以将以下任何自定义存储库类型与可搜索的快照一起使用:

创建或编辑索引生命周期策略

edit

一个数据流将您的数据存储在多个后备索引中。ILM使用一个索引生命周期策略来自动将这些索引在您的数据层之间移动。

如果您使用Fleet或Elastic Agent,请编辑Elasticsearch的内置生命周期策略之一。 如果您使用自定义应用程序,请创建自己的策略。无论哪种情况, 请确保您的策略:

  • 包括您已配置的每个数据层的阶段。
  • 计算从滚动更新过渡到下一阶段的阈值,或min_age
  • 如果需要,在冷阶段和冻结阶段使用可搜索的快照。
  • 如果需要,包括一个删除阶段。

Fleet 和 Elastic Agent 使用以下内置的生命周期策略:

  • 日志
  • 指标
  • 合成监控

您可以根据性能、弹性和保留要求自定义这些策略。

要在 Kibana 中编辑策略,请打开主菜单并转到 堆栈管理 > 索引生命周期策略。点击您要编辑的策略。

您还可以使用更新生命周期策略 API

PUT _ilm/policy/logs
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_primary_shard_size": "50gb"
          }
        }
      },
      "warm": {
        "min_age": "30d",
        "actions": {
          "shrink": {
            "number_of_shards": 1
          },
          "forcemerge": {
            "max_num_segments": 1
          }
        }
      },
      "cold": {
        "min_age": "60d",
        "actions": {
          "searchable_snapshot": {
            "snapshot_repository": "found-snapshots"
          }
        }
      },
      "frozen": {
        "min_age": "90d",
        "actions": {
          "searchable_snapshot": {
            "snapshot_repository": "found-snapshots"
          }
        }
      },
      "delete": {
        "min_age": "735d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

创建组件模板

edit

如果您使用 Fleet 或 Elastic Agent,请跳至 搜索和可视化您的数据。 Fleet 和 Elastic Agent 使用内置模板为您创建数据流。

如果您使用自定义应用程序,您需要设置自己的数据流。 数据流需要一个匹配的索引模板。在大多数情况下,您使用一个或多个组件模板来组成这个索引模板。您通常为映射和索引设置使用单独的组件模板。这使您可以在多个索引模板中重用组件模板。

在创建组件模板时,请包含:

  • 一个用于@timestamp字段的datedate_nanos映射。如果你不指定映射,Elasticsearch会将@timestamp映射为一个具有默认选项的date字段。
  • index.lifecycle.name索引设置中的生命周期策略。

在映射您的字段时,请使用Elastic Common Schema (ECS)。ECS 字段默认与多个 Elastic Stack 功能集成。

如果你不确定如何映射你的字段,可以使用运行时字段在搜索时从非结构化内容中提取字段。例如,你可以将日志消息索引到一个通配符字段,并在搜索期间从这个字段中提取IP地址和其他数据。

要在 Kibana 中创建组件模板,请打开主菜单并转到 堆栈管理 > 索引管理。在 索引模板 视图中,点击 创建组件模板

您还可以使用创建组件模板 API

# Creates a component template for mappings
PUT _component_template/my-mappings
{
  "template": {
    "mappings": {
      "properties": {
        "@timestamp": {
          "type": "date",
          "format": "date_optional_time||epoch_millis"
        },
        "message": {
          "type": "wildcard"
        }
      }
    }
  },
  "_meta": {
    "description": "Mappings for @timestamp and message fields",
    "my-custom-meta-field": "More arbitrary metadata"
  }
}

# Creates a component template for index settings
PUT _component_template/my-settings
{
  "template": {
    "settings": {
      "index.lifecycle.name": "my-lifecycle-policy"
    }
  },
  "_meta": {
    "description": "Settings for ILM",
    "my-custom-meta-field": "More arbitrary metadata"
  }
}

创建索引模板

edit

使用您的组件模板创建一个索引模板。指定:

  • 一个或多个与数据流名称匹配的索引模式。我们建议使用我们的数据流命名方案
  • 模板已启用数据流。
  • 包含映射和索引设置的任何组件模板。
  • 优先级高于200,以避免与内置模板发生冲突。请参阅避免索引模式冲突

要在 Kibana 中创建索引模板,请打开主菜单并转到 堆栈管理 > 索引管理。在 索引模板 视图中,点击 创建模板

您还可以使用创建索引模板 API。 包含 data_stream 对象以启用数据流。

PUT _index_template/my-index-template
{
  "index_patterns": ["my-data-stream*"],
  "data_stream": { },
  "composed_of": [ "my-mappings", "my-settings" ],
  "priority": 500,
  "_meta": {
    "description": "Template for my time series data",
    "my-custom-meta-field": "More arbitrary metadata"
  }
}

向数据流添加数据

edit

索引请求 将文档添加到数据流中。这些请求必须使用 op_typecreate。文档必须包含一个 @timestamp 字段。

要自动创建您的数据流,请提交一个针对流名称的索引请求。此名称必须与您的索引模板中的一个索引模式匹配。

PUT my-data-stream/_bulk
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736" }
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon.ico HTTP/1.0\" 200 3638" }

POST my-data-stream/_doc
{
  "@timestamp": "2099-05-06T16:21:15.000Z",
  "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"
}

搜索并可视化您的数据

edit

要在 Kibana 中探索和搜索您的数据,请打开主菜单并选择 发现。请参阅 Kibana 的 发现文档

使用 Kibana 的 仪表板 功能以图表、表格、地图等形式可视化您的数据。请参阅 Kibana 的 仪表板文档

您还可以使用搜索API来搜索和聚合您的数据。使用运行时字段grok模式在搜索时动态地从日志消息和其他非结构化内容中提取数据。

GET my-data-stream/_search
{
  "runtime_mappings": {
    "source.ip": {
      "type": "ip",
      "script": """
        String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ "message" ].value)?.sourceip;
        if (sourceip != null) emit(sourceip);
      """
    }
  },
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "@timestamp": {
              "gte": "now-1d/d",
              "lt": "now/d"
            }
          }
        },
        {
          "range": {
            "source.ip": {
              "gte": "192.0.2.0",
              "lte": "192.0.2.255"
            }
          }
        }
      ]
    }
  },
  "fields": [
    "*"
  ],
  "_source": false,
  "sort": [
    {
      "@timestamp": "desc"
    },
    {
      "source.ip": "desc"
    }
  ]
}

Elasticsearch 搜索默认是同步的。跨冻结数据、长时间范围或大数据集的搜索可能需要更长时间。使用 异步搜索 API 在后台运行搜索。有关更多搜索选项,请参阅 搜索 API

POST my-data-stream/_async_search
{
  "runtime_mappings": {
    "source.ip": {
      "type": "ip",
      "script": """
        String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ "message" ].value)?.sourceip;
        if (sourceip != null) emit(sourceip);
      """
    }
  },
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "@timestamp": {
              "gte": "now-2y/d",
              "lt": "now/d"
            }
          }
        },
        {
          "range": {
            "source.ip": {
              "gte": "192.0.2.0",
              "lte": "192.0.2.255"
            }
          }
        }
      ]
    }
  },
  "fields": [
    "*"
  ],
  "_source": false,
  "sort": [
    {
      "@timestamp": "desc"
    },
    {
      "source.ip": "desc"
    }
  ]
}