常见的脚本使用场景

edit

常见的脚本使用案例

edit

你可以编写脚本来完成几乎任何事情,但有时,这也会带来麻烦。了解脚本的可能性是具有挑战性的,因此以下示例解决了脚本真正有帮助的常见用例。

字段提取

edit

字段提取的目标很简单;您的数据中包含大量信息的字段,但您只想提取其中的部分内容。

您有两种选择:

  • Grok 是一种支持可重用别名表达式的正则表达式方言。因为 Grok 建立在正则表达式(regex)之上,所以任何正则表达式在 grok 中也是有效的。
  • Dissect 使用分隔符从文本中提取结构化字段,以定义匹配模式。与 grok 不同,dissect 不使用正则表达式。

让我们从一个简单的例子开始,将 @timestampmessage 字段添加到 my-index 映射中作为索引字段。为了保持灵活性,使用 wildcard 作为 message 字段的类型:

PUT /my-index/
{
  "mappings": {
    "properties": {
      "@timestamp": {
        "format": "strict_date_optional_time||epoch_second",
        "type": "date"
      },
      "message": {
        "type": "wildcard"
      }
    }
  }
}

在映射了您想要检索的字段后,将一些日志数据记录索引到 Elasticsearch 中。以下请求使用 bulk API 将原始日志数据索引到 my-index 中。您不需要索引所有的日志数据,可以使用一个小样本来实验运行时字段。

POST /my-index/_bulk?refresh
{"index":{}}
{"timestamp":"2020-04-30T14:30:17-05:00","message":"40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:30:53-05:00","message":"232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:12-05:00","message":"26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:19-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] \"GET /french/splash_inet.html HTTP/1.0\" 200 3781"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:22-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:27-05:00","message":"252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:28-05:00","message":"not a valid apache log"}

从日志消息中提取IP地址 (Grok)

edit

如果你想检索包含clientip的结果,可以在映射中将该字段添加为运行时字段。以下运行时脚本定义了一个grok模式,用于从message字段中提取结构化字段。

该脚本匹配 %{COMMONAPACHELOG} 日志模式,该模式理解 Apache 日志的结构。如果模式匹配(clientip != null),脚本会发出匹配的 IP 地址的值。如果模式不匹配,脚本只会返回字段值而不崩溃。

PUT my-index/_mappings
{
  "runtime": {
    "http.clientip": {
      "type": "ip",
      "script": """
        String clientip=grok('%{COMMONAPACHELOG}').extract(doc["message"].value)?.clientip;
        if (clientip != null) emit(clientip); 
      """
    }
  }
}

此条件确保即使消息的模式不匹配,脚本也不会发出任何内容。

您可以定义一个简单的查询来运行针对特定IP地址的搜索,并返回所有相关字段。使用搜索API的fields参数来检索http.clientip运行时字段。

GET my-index/_search
{
  "query": {
    "match": {
      "http.clientip": "40.135.0.0"
    }
  },
  "fields" : ["http.clientip"]
}

响应包括文档,其中http.clientip的值匹配40.135.0.0

{
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my-index",
        "_id" : "Rq-ex3gBA_A0V6dYGLQ7",
        "_score" : 1.0,
        "_source" : {
          "timestamp" : "2020-04-30T14:30:17-05:00",
          "message" : "40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"
        },
        "fields" : {
          "http.clientip" : [
            "40.135.0.0"
          ]
        }
      }
    ]
  }
}

解析字符串以提取字段的一部分(Dissect)

edit

与在上一个示例中匹配日志模式不同,您可以定义一个拆分模式,以包含您想要丢弃的字符串部分。

例如,本节开头的日志数据包含一个message字段。该字段包含多个数据片段:

"message" : "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"

您可以在运行时字段中定义一个dissect模式,以提取HTTP响应代码,在前面的示例中,该代码为304

PUT my-index/_mappings
{
  "runtime": {
    "http.response": {
      "type": "long",
      "script": """
        String response=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}').extract(doc["message"].value)?.response;
        if (response != null) emit(Integer.parseInt(response));
      """
    }
  }
}

然后,您可以运行查询以使用http.response运行时字段检索特定的HTTP响应:

GET my-index/_search
{
  "query": {
    "match": {
      "http.response": "304"
    }
  },
  "fields" : ["http.response"]
}

响应包括一个文档,其中HTTP响应为304

{
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my-index",
        "_id" : "Sq-ex3gBA_A0V6dYGLQ7",
        "_score" : 1.0,
        "_source" : {
          "timestamp" : "2020-04-30T14:31:22-05:00",
          "message" : "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"
        },
        "fields" : {
          "http.response" : [
            304
          ]
        }
      }
    ]
  }
}

按分隔符拆分字段中的值(Dissect)

edit

假设你想像前面的例子一样提取字段的一部分,但你想根据特定值进行拆分。你可以使用解剖模式来提取你想要的信息,并以特定格式返回该数据。

例如,假设您有一堆来自Elasticsearch的垃圾收集(gc)日志数据,格式如下:

[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit]   class space    used 266K, capacity 384K, committed 384K, reserved 1048576K

您只想提取usedcapacitycommitted数据,以及相关的值。让我们索引一些包含日志数据的文档作为示例:

POST /my-index/_bulk?refresh
{"index":{}}
{"gc": "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit]   class space    used 266K, capacity 384K, committed 384K, reserved 1048576K"}
{"index":{}}
{"gc": "[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit]   class space    used 15255K, capacity 16726K, committed 16844K, reserved 1048576K"}
{"index":{}}
{"gc": "[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit]  Metaspace       used 115409K, capacity 119541K, committed 120248K, reserved 1153024K"}
{"index":{}}
{"gc": "[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit]   class space    used 14503K, capacity 15894K, committed 15948K, reserved 1048576K"}
{"index":{}}
{"gc": "[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit]  Metaspace       used 107719K, capacity 111775K, committed 112724K, reserved 1146880K"}
{"index":{}}
{"gc": "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit]  class space  used 266K, capacity 367K, committed 384K, reserved 1048576K"}

再次查看数据,有一个时间戳,一些你不感兴趣的其他数据,然后是usedcapacitycommitted数据:

[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit]   class space    used 266K, capacity 384K, committed 384K, reserved 1048576K

您可以将变量分配给gc字段中数据的每个部分,然后仅返回您想要的部分。任何在大括号{}中的内容都被视为变量。例如,变量[%{@timestamp}][%{code}][%{desc}]将匹配数据的前三个块,所有这些块都位于方括号[]中。

[%{@timestamp}][%{code}][%{desc}]  %{ident} used %{usize}, capacity %{csize}, committed %{comsize}, reserved %{rsize}

您的分解模式可以包含术语 usedcapacitycommitted,而不是使用变量,因为您希望准确返回这些术语。您还可以将变量分配给您希望返回的值,例如 %{usize}%{csize}%{comsize}。日志数据中的分隔符是逗号,因此您的分解模式也需要使用该分隔符。

现在你有了一个解析模式,你可以将其包含在Painless脚本中,作为运行时字段的一部分。该脚本使用你的解析模式来拆分gc字段,然后返回由emit方法定义的所需信息。由于解析使用简单的语法,你只需要明确告诉它你想要什么。

以下模式告诉 dissect 返回术语 used,一个空格,来自 gc.usize 的值,以及一个逗号。此模式会重复用于您想要检索的其他数据。虽然此模式在生产环境中可能不是那么有用,但它提供了很大的灵活性来试验和操作您的数据。在生产环境中,您可能只想使用 emit(gc.usize) 然后对该值进行聚合或在计算中使用它。

emit("used" + ' ' + gc.usize + ', ' + "capacity" + ' ' + gc.csize + ', ' + "committed" + ' ' + gc.comsize)

将所有内容整合在一起,您可以在搜索请求中创建一个名为 gc_size 的运行时字段。使用 fields 选项,您可以检索 gc_size 运行时字段的所有值。此查询还包括一个桶聚合,用于对您的数据进行分组。

GET my-index/_search
{
  "runtime_mappings": {
    "gc_size": {
      "type": "keyword",
      "script": """
        Map gc=dissect('[%{@timestamp}][%{code}][%{desc}]  %{ident} used %{usize}, capacity %{csize}, committed %{comsize}, reserved %{rsize}').extract(doc["gc.keyword"].value);
        if (gc != null) emit("used" + ' ' + gc.usize + ', ' + "capacity" + ' ' + gc.csize + ', ' + "committed" + ' ' + gc.comsize);
      """
    }
  },
  "size": 1,
  "aggs": {
    "sizes": {
      "terms": {
        "field": "gc_size",
        "size": 10
      }
    }
  },
  "fields" : ["gc_size"]
}

响应包括来自 gc_size 字段的数据,格式与您在剖析模式中定义的完全一致!

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 6,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my-index",
        "_id" : "GXx3H3kBKGE42WRNlddJ",
        "_score" : 1.0,
        "_source" : {
          "gc" : "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit]   class space    used 266K, capacity 384K, committed 384K, reserved 1048576K"
        },
        "fields" : {
          "gc_size" : [
            "used 266K, capacity 384K, committed 384K"
          ]
        }
      }
    ]
  },
  "aggregations" : {
    "sizes" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "used 107719K, capacity 111775K, committed 112724K",
          "doc_count" : 1
        },
        {
          "key" : "used 115409K, capacity 119541K, committed 120248K",
          "doc_count" : 1
        },
        {
          "key" : "used 14503K, capacity 15894K, committed 15948K",
          "doc_count" : 1
        },
        {
          "key" : "used 15255K, capacity 16726K, committed 16844K",
          "doc_count" : 1
        },
        {
          "key" : "used 266K, capacity 367K, committed 384K",
          "doc_count" : 1
        },
        {
          "key" : "used 266K, capacity 384K, committed 384K",
          "doc_count" : 1
        }
      ]
    }
  }
}