监控与仪器

监控Spark应用程序有几种方法:网页用户界面、指标和外部工具。

网页接口

每个 SparkContext 启动一个 Web UI ,默认在 4040 端口上,显示关于应用程序的有用信息。包括:

您可以通过在网页浏览器中简单地打开 http:// :4040 来访问此接口。 如果多个 SparkContexts 在同一主机上运行,它们将依次绑定到从 4040 开始的端口 依次是 4041、4042 等。

请注意,这些信息仅在应用程序运行期间默认可用。 要在之后查看Web UI,请在启动应用程序之前将 spark.eventLog.enabled 设置为true。 这将配置Spark记录编码在UI中显示的信息的Spark事件,以便持续存储。

事后查看

只要应用程序的事件日志存在,通过Spark的历史服务器构建应用程序的用户界面仍然是可能的。 您可以通过执行以下命令来启动历史服务器:

./sbin/start-history-server.sh

这默认创建了一个网络界面在 http:// :18080 ,列出了未完成和已完成的应用程序及尝试。

在使用文件系统提供者类(请参见下面的 spark.history.provider )时,必须在 spark.history.fs.logDirectory 配置选项中提供基础日志目录,并且应包含每个代表应用程序事件日志的子目录。

Spark作业本身必须配置为记录事件,并将其记录到相同的共享、可写目录。例如,如果服务器配置的日志目录为 hdfs://namenode/shared/spark-logs ,那么客户端的选项将是:

spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode/shared/spark-logs

历史服务器可以按如下方式配置:

环境变量

环境变量 含义
SPARK_DAEMON_MEMORY 分配给历史服务器的内存(默认:1g)。
SPARK_DAEMON_JAVA_OPTS 历史服务器的JVM选项(默认:无)。
SPARK_DAEMON_CLASSPATH 历史服务器的类路径(默认:无)。
SPARK_PUBLIC_DNS 历史服务器的公共地址。如果没有设置,这可能导致应用程序历史的链接使用服务器的内部地址,从而导致链接断开(默认:无)。
SPARK_HISTORY_OPTS spark.history.* 配置选项用于历史服务器(默认:无)。

对滚动事件日志文件应用压缩

一个长期运行的应用程序(例如流媒体)可能会产生一个巨大的单一事件日志文件,这可能需要花费大量的维护成本,并且在每次更新时需要消耗大量资源来在Spark历史服务器中进行重放。

启用 spark.eventLog.rolling.enabled spark.eventLog.rolling.maxFileSize 将 让你拥有循环事件日志文件,而不是单个巨大事件日志文件,这在某些情况下可能会有所帮助, 但是它仍然无法帮助你减少日志的整体大小。

Spark 历史服务器可以对滚动的事件日志文件进行压缩,以减少日志的总体大小,方法是设置配置 spark.history.fs.eventLog.rolling.maxFilesToRetain 在 Spark 历史服务器上。

详情将在下面描述,但请提前注意,压缩是有损的操作。 压缩将丢弃一些事件,这些事件将不会再在用户界面上出现 - 您可能想在启用该选项之前检查哪些事件将被丢弃。

当压缩发生时,历史服务器会列出应用程序所有可用的事件日志文件,并考虑到其索引低于将作为压缩目标的最小索引文件的事件日志文件。例如,如果应用程序 A 有 5 个事件日志文件,并且 spark.history.fs.eventLog.rolling.maxFilesToRetain 设置为 2,那么前 3 个日志文件将被选中进行压缩。

一旦选择了目标,它会分析这些目标,以确定可以排除哪些事件,并将它们重写为一个紧凑的文件,同时丢弃决定排除的事件。

压缩尝试排除指向过时数据的事件。目前,以下是要排除的事件候选者的描述:

一旦重写完成,原始日志文件将以尽力而为的方式被删除。历史服务器可能无法删除原始日志文件,但这不会影响历史服务器的操作。

请注意,如果Spark History Server发现压缩时不会减少很多空间,则可能不会压缩旧的事件日志文件。对于流式查询,我们通常期望压缩会在每个微批次触发一个或多个作业时运行,这些作业会很快完成,但在许多情况下,批处理查询不会运行压缩。

请注意,这是在 Spark 3.0 中引入的新功能,可能尚未完全稳定。在某些情况下,压缩可能会排除比您预期更多的事件,从而导致应用程序在历史服务器上的一些 UI 问题。请谨慎使用。

Spark历史服务器配置选项

Spark历史服务器的安全选项在 安全 页面中有更详细的说明。

属性名称 默认值 含义 自版本起
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider 实现应用历史后端的类的名称。目前只有一个实现,由Spark提供,它查找存储在文件系统中的应用日志。 1.1.0
spark.history.fs.logDirectory file:/tmp/spark-events 对于文件系统历史提供者,用于加载的包含应用事件日志的目录的URL。这可以是本地 file:// 路径, HDFS路径 hdfs://namenode/shared/spark-logs 或者Hadoop API支持的其他文件系统的路径。 1.1.0
spark.history.fs.update.interval 10s 文件系统历史提供者检查日志目录中的新日志或更新日志的周期。较短的间隔能更快检测到新应用, 代价是更多的服务器负载要重新读取更新的应用。 一旦更新完成,已完成和未完成应用的列表将反映这些更改。 1.4.0
spark.history.retainedApplications 50 在缓存中保留UI数据的应用程序数。如果超过此上限,则最旧的应用将从缓存中删除。如果应用不在缓存中, 则在从UI访问时必须从磁盘加载它。 1.0.0
spark.history.ui.maxApplications Int.MaxValue 在历史摘要页面上显示的应用程序数量。应用程序UI仍然可以通过直接访问其URL获取,即使它们未在历史摘要页面上显示。 2.0.1
spark.history.ui.port 18080 历史服务器的Web接口绑定的端口。 1.0.0
spark.history.kerberos.enabled false 指示历史服务器是否应使用kerberos进行登录。如果历史服务器正在访问安全Hadoop集群上的HDFS文件,则这是必需的。 1.0.1
spark.history.kerberos.principal (none) spark.history.kerberos.enabled=true 时,指定历史服务器的kerberos主体名称。 1.0.1
spark.history.kerberos.keytab (none) spark.history.kerberos.enabled=true 时,指定历史服务器的kerberos keytab文件的位置。 1.0.1
spark.history.fs.cleaner.enabled false 指定历史服务器是否应定期清理存储中的事件日志。 1.4.0
spark.history.fs.cleaner.interval 1d spark.history.fs.cleaner.enabled=true 时,指定文件系统作业历史清理器检查要删除的文件的频率。 如果至少满足两个条件中的一个,文件将被删除。 首先,如果文件的年龄超过 spark.history.fs.cleaner.maxAge ,则会被删除。 如果文件数量超过 spark.history.fs.cleaner.maxNum ,Spark会尝试根据其最旧尝试时间的顺序清理应用程序的已完成尝试。 1.4.0
spark.history.fs.cleaner.maxAge 7d spark.history.fs.cleaner.enabled=true 时,历史作业文件的最大年龄,超过该年龄的将会被历史文件清理器删除。 1.4.0
spark.history.fs.cleaner.maxNum Int.MaxValue spark.history.fs.cleaner.enabled=true 时,指定事件日志目录中保留的最大文件数量。 Spark会尝试清理已完成的尝试日志,以保持日志目录在该限制之下。 这应该小于HDFS中的潜在文件系统限制,例如 `dfs.namenode.fs-limits.max-directory-items`。 3.0.0
spark.history.fs.endEventReparseChunkSize 1m 解析日志文件末尾字节数以查找结束事件。 这用于加快应用程序列表的生成,跳过不必要的事件日志文件部分。通过将此配置设置为0可以禁用此功能。 2.4.0
spark.history.fs.inProgressOptimization.enabled true 启用对进行中的日志的优化处理。此选项可能会将未能重命名其事件日志的已完成应用程序列为进行中。 2.4.0
spark.history.fs.driverlog.cleaner.enabled spark.history.fs.cleaner.enabled 指定历史服务器是否应定期清理存储中的驱动程序日志。 3.0.0
spark.history.fs.driverlog.cleaner.interval spark.history.fs.cleaner.interval spark.history.fs.driverlog.cleaner.enabled=true 时,指定文件系统驱动程序日志清理器检查要删除的文件的频率。 只有当它们的年龄超过 spark.history.fs.driverlog.cleaner.maxAge 时,文件才会被删除。 3.0.0
spark.history.fs.driverlog.cleaner.maxAge spark.history.fs.cleaner.maxAge spark.history.fs.driverlog.cleaner.enabled=true 时,驱动程序日志文件在驱动程序日志清理器运行时将被删除。 3.0.0
spark.history.fs.numReplayThreads 可用核心的25% 历史服务器将用于处理事件日志的线程数。 2.0.0
spark.history.store.maxDiskUsage 10g 缓存应用程序历史信息存储的本地目录的最大磁盘使用量。 2.3.0
spark.history.store.path (none) 缓存应用程序历史数据的本地目录。如果设置,历史服务器将把应用程序数据存储在磁盘上,而不是保持在内存中。写入磁盘的数据将在历史服务器重启后重用。 2.3.0
spark.history.store.serializer JSON 用于在内存UI对象与基于磁盘的KV存储之间进行读写的序列化工具;JSON或PROTOBUF。 在Spark 3.4.0之前,JSON序列化器是唯一的选择,因此是默认值。 相较于JSON序列化器,PROTOBUF序列化器速度快且紧凑。 3.4.0
spark.history.custom.executor.log.url (none) 指定支持外部日志服务的自定义Spark执行器日志URL,而不是使用集群管理器的应用日志URL在历史服务器中。Spark将通过模式支持一些路径变量,这些变量可能会根据集群管理器而有所不同。请查看您集群管理器的文档以了解支持哪些模式(如果有的话)。此配置对正在运行的应用程序没有影响,仅影响历史服务器。

目前,仅YARN模式支持此配置。
3.0.0
spark.history.custom.executor.log.url.applyIncompleteApplication true 指定是否也对不完整的应用程序应用自定义Spark执行器日志URL。 如果应该提供正在运行的应用程序的执行器日志的原始日志URL,请将其设置为`false`。 请注意,不完整的应用程序可能包括未正常关闭的应用程序。 即使设置为`true`,此配置对正在运行的应用程序没有影响,仅影响历史服务器。 3.0.0
spark.history.fs.eventLog.rolling.maxFilesToRetain Int.MaxValue 最大保留的非压缩事件日志文件数量。默认情况下,所有事件日志文件将被保留。最低值为1出于技术原因。
请阅读“应用旧事件日志文件的压缩”部分以获取更多详细信息。
3.0.0
spark.history.store.hybridStore.enabled false 是否使用HybridStore作为解析事件日志时的存储。HybridStore将首先将数据写入内存存储,并在将数据写入内存存储完成后拥有一个后台线程,该线程将数据转储到磁盘存储。 3.1.0
spark.history.store.hybridStore.maxMemoryUsage 2g 可用于创建HybridStore的最大内存空间。HybridStore共用堆内存,因此如果启用HybridStore,则应通过SHS的内存选项增加堆内存。 3.1.0
spark.history.store.hybridStore.diskBackend ROCKSDB 指定用于混合存储的磁盘基础存储;LEVELDB或ROCKSDB。 3.3.0
spark.history.fs.update.batchSize Int.MaxValue 指定更新新事件日志文件的批处理大小。 这将控制每个扫描过程在合理时间内完成,从而防止在大型环境中初始扫描运行过长并阻止及时扫描新事件日志文件。 3.4.0

请注意,在所有这些用户界面中,表格可以通过点击其标题进行排序,这使得识别慢任务、数据倾斜等变得容易。

注意

  1. 历史服务器显示完成和未完成的Spark作业。如果一个应用在失败后进行了多次尝试,那么失败的尝试将会被显示,以及任何正在进行的未完成尝试或最终成功的尝试。

  2. 未完成的应用仅会间歇性更新。更新之间的时间由检查更改文件的间隔定义( spark.history.fs.update.interval )。在较大的集群上,更新间隔可能设置为较大的值。查看正在运行的应用程序的方式实际上是查看其自身的Web UI。

  3. 未能注册为完成的应用程序将被列为未完成——即使它们不再运行。这可能发生在应用程序崩溃的情况下。

  4. 指示Spark作业完成的一种方法是显式停止Spark上下文( sc.stop() ),或者在Python中使用 with SparkContext() as sc: 构造来处理Spark上下文的设置和拆除。

REST API

除了在用户界面中查看指标外,它们还可以作为 JSON 提供。这为开发人员提供了一种简单的方法来为 Spark 创建新的可视化和监控工具。JSON 可用于正在运行的应用程序和历史服务器。这些端点挂载在 /api/v1 。例如,对于历史服务器,它们通常可以在 http:// :18080/api/v1 访问,对于正在运行的应用程序,则可以在 http://localhost:4040/api/v1 访问。

在API中,应用程序通过其应用程序ID引用, [app-id] 。 在YARN上运行时,每个应用程序可能有多个尝试,但是只有在集群模式下的应用程序才有尝试ID,而在客户端模式下的应用程序则没有。在YARN集群模式中,可以通过 [attempt-id] 识别应用程序。在下面列出的API中,当在YARN集群模式下运行时, [app-id] 实际上将是 [base-app-id]/[attempt-id] ,其中 [base-app-id] 是YARN应用程序ID。

接口 含义
/applications 所有应用程序的列表。
?status=[completed|running] 仅列出处于所选状态的应用程序。
?minDate=[date] 列出最早的开始日期/时间。
?maxDate=[date] 列出最新的开始日期/时间。
?minEndDate=[date] 列出最早的结束日期/时间。
?maxEndDate=[date] 列出最新的结束日期/时间。
?limit=[limit] 限制列出的应用程序数量。
示例:
?minDate=2015-02-10
?minDate=2015-02-03T16:42:40.000GMT
?maxDate=2015-02-11T20:41:30.000GMT
?minEndDate=2015-02-12
?minEndDate=2015-02-12T09:15:10.000GMT
?maxEndDate=2015-02-14T16:30:45.000GMT
?limit=10
/applications/[app-id]/jobs 给定应用程序的所有作业列表。
?status=[running|succeeded|failed|unknown] 仅列出处于特定状态的作业。
/applications/[app-id]/jobs/[job-id] 给定作业的详细信息。
/applications/[app-id]/stages 给定应用程序的所有阶段列表。
?status=[active|complete|pending|failed] 仅列出处于给定状态的阶段。
?details=true 列出所有阶段及其任务数据。
?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 仅列出具有指定任务状态的任务。查询参数 taskStatus 仅在 details=true 时生效。这还支持多个 taskStatus ,例如 ?details=true&taskStatus=SUCCESS&taskStatus=FAILED ,这将返回符合任何指定任务状态的所有任务。
?withSummaries=true 列出带有任务指标分布和执行器指标分布的阶段。
?quantiles=0.0,0.25,0.5,0.75,1.0 用给定的分位数汇总指标。查询参数 quantiles 仅在 withSummaries=true 时生效。默认值为 0.0,0.25,0.5,0.75,1.0
/applications/[app-id]/stages/[stage-id] 给定阶段的所有尝试列表。
?details=true 列出给定阶段的所有尝试及其任务数据。
?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 仅列出具有指定任务状态的任务。查询参数 taskStatus 仅在 details=true 时生效。这还支持多个 taskStatus ,例如 ?details=true&taskStatus=SUCCESS&taskStatus=FAILED ,这将返回符合任何指定任务状态的所有任务。
?withSummaries=true 列出每次尝试的任务指标分布和执行器指标分布。
?quantiles=0.0,0.25,0.5,0.75,1.0 用给定的分位数汇总指标。查询参数 quantiles 仅在 withSummaries=true 时生效。默认值为 0.0,0.25,0.5,0.75,1.0
示例:
?details=true
?details=true&taskStatus=RUNNING
?withSummaries=true
?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] 给定阶段尝试的详细信息。
?details=true 列出给定阶段尝试的所有任务数据。
?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 仅列出具有指定任务状态的任务。查询参数 taskStatus 仅在 details=true 时生效。这还支持多个 taskStatus ,例如 ?details=true&taskStatus=SUCCESS&taskStatus=FAILED ,这将返回符合任何指定任务状态的所有任务。
?withSummaries=true 列出给定阶段尝试的任务指标分布和执行器指标分布。
?quantiles=0.0,0.25,0.5,0.75,1.0 用给定的分位数汇总指标。查询参数 quantiles 仅在 withSummaries=true 时生效。默认值为 0.0,0.25,0.5,0.75,1.0
示例:
?details=true
?details=true&taskStatus=RUNNING
?withSummaries=true
?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary 给定阶段尝试中所有任务的摘要指标。
?quantiles 用给定的分位数汇总指标。
示例: ?quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList 给定阶段尝试的所有任务列表。
?offset=[offset]&length=[len] 列出给定范围内的任务。
?sortBy=[runtime|-runtime] 排序任务。
?status=[running|success|killed|failed|unknown] 仅列出特定状态的任务。
示例: ?offset=10&length=50&sortBy=runtime&status=running
/applications/[app-id]/executors 给定应用程序的所有活动执行器的列表。
/applications/[app-id]/executors/[executor-id]/threads 给定活动执行器中所有线程的堆栈跟踪。 通过历史服务器无法获得。
/applications/[app-id]/allexecutors 给定应用程序的所有(活动和非活动)执行器的列表。
/applications/[app-id]/storage/rdd 给定应用程序存储的 RDD 列表。
/applications/[app-id]/storage/rdd/[rdd-id] 给定 RDD 的存储状态详细信息。
/applications/[base-app-id]/logs 以压缩文件的形式下载给定应用程序所有尝试的事件日志。
/applications/[base-app-id]/[attempt-id]/logs 以压缩文件的形式下载特定应用程序尝试的事件日志。
/applications/[app-id]/streaming/statistics 流式上下文的统计信息。
/applications/[app-id]/streaming/receivers 所有流式接收器的列表。
/applications/[app-id]/streaming/receivers/[stream-id] 给定接收器的详细信息。
/applications/[app-id]/streaming/batches 所有保留批次的列表。
/applications/[app-id]/streaming/batches/[batch-id] 给定批次的详细信息。
/applications/[app-id]/streaming/batches/[batch-id]/operations 给定批次的所有输出操作列表。
/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] 给定批次和给定操作的详细信息。
/applications/[app-id]/sql 给定应用程序的所有查询的列表。
?details=[true (默认) | false] 列出/隐藏 Spark 计划节点的详细信息。
?planDescription=[true (默认) | false] 在物理计划大小较大时按需启用/禁用物理 planDescription
?offset=[offset]&length=[len] 列出给定范围内的查询。
/applications/[app-id]/sql/[execution-id] 给定查询的详细信息。
?details=[true (默认) | false] 除给定查询详细信息外,还列出/隐藏度量详细信息。
?planDescription=[true (默认) | false] 在物理计划大小较大时为给定查询按需启用/禁用物理 planDescription
/applications/[app-id]/environment 给定应用程序的环境详细信息。
/version 获取当前的 Spark 版本。

可以检索的作业和阶段的数量受到独立Spark UI相同的保留机制的限制; "spark.ui.retainedJobs" 定义了触发作业垃圾收集的阈值, spark.ui.retainedStages 则针对阶段。请注意,垃圾收集发生在回放时:通过增加这些值并重启历史服务器,可以检索更多条目。

执行器任务指标

REST API暴露了由Spark执行器收集的任务指标的值,粒度为任务执行。这些指标可用于性能故障排除和工作负载特征化。可用指标的列表及简短描述如下:

Spark Executor Task Metric name 简短描述
executorRunTime 执行器在运行此任务上花费的时间。这包括获取洗牌数据的时间。值以毫秒表示。
executorCpuTime 执行器在运行此任务上花费的 CPU 时间。这包括获取洗牌数据的时间。值以纳秒表示。
executorDeserializeTime 反序列化此任务所花费的时间。值以毫秒表示。
executorDeserializeCpuTime 在执行器上反序列化此任务所花费的 CPU 时间。值以纳秒表示。
resultSize 此任务传输回驱动程序作为 TaskResult 的字节数。
jvmGCTime JVM 在执行此任务期间用于垃圾收集的时间。值以毫秒表示。
resultSerializationTime 序列化任务结果所花费的时间。值以毫秒表示。
memoryBytesSpilled 此任务溢出的内存字节数。
diskBytesSpilled 此任务溢出的磁盘字节数。
peakExecutionMemory 在洗牌、聚合和连接过程中创建的内部数据结构所使用的峰值内存。该累加器的值应大致等于在此任务中创建的所有此类数据结构的峰值大小之和。对于 SQL 作业,这仅跟踪所有不安全操作符和 ExternalSort。
inputMetrics.* 与从 org.apache.spark.rdd.HadoopRDD 或持久化数据读取数据相关的指标。
.bytesRead 读取的字节总数。
.recordsRead 读取的记录总数。
outputMetrics.* 与外部写入数据(例如,写入分布式文件系统)相关的指标,仅在有输出的任务中定义。
.bytesWritten 写入的字节总数
.recordsWritten 写入的记录总数
shuffleReadMetrics.* 与洗牌读取操作相关的指标。
.recordsRead 在洗牌操作中读取的记录数
.remoteBlocksFetched 在洗牌操作中提取的远程块数
.localBlocksFetched 在洗牌操作中提取的本地(与从远程执行器读取相对)块数
.totalBlocksFetched 在洗牌操作中提取的块总数(本地和远程)
.remoteBytesRead 在洗牌操作中读取的远程字节数
.localBytesRead 在洗牌操作中从本地磁盘读取的字节数(与从远程执行器读取相对)
.totalBytesRead 在洗牌操作中读取的字节总数(本地和远程)
.remoteBytesReadToDisk 在洗牌操作中读取到磁盘的远程字节数。大块在洗牌读取操作中被提取到磁盘,而不是被读入内存,这是默认行为。
.fetchWaitTime 任务等待远程洗牌块的时间。这仅包括在洗牌输入数据上阻塞的时间。例如,如果块 B 在任务尚未完成处理块 A 时被提取,则不被视为在块 B 上阻塞。值以毫秒表示。
shuffleWriteMetrics.* 与写入洗牌数据的操作相关的指标。
.bytesWritten 在洗牌操作中写入的字节数
.recordsWritten 在洗牌操作中写入的记录数
.writeTime 在写入磁盘或缓冲区缓存时阻塞所花费的时间。值以纳秒表示。

执行器指标

执行器级别的指标作为心跳的一部分从每个执行器发送到驱动程序,以描述执行器自身的性能指标,如JVM堆内存、GC信息。 Executor指标值及其每个执行器的内存峰值通过REST API以JSON格式和Prometheus格式公开。 JSON端点位于: /applications/[app-id]/executors ,Prometheus端点位于: /metrics/executors/prometheus 。 Prometheus端点取决于一个配置参数: spark.ui.prometheus.enabled=true (默认值为 false )。 此外,如果 spark.eventLog.logStageExecutorMetrics 为真,则执行器内存指标的按阶段聚合峰值将写入事件日志。 Executor内存指标也通过基于 Dropwizard指标库 的Spark指标系统公开。 以下是可用指标的列表,附带简短描述:

执行器级别指标名称 简短描述
rddBlocks 此执行器的块管理器中的 RDD 块。
memoryUsed 此执行器使用的存储内存。
diskUsed 此执行器用于 RDD 存储的磁盘空间。
totalCores 此执行器中可用的核心数。
maxTasks 此执行器中可以并发运行的最大任务数。
activeTasks 当前正在执行的任务数。
failedTasks 此执行器中已失败的任务数。
completedTasks 此执行器中已完成的任务数。
totalTasks 此执行器中任务的总数(运行、失败和完成)。
totalDuration JVM 在此执行器中执行任务所花费的时间。 该值以毫秒为单位表示。
totalGCTime JVM 在此执行器中进行垃圾收集所花费的时间总和。 该值以毫秒为单位表示。
totalInputBytes 此执行器中输入的总字节数。
totalShuffleRead 此执行器中读取的总 shuffle 字节数。
totalShuffleWrite 此执行器中写入的总 shuffle 字节数。
maxMemory 可用于存储的总内存量(字节数)。
memoryMetrics.* 内存指标的当前值:
.usedOnHeapStorageMemory 当前用于存储的堆内存(字节数)。
.usedOffHeapStorageMemory 当前用于存储的非堆内存(字节数)。
.totalOnHeapStorageMemory 可用于存储的总堆内存(字节数)。此数量可能会随着时间而变化,具体取决于 MemoryManager 的实现。
.totalOffHeapStorageMemory 可用于存储的总非堆内存(字节数)。此数量可能会随着时间而变化,具体取决于 MemoryManager 的实现。
peakMemoryMetrics.* 内存(和 GC)指标的峰值:
.JVMHeapMemory 用于对象分配的堆的峰值内存使用量。 堆由一个或多个内存池组成。返回的内存使用量的使用和提交大小是所有堆内存池的这些值的总和,而返回的内存使用量的初始和最大大小表示堆内存的设置,这可能不是所有堆内存池的总和。 返回的内存使用量中使用的内存量是由活动对象和未被收集的垃圾对象(如果有)占用的内存量。
.JVMOffHeapMemory Java 虚拟机使用的非堆内存的峰值内存使用量。非堆内存由一个或多个内存池组成。返回的内存使用量的使用和提交大小是所有非堆内存池的这些值的总和,而返回的内存使用量的初始和最大大小表示非堆内存的设置,这可能不是所有非堆内存池的总和。
.OnHeapExecutionMemory 当前使用的峰值堆内存执行内存(字节数)。
.OffHeapExecutionMemory 当前使用的峰值非堆内存执行内存(字节数)。
.OnHeapStorageMemory 当前使用的峰值堆内存存储内存(字节数)。
.OffHeapStorageMemory 当前使用的峰值非堆内存存储内存(字节数)。
.OnHeapUnifiedMemory 当前使用的峰值堆内存(执行和存储)。
.OffHeapUnifiedMemory 当前使用的峰值非堆内存(执行和存储)。
.DirectPoolMemory JVM 用于直接缓冲池的峰值内存( java.lang.management.BufferPoolMXBean
.MappedPoolMemory JVM 用于映射缓冲池的峰值内存( java.lang.management.BufferPoolMXBean
.ProcessTreeJVMVMemory 虚拟内存大小(字节数)。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。
.ProcessTreeJVMRSSMemory 常驻集大小:进程在实际内存中拥有的页面数量。 这只是计算文本、数据或堆栈空间的页面。 这不包括未按需加载的页面或已交换出的页面。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。
.ProcessTreePythonVMemory Python 的虚拟内存大小(字节数)。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。
.ProcessTreePythonRSSMemory Python 的常驻集大小。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。
.ProcessTreeOtherVMemory 其他类型进程的虚拟内存大小(字节数)。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。
.ProcessTreeOtherRSSMemory 其他类型进程的常驻集大小。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。
.MinorGCCount 总小 GC 计数。例如,垃圾收集器是 Copy、PS Scavenge、ParNew、G1 Young Generation 等。
.MinorGCTime 经过的总小 GC 时间。 该值以毫秒为单位表示。
.MajorGCCount 总大 GC 计数。例如,垃圾收集器是 MarkSweepCompact、PS MarkSweep、ConcurrentMarkSweep、G1 Old Generation 等。
.MajorGCTime 经过的总大 GC 时间。 该值以毫秒为单位表示。

RSS和Vmem的计算基于 proc(5)

API版本控制策略

这些端点已经进行了强版本控制,以便于在其上开发应用程序。特别是,Spark保证:

请注意,即使在检查正在运行的应用程序的用户界面时, applications/[app-id] 部分仍然是必需的,尽管只有一个可用的应用程序。例如,要查看正在运行的应用程序的作业列表,您可以访问 http://localhost:4040/api/v1/applications/[app-id]/jobs 。这是为了保持两种模式下路径的一致性。

指标

Spark 具有一个基于 Dropwizard Metrics Library 的可配置指标系统。 这使得用户可以将 Spark 指标报告到多种目的地,包括 HTTP、JMX 和 CSV 文件。指标由嵌入在 Spark 代码库中的源生成。它们为特定活动和 Spark 组件提供了仪器化。指标系统通过一个配置文件配置,Spark 希望该文件存在于 $SPARK_HOME/conf/metrics.properties 。可以通过 spark.metrics.conf 配置属性 指定自定义文件位置。可以使用以 spark.metrics.conf. 为前缀的一组配置参数,而不是使用配置文件。默认情况下,用于驱动程序或执行程序指标的根命名空间是 spark.app.id 的值。 然而,用户通常希望能够跨应用程序跟踪驱动程序和执行程序的指标,而这很难通过应用程序 ID(即 spark.app.id )来实现,因为它在每次调用应用程序时都会改变。对于这种用例,可以使用 spark.metrics.namespace 配置属性为指标报告指定自定义命名空间。如果用户希望将指标命名空间设置为应用程序的名称,他们可以将 spark.metrics.namespace 属性设置为像 ${spark.app.name} 这样的值。然后,Spark 会适当地扩展此值,并将其用作指标系统的根命名空间。非驱动程序和执行程序指标从未带有 spark.app.id 前缀, spark.metrics.namespace 属性对此类指标也没有任何影响。

Spark的指标被解耦为不同的 实例 ,对应于Spark组件。在每个实例中,您可以配置一组将指标报告到的汇聚器。当前支持以下实例:

每个实例可以报告给零个或多个 接收器 。接收器包含在 org.apache.spark.metrics.sink 包中:

Spark还支持一个Ganglia sink,由于许可限制,它没有被包含在默认构建中:

要安装 GangliaSink ,您需要执行 Spark 的自定义构建。 请注意,通过嵌入此库,您将在您的 Spark 包中包含 LGPL 许可的代码 。对于 sbt 用户,在构建之前设置 SPARK_GANGLIA_LGPL 环境变量。对于 Maven 用户,启用 -Pspark-ganglia-lgpl 配置文件。除了修改集群的 Spark 构建外,用户应用程序还需要链接到 spark-ganglia-lgpl 工件。

指标配置文件的语法以及每个接收器可用的参数在一个示例配置文件中定义, $SPARK_HOME/conf/metrics.properties.template

当使用 Spark 配置参数而不是指标配置文件时,相关参数名称由前缀 spark.metrics.conf. 以及配置细节组成,即参数的格式如下: spark.metrics.conf.[instance|*].sink.[sink_name].[parameter_name] 。 该示例展示了用于 Graphite sink 的 Spark 配置参数列表:

"spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"
"spark.metrics.conf.*.sink.graphite.host"="graphiteEndPoint_hostName>"
"spark.metrics.conf.*.sink.graphite.port"=
"spark.metrics.conf.*.sink.graphite.period"=10
"spark.metrics.conf.*.sink.graphite.unit"=秒
"spark.metrics.conf.*.sink.graphite.prefix"="可选前缀"
"spark.metrics.conf.*.sink.graphite.regex"="可选正则表达式用于发送匹配的指标"

Spark指标配置的默认值如下:

"*.sink.servlet.class" = "org.apache.spark.metrics.sink.MetricsServlet"
"*.sink.servlet.path" = "/metrics/json"
"master.sink.servlet.path" = "/metrics/master/json"
"applications.sink.servlet.path" = "/metrics/applications/json"

可以使用度量配置文件或配置参数 spark.metrics.conf.[component_name].source.jvm.class=[source_name] 配置附加源。目前,JVM 源是唯一可用的可选源。例如,以下配置参数激活 JVM 源: "spark.metrics.conf.*.source.jvm.class"="org.apache.spark.metrics.source.JvmSource"

可用指标提供者列表

Spark使用的指标有多种类型:计量器、计数器、直方图、计量器和定时器,详见 Dropwizard库文档 。以下组件和指标的列表报告了可用指标的名称及一些细节,按组件实例和源命名空间分组。在Spark仪器中使用的最常见的指标类型是计量器和计数器。计数器可以通过其后缀 .count 识别。定时器、计量器和直方图在列表中进行了注解,其余列表元素是计量器类型的指标。绝大多数指标在其父组件实例配置后会立即激活,有些指标还需要通过额外的配置参数来启用,详细信息在列表中报告。

组件实例 = 驱动程序

这是带有最大数量的仪表化指标的组件

组件实例 = 执行器

这些指标由Spark执行器公开。

来源 = JVM 来源

备注:

组件实例 = applicationMaster

注意:在YARN上运行时适用

组件实例 = mesos_cluster

注意:在运行于mesos时适用

组件实例 = master

注意:在以Spark独立模式作为主节点运行时适用

组件实例 = 应用程序源

注意:在以Spark独立模式作为主节点运行时适用

组件实例 = worker

注意:当在Spark独立模式下作为工作线程运行时适用

组件实例 = shuffleService

注意:适用于洗牌服务

高级仪器

可以使用几个外部工具来帮助分析Spark作业的性能:

Spark还提供了一个插件API,以便可以将自定义仪表代码添加到Spark应用程序中。可以用于将插件加载到Spark中的两个配置键:

两者都接受一个以逗号分隔的类名列表,这些类名实现了 org.apache.spark.api.plugin.SparkPlugin 接口。存在这两个名称是为了可以将一个列表放入 Spark 默认配置文件,使用户能够轻松地从命令行添加其他插件,而无需覆盖配置文件中的列表。重复的插件会被忽略。