常见的脚本使用场景
edit常见的脚本使用案例
edit你可以编写脚本来完成几乎任何事情,但有时,这也会带来麻烦。了解脚本的可能性是具有挑战性的,因此以下示例解决了脚本真正有帮助的常见用例。
字段提取
edit字段提取的目标很简单;您的数据中包含大量信息的字段,但您只想提取其中的部分内容。
您有两种选择:
让我们从一个简单的例子开始,将 @timestamp 和 message 字段添加到 my-index 映射中作为索引字段。为了保持灵活性,使用 wildcard 作为 message 字段的类型:
PUT /my-index/
{
"mappings": {
"properties": {
"@timestamp": {
"format": "strict_date_optional_time||epoch_second",
"type": "date"
},
"message": {
"type": "wildcard"
}
}
}
}
在映射了您想要检索的字段后,将一些日志数据记录索引到 Elasticsearch 中。以下请求使用 bulk API 将原始日志数据索引到 my-index 中。您不需要索引所有的日志数据,可以使用一个小样本来实验运行时字段。
POST /my-index/_bulk?refresh
{"index":{}}
{"timestamp":"2020-04-30T14:30:17-05:00","message":"40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:30:53-05:00","message":"232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:12-05:00","message":"26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:19-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] \"GET /french/splash_inet.html HTTP/1.0\" 200 3781"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:22-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:27-05:00","message":"252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:28-05:00","message":"not a valid apache log"}
从日志消息中提取IP地址 (Grok)
edit如果你想检索包含clientip的结果,可以在映射中将该字段添加为运行时字段。以下运行时脚本定义了一个grok模式,用于从message字段中提取结构化字段。
该脚本匹配 %{COMMONAPACHELOG} 日志模式,该模式理解 Apache 日志的结构。如果模式匹配(clientip != null),脚本会发出匹配的 IP 地址的值。如果模式不匹配,脚本只会返回字段值而不崩溃。
PUT my-index/_mappings
{
"runtime": {
"http.clientip": {
"type": "ip",
"script": """
String clientip=grok('%{COMMONAPACHELOG}').extract(doc["message"].value)?.clientip;
if (clientip != null) emit(clientip);
"""
}
}
}
您可以定义一个简单的查询来运行针对特定IP地址的搜索,并返回所有相关字段。使用搜索API的fields参数来检索http.clientip运行时字段。
GET my-index/_search
{
"query": {
"match": {
"http.clientip": "40.135.0.0"
}
},
"fields" : ["http.clientip"]
}
响应包括文档,其中http.clientip的值匹配40.135.0.0。
{
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "my-index",
"_id" : "Rq-ex3gBA_A0V6dYGLQ7",
"_score" : 1.0,
"_source" : {
"timestamp" : "2020-04-30T14:30:17-05:00",
"message" : "40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"
},
"fields" : {
"http.clientip" : [
"40.135.0.0"
]
}
}
]
}
}
解析字符串以提取字段的一部分(Dissect)
edit与在上一个示例中匹配日志模式不同,您可以定义一个拆分模式,以包含您想要丢弃的字符串部分。
例如,本节开头的日志数据包含一个message字段。该字段包含多个数据片段:
"message" : "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"
您可以在运行时字段中定义一个dissect模式,以提取HTTP响应代码,在前面的示例中,该代码为304。
PUT my-index/_mappings
{
"runtime": {
"http.response": {
"type": "long",
"script": """
String response=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}').extract(doc["message"].value)?.response;
if (response != null) emit(Integer.parseInt(response));
"""
}
}
}
然后,您可以运行查询以使用http.response运行时字段检索特定的HTTP响应:
GET my-index/_search
{
"query": {
"match": {
"http.response": "304"
}
},
"fields" : ["http.response"]
}
响应包括一个文档,其中HTTP响应为304:
{
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "my-index",
"_id" : "Sq-ex3gBA_A0V6dYGLQ7",
"_score" : 1.0,
"_source" : {
"timestamp" : "2020-04-30T14:31:22-05:00",
"message" : "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"
},
"fields" : {
"http.response" : [
304
]
}
}
]
}
}
按分隔符拆分字段中的值(Dissect)
edit假设你想像前面的例子一样提取字段的一部分,但你想根据特定值进行拆分。你可以使用解剖模式来提取你想要的信息,并以特定格式返回该数据。
例如,假设您有一堆来自Elasticsearch的垃圾收集(gc)日志数据,格式如下:
[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 384K, committed 384K, reserved 1048576K
您只想提取used、capacity和committed数据,以及相关的值。让我们索引一些包含日志数据的文档作为示例:
POST /my-index/_bulk?refresh
{"index":{}}
{"gc": "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 384K, committed 384K, reserved 1048576K"}
{"index":{}}
{"gc": "[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit] class space used 15255K, capacity 16726K, committed 16844K, reserved 1048576K"}
{"index":{}}
{"gc": "[2021-03-24T20:27:24.184+0000][90239][gc,heap,exit] Metaspace used 115409K, capacity 119541K, committed 120248K, reserved 1153024K"}
{"index":{}}
{"gc": "[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit] class space used 14503K, capacity 15894K, committed 15948K, reserved 1048576K"}
{"index":{}}
{"gc": "[2021-04-19T15:03:21.735+0000][84408][gc,heap,exit] Metaspace used 107719K, capacity 111775K, committed 112724K, reserved 1146880K"}
{"index":{}}
{"gc": "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 367K, committed 384K, reserved 1048576K"}
再次查看数据,有一个时间戳,一些你不感兴趣的其他数据,然后是used、capacity和committed数据:
[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 384K, committed 384K, reserved 1048576K
您可以将变量分配给gc字段中数据的每个部分,然后仅返回您想要的部分。任何在大括号{}中的内容都被视为变量。例如,变量[%{@timestamp}][%{code}][%{desc}]将匹配数据的前三个块,所有这些块都位于方括号[]中。
[%{@timestamp}][%{code}][%{desc}] %{ident} used %{usize}, capacity %{csize}, committed %{comsize}, reserved %{rsize}
您的分解模式可以包含术语 used、capacity 和 committed,而不是使用变量,因为您希望准确返回这些术语。您还可以将变量分配给您希望返回的值,例如 %{usize}、%{csize} 和 %{comsize}。日志数据中的分隔符是逗号,因此您的分解模式也需要使用该分隔符。
现在你有了一个解析模式,你可以将其包含在Painless脚本中,作为运行时字段的一部分。该脚本使用你的解析模式来拆分gc字段,然后返回由emit方法定义的所需信息。由于解析使用简单的语法,你只需要明确告诉它你想要什么。
以下模式告诉 dissect 返回术语 used,一个空格,来自 gc.usize 的值,以及一个逗号。此模式会重复用于您想要检索的其他数据。虽然此模式在生产环境中可能不是那么有用,但它提供了很大的灵活性来试验和操作您的数据。在生产环境中,您可能只想使用 emit(gc.usize) 然后对该值进行聚合或在计算中使用它。
emit("used" + ' ' + gc.usize + ', ' + "capacity" + ' ' + gc.csize + ', ' + "committed" + ' ' + gc.comsize)
将所有内容整合在一起,您可以在搜索请求中创建一个名为 gc_size 的运行时字段。使用 fields 选项,您可以检索 gc_size 运行时字段的所有值。此查询还包括一个桶聚合,用于对您的数据进行分组。
GET my-index/_search
{
"runtime_mappings": {
"gc_size": {
"type": "keyword",
"script": """
Map gc=dissect('[%{@timestamp}][%{code}][%{desc}] %{ident} used %{usize}, capacity %{csize}, committed %{comsize}, reserved %{rsize}').extract(doc["gc.keyword"].value);
if (gc != null) emit("used" + ' ' + gc.usize + ', ' + "capacity" + ' ' + gc.csize + ', ' + "committed" + ' ' + gc.comsize);
"""
}
},
"size": 1,
"aggs": {
"sizes": {
"terms": {
"field": "gc_size",
"size": 10
}
}
},
"fields" : ["gc_size"]
}
响应包括来自 gc_size 字段的数据,格式与您在剖析模式中定义的完全一致!
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 6,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "my-index",
"_id" : "GXx3H3kBKGE42WRNlddJ",
"_score" : 1.0,
"_source" : {
"gc" : "[2021-04-27T16:16:34.699+0000][82460][gc,heap,exit] class space used 266K, capacity 384K, committed 384K, reserved 1048576K"
},
"fields" : {
"gc_size" : [
"used 266K, capacity 384K, committed 384K"
]
}
}
]
},
"aggregations" : {
"sizes" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "used 107719K, capacity 111775K, committed 112724K",
"doc_count" : 1
},
{
"key" : "used 115409K, capacity 119541K, committed 120248K",
"doc_count" : 1
},
{
"key" : "used 14503K, capacity 15894K, committed 15948K",
"doc_count" : 1
},
{
"key" : "used 15255K, capacity 16726K, committed 16844K",
"doc_count" : 1
},
{
"key" : "used 266K, capacity 367K, committed 384K",
"doc_count" : 1
},
{
"key" : "used 266K, capacity 384K, committed 384K",
"doc_count" : 1
}
]
}
}
}