如何编写脚本

edit

在Elasticsearch API中支持脚本的任何地方,语法都遵循相同的模式;您指定脚本的语言,提供脚本逻辑(或源代码),并添加传递到脚本的参数:

  "script": {
    "lang":   "...",
    "source" | "id": "...",
    "params": { ... }
  }
lang
指定脚本所使用的语言。默认为painless
source, id
The script itself, which you specify as source for an inline script or id for a stored script. Use the stored script APIs to create and manage stored scripts.
params
指定传递到脚本中的任何命名参数作为变量。使用参数而不是硬编码值以减少编译时间。

编写你的第一个脚本

edit

Painless 是 Elasticsearch 的默认脚本语言。它安全、高效,并为有少量编程经验的人提供了自然的语法。

一个Painless脚本由一个或多个语句组成,并且可以选择在开头包含一个或多个用户定义的函数。一个脚本必须始终至少有一个语句。

The Painless execute API 提供了测试脚本的能力,该脚本带有简单的用户定义参数并接收结果。让我们从一个完整的脚本开始,并回顾其组成部分。

首先,使用单个字段索引一个文档,以便我们有一些数据可以处理:

PUT my-index-000001/_doc/1
{
  "my_field": 5
}

然后,我们可以构建一个在该字段上操作的脚本,并将其作为查询的一部分运行以进行评估。以下查询使用搜索API的script_fields参数来检索脚本估值。这里发生了很多事情,但我们将分解各个组件以单独理解它们。目前,您只需要了解此脚本使用my_field并对其进行操作。

GET my-index-000001/_search
{
  "script_fields": {
    "my_doubled_field": {
      "script": { 
        "source": "doc['my_field'].value * params['multiplier']", 
        "params": {
          "multiplier": 2
        }
      }
    }
  }
}

script 对象

脚本

The script 是一个标准的 JSON 对象,用于在 Elasticsearch 的大多数 API 下定义脚本。该对象需要 source 来定义脚本本身。脚本没有指定语言,因此默认为 Painless。

在脚本中使用参数

edit

Elasticsearch第一次看到一个新的脚本时,它会编译该脚本并将编译后的版本存储在缓存中。编译可能是一个繁重的过程。与其在脚本中硬编码值,不如将它们作为命名的params传递。

例如,在前面的脚本中,我们可以直接硬编码值,并编写一个看似不那么复杂的脚本。我们可以直接获取my_field的第一个值,然后将其乘以2

"source": "return doc['my_field'].value * 2"

虽然这个解决方案有效,但它非常不灵活。我们必须修改脚本源代码来更改乘数,并且每次乘数变化时,Elasticsearch都必须重新编译脚本。

与其硬编码值,不如使用命名的 params 来使脚本更加灵活,并且还可以减少脚本运行时的编译时间。现在,您可以对 multiplier 参数进行更改,而无需 Elasticsearch 重新编译脚本。

"source": "doc['my_field'].value * params['multiplier']",
"params": {
  "multiplier": 2
}

默认情况下,您可以在5分钟内编译最多150个脚本。 对于摄取上下文,默认的脚本编译速率是无限制的。

script.context.field.max_compilations_rate=100/10m

如果你在短时间内编译了太多唯一的脚本,Elasticsearch会拒绝新的动态脚本,并返回一个circuit_breaking_exception错误。

缩短你的脚本

edit

使用Painless原生的语法能力,您可以减少脚本中的冗余,使其更简洁。以下是一个我们可以简化的简单脚本:

GET my-index-000001/_search
{
  "script_fields": {
    "my_doubled_field": {
      "script": {
        "lang":   "painless",
        "source": "doc['my_field'].value * params.get('multiplier');",
        "params": {
          "multiplier": 2
        }
      }
    }
  }
}

让我们来看一下脚本的简化版本,看看它相比之前的迭代包含了哪些改进:

GET my-index-000001/_search
{
  "script_fields": {
    "my_doubled_field": {
      "script": {
        "source": "field('my_field').get(null) * params['multiplier']",
        "params": {
          "multiplier": 2
        }
      }
    }
  }
}

此版本的脚本移除了多个组件,并显著简化了语法:

  • lang 声明。因为 Painless 是默认语言,如果你正在编写 Painless 脚本,则不需要指定语言。
  • return 关键字。Painless 自动使用脚本中的最后一条语句(在可能的情况下)在需要返回值的脚本上下文中生成返回值。
  • get 方法,它被方括号 [] 替换。Painless 为 Map 类型提供了一个快捷方式,允许我们使用方括号而不是较长的 get 方法。
  • source 语句末尾的分号。Painless 不需要块的最后一条语句使用分号。然而,在其他情况下,它确实需要分号来消除歧义。

在 Elasticsearch 支持脚本的任何地方使用这种简写语法,例如当你在创建 运行时字段时。

存储和检索脚本

edit

您可以使用存储脚本API从集群状态中存储和检索脚本。存储的脚本减少了编译时间,并使搜索更快。

与常规脚本不同,存储脚本要求您使用lang参数指定脚本语言。

要创建一个脚本,请使用创建存储脚本 API。例如,以下请求创建一个名为calculate-score的存储脚本。

POST _scripts/calculate-score
{
  "script": {
    "lang": "painless",
    "source": "Math.log(_score * 2) + params['my_modifier']"
  }
}

您可以通过使用获取存储脚本API来检索该脚本。

GET _scripts/calculate-score

要在查询中使用存储的脚本,请在script声明中包含脚本id

GET my-index-000001/_search
{
  "query": {
    "script_score": {
      "query": {
        "match": {
            "message": "some message"
        }
      },
      "script": {
        "id": "calculate-score", 
        "params": {
          "my_modifier": 2
        }
      }
    }
  }
}

id 存储脚本的标识符

要删除存储的脚本,请提交一个删除存储脚本 API请求。

DELETE _scripts/calculate-score

使用脚本更新文档

edit

您可以使用update API来更新带有指定脚本的文档。该脚本可以更新、删除或跳过修改文档。update API还支持传递部分文档,该文档将与现有文档合并。

首先,让我们索引一个简单的文档:

PUT my-index-000001/_doc/1
{
  "counter" : 1,
  "tags" : ["red"]
}

要增加计数器,您可以使用以下脚本提交更新请求:

POST my-index-000001/_update/1
{
  "script" : {
    "source": "ctx._source.counter += params.count",
    "lang": "painless",
    "params" : {
      "count" : 4
    }
  }
}

同样地,您可以使用更新脚本来向标签列表中添加一个标签。 因为这只是一个列表,即使标签已经存在,也会被添加:

POST my-index-000001/_update/1
{
  "script": {
    "source": "ctx._source.tags.add(params['tag'])",
    "lang": "painless",
    "params": {
      "tag": "blue"
    }
  }
}

您也可以从标签列表中移除一个标签。Java的List类中的remove方法在Painless中可用。它接受您想要移除的元素的索引。为了避免可能的运行时错误,您首先需要确保该标签存在。如果列表中包含该标签的重复项,此脚本只会移除一个出现项。

POST my-index-000001/_update/1
{
  "script": {
    "source": "if (ctx._source.tags.contains(params['tag'])) { ctx._source.tags.remove(ctx._source.tags.indexOf(params['tag'])) }",
    "lang": "painless",
    "params": {
      "tag": "blue"
    }
  }
}

您还可以从文档中添加和删除字段。例如,此脚本添加了字段 new_field

POST my-index-000001/_update/1
{
  "script" : "ctx._source.new_field = 'value_of_new_field'"
}

相反,此脚本移除字段 new_field

POST my-index-000001/_update/1
{
  "script" : "ctx._source.remove('new_field')"
}

除了更新文档,您还可以从脚本内部更改执行的操作。例如,如果 tags 字段包含 green,此请求将删除文档。否则,它将不执行任何操作(noop):

POST my-index-000001/_update/1
{
  "script": {
    "source": "if (ctx._source.tags.contains(params['tag'])) { ctx.op = 'delete' } else { ctx.op = 'none' }",
    "lang": "painless",
    "params": {
      "tag": "green"
    }
  }
}

脚本、缓存和搜索速度

edit

Elasticsearch 执行了许多优化措施,以使使用脚本尽可能快速。一个重要的优化是脚本缓存。编译后的脚本被放置在缓存中,以便引用该脚本的请求不会产生编译开销。

缓存大小很重要。您的脚本缓存应足够大,以容纳用户需要同时访问的所有脚本。

如果你看到大量的脚本缓存驱逐和不断增加的编译数量在节点统计信息中,你的缓存可能太小了。

所有脚本默认情况下都会被缓存,因此它们只需要在更新时重新编译。默认情况下,脚本没有基于时间的过期设置。您可以通过使用script.cache.expire设置来更改此行为。使用script.cache.max_size设置来配置缓存的大小。

脚本的大小限制为65,535字节。将script.max_size_in_bytes的值设置为增加该软限制。如果你的脚本非常大,那么考虑使用原生脚本引擎

提高搜索速度

edit

脚本非常有用,但不能使用Elasticsearch的索引结构或相关优化。这种关系有时会导致搜索速度变慢。

如果你经常使用脚本来转换索引数据,你可以通过在数据摄取过程中进行转换来使搜索更快。然而,这通常意味着索引速度会变慢。让我们看一个实际的例子来说明如何提高搜索速度。

在运行搜索时,通常会根据两个值的总和来对结果进行排序。 例如,考虑一个名为 my_test_scores 的索引,其中包含测试分数数据。该索引包括两个类型为 long 的字段:

  • math_score
  • verbal_score

您可以使用一个脚本来运行查询,该脚本将这些值相加。这种方法没有问题,但由于脚本估值是请求的一部分,查询速度会较慢。以下请求返回grad_year等于2099的文档,并按脚本估值的结果进行排序。

GET /my_test_scores/_search
{
  "query": {
    "term": {
      "grad_year": "2099"
    }
  },
  "sort": [
    {
      "_script": {
        "type": "number",
        "script": {
          "source": "doc['math_score'].value + doc['verbal_score'].value"
        },
        "order": "desc"
      }
    }
  ]
}

如果你正在搜索一个小索引,那么将脚本作为搜索查询的一部分包含在内可能是一个好的解决方案。如果你想让搜索更快,你可以在摄取期间执行此计算,并将总和索引到一个字段中。

首先,我们将在索引中添加一个名为 total_score 的新字段,该字段将包含 math_scoreverbal_score 字段值的总和。

PUT /my_test_scores/_mapping
{
  "properties": {
    "total_score": {
      "type": "long"
    }
  }
}

接下来,使用包含 ingest pipelinescript processor 来计算 math_scoreverbal_score 的总和,并将其索引到 total_score 字段中。

PUT _ingest/pipeline/my_test_scores_pipeline
{
  "description": "Calculates the total test score",
  "processors": [
    {
      "script": {
        "source": "ctx.total_score = (ctx.math_score + ctx.verbal_score)"
      }
    }
  ]
}

要更新现有数据,请使用此管道将任何文档从 my_test_scores 重新索引到一个名为 my_test_scores_2 的新索引中。

POST /_reindex
{
  "source": {
    "index": "my_test_scores"
  },
  "dest": {
    "index": "my_test_scores_2",
    "pipeline": "my_test_scores_pipeline"
  }
}

继续使用管道将任何新文档索引到 my_test_scores_2

POST /my_test_scores_2/_doc/?pipeline=my_test_scores_pipeline
{
  "student": "kimchy",
  "grad_year": "2099",
  "math_score": 1200,
  "verbal_score": 800
}

这些更改减缓了索引过程,但允许更快的搜索。您可以使用total_score字段对在my_test_scores_2上进行的搜索进行排序,而不是使用脚本。响应几乎是实时的!尽管这个过程减缓了索引时间,但它大大提高了搜索时的查询速度。

GET /my_test_scores_2/_search
{
  "query": {
    "term": {
      "grad_year": "2099"
    }
  },
  "sort": [
    {
      "total_score": {
        "order": "desc"
      }
    }
  ]
}

剖析数据

edit

Dissect 将单个文本字段与定义的模式进行匹配。Dissect 模式由您想要丢弃的字符串部分定义。特别注意字符串的每个部分有助于构建成功的 dissect 模式。

如果你不需要正则表达式的强大功能,请使用解剖模式而不是grok。解剖使用比grok更简单的语法,并且通常在整体上更快。解剖的语法是透明的:告诉解剖你想要什么,它就会返回那些结果给你。

剖析模式

edit

剖析模式由变量分隔符组成。任何由百分号和花括号%{}定义的内容都被视为变量,例如%{clientip}。您可以将变量分配给字段数据中的任何部分,然后仅返回您想要的部分。分隔符是变量之间的任何值,可以是空格、破折号或其他分隔符。

例如,假设您有包含 message 字段的日志数据,如下所示:

"message" : "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"

您将变量分配给数据的每个部分,以构建一个成功的dissect模式。请记住,告诉dissect确切地您想要匹配的内容。

数据的第一个部分看起来像是一个IP地址,所以你可以像%{clientip}这样分配一个变量。接下来的两个字符是带有两侧空格的破折号。你可以为每个破折号分配一个变量,或者用一个变量来表示破折号和空格。接下来是一组包含时间戳的方括号。方括号是一个分隔符,所以你在分解模式中包含这些。到目前为止,数据和匹配的分解模式看起来像这样:

247.37.0.0 - - [30/Apr/2020:14:31:22 -0500]  

%{clientip} %{ident} %{auth} [%{@timestamp}] 

来自message字段的第一批数据

解析模式以匹配选定的数据块

使用相同的逻辑,您可以为剩余的数据块创建变量。双引号是分隔符,因此在分解模式中包含这些符号。该模式将GET替换为%{verb}变量,但将HTTP保留为模式的一部分。

\"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0

"%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}

结合这两种模式,结果是一个分解模式,看起来像这样:

%{clientip} %{ident} %{auth} [%{@timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}

现在你有了一个分解模式,如何测试和使用它?

使用Painless测试dissect模式

edit

您可以将dissect模式合并到Painless脚本中以提取数据。要测试您的脚本,可以使用Painless执行API的字段上下文或创建一个包含该脚本的运行时字段。运行时字段提供了更大的灵活性并接受多个文档,但如果您在测试脚本的集群上没有写入权限,Painless执行API是一个很好的选择。

例如,通过包含您的Painless脚本和一个与您的数据匹配的单个文档,使用Painless执行API测试您的dissect模式。首先将message字段索引为wildcard数据类型:

PUT my-index
{
  "mappings": {
    "properties": {
      "message": {
        "type": "wildcard"
      }
    }
  }
}

如果您想检索HTTP响应代码,请将您的分解模式添加到一个Painless脚本中,该脚本提取response值。要从字段中提取值,请使用此函数:

`.extract(doc["<field_name>"].value)?.<field_value>`

在这个例子中,message,而 response

POST /_scripts/painless/_execute
{
  "script": {
    "source": """
      String response=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}').extract(doc["message"].value)?.response;
        if (response != null) emit(Integer.parseInt(response)); 
    """
  },
  "context": "long_field", 
  "context_setup": {
    "index": "my-index",
    "document": {          
      "message": """247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] "GET /images/hm_nbg.jpg HTTP/1.0" 304 0"""
    }
  }
}

运行时字段需要使用emit方法来返回值。

因为响应代码是一个整数,使用long_field上下文。

包含一个与您的数据匹配的示例文档。

结果包括HTTP响应代码:

{
  "result" : [
    304
  ]
}

在运行时字段中使用dissect模式和脚本

edit

如果你有一个功能性的解析模式,你可以将其添加到一个运行时字段中以操作数据。因为运行时字段不需要你索引字段,所以你可以灵活地修改你的脚本及其功能。如果你已经使用Painless执行API测试了你的解析模式,你可以在你的运行时字段中使用那个完全相同的Painless脚本。

首先,将 message 字段添加为 wildcard 类型,就像在前一节中一样,但也要将 @timestamp 添加为 date,以防你想要对该字段进行 其他使用场景 的操作:

PUT /my-index/
{
  "mappings": {
    "properties": {
      "@timestamp": {
        "format": "strict_date_optional_time||epoch_second",
        "type": "date"
      },
      "message": {
        "type": "wildcard"
      }
    }
  }
}

如果你想使用你的剖析模式提取HTTP响应代码,你可以创建一个运行时字段,如http.response

PUT my-index/_mappings
{
  "runtime": {
    "http.response": {
      "type": "long",
      "script": """
        String response=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}').extract(doc["message"].value)?.response;
        if (response != null) emit(Integer.parseInt(response));
      """
    }
  }
}

在映射了您想要检索的字段后,将一些日志数据记录索引到 Elasticsearch 中。以下请求使用 bulk API 将原始日志数据索引到 my-index 中:

POST /my-index/_bulk?refresh=true
{"index":{}}
{"timestamp":"2020-04-30T14:30:17-05:00","message":"40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:30:53-05:00","message":"232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:12-05:00","message":"26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:19-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] \"GET /french/splash_inet.html HTTP/1.0\" 200 3781"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:22-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:27-05:00","message":"252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:28-05:00","message":"not a valid apache log"}

您可以定义一个简单的查询来运行搜索以获取特定的HTTP响应并返回所有相关字段。使用搜索API的fields参数来检索http.response运行时字段。

GET my-index/_search
{
  "query": {
    "match": {
      "http.response": "304"
    }
  },
  "fields" : ["http.response"]
}

或者,您可以在搜索请求的上下文中定义相同的运行时字段。运行时定义和脚本与之前在索引映射中定义的完全相同。只需将该定义复制到搜索请求的 runtime_mappings 部分,并包含一个与运行时字段匹配的查询。此查询返回的结果与之前为索引映射中的 http.response 运行时字段定义的搜索查询相同,但仅限于此特定搜索的上下文中:

GET my-index/_search
{
  "runtime_mappings": {
    "http.response": {
      "type": "long",
      "script": """
        String response=dissect('%{clientip} %{ident} %{auth} [%{@timestamp}] "%{verb} %{request} HTTP/%{httpversion}" %{response} %{size}').extract(doc["message"].value)?.response;
        if (response != null) emit(Integer.parseInt(response));
      """
    }
  },
  "query": {
    "match": {
      "http.response": "304"
    }
  },
  "fields" : ["http.response"]
}
{
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my-index",
        "_id" : "D47UqXkBByC8cgZrkbOm",
        "_score" : 1.0,
        "_source" : {
          "timestamp" : "2020-04-30T14:31:22-05:00",
          "message" : "247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"
        },
        "fields" : {
          "http.response" : [
            304
          ]
        }
      }
    ]
  }
}

深入理解 Grok

edit

Grok 是一种支持可重用别名表达式的正则表达式方言。Grok 非常适合处理 syslog 日志、Apache 和其他网络服务器日志、mysql 日志,以及通常任何为人类而非计算机编写的日志格式。

Grok 建立在 Oniguruma 正则表达式库之上,因此任何正则表达式在 grok 中都是有效的。Grok 使用这种正则表达式语言来允许命名现有模式并将它们组合成更复杂的模式,以匹配您的字段。

Grok 模式

edit

Elastic Stack 附带了许多 预定义的 grok 模式,这些模式简化了 grok 的使用。重用 grok 模式的语法采用以下形式之一:

%{SYNTAX}

%{SYNTAX:ID}

%{SYNTAX:ID:TYPE}

SYNTAX
将匹配您的文本的模式名称。例如,NUMBERIP 都是默认模式集中提供的模式。NUMBER 模式匹配像 3.44 这样的数据,而 IP 模式匹配像 55.3.244.1 这样的数据。
ID
您为正在匹配的文本片段提供的标识符。例如,3.44 可能是某个事件的持续时间,因此您可以将其称为duration。字符串 55.3.244.1 可能标识发出请求的client
TYPE
您希望将命名字段转换成的数据类型。支持的类型包括 intlongdoublefloatboolean

例如,假设您有如下所示的消息数据:

3.44 55.3.244.1

第一个值是一个数字,后面跟着一个看起来像是IP地址的内容。你可以使用以下grok表达式来匹配这段文本:

%{NUMBER:duration} %{IP:client}

迁移到 Elastic 通用模式 (ECS)

edit

为了简化向Elastic Common Schema (ECS)的迁移,除了现有的模式外,还提供了一组新的符合ECS的模式。新的ECS模式定义捕获了符合该模式的事件字段名称。

ECS 模式集包含了旧模式集中的所有模式定义,并且是一个即插即用的替代方案。使用 ecs-compatability 设置来切换模式。

新功能和增强功能将被添加到符合ECS标准的文件中。遗留模式可能仍会收到向后兼容的错误修复。

在 Painless 脚本中使用 grok 模式

edit

您可以将预定义的grok模式合并到Painless脚本中以提取数据。要测试您的脚本,可以使用Painless执行API的字段上下文或创建一个包含该脚本的运行时字段。运行时字段提供了更大的灵活性并接受多个文档,但如果您在测试脚本的集群上没有写入权限,Painless执行API是一个很好的选择。

如果您需要帮助构建grok模式以匹配您的数据,请使用Kibana中的Grok Debugger工具。

例如,如果您正在处理Apache日志数据,您可以使用%{COMMONAPACHELOG}语法,该语法理解Apache日志的结构。一个示例文档可能如下所示:

"timestamp":"2020-04-30T14:30:17-05:00","message":"40.135.0.0 - -
[30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"

要从 message 字段中提取 IP 地址,您可以编写一个 Painless 脚本,该脚本包含 %{COMMONAPACHELOG} 语法。您可以使用 Painless 执行 API 的 ip 字段上下文 来测试此脚本,但让我们改为使用运行时字段。

基于示例文档,索引 @timestampmessage 字段。为了保持灵活性,将 message 字段的类型设置为 wildcard

PUT /my-index/
{
  "mappings": {
    "properties": {
      "@timestamp": {
        "format": "strict_date_optional_time||epoch_second",
        "type": "date"
      },
      "message": {
        "type": "wildcard"
      }
    }
  }
}

接下来,使用批量 API将一些日志数据索引到my-index中。

POST /my-index/_bulk?refresh
{"index":{}}
{"timestamp":"2020-04-30T14:30:17-05:00","message":"40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:30:53-05:00","message":"232.0.0.0 - - [30/Apr/2020:14:30:53 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:12-05:00","message":"26.1.0.0 - - [30/Apr/2020:14:31:12 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:19-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:19 -0500] \"GET /french/splash_inet.html HTTP/1.0\" 200 3781"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:22-05:00","message":"247.37.0.0 - - [30/Apr/2020:14:31:22 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:27-05:00","message":"252.0.0.0 - - [30/Apr/2020:14:31:27 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"}
{"index":{}}
{"timestamp":"2020-04-30T14:31:28-05:00","message":"not a valid apache log"}

在运行时字段中集成grok模式和脚本

edit

现在您可以在映射中定义一个运行时字段,其中包括您的 Painless 脚本和 grok 模式。如果模式匹配,脚本会发出匹配的 IP 地址的值。如果模式不匹配(clientip != null),脚本只会返回字段值而不崩溃。

PUT my-index/_mappings
{
  "runtime": {
    "http.clientip": {
      "type": "ip",
      "script": """
        String clientip=grok('%{COMMONAPACHELOG}').extract(doc["message"].value)?.clientip;
        if (clientip != null) emit(clientip);
      """
    }
  }
}

或者,您可以在搜索请求的上下文中定义相同的运行时字段。运行时定义和脚本与之前在索引映射中定义的完全相同。只需将该定义复制到搜索请求的 runtime_mappings 部分,并包含一个与运行时字段匹配的查询。此查询返回的结果与您在索引映射中为 http.clientip 运行时字段定义搜索查询时返回的结果相同,但仅限于此特定搜索的上下文中:

GET my-index/_search
{
  "runtime_mappings": {
    "http.clientip": {
      "type": "ip",
      "script": """
        String clientip=grok('%{COMMONAPACHELOG}').extract(doc["message"].value)?.clientip;
        if (clientip != null) emit(clientip);
      """
    }
  },
  "query": {
    "match": {
      "http.clientip": "40.135.0.0"
    }
  },
  "fields" : ["http.clientip"]
}

返回计算结果

edit

使用 http.clientip 运行时字段,您可以定义一个简单的查询来搜索特定的 IP 地址并返回所有相关字段。fields 参数在 _search API 上适用于所有字段,即使是那些未作为原始 _source 的一部分发送的字段:

GET my-index/_search
{
  "query": {
    "match": {
      "http.clientip": "40.135.0.0"
    }
  },
  "fields" : ["http.clientip"]
}

响应包括您搜索查询中指示的特定IP地址。 Painless脚本中的grok模式在运行时从 message 字段中提取了此值。

{
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my-index",
        "_id" : "1iN2a3kBw4xTzEDqyYE0",
        "_score" : 1.0,
        "_source" : {
          "timestamp" : "2020-04-30T14:30:17-05:00",
          "message" : "40.135.0.0 - - [30/Apr/2020:14:30:17 -0500] \"GET /images/hm_bg.jpg HTTP/1.0\" 200 24736"
        },
        "fields" : {
          "http.clientip" : [
            "40.135.0.0"
          ]
        }
      }
    ]
  }
}