Spark 配置

Spark提供了三个位置来配置系统:

Spark 属性

Spark 属性控制大多数应用程序设置,并为每个应用程序单独配置。这些属性可以直接在传递给您的 SparkConf 上设置。 SparkConf 允许您配置一些常见属性(例如,主 URL 和应用程序名称),以及通过 set() 方法配置任意键值对。例如,我们可以按照如下方式初始化一个具有两个线程的应用程序:

请注意,我们以 local[2] 运行,这意味着使用两个线程 - 这代表了“最小”并行性,这可以帮助检测仅在分布式环境中运行时存在的错误。

val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
val sc = new SparkContext(conf)

请注意,在本地模式下,我们可以拥有多个线程,在像 Spark Streaming 这样的情况下,我们实际上可能需要多个线程以防止任何类型的资源耗尽问题。

指定某个时间持续时间的属性应配置时间单位。接受以下格式:

25毫秒 (milliseconds)
5秒 (seconds)
10分钟或10min (minutes)
3小时 (hours)
5天 (days)
1年 (years)

指定字节大小的属性应配置为大小单位。接受以下格式:

1b(字节)
1k或1kb(基比特 = 1024字节)
1m或1mb(梅比特 = 1024基比特)
1g或1gb(吉比特 = 1024梅比特)
1t或1tb(特比特 = 1024吉比特)
1p或1pb(佩比特 = 1024特比特)

虽然没有单位的数字通常被解释为字节,但有一些会被解释为 KiB 或 MiB。请参阅各个配置属性的文档。在可能的情况下,指定单位是可取的。

动态加载 Spark 属性

在某些情况下,您可能希望避免在 SparkConf 中硬编码某些配置。例如,如果您想要在不同的主节点或不同的内存大小下运行相同的应用程序。Spark 允许您简单地创建一个空的配置:

val sc = new SparkContext(new SparkConf())

然后,您可以在运行时提供配置值:

./bin/spark-submit --name "我的应用" --master local[4] --conf spark.eventLog.enabled=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

Spark Shell 和 spark-submit 工具支持两种动态加载配置的方法。第一种是命令行选项, 例如 --master ,如上所示。 spark-submit 可以使用 --conf/-c 标志接受任何 Spark 属性,但对于在启动 Spark 应用程序时起作用的属性则使用特殊标志。 运行 ./bin/spark-submit --help 将显示这些选项的完整列表。

bin/spark-submit 还会从 conf/spark-defaults.conf 读取配置选项,其中每行由一个键和一个值组成,二者之间用空格分隔。例如:

spark.master            spark://5.6.7.8:7077
spark.executor.memory   4g
spark.eventLog.enabled  true
spark.serializer        org.apache.spark.serializer.KryoSerializer

任何指定为标志或在属性文件中的值将被传递给应用程序,并与通过 SparkConf 指定的值合并。直接在 SparkConf 上设置的属性具有最高优先级,然后是传递给 spark-submit spark-shell 的标志,最后是 spark-defaults.conf 文件中的选项。由于 Spark 的早期版本,一些配置键已被重命名;在这种情况下,仍然接受旧的键名,但优先级低于任何新的键实例。

Spark 属性主要可以分为两种:一种与部署相关,比如 “spark.driver.memory”、“spark.executor.instances”,这种属性在运行时通过 SparkConf 进行编程设置时可能不会受到影响,或者其行为取决于您选择的集群管理器和部署模式,因此建议通过配置文件或 spark-submit 命令行选项进行设置;另一种主要与 Spark 运行时控制相关,比如 “spark.task.maxFailures”,这种属性可以通过两种方式进行设置。

查看Spark属性

应用程序的Web UI在 http:// :4040 列出了Spark属性,在“环境”标签中。 这是一个检查属性是否已正确设置的有用地方。请注意,只有通过 spark-defaults.conf SparkConf 或命令行明确指定的值会出现。对于所有其他配置属性,您可以假设使用的是默认值。

可用属性

控制内部设置的大多数属性都有合理的默认值。最常用的一些设置选项包括:

应用程序属性

属性名称 默认值 含义 自版本
spark.app.name (无) 您的应用程序的名称。这将在 UI 中和日志数据中显示。 0.9.0
spark.driver.cores 1 集群模式下用于驱动程序进程的核心数量。 1.3.0
spark.driver.maxResultSize 1g 每个 Spark 操作(例如 collect)所有分区序列化结果的总大小限制(以字节为单位)。应至少为 1M,或 0 表示无限制。如果总大小超出此限制,作业将被中止。 设置高限制可能会导致驱动程序的内存不足错误(取决于 spark.driver.memory 和 JVM 中对象的内存开销)。设置适当的限制可以保护驱动程序免受内存不足错误。 1.2.0
spark.driver.memory 1g 用于驱动程序进程的内存量,即初始化 SparkContext 的地方,以带有大小单位后缀的 JVM 内存字符串格式(“k”,“m”,“g”或“t”)(例如 512m , 2g )。
注意: 在客户端模式下,必须通过 SparkConf 直接在应用程序中设置此配置,因为在此时驱动程序 JVM 已经启动。 相反,请通过 --driver-memory 命令行选项或在默认属性文件中设置。
1.1.1
spark.driver.memoryOverhead driverMemory * spark.driver.memoryOverheadFactor , 最小为 384 在集群模式下,每个驱动程序进程分配的非堆内存量,以 MiB 为单位,除非另有说明。这是用于诸如 VM 开销、驻留字符串、其他本机开销等的内存。这通常随着容器大小而增长(通常为 6-10%)。 此选项当前在 YARN、Mesos 和 Kubernetes 上受支持。 注意: 非堆内存包括离堆内存(当 spark.memory.offHeap.enabled=true 时)和其他驱动程序进程使用的内存(例如与 PySpark 驱动程序一起使用的 python 进程)以及同一容器中运行的其他非驱动程序进程使用的内存。运行驱动程序的容器的最大内存大小由 spark.driver.memoryOverhead spark.driver.memory 的总和决定。 2.3.0
spark.driver.memoryOverheadFactor 0.10 每个驱动程序进程在集群模式下作为额外非堆内存分配的驱动程序内存的比例。这个内存包括 VM 开销、驻留字符串、其他本机开销等。这个值通常随着容器大小而增长。 默认情况下,除了 Kubernetes 非 JVM 作业外,此值默认为 0.10,Kubernetes 非 JVM 作业默认为 0.40。这是因为非 JVM 任务需要更多的非 JVM 堆空间,而此类任务通常会因“内存开销超限”错误而失败。这样可以提前避免错误,使用更高的默认值。 如果直接设置了 spark.driver.memoryOverhead ,则忽略此值。 3.3.0
spark.driver.resource.{resourceName}.amount 0 用于驱动程序的特定资源类型的数量。 如果使用此配置,您还必须指定 spark.driver.resource.{resourceName}.discoveryScript 以便驱动程序在启动时找到资源。 3.0.0
spark.driver.resource.{resourceName}.discoveryScript 驱动程序运行的脚本,用于发现特定资源类型。该脚本应写入 STDOUT,格式为 ResourceInformation 类的 JSON 字符串,包括名称和地址数组。对于客户端提交的驱动程序,发现脚本必须为该驱动程序分配与同一主机上其他驱动程序不同的资源地址。 3.0.0
spark.driver.resource.{resourceName}.vendor 用于驱动程序的资源的供应商。此选项当前只在 Kubernetes 上受支持,并且实际上既是供应商又是 following Kubernetes 设备插件命名约定的域名。(例如:对于 Kubernetes 上的 GPU,此配置应设置为 nvidia.com 或 amd.com) 3.0.0
spark.resources.discoveryPlugin org.apache.spark.resource.ResourceDiscoveryScriptPlugin 用于加载到应用程序中的类名的逗号分隔列表,这些类实现了 org.apache.spark.api.resource.ResourceDiscoveryPlugin。这是面向高级用户的,用于用自定义实现替换资源发现类。Spark 将尝试每个指定的类,直到其中一个返回该资源的资源信息。如果没有插件返回该资源的信息,它将最后尝试发现脚本。 3.0.0
spark.executor.memory 1g 每个执行器进程使用的内存量,格式与 JVM 内存字符串相同,带有大小单位后缀(“k”,“m”,“g”或“t”)(例如 512m , 2g )。 0.7.0
spark.executor.pyspark.memory 未设置 分配给每个执行器的 PySpark 内存量,以 MiB 为单位,除非另有说明。如果设置,执行器的 PySpark 内存将被限制在此数量。如果未设置,Spark 将不限制 Python 的内存使用,而应用程序必须避免超出与其他非 JVM 进程共享的开销内存空间。当 PySpark 在 YARN 或 Kubernetes 中运行时,此内存会被添加到执行器资源请求中。
注意: 此功能依赖于 Python 的 `resource` 模块;因此,行为和限制都是继承的。例如,Windows 不支持资源限制,MacOS 上的实际资源没有被限制。
2.4.0
spark.executor.memoryOverhead executorMemory * spark.executor.memoryOverheadFactor , 最小为 384 每个执行器进程分配的额外内存量,以 MiB 为单位,除非另有说明。这是用于诸如 VM 开销、驻留字符串、其他本机开销等的内存。这个值通常随着执行器大小而增长(通常为 6-10%)。此选项当前在 YARN 和 Kubernetes 上受支持。
注意: 额外内存包括 PySpark 执行器内存(当 spark.executor.pyspark.memory 未配置时)和其他非执行器进程在同一容器中使用的内存。运行执行器的容器的最大内存大小由 spark.executor.memoryOverhead spark.executor.memory spark.memory.offHeap.size spark.executor.pyspark.memory 的总和决定。
2.3.0
spark.executor.memoryOverheadFactor 0.10 每个执行器进程分配的额外非堆内存的执行器内存的比例。该内存包括 VM 开销、驻留字符串、其他本机开销等。该值通常随着容器大小而增长。这个值默认为 0.10,除了 Kubernetes 非 JVM 作业外,后者默认为 0.40。这样做是因为非 JVM 任务需要更多的非 JVM 堆空间,此类任务通常会因“内存开销超限”错误而失败。这样可以提前避免错误,使用更高的默认值。 如果直接设置了 spark.executor.memoryOverhead ,则忽略此值。 3.3.0
spark.executor.resource.{resourceName}.amount 0 用于每个执行器进程的特定资源类型的数量。 如果使用此配置,您还必须指定 spark.executor.resource.{resourceName}.discoveryScript 以便执行器在启动时找到资源。 3.0.0
spark.executor.resource.{resourceName}.discoveryScript 让执行器运行的脚本,用于发现特定资源类型。该脚本应写入 STDOUT,格式为 ResourceInformation 类的 JSON 字符串,包括名称和地址数组。 3.0.0
spark.executor.resource.{resourceName}.vendor 用于执行器的资源的供应商。此选项当前仅在 Kubernetes 上受支持,并且实际上既是供应商又是 following Kubernetes 设备插件命名约定的域名。(例如:对于 Kubernetes 上的 GPU,此配置应设置为 nvidia.com 或 amd.com) 3.0.0
spark.extraListeners (无) 实现 SparkListener 的类的逗号分隔列表;在初始化 SparkContext 时,这些类的实例将被创建并注册到 Spark 的监听器总线。如果一个类有一个接受 SparkConf 的单参数构造函数,将调用该构造函数;否则,将调用零参数构造函数。如果找不到有效的构造函数,SparkContext 创建将失败并抛出异常。 1.3.0
spark.local.dir /tmp 用于 Spark 的“临时”空间的目录,包括映射输出文件和存储在磁盘上的 RDD。这应该位于您系统中的快速本地磁盘上。它还可以是多个目录的以逗号分隔的列表,这些目录位于不同的磁盘上。
注意: 如果集群管理器设置了 SPARK_LOCAL_DIRS(独立),MESOS_SANDBOX(Mesos)或 LOCAL_DIRS(YARN)环境变量,则将覆盖此设置。
0.5.0
spark.logConf false 当启动 SparkContext 时记录有效的 SparkConf 为 INFO。 0.9.0
spark.master (无) 要连接的集群管理器。请参见 允许的主 URL 列表 0.9.0
spark.submit.deployMode client Spark 驱动程序程序的部署模式,可以是“client”或“cluster”, 这意味着在本地启动驱动程序程序(“client”)或在集群中的节点之一上远程启动驱动程序程序(“cluster”)。 1.5.0
spark.log.callerContext (无) 运行在 Yarn/HDFS 上时,将写入 Yarn RM 日志/HDFS 审计日志的应用程序信息。其长度取决于 Hadoop 配置 hadoop.caller.context.max.size 。应该简洁,通常可以有多达 50 个字符。 2.2.0
spark.log.level (无) 设置时,将覆盖任何用户定义的日志设置,就像在 Spark 启动时调用 SparkContext.setLogLevel() 一样。有效的日志级别包括:“ALL”、“DEBUG”、“ERROR”、“FATAL”、“INFO”、“OFF”、“TRACE”、“WARN”。 3.5.0
spark.driver.supervise false 如果为真,则在驱动程序以非零退出状态失败时自动重新启动驱动程序。 仅在 Spark 独立模式或 Mesos 集群部署模式下生效。 1.3.0
spark.driver.log.dfsDir (无) 如果 spark.driver.log.persistToDfs.enabled 为真,将同步 Spark 驱动程序日志的基础目录。在该基础目录中,每个应用程序将驱动程序日志记录到特定于应用程序的文件中。用户可能希望将其设置为统一位置,例如 HDFS 目录,以便驱动日志文件可以持久保存以供后续使用。该目录应允许任何 Spark 用户读写文件,并允许 Spark 历史服务器用户删除文件。此外,由于 spark.history.fs.driverlog.cleaner.enabled 为真,较旧的日志会通过 Spark 历史服务器 进行清理,如果它们的年龄超过通过 spark.history.fs.driverlog.cleaner.maxAge 设置的最大年龄。 3.0.0
spark.driver.log.persistToDfs.enabled false 如果为真,在客户端模式下运行的 spark 应用程序将把驱动日志写入持久存储中,该存储在 spark.driver.log.dfsDir 中配置。如果未配置 spark.driver.log.dfsDir ,则驱动日志不会被持久保存。此外,通过在 Spark 历史服务器 中将 spark.history.fs.driverlog.cleaner.enabled 设置为真,可以启用清理功能。 3.0.0
spark.driver.log.layout %d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n%ex 同步到 spark.driver.log.dfsDir 的驱动程序日志的布局。如果未配置,则使用 log4j2.properties 中定义的第一个追加器的布局。如果也未配置,驱动程序日志将使用默认布局。 3.0.0
spark.driver.log.allowErasureCoding false 是否允许驱动日志使用纠删码。在 HDFS 上,纠删码文件的更新速度不会像常规复制文件那样快,因此可能需要更长时间才能反映应用程序写入的更改。请注意,即使这是正确的,Spark 仍然不会强制文件使用纠删码,它只会使用文件系统默认值。 3.0.0
spark.decommission.enabled false 启用弃用时,Spark 将尽力优雅地关闭执行器。Spark 将尝试在启用 spark.storage.decommission.rddBlocks.enabled 时将所有 RDD 块迁移(受控制)从弃用的执行器迁移到远程执行器,并在启用 spark.storage.decommission.shuffleBlocks.enabled 时进行处理。启用弃用时,当 spark.dynamicAllocation.enabled 启用时,Spark 也会弃用执行器,而不是杀死它。 3.1.0
spark.executor.decommission.killInterval (无) 弃用执行器后,外部(例如非 Spark)服务将强制关闭的持续时间。 3.1.0
spark.executor.decommission.forceKillTimeout (无) 强制 Spark 关闭弃用执行器的持续时间。对于大多数情况,这应该设置为一个较高的值,因为较低的值将阻止块迁移有足够的时间完成。 3.2.0
spark.executor.decommission.signal PWR 用于触发执行器开始弃用的信号。 3.2.0
spark.executor.maxNumFailures numExecutors * 2,最小为 3 在应用程序失败之前执行器失败的最大次数。此配置仅在 YARN 上生效,或在 `spark.kubernetes.allocation.pods.allocator` 设置为 'direct' 时的 Kubernetes 上。 3.5.0
spark.executor.failuresValidityInterval (无) 执行器失败被视为独立的时间间隔,并且不累积到尝试计数中。此配置仅在 YARN 上生效,或在 `spark.kubernetes.allocation.pods.allocator` 设置为 'direct' 时的 Kubernetes 上。 3.5.0

除了这些,以下属性也可用,并且在某些情况下可能会有用:

运行环境

属性名称 默认值 含义 自版本起
spark.driver.extraClassPath (无) 在驱动程序的类路径前面添加额外的类路径条目。
注意: 在客户端模式下,该配置不得通过 SparkConf 直接在应用程序中设置,因为此时驱动程序 JVM 已经启动。 相反,请通过 --driver-class-path 命令行选项或在 默认属性文件中设置此项。
1.0.0
spark.driver.defaultJavaOptions (无) 要在 spark.driver.extraJavaOptions 前面添加的默认 JVM 选项字符串。 这主要由管理员设置。 例如,GC 设置或其他日志。 请注意,使用此选项设置最大堆大小 (-Xmx) 设置是非法的。最大堆 大小设置可以在集群模式下通过 spark.driver.memory 设置,在客户端模式下 通过 --driver-memory 命令行选项设置。
注意: 在客户端模式下,该配置不得通过 SparkConf 直接在应用程序中设置,因为此时驱动程序 JVM 已经启动。 相反,请通过 --driver-java-options 命令行选项或在 默认属性文件中设置此项。
3.0.0
spark.driver.extraJavaOptions (无) 要传递给驱动程序的额外 JVM 选项字符串。此项旨在由用户设置。 例如,GC 设置或其他日志。 请注意,使用此选项设置最大堆大小 (-Xmx) 设置是非法的。最大堆 大小设置可以在集群模式下通过 spark.driver.memory 设置,在客户端模式下 通过 --driver-memory 命令行选项设置。
注意: 在客户端模式下,该配置不得通过 SparkConf 直接在应用程序中设置,因为此时驱动程序 JVM 已经启动。 相反,请通过 --driver-java-options 命令行选项或在 默认属性文件中设置此项。 spark.driver.defaultJavaOptions 将前置于此配置。
1.0.0
spark.driver.extraLibraryPath (无) 在启动驱动程序 JVM 时设置特殊的库路径。
注意: 在客户端模式下,该配置不得通过 SparkConf 直接在应用程序中设置,因为此时驱动程序 JVM 已经启动。 相反,请通过 --driver-library-path 命令行选项或在 默认属性文件中设置此项。
1.0.0
spark.driver.userClassPathFirst false (实验性)在驱动程序加载类时,用户添加的 jars 是否优先于 Spark 自己的 jars。 此功能可用于减轻 Spark 的依赖项与用户依赖项之间的冲突。 这当前是一个实验性功能。 仅在集群模式下使用。 1.3.0
spark.executor.extraClassPath (无) 在执行器的类路径前面添加额外的类路径条目。这个设置主要是为了 兼容旧版本的 Spark。用户通常不需要设置此选项。 1.0.0
spark.executor.defaultJavaOptions (无) 要在 spark.executor.extraJavaOptions 前面添加的默认 JVM 选项字符串。 这主要由管理员设置。 例如,GC 设置或其他日志。 请注意,使用此选项设置 Spark 属性或最大堆大小 (-Xmx) 设置是非法的。 Spark 属性应使用 SparkConf 对象或与 spark-submit 脚本一起使用的 spark-defaults.conf 文件设置。 最大堆大小设置可以通过 spark.executor.memory 设置。 以下符号(如果存在)将被插值:将被替换为 应用程序 ID, 将被替换为执行器 ID。例如,要启用 以应用程序的执行器 ID 命名的文件在 /tmp 中进行详细的 gc 日志,传递的“值”为: -verbose:gc -Xloggc:/tmp/-.gc 3.0.0
spark.executor.extraJavaOptions (无) 要传递给执行器的额外 JVM 选项字符串。此项旨在由用户设置。 例如,GC 设置或其他日志。 请注意,使用此选项设置 Spark 属性或最大堆大小 (-Xmx) 设置是非法的。 Spark 属性应使用 SparkConf 对象或与 spark-submit 脚本一起使用的 spark-defaults.conf 文件设置。 最大堆大小设置可以通过 spark.executor.memory 设置。 以下符号(如果存在)将被插值:将被替换为 应用程序 ID, 将被替换为执行器 ID。例如,要启用 以应用程序的执行器 ID 命名的文件在 /tmp 中进行详细的 gc 日志,传递的“值”为: -verbose:gc -Xloggc:/tmp/-.gc spark.executor.defaultJavaOptions 将前置于此配置。 1.0.0
spark.executor.extraLibraryPath (无) 在启动执行器 JVM 时设置特殊的库路径。 1.0.0
spark.executor.logs.rolling.maxRetainedFiles -1 设置系统将保留的最新滚动日志文件的数量。 较旧的日志文件将被删除。 默认禁用。 1.1.0
spark.executor.logs.rolling.enableCompression false 启用执行器日志压缩。如果启用,滚动的执行器日志将被压缩。 默认禁用。 2.0.2
spark.executor.logs.rolling.maxSize 1024 * 1024 设置执行器日志文件的最大大小(以字节为单位),该日志将被滚动。 默认禁用滚动。有关旧日志自动清理,请参见 spark.executor.logs.rolling.maxRetainedFiles 1.4.0
spark.executor.logs.rolling.strategy (无) 设置执行器日志的滚动策略。默认情况下禁用。可以 设置为“时间”(基于时间滚动)或“大小”(基于大小滚动)。对于“时间”, 使用 spark.executor.logs.rolling.time.interval 设置滚动间隔。 对于“大小”,使用 spark.executor.logs.rolling.maxSize 设置 滚动的最大文件大小。 1.1.0
spark.executor.logs.rolling.time.interval 每日 设置执行器日志的滚动时间间隔。 默认情况下禁用滚动。有效值为 daily hourly minutely 或 以秒为单位的任何间隔。请参见 spark.executor.logs.rolling.maxRetainedFiles 有关旧日志的自动清理。 1.1.0
spark.executor.userClassPathFirst false (实验性)与 spark.driver.userClassPathFirst 相同功能,但 应用于执行器实例。 1.3.0
spark.executorEnv.[EnvironmentVariableName] (无) 将由 EnvironmentVariableName 指定的环境变量添加到执行器 进程中。用户可以指定多个环境变量。 0.9.0
spark.redaction.regex (?i)secret|password|token|access[.]key 正则表达式,用于决定驱动程序和 执行器环境中的哪些 Spark 配置属性和环境变量包含敏感信息。当此正则表达式匹配属性键或 值时,该值将从环境 UI 以及 YARN 和事件日志等各种日志中删除。 2.1.2
spark.redaction.string.regex (无) 正则表达式,用于决定 Spark 生成的字符串的哪些部分包含敏感信息。 当此正则表达式匹配字符串部分时,该字符串部分将被替换为一个虚拟值。 目前这用于隐藏 SQL 解释命令的输出。 2.2.0
spark.python.profile false 在 Python 工作器中启用分析,分析结果将在 sc.show_profiles() 中显示, 或在驱动程序退出前显示。它也可以通过 sc.dump_profiles(path) 导出到磁盘。如果某些分析结果已手动显示, 它们不会在驱动程序退出前自动显示。 默认使用 pyspark.profiler.BasicProfiler ,但可以通过将分析器类作为参数传递给 SparkContext 构造函数来覆盖。 1.2.0
spark.python.profile.dump (无) 用于在驱动程序退出前导出分析结果的目录。 结果将作为每个 RDD 的单独文件导出。它们可以通过 pstats.Stats() 加载。如果指定了此项,分析结果将不会自动显示。 1.2.0
spark.python.worker.memory 512m 在聚合期间每个 Python 工作进程使用的内存量,格式与带大小单位后缀的 JVM 内存字符串相同 (“k”,“m”,“g”或“t”) (例如 512m 2g )。 如果聚合期间使用的内存超过此数量,将会将数据溢出到磁盘。 1.1.0
spark.python.worker.reuse true 是否重用 Python 工作器。如果是,它将使用固定数量的 Python 工作器, 不需要为每个任务 fork() 一个 Python 进程。如果有大量广播,这将非常有用, 因为广播将不需要从 JVM 转移到 Python 工作器。 1.2.0
spark.files 要放置在每个执行器的工作目录中的文件的逗号分隔列表。支持通配符。 1.0.0
spark.submit.pyFiles 在 Python 应用程序的 PYTHONPATH 上放置的 .zip、.egg 或 .py 文件的逗号分隔列表。支持通配符。 1.0.1
spark.jars 要包含在驱动程序和执行器类路径中的 jars 的逗号分隔列表。支持通配符。 0.9.0
spark.jars.packages 要包含在驱动程序和执行器 类路径中的 jars 的 Maven 坐标的逗号分隔列表。坐标应为 groupId:artifactId:version。如果 spark.jars.ivySettings 被指定,工件将根据文件中的配置进行解析,否则将搜索本地 Maven 仓库,然后是 Maven 中央仓库,最后是命令行选项 --repositories 指定的任何其他远程 仓库。有关详细信息,请参见 高级依赖项管理 1.5.0
spark.jars.excludes 在解析 spark.jars.packages 中提供的依赖关系以避免依赖冲突时,要排除的 groupId:artifactId 的逗号分隔列表。 1.5.0
spark.jars.ivy 指定 Ivy 用户目录的路径,用于本地 Ivy 缓存和来自 spark.jars.packages 的包文件。这将覆盖 Ivy 属性 ivy.default.ivy.user.dir 的默认值 ~/.ivy2。 1.3.0
spark.jars.ivySettings 指向 Ivy 设置文件的路径,用于自定义解析使用 spark.jars.packages 指定的 jars,而不是内置的默认值,例如 Maven 中央仓库。由命令行给出的额外仓库 选项 --repositories spark.jars.repositories 也将被包含。 这对于允许 Spark 从防火墙后解析工件很有用,例如通过内部的 工件服务器(如 Artifactory)。有关设置文件格式的详细信息,请参考 设置文件 。 仅支持 file:// 方案的路径。不带方案的路径默认假设为 file:// 方案。

在 YARN 集群模式下运行时,此文件也将本地化到远程驱动程序,以便在 SparkContext#addJar 中进行依赖项解析
2.2.0
spark.jars.repositories 额外远程仓库的逗号分隔列表,用于查找与 --packages spark.jars.packages 给予的 Maven 坐标。 2.3.0
spark.archives 要提取到每个执行器工作目录中的档案的逗号分隔列表。 支持 .jar、.tar.gz、.tgz 和 .zip。您可以通过在要解压缩的文件名后添加 # 来指定要解压缩的目录名,例如 file.zip#directory 。 此配置为实验性。 3.1.0
spark.pyspark.driver.python 用于驱动程序的 PySpark 的 Python 二进制可执行文件。 (默认值为 spark.pyspark.python 2.1.0
spark.pyspark.python 用于驱动程序和执行器的 PySpark 的 Python 二进制可执行文件。 2.1.0

洗牌行为

属性名称 默认值 含义 自版本起
spark.reducer.maxSizeInFlight 48m 从每个reduce任务同时获取的map输出的最大大小,以MiB为单位,除非另有说明。由于每个输出需要我们创建一个缓冲区来接收,因此这表示每个reduce任务的固定内存开销,因此除非你有大量内存,否则应保持其较小。 1.4.0
spark.reducer.maxReqsInFlight Int.MaxValue 此配置限制在任何给定时间从远程请求中获取的块的数量。当集群中的主机数量增加时,这可能会导致一个或多个节点的入站连接数量非常庞大,造成工作节点在负载下失败。通过允许限制获取请求的数量,可以减轻这种情况。 2.0.0
spark.reducer.maxBlocksInFlightPerAddress Int.MaxValue 此配置限制从给定主机端口进行的每个reduce任务被提取的远程块的数量。当从给定地址在单次提取或同时请求大量块时,这可能会导致服务执行程序或节点管理器崩溃。当启用外部洗牌时,这特别有助于减少节点管理器的负担。通过将其设置为较低值,可以缓解此问题。 2.2.1
spark.shuffle.compress true 是否压缩map输出文件。通常这是个好主意。压缩将使用 spark.io.compression.codec 0.6.0
spark.shuffle.file.buffer 32k 每个洗牌文件输出流的内存缓冲区大小,以KiB为单位,除非另有说明。这些缓冲区减少了在创建中间洗牌文件时所做的磁盘查找和系统调用的数量。 1.4.0
spark.shuffle.unsafe.file.output.buffer 32k 在不安全洗牌写入器中每个分区写入后的这个缓冲区大小的文件系统。以KiB为单位,除非另有说明。 2.3.0
spark.shuffle.spill.diskWriteBufferSize 1024 * 1024 将已排序记录写入磁盘文件时使用的缓冲区大小,以字节为单位。 2.3.0
spark.shuffle.io.maxRetries 3 (仅限Netty)由于IO相关异常而失败的提取会自动重试,如果设置为非零值。该重试逻辑有助于在长时间的GC暂停或瞬态网络连接问题面前稳定大规模洗牌。 1.2.0
spark.shuffle.io.numConnectionsPerPeer 1 (仅限Netty)主机之间的连接被重用,以减少大型集群的连接积累。对于拥有许多硬盘和少数主机的集群,这可能导致没有足够的并发来饱和所有硬盘,因此用户可能会考虑增加此值。 1.2.1
spark.shuffle.io.preferDirectBufs true (仅限Netty)使用非堆缓冲区来减少洗牌和缓存块传输期间的垃圾收集。对于非堆内存有限的环境,用户可能希望关闭此选项,以强制从Netty中分配所有内存。 1.2.0
spark.shuffle.io.retryWait 5s (仅限Netty)提取重试之间的等待时间。由于重试引起的最大延迟默认情况下为15秒,计算方法为 maxRetries * retryWait 1.2.1
spark.shuffle.io.backLog -1 洗牌服务的接受队列长度。对于大型应用程序,此值可能需要增加,以便在服务无法跟上短时间内大量到达的连接时,不会丢弃传入连接。此设置需要在洗牌服务本身运行的地方进行配置,可能在应用程序外部(请参见下方的 spark.shuffle.service.enabled 选项)。如果设置为小于1,则将回退到Netty定义的操作系统默认值 io.netty.util.NetUtil#SOMAXCONN 1.1.1
spark.shuffle.io.connectionTimeout 值为 spark.network.timeout 用于标记洗牌服务器与客户端之间已建立连接的超时,如果仍有未完成的提取请求,但在通道上没有流量,至少要持续 `connectionTimeout`。 1.2.0
spark.shuffle.io.connectionCreationTimeout 值为 spark.shuffle.io.connectionTimeout 用于在洗牌服务器和客户端之间建立连接的超时。 3.2.0
spark.shuffle.service.enabled false 启用外部洗牌服务。此服务保持执行程序写入的洗牌文件,例如,以便可以安全地移除执行程序,或者在执行程序失败的情况下可以继续进行洗牌提取。必须设置外部洗牌服务以启用它。有关更多信息,请参见 动态分配配置和设置文档 1.2.0
spark.shuffle.service.port 7337 外部洗牌服务将运行的端口。 1.2.0
spark.shuffle.service.name spark_shuffle 客户端应与之通信的Spark洗牌服务的配置名称。这必须与YARN NodeManager配置中的洗牌配置名称( yarn.nodemanager.aux-services )匹配。仅在 spark.shuffle.service.enabled 设置为true时生效。 3.2.0
spark.shuffle.service.index.cache.size 100m 限制为指定内存足迹的缓存条目,以字节为单位,除非另有说明。 2.3.0
spark.shuffle.service.removeShuffle false 当不再需要洗牌时,是否使用ExternalShuffleService删除已释放执行程序的洗牌块。如果没有启用此选项,则被释放的执行程序上的洗牌数据将在应用程序结束之前保留在磁盘上。 3.3.0
spark.shuffle.maxChunksBeingTransferred Long.MAX_VALUE 在洗牌服务上允许同时传输的最大块数。请注意,当达到最大数量时,新的传入连接将被关闭。如果达到这些限制,任务将因提取失败而失败。 2.3.0
spark.shuffle.sort.bypassMergeThreshold 200 (高级)在基于排序的洗牌管理器中,如果没有map侧聚合且reduce分区数最多为该数,则避免合并排序数据。 1.1.1
spark.shuffle.sort.io.plugin.class org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO 用于洗牌IO的类的名称。 3.0.0
spark.shuffle.spill.compress true 是否压缩在洗牌期间溢出的数据。压缩将使用 spark.io.compression.codec 0.9.0
spark.shuffle.accurateBlockThreshold 100 * 1024 * 1024 在HighlyCompressedMapStatus中,超过此阈值的洗牌块大小将被准确记录。这有助于通过避免在获取洗牌块时低估洗牌块大小来防止OOM。 2.2.1
spark.shuffle.registration.timeout 5000 注册外部洗牌服务的超时时间,单位为毫秒。 2.3.0
spark.shuffle.registration.maxAttempts 3 当我们无法注册到外部洗牌服务时,将重试最大次数maxAttempts。 2.3.0
spark.shuffle.reduceLocality.enabled true 是否计算reduce任务的本地性偏好。 1.5.0
spark.shuffle.mapOutput.minSizeForBroadcast 512k 使用广播发送map输出状态到执行程序的大小。 2.0.0
spark.shuffle.detectCorrupt true 是否检测提取块中的任何损坏。 2.2.0
spark.shuffle.detectCorrupt.useExtraMemory false 如果启用,压缩/加密流的一部分将使用额外内存进行解压缩/解密,以便及早检测损坏。抛出的任何IOException将导致任务重试一次,如果再次以相同异常失败,则将抛出FetchFailedException以重试先前的阶段。 3.0.0
spark.shuffle.useOldFetchProtocol false 在执行洗牌块提取时,是否使用旧协议。仅在需要在新Spark版本任务从旧版本外部洗牌服务提取洗牌块的场景中启用。 3.0.0
spark.shuffle.readHostLocalDisk true 如果启用(并且 spark.shuffle.useOldFetchProtocol 被禁用),从与这些块管理器在同一主机上运行的机器请求的洗牌块将直接从磁盘读取,而不是通过网络作为远程块提取。 3.0.0
spark.files.io.connectionTimeout 值为 spark.network.timeout 用于标记在Spark RPC环境中提取文件所建立连接的超时,如果仍有正在下载的文件,但在通道上没有流量,至少要持续 `connectionTimeout`。 1.6.0
spark.files.io.connectionCreationTimeout 值为 spark.files.io.connectionTimeout 用于在Spark RPC环境中提取文件时建立连接的超时。 3.2.0
spark.shuffle.checksum.enabled true 是否计算洗牌数据的校验和。如果启用,Spark将为map输出文件中的每个分区数据计算校验和并将值存储在磁盘上的校验和文件中。当检测到洗牌数据损坏时,Spark将尝试通过使用校验和文件来诊断损坏的原因(例如,网络问题、磁盘问题等)。 3.2.0
spark.shuffle.checksum.algorithm ADLER32 用于计算洗牌校验和的算法。目前,它仅支持JDK的内置算法,例如ADLER32、CRC32。 3.2.0
spark.shuffle.service.fetch.rdd.enabled false 是否使用ExternalShuffleService提取磁盘持久化的RDD块。在动态分配的情况下,如果启用此功能,则在 spark.dynamicAllocation.executorIdleTimeout 后,只有磁盘持久化块的执行程序将被视为空闲并相应释放。 3.0.0
spark.shuffle.service.db.enabled true 是否在ExternalShuffleService中使用数据库。请注意,这仅影响独立模式。 3.0.0
spark.shuffle.service.db.backend LEVELDB 指定在洗牌服务本地数据库中使用的基于磁盘的存储。设置为LEVELDB或ROCKSDB。 3.4.0

Spark 用户界面

属性名称 默认值 含义 版本
spark.eventLog.logBlockUpdates.enabled false 如果 spark.eventLog.enabled 为真,是否记录每个块更新的事件。 *警告*: 这将显著增加事件日志的大小。 2.3.0
spark.eventLog.longForm.enabled false 如果为真,使用事件日志中的调用位置长格式。否则使用短格式。 2.4.0
spark.eventLog.compress false 如果 spark.eventLog.enabled 为真,是否压缩记录的事件。 1.0.0
spark.eventLog.compression.codec zstd 压缩记录事件的编解码器。默认情况下,Spark提供四种编解码器: lz4 lzf snappy zstd 。 您还可以使用完全限定类名来指定编解码器,例如: org.apache.spark.io.LZ4CompressionCodec org.apache.spark.io.LZFCompressionCodec org.apache.spark.io.SnappyCompressionCodec , 和 org.apache.spark.io.ZStdCompressionCodec 3.0.0
spark.eventLog.erasureCoding.enabled false 是否允许事件日志使用纠删码,或者关闭纠删码,无论文件系统的默认设置如何。 在HDFS上,纠删码文件的更新速度将不如常规复制文件快,因此应用程序更新将在历史服务器上显示得更慢。 注意,即使这为真,Spark仍将不强制文件使用纠删码,它将简单地使用文件系统的默认值。 3.0.0
spark.eventLog.dir file:///tmp/spark-events Spark事件记录的基本目录,如果 spark.eventLog.enabled 为真。 在此基本目录中,Spark为每个应用程序创建一个子目录,并在此目录中记录特定于应用程序的事件。用户可能希望将其设置为统一的位置,如HDFS目录,以便历史文件能够被历史服务器读取。 1.0.0
spark.eventLog.enabled false 是否记录Spark事件,有助于在应用程序完成后重建Web UI。 1.0.0
spark.eventLog.overwrite false 是否覆盖任何现有文件。 1.0.0
spark.eventLog.buffer.kb 100k 写入输出流时使用的缓冲区大小,以KiB为单位,除非另有说明。 1.0.0
spark.eventLog.rolling.enabled false 是否启用事件日志文件滚动。如果设置为真,则将每个事件日志文件切割到配置的大小。 3.0.0
spark.eventLog.rolling.maxFileSize 128m spark.eventLog.rolling.enabled=true 时,指定事件日志文件在滚动之前的最大大小。 3.0.0
spark.ui.dagGraph.retainedRootRDDs Int.MaxValue Spark UI和状态API在垃圾收集之前记住多少DAG图节点。 2.1.0
spark.ui.enabled true 是否为Spark应用程序运行Web UI。 1.1.1
spark.ui.store.path None 用于缓存实时UI的应用程序信息的本地目录。 默认情况下未设置,意味着所有应用程序信息将保留在内存中。 3.4.0
spark.ui.killEnabled true 允许从Web UI终止作业和阶段。 1.0.0
spark.ui.liveUpdate.period 100ms 更新实时实体的频率。-1表示在回放应用程序时“永不更新”,意味着只会发生最后一次写入。对于实时应用程序,这避免了在快速处理传入任务事件时能省略的一些操作。 2.3.0
spark.ui.liveUpdate.minFlushPeriod 1s 刷新过时UI数据之前经过的最短时间。当传入任务事件不频繁触发时,这可以避免UI变得过时。 2.4.2
spark.ui.port 4040 应用程序仪表板的端口,显示内存和工作负载数据。 0.7.0
spark.ui.retainedJobs 1000 Spark UI和状态API在回收之前记住多少个作业。 这是一个目标最大值,某些情况下可能会保留更少的元素。 1.2.0
spark.ui.retainedStages 1000 Spark UI和状态API在回收之前记住多少个阶段。 这是一个目标最大值,某些情况下可能会保留更少的元素。 0.9.0
spark.ui.retainedTasks 100000 Spark UI和状态API在回收之前记住一个阶段中的多少个任务。 这是一个目标最大值,某些情况下可能会保留更少的元素。 2.0.1
spark.ui.reverseProxy false 启用将Spark Master作为工作节点和应用程序UI的反向代理。在此模式下,Spark master将反向代理工作节点和应用程序UI,以便在不需要直接访问其主机的情况下访问它们。请谨慎使用,因为工作节点和应用程序UI将无法直接访问,您只能通过spark master/proxy公共URL访问它们。此设置会影响集群中运行的所有工作节点和应用程序UI,必须在所有工作节点、驱动程序和master上设置。 2.1.0
spark.ui.reverseProxyUrl 如果Spark UI应该通过另一个前端反向代理提供服务,这就是通过该反向代理访问Spark master UI的URL。 这在运行身份验证代理时很有用,例如OAuth代理。URL可以包含路径前缀,如 http://mydomain.com/path/to/spark/ ,允许您通过相同的虚拟主机和端口为多个Spark集群和其他Web应用程序提供服务。 通常,这应该是一个包含方案(http/https)、主机和端口的绝对URL。 在这里,可以指定一个以"/"开头的相对URL。在这种情况下,Spark UI和Spark REST API生成的所有URL将是服务器相对链接——这仍然有效,因为整个Spark UI通过相同的主机和端口提供服务。
此设置会影响Spark UI中的链接生成,但前端反向代理负责
  • 在转发请求之前去掉路径前缀,
  • 重写直接指向Spark master的重定向,
  • 将访问从 http://mydomain.com/path/to/spark 重定向到 http://mydomain.com/path/to/spark/ (路径前缀后的尾部斜杠);否则主页面上的相对链接将无法正确工作。
此设置会影响集群中运行的所有工作节点和应用程序UI,必须在所有工作节点、驱动程序和master上相同地设置。仅当 spark.ui.reverseProxy 被开启时才有效。当Spark master web UI可以直接访问时,此设置不需要。
注意,设置的值在通过“/”拆分后不能包含关键字 `proxy` 或 `history`。Spark UI依赖这两个关键字从URI获取REST API端点。
2.1.0
spark.ui.proxyRedirectUri 当Spark在代理后面运行时,重定向的目标地址。这将使Spark修改重定向响应,使其指向代理服务器,而不是Spark UI自己的地址。这应该只是服务器的地址,没有任何应用程序的前缀路径;前缀应该由代理服务器本身设置(通过添加 X-Forwarded-Context 请求头),或者通过在Spark应用程序的配置中设置代理基础。 3.0.0
spark.ui.showConsoleProgress false 在控制台中显示进度条。进度条显示运行超过500毫秒的阶段的进度。如果多个阶段同时运行,将在同一行上显示多个进度条。
注意: 在shell环境中,spark.ui.showConsoleProgress的默认值为true。
1.2.1
spark.ui.custom.executor.log.url (none) 指定自定义的Spark执行器日志URL,以支持外部日志服务,而不是使用集群管理器在Spark UI中的应用程序日志URL。Spark将通过模式支持一些路径变量,这些变量可能因集群管理器而异。请检查您的集群管理器的文档,以查看支持哪些模式(如果有)。

请注意,此配置还会替换事件日志中的原始日志URL,在访问历史服务器上的应用程序时也会生效。新日志URL必须是永久性的,否则您可能会遇到执行器日志URL的死链接。

目前,仅YARN和K8s集群管理器支持此配置。
3.0.0
spark.worker.ui.retainedExecutors 1000 Spark UI和状态API在回收之前记住多少个已完成的执行器。 1.5.0
spark.worker.ui.retainedDrivers 1000 Spark UI和状态API在回收之前记住多少个已完成的驱动程序。 1.5.0
spark.sql.ui.retainedExecutions 1000 Spark UI和状态API在回收之前记住多少个已完成的执行。 1.5.0
spark.streaming.ui.retainedBatches 1000 Spark UI和状态API在回收之前记住多少个已完成的批次。 1.0.0
spark.ui.retainedDeadExecutors 100 Spark UI和状态API在回收之前记住多少个死去的执行器。 2.0.0
spark.ui.filters None 要应用于Spark Web UI的过滤器类名称的以逗号分隔的列表。过滤器应该是一个标准的 javax servlet Filter
可以通过在配置中设置形如 spark.<过滤器类名>.param.<参数名>=<值> 的配置条目来指定过滤器参数。
例如:
spark.ui.filters=com.test.filter1
spark.com.test.filter1.param.name1=foo
spark.com.test.filter1.param.name2=bar
1.0.0
spark.ui.requestHeaderSize 8k HTTP请求头允许的最大大小,以字节为单位,除非另有说明。 此设置也适用于Spark历史服务器。 2.2.3
spark.ui.timelineEnabled true 是否在UI页面上显示事件时间线数据。 3.4.0
spark.ui.timeline.executors.maximum 250 在事件时间线上显示的最大执行器数量。 3.2.0
spark.ui.timeline.jobs.maximum 500 在事件时间线上显示的最大作业数量。 3.2.0
spark.ui.timeline.stages.maximum 500 在事件时间线上显示的最大阶段数量。 3.2.0
spark.ui.timeline.tasks.maximum 1000 在事件时间线上显示的最大任务数量。 1.4.0
spark.appStatusStore.diskStoreDir None 本地目录,用于存储SQL执行的诊断信息。此配置仅适用于实时UI。 3.4.0

压缩与序列化

属性名称 默认值 含义 自版本
spark.broadcast.compress true 在发送广播变量之前是否进行压缩。通常是个好主意。 压缩将使用 spark.io.compression.codec 0.6.0
spark.checkpoint.compress false 是否压缩 RDD 检查点。通常是个好主意。 压缩将使用 spark.io.compression.codec 2.2.0
spark.io.compression.codec lz4 用于压缩内部数据如 RDD 分区、事件日志、广播变量和洗牌输出的编解码器。默认情况下,Spark 提供四种编解码器: lz4 lzf snappy zstd 。您也可以使用完全限定的类名指定编解码器,例如: org.apache.spark.io.LZ4CompressionCodec org.apache.spark.io.LZFCompressionCodec org.apache.spark.io.SnappyCompressionCodec , 和 org.apache.spark.io.ZStdCompressionCodec 0.8.0
spark.io.compression.lz4.blockSize 32k 使用 LZ4 压缩时的块大小。当使用 LZ4 压缩编解码器时,降低此块大小也会降低使用 LZ4 时的洗牌内存使用。默认单位是字节,除非另有说明。此配置仅适用于 `spark.io.compression.codec`。 1.4.0
spark.io.compression.snappy.blockSize 32k 使用 Snappy 压缩时的块大小。当使用 Snappy 压缩编解码器时,降低此块大小也会降低使用 Snappy 时的洗牌内存使用。默认单位是字节,除非另有说明。此配置仅适用于 `spark.io.compression.codec`。 1.4.0
spark.io.compression.zstd.level 1 Zstd 压缩编解码器的压缩级别。增加压缩级别会导致更好的压缩,但代价是更多的 CPU 和内存。此配置仅适用于 `spark.io.compression.codec`。 2.3.0
spark.io.compression.zstd.bufferSize 32k 使用 Zstd 压缩时的缓冲区大小(以字节为单位)。降低此大小将降低使用 Zstd 时的洗牌内存使用,但可能会因为过多的 JNI 调用开销而增加压缩成本。此配置仅适用于 `spark.io.compression.codec`。 2.3.0
spark.io.compression.zstd.bufferPool.enabled true 如果为真,则启用 ZSTD JNI 库的缓冲池。 3.2.0
spark.kryo.classesToRegister (无) 如果您使用 Kryo 序列化,请提供以逗号分隔的自定义类名列表,以在 Kryo 中注册。 有关更多详细信息,请参见 调优指南 1.2.0
spark.kryo.referenceTracking true 在使用 Kryo 序列化数据时,是否跟踪对同一对象的引用,如果您的对象图中有循环并且含有多个相同对象的副本,则这是必需的,并且有助于提高效率。如果您知道不是这种情况,则可以禁用以提高性能。 0.8.0
spark.kryo.registrationRequired false 是否需要在 Kryo 中注册。如果设置为 'true',在序列化未注册的类时,Kryo 将抛出异常。如果设置为 false(默认值),Kryo 将与每个对象一起写入未注册的类名。写入类名可能会导致显著的性能开销,因此启用此选项可以严格要求用户没有遗漏类的注册。 1.1.0
spark.kryo.registrator (无) 如果您使用 Kryo 序列化,请提供以逗号分隔的类列表,以注册您的自定义类。这一属性在您需要以自定义方式注册类时很有用,例如,指定自定义字段序列化器。否则 spark.kryo.classesToRegister 更简单。它应该设置为扩展 KryoRegistrator 的类。 有关更多详细信息,请参见 调优指南 0.5.0
spark.kryo.unsafe true 是否使用基于 Unsafe 的 Kryo 序列化器。使用基于 Unsafe 的 IO 可以显著加快速度。 2.1.0
spark.kryoserializer.buffer.max 64m Kryo 序列化缓冲区的最大允许大小,以 MiB 为单位,除非另有说明。它必须大于您尝试序列化的任何对象并且小于 2048m。如果您在 Kryo 内部收到 "buffer limit exceeded" 异常,请增加此值。 1.4.0
spark.kryoserializer.buffer 64k Kryo 序列化缓冲区的初始大小,以 KiB 为单位,除非另有说明。请注意,每个工作节点上将有一个缓冲区 每个核心 。如果需要,此缓冲区将增长到 spark.kryoserializer.buffer.max 1.4.0
spark.rdd.compress false 是否压缩序列化的 RDD 分区(例如,对于 StorageLevel.MEMORY_ONLY_SER 在 Java 和 Scala 中或 StorageLevel.MEMORY_ONLY 在 Python 中)。 可以节省大量空间,但会增加一些额外的 CPU 时间。 压缩将使用 spark.io.compression.codec 0.6.0
spark.serializer org.apache.spark.serializer.
JavaSerializer
用于序列化将通过网络发送或需要以序列化形式进行缓存的对象的类。Java 序列化的默认值适用于任何可序列化的 Java 对象,但速度相对较慢,因此我们建议在需要速度时 使用 org.apache.spark.serializer.KryoSerializer 并配置 Kryo 序列化 。可以是 org.apache.spark.Serializer 的任何子类。 0.5.0
spark.serializer.objectStreamReset 100 使用 org.apache.spark.serializer.JavaSerializer 进行序列化时,序列化器缓存对象以防止写入冗余数据,但这会阻止对这些对象的垃圾回收。通过调用 'reset',您可以从序列化器中冲刷掉该信息,并允许旧对象被回收。要关闭此定期重置,将其设置为 -1。默认情况下,它将在每 100 个对象之后重置序列化器。 1.0.0

内存管理

属性名称 默认值 含义 自版本起
spark.memory.fraction 0.6 用于执行和存储的(堆空间 - 300MB)所占的比例。这个值越低,溢出和缓存数据驱逐发生的频率就越高。此配置的目的是为内部元数据、用户数据结构和稀疏、异常大记录情况下的不精确大小估算预留内存。建议将此值保持为默认值。有关更多详细信息,包括增加此值时正确调优JVM垃圾回收的重要信息,请参见 此描述 1.6.0
spark.memory.storageFraction 0.5 免于被驱逐的存储内存量,以 spark.memory.fraction 所预留的区域大小的比例表示。这个值越高,可用于执行的工作内存越少,任务可能更频繁地溢出到磁盘。建议将此值保持为默认值。有关更多详细信息,请参见 此描述 1.6.0
spark.memory.offHeap.enabled false 如果为true,Spark将尝试使用堆外内存进行某些操作。如果启用了堆外内存使用,则 spark.memory.offHeap.size 必须为正值。 1.6.0
spark.memory.offHeap.size 0 可用于堆外分配的绝对内存量,以字节为单位,除非另有说明。此设置对堆内存使用没有影响,因此如果您的执行器总内存消耗必须在某个硬限制内,请确保相应缩小您的JVM堆大小。当 spark.memory.offHeap.enabled=true 时,此值必须设置为正值。 1.6.0
spark.storage.unrollMemoryThreshold 1024 * 1024 在展开任何块之前请求的初始内存。 1.1.0
spark.storage.replication.proactive false 启用RDD块的主动块复制。由于执行器故障而丢失的缓存RDD块副本,如果存在任何可用副本,则会被补充。这尝试将块的复制级别恢复到初始数量。 2.2.0
spark.storage.localDiskByExecutors.cacheSize 1000 存储本地目录的执行器最大数量。此大小适用于驱动程序和执行器端,以避免存储不受限制。此缓存将用于避免在从同一主机获取磁盘持久化的RDD块或洗牌块时(当 spark.shuffle.readHostLocalDisk 已设置时)使用网络。 3.0.0
spark.cleaner.periodicGC.interval 30min 控制触发垃圾回收的频率。

此上下文清理器仅在弱引用被垃圾回收时触发清理。在具有大驱动程序JVM的长时间运行的应用程序中,如果驱动程序的内存压力很小,这可能非常偶尔发生,甚至根本不发生。完全不清理可能会导致执行器在一段时间后用尽磁盘空间。
1.6.0
spark.cleaner.referenceTracking true 启用或禁用上下文清理。 1.0.0
spark.cleaner.referenceTracking.blocking true 控制清理线程是否应该在清理任务上阻塞(洗牌除外,这由 spark.cleaner.referenceTracking.blocking.shuffle Spark属性控制)。 1.0.0
spark.cleaner.referenceTracking.blocking.shuffle false 控制清理线程是否应该在洗牌清理任务上阻塞。 1.1.1
spark.cleaner.referenceTracking.cleanCheckpoints false 控制是否在引用超出范围时清理检查点文件。 1.4.0

执行行为

属性名称 默认值 含义 自版本起
spark.broadcast.blockSize 4m TorrentBroadcastFactory 中每个块的每一部分的大小,单位为KiB,除非另有说明。过大的值会降低广播时的并行性(使其变慢);但是,如果值太小, BlockManager 可能会受到性能影响。 0.5.0
spark.broadcast.checksum true 是否启用广播的校验和。如果启用,广播将包含一个校验和,可以帮助检测损坏的块,但会额外消耗一定的计算和发送数据的成本。如果网络有其他机制保证数据在广播期间不会损坏,可以禁用此功能。 2.1.1
spark.broadcast.UDFCompressionThreshold 1 * 1024 * 1024 用户自定义函数(UDF)和 Python RDD 命令以字节为单位通过广播压缩的阈值,除非另有说明。 3.0.0
spark.executor.cores 在 YARN 模式中为 1,在独立和 Mesos 粗粒度模式中为工作节点上可用的所有核心。 每个执行器使用的核心数量。 在独立和 Mesos 粗粒度模式中,详情见 此描述 1.0.0
spark.default.parallelism 对于分布式洗牌操作,如 reduceByKey join ,在父 RDD 中的最大分区数。对于像 parallelize 这样的没有父 RDD 的操作,它取决于集群管理器:
  • 本地模式:本地机器上的核心数
  • Mesos 精细粒度模式:8
  • 其他:所有执行节点上的核心总数或 2,以较大者为准
用户未设置时,变换如 join reduceByKey parallelize 返回的 RDD 的默认分区数。 0.5.0
spark.executor.heartbeatInterval 10s 每个执行器与驱动程序的心跳间隔。心跳让驱动程序知道执行器仍然存活,并为正在进行的任务更新指标。spark.executor.heartbeatInterval 应显著小于 spark.network.timeout 1.1.0
spark.files.fetchTimeout 60s 从驱动程序获取通过 SparkContext.addFile() 添加的文件时使用的通信超时。 1.0.0
spark.files.useFetchCache true 如果设置为 true(默认),文件获取将使用一个由属于同一应用程序的执行器共享的本地缓存,这可以提高在同一主机上运行多个执行器时的任务启动性能。如果设置为 false,这些缓存优化将被禁用,所有执行器将获取自己的文件副本。为了使用位于 NFS 文件系统上的 Spark 本地目录,可能会禁用此优化(有关更多详情,见 SPARK-6313 )。 1.2.2
spark.files.overwrite false 启动时是否覆盖任何存在的文件。即使该选项设置为 true ,用户也无法覆盖通过 SparkContext.addFile SparkContext.addJar 添加的文件。 1.0.0
spark.files.ignoreCorruptFiles false 是否忽略损坏的文件。如果为 true,当遇到损坏或不存在的文件时,Spark 作业将继续运行,已读取的内容仍将返回。 2.1.0
spark.files.ignoreMissingFiles false 是否忽略缺失的文件。如果为 true,当遇到缺失的文件时,Spark 作业将继续运行,已读取的内容仍将返回。 2.4.0
spark.files.maxPartitionBytes 134217728 (128 MiB) 读取文件时可以打包到单个分区的最大字节数。 2.1.0
spark.files.openCostInBytes 4194304 (4 MiB) 打开一个文件的估计成本,按可以同时扫描的字节数计算。当将多个文件放入一个分区时使用。最好高估此值,这样包含小文件的分区会比包含大文件的分区更快。 2.1.0
spark.hadoop.cloneConf false 如果设置为 true,则为每个任务克隆一个新的 Hadoop Configuration 对象。为了处理 Configuration 线程安全问题,应该启用此选项(有关更多详情,见 SPARK-2546 )。默认情况下禁用,以避免对不受这些问题影响的作业造成意外的性能退化。 1.0.3
spark.hadoop.validateOutputSpecs true 如果设置为 true,则验证用于 saveAsHadoopFile 和其他变体的输出规格(例如检查输出目录是否已存在)。可以禁用此功能以消除因输出目录已存在而导致的异常。我们建议用户不要禁用此功能,除非试图实现与以前版本 Spark 的兼容性。只需手动使用 Hadoop 的 FileSystem API 删除输出目录。对于通过 Spark Streaming 的 StreamingContext 生成的作业,此设置将被忽略,因为在检查点恢复期间可能需要重写现有输出目录。 1.0.1
spark.storage.memoryMapThreshold 2m 从磁盘读取块时,Spark 开启内存映射的块大小。默认单位为字节,除非另有说明。这防止 Spark 对非常小的块进行内存映射。一般而言,内存映射对于接近或低于操作系统页面大小的块开销较大。 0.9.2
spark.storage.decommission.enabled false 是否在弃用执行器时弃用块管理器。 3.1.0
spark.storage.decommission.shuffleBlocks.enabled true 在块管理器弃用期间是否转移洗牌块。需要可迁移的洗牌解析器(如基于排序的洗牌)。 3.1.0
spark.storage.decommission.shuffleBlocks.maxThreads 8 在迁移洗牌文件时使用的最大线程数。 3.1.0
spark.storage.decommission.rddBlocks.enabled true 在块管理器弃用期间是否转移 RDD 块。 3.1.0
spark.storage.decommission.fallbackStorage.path (none) 块管理器弃用期间后备存储的位置。例如, s3a://spark-storage/ 。如果为空,则禁用后备存储。应由 TTL 管理,因为 Spark 不会清理它。 3.1.0
spark.storage.decommission.fallbackStorage.cleanUp false 如果为 true,Spark 在关闭期间清理其后备存储数据。 3.2.0
spark.storage.decommission.shuffleBlocks.maxDiskSize (none) 在拒绝远程洗牌块之前,用于存储洗牌块的最大磁盘空间。拒绝远程洗牌块意味着执行器将不会接收任何洗牌迁移,如果没有其他执行器可用于迁移,则洗牌块将丢失,除非配置了 spark.storage.decommission.fallbackStorage.path 3.2.0
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1 文件输出提交者算法版本,有效算法版本号:1 或 2。请注意,2 可能会导致正确性问题,例如 MAPREDUCE-7282。 2.2.0

执行者指标

属性名称 默认值 含义 自版本起
spark.eventLog.logStageExecutorMetrics false 是否将每个阶段的执行者指标峰值(针对每个执行者)写入事件日志。
注意: 这些指标是在执行者心跳中轮询(收集)并发送的,始终会执行;此配置仅用于确定是否将汇总的指标峰值写入事件日志。
3.0.0
spark.executor.processTreeMetrics.enabled false 在收集执行者指标时,是否收集进程树指标(来自 /proc 文件系统)。
注意: 仅在存在 /proc 文件系统时才会收集进程树指标。
3.0.0
spark.executor.metrics.pollingInterval 0 收集执行者指标的频率(以毫秒为单位)。
如果为 0,则在执行者心跳时进行轮询(因此在心跳间隔内,指定为 spark.executor.heartbeatInterval )。 如果为正值,则在此间隔内进行轮询。
3.0.0
spark.eventLog.gcMetrics.youngGenerationGarbageCollectors Copy,PS Scavenge,ParNew,G1 Young Generation 支持的年轻代垃圾收集器的名称。名称通常是 GarbageCollectorMXBean.getName 的返回值。 内置的年轻代垃圾收集器是 Copy,PS Scavenge,ParNew,G1 Young Generation。 3.0.0
spark.eventLog.gcMetrics.oldGenerationGarbageCollectors MarkSweepCompact,PS MarkSweep,ConcurrentMarkSweep,G1 Old Generation 支持的老年代垃圾收集器的名称。名称通常是 GarbageCollectorMXBean.getName 的返回值。 内置的老年代垃圾收集器是 MarkSweepCompact,PS MarkSweep,ConcurrentMarkSweep,G1 Old Generation。 3.0.0
spark.executor.metrics.fileSystemSchemes file,hdfs 在执行者指标中报告的文件系统方案。 3.1.0

网络

属性名称 默认值 含义 自版本起
spark.rpc.message.maxSize 128 允许在“控制平面”通信中最大消息大小(以MiB为单位);通常仅适用于在执行器和驱动程序之间发送的映射输出大小信息。如果您正在运行成千上万的映射和减少任务,并看到关于RPC消息大小的消息,请增加此值。 2.0.0
spark.blockManager.port (随机) 所有块管理器监听的端口。这些在驱动程序和执行器上都存在。 1.1.0
spark.driver.blockManager.port (值为spark.blockManager.port) 驱动程序特定的块管理器监听端口,适用于无法使用与执行器相同配置的情况。 2.1.0
spark.driver.bindAddress (值为spark.driver.host) 绑定监听套接字的主机名或IP地址。此配置覆盖SPARK_LOCAL_IP环境变量(见下文)。
它还允许向执行器或外部系统广告与本地不同的地址。这在运行具有桥接网络的容器时非常有用。为了使其正常工作,驱动程序使用的不同端口(RPC、块管理器和UI)需要从容器的主机转发。
2.1.0
spark.driver.host (本地主机名) 驱动程序的主机名或IP地址。 这用于与执行器和独立主控进行通信。 0.7.0
spark.driver.port (随机) 驱动程序监听的端口。 这用于与执行器和独立主控进行通信。 0.7.0
spark.rpc.io.backLog 64 RPC服务器的接受队列长度。对于大型应用程序,此值可能 需要增加,以便在短时间内到达大量连接时,输入连接不会被丢弃。 3.0.0
spark.network.timeout 120秒 所有网络交互的默认超时。此配置将替代 spark.storage.blockManagerHeartbeatTimeoutMs , spark.shuffle.io.connectionTimeout , spark.rpc.askTimeout spark.rpc.lookupTimeout 如果它们未被配置。 1.3.0
spark.network.timeoutInterval 60秒 驱动程序检查并使死掉的执行器超时的时间间隔。 1.3.2
spark.network.io.preferDirectBufs true 如果启用,则共享分配器优先使用堆外缓冲区分配。 堆外缓冲区用于减少在洗牌和缓存 块传输期间的垃圾收集。在堆外内存严重受限的环境中,用户可能希望 关闭此选项以强制所有分配在堆内。 3.0.0
spark.port.maxRetries 16 在放弃之前继续绑定端口的最大重试次数。 当端口指定了一个特定值(非0)时,每次后续重试都将 将上一个尝试中使用的端口递增1后再重试。这 本质上允许它尝试从指定的起始端口到端口 + maxRetries的一系列端口。 1.1.1
spark.rpc.askTimeout spark.network.timeout RPC询问操作在超时之前等待的持续时间。 1.4.0
spark.rpc.lookupTimeout 120秒 RPC远程端点查找操作在超时之前等待的持续时间。 1.4.0
spark.network.maxRemoteBlockSizeFetchToMem 200m 当块的大小超过此阈值时,将远程块提取到磁盘 以字节为单位。这是为了避免一个巨大的请求占用太多内存。请注意此配置 将影响洗牌提取和块管理器远程块提取。 对于启用了外部洗牌服务的用户,此功能只能在 外部洗牌服务至少为2.3.0时工作。 3.0.0
spark.rpc.io.connectionTimeout 值为 spark.network.timeout 超时用于标记RPC对等体之间建立的连接为空闲状态并关闭 如果存在未决的RPC请求但在通道上至少没有流量 `connectionTimeout`。 1.2.0
spark.rpc.io.connectionCreationTimeout 值为 spark.rpc.io.connectionTimeout 建立RPC对等体之间连接的超时。 3.2.0

调度

属性名称 默认值 含义 版本
spark.cores.max (未设置) 独立部署集群 处于“粗粒度”共享模式的 Mesos 集群 上运行时,申请的应用程序的最大 CPU 核心数(不是每台机器的核心数)。如果未设置,则默认值将是 Spark 的独立集群管理器上的 spark.deploy.defaultCores ,或者在 Mesos 上为无限(所有可用核心)。 0.6.0
spark.locality.wait 3s 在放弃并在较少本地的节点上启动任务之前,等待启动数据本地任务的时间。相同的等待时间将用于跨多个本地性级别(进程本地、节点本地、机架本地,然后是任意)。还可以通过设置 spark.locality.wait.node 等自定义每个级别的等待时间。如果您的任务较长并且本地性差,则应增加此设置,但默认值通常效果很好。 0.5.0
spark.locality.wait.node spark.locality.wait 为节点本地性自定义本地性等待。例如,您可以将其设置为 0,以跳过节点本地性并立即搜索机架本地性(如果您的集群有机架信息)。 0.8.0
spark.locality.wait.process spark.locality.wait 为进程本地性自定义本地性等待。这会影响试图在特定执行器进程中访问缓存数据的任务。 0.8.0
spark.locality.wait.rack spark.locality.wait 为机架本地性自定义本地性等待。 0.8.0
spark.scheduler.maxRegisteredResourcesWaitingTime 30s 在调度开始之前,等待资源注册的最长时间。 1.1.1
spark.scheduler.minRegisteredResourcesRatio 0.8 对于 KUBERNETES 模式; 0.8 对于 YARN 模式; 0.0 对于独立模式和 Mesos 粗粒度模式 注册资源的最小比例(注册资源 / 总预期资源)(资源是在 YARN 模式和 Kubernetes 模式下的执行器,独立模式和 Mesos 粗粒度模式下的 CPU 核心 ['spark.cores.max' 值是 Mesos 粗粒度模式的总预期资源]),以在调度开始之前等待。指定为 0.0 到 1.0 之间的双精度数。不管是否达到了资源的最小比例,调度开始之前的最长等待时间由配置 spark.scheduler.maxRegisteredResourcesWaitingTime 控制。 1.1.1
spark.scheduler.mode FIFO 在提交到同一 SparkContext 的作业之间的 调度模式 。可以设置为 FAIR 来使用公平共享,而不是一个接一个地排队作业。适用于多用户服务。 0.8.0
spark.scheduler.revive.interval 1s 调度程序恢复运行任务的工作资源请求的时间间隔。 0.8.1
spark.scheduler.listenerbus.eventqueue.capacity 10000 事件队列的默认容量。Spark 会首先尝试使用 `spark.scheduler.listenerbus.eventqueue.queueName.capacity` 指定的容量初始化事件队列。如果未配置,Spark 将使用此配置指定的默认容量。请注意,容量必须大于 0。如果观察到侦听器事件丢失,请考虑增加值(例如 20000)。增加此值可能会导致驱动程序使用更多内存。 2.3.0
spark.scheduler.listenerbus.eventqueue.shared.capacity spark.scheduler.listenerbus.eventqueue.capacity Spark 侦听器总线中共享事件队列的容量,该队列存储注册到侦听器总线的外部侦听器的事件。如果相应于共享队列的侦听器事件丢失,请考虑增加值。增加此值可能会导致驱动程序使用更多内存。 3.0.0
spark.scheduler.listenerbus.eventqueue.appStatus.capacity spark.scheduler.listenerbus.eventqueue.capacity appStatus 事件队列的容量,该队列存储内部应用程序状态侦听器的事件。如果相应于 appStatus 队列的侦听器事件丢失,请考虑增加值。增加此值可能会导致驱动程序使用更多内存。 3.0.0
spark.scheduler.listenerbus.eventqueue.executorManagement.capacity spark.scheduler.listenerbus.eventqueue.capacity Spark 监听器总线中 executorManagement 事件队列的容量,该队列存储内部执行器管理侦听器的事件。如果相应于 executorManagement 队列的侦听器事件丢失,请考虑增加值。增加此值可能会导致驱动程序使用更多内存。 3.0.0
spark.scheduler.listenerbus.eventqueue.eventLog.capacity spark.scheduler.listenerbus.eventqueue.capacity Spark 侦听器总线中 eventLog 队列的容量,该队列存储写入事件日志的事件记录侦听器的事件。如果相应于 eventLog 队列的侦听器事件丢失,请考虑增加值。增加此值可能会导致驱动程序使用更多内存。 3.0.0
spark.scheduler.listenerbus.eventqueue.streams.capacity spark.scheduler.listenerbus.eventqueue.capacity Spark 侦听器总线中 streams 队列的容量,该队列存储内部流处理侦听器的事件。如果相应于 streams 队列的侦听器事件丢失,请考虑增加值。增加此值可能会导致驱动程序使用更多内存。 3.0.0
spark.scheduler.resource.profileMergeConflicts false 如果设置为“true”,当 RDD 中指定了不同的配置文件,并且它们组合到单个阶段时,Spark 将合并 ResourceProfiles。合并时,Spark 选择每个资源的最大值并创建一个新的 ResourceProfile。默认值为 false,这会导致 Spark 抛出异常,如果在进入同一阶段的 RDD 中发现多个不同的 ResourceProfiles。 3.1.0
spark.scheduler.excludeOnFailure.unschedulableTaskSetTimeout 120s 在因所有执行器因任务失败而被排除的情况下,等待获取新执行器并调度任务的超时时间(以秒为单位)。 2.4.1
spark.standalone.submit.waitAppCompletion false 如果设置为 true,当 RDD 中指定了不同的配置文件并且组合到单个阶段时,Spark 会合并 ResourceProfiles。当它们合并时,Spark 选择每个资源的最大值并创建一个新的 ResourceProfile。默认值为 false,这会导致 Spark 抛出异常,如果在进入同一阶段的 RDD 中发现多个不同的 ResourceProfiles。 3.1.0
spark.excludeOnFailure.enabled false 如果设置为“true”,防止 Spark 在由于过多任务失败而被排除的执行器上调度任务。使用的算法可以通过其他“spark.excludeOnFailure”配置选项进一步控制。 2.1.0
spark.excludeOnFailure.timeout 1h (实验性)在整个应用程序中,为某个节点或执行器排除的时间,之后其将无条件从排除列表中移除以尝试运行新任务。 2.1.0
spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor 1 (实验性)对于给定的任务,它可以在一个执行器上重试的次数,在该任务被排除之前。 2.1.0
spark.excludeOnFailure.task.maxTaskAttemptsPerNode 2 (实验性)对于给定的任务,它可以在一个节点上重试的次数,在该节点因该任务而被排除之前。 2.1.0
spark.excludeOnFailure.stage.maxFailedTasksPerExecutor 2 (实验性)在一个阶段中,必须在一个执行器上失败的不同任务的数量,才能在该阶段中将执行器排除。 2.1.0
spark.excludeOnFailure.stage.maxFailedExecutorsPerNode 2 (实验性)在给定阶段中被标记为排除的不同执行器的数量,才能将整节点标记为该阶段失败。 2.1.0
spark.excludeOnFailure.application.maxFailedTasksPerExecutor 2 (实验性)在成功的任务集中,必须在一个执行器上失败的不同任务的数量,才能在整个应用程序中将执行器排除。被排除的执行器将在由 spark.excludeOnFailure.timeout 指定的超时之后自动添加回可用资源池中。需要注意的是,使用动态分配时,执行器可能会被标记为空闲并被集群管理器回收。 2.2.0
spark.excludeOnFailure.application.maxFailedExecutorsPerNode 2 (实验性)必须为整个应用程序排除的不同执行器的数量,才能将节点排除为整个应用程序。被排除的节点将在由 spark.excludeOnFailure.timeout 指定的超时之后自动添加回可用资源池中。需要注意的是,使用动态分配时,节点上的执行器可能会被标记为空闲并被集群管理器回收。 2.2.0
spark.excludeOnFailure.killExcludedExecutors false (实验性)如果设置为“true”,允许 Spark 在因获取失败或因整个应用程序被排除时自动杀死执行器,此行为由 spark.killExcludedExecutors.application.* 控制。需要注意的是,当整个节点被添加为排除时,该节点上的所有执行器将被杀死。 2.2.0
spark.excludeOnFailure.application.fetchFailure.enabled false (实验性)如果设置为“true”,当发生获取失败时,Spark 将立即排除执行器。如果启用了外部 Shuffle 服务,则整个节点将被排除。 2.3.0
spark.speculation false 如果设置为“true”,执行任务的猜测执行。这意味着如果在一个阶段中一个或多个任务运行较慢,则将重新启动它们。 0.6.0
spark.speculation.interval 100ms Spark 检查任务以进行推测的频率。 0.6.0
spark.speculation.multiplier 1.5 任务比中位数慢多少倍才能被认为是推测的候选。 0.6.0
spark.speculation.quantile 0.75 在特定阶段启用推测之前,必须完成的任务比例。 0.6.0
spark.speculation.minTaskRuntime 100ms 一个任务在被考虑进行推测之前运行的最短时间。这可用于避免对非常短的任务启动推测副本。 3.2.0
spark.speculation.task.duration.threshold 调度程序在任务持续时间超过阈值后将尝试进行推测运行任务。如果提供,当当前阶段的任务数量小于或等于单个执行器上的槽位数量,并且任务花费的时间超过阈值时,任务将被猜测性地运行。此配置帮助推测任务非常少的阶段。如果执行器槽位足够大,常规的推测配置可能也适用。例如,如果成功的运行次数足够,任务可能会被重新启动,即使阈值未达到。槽位的数量是根据 spark.executor.cores spark.task.cpus 的配置值计算的,最小值为 1。默认单位为字节,除非另有指定。 3.0.0
spark.speculation.efficiency.processRateMultiplier 0.75 在评估低效任务时使用的乘数。乘数越高,将可能被视为低效的任务越多。 3.4.0
spark.speculation.efficiency.longRunTaskFactor 2 只要任务的持续时间超过乘以因子的值和时间阈值(可以是 spark.speculation.multiplier * successfulTaskDurations.median 或 spark.speculation.minTaskRuntime ),任务将不管数据处理速率好坏而被推测。这避免了在任务速度慢与数据处理速率无关时错过低效任务。 3.4.0
spark.speculation.efficiency.enabled true 设置为 true 时,Spark 将通过阶段任务指标或其持续时间评估任务处理的效率,并只需要推测低效任务。当一个任务被认为是低效时,1)其数据处理速率低于阶段中所有成功任务的平均数据处理速率与一个乘数的乘积,或 2)其持续时间超过乘以 spark.speculation.efficiency.longRunTaskFactor 和时间阈值(可以是 spark.speculation.multiplier * successfulTaskDurations.median 或 spark.speculation.minTaskRuntime )。 3.4.0
spark.task.cpus 1 分配给每个任务的核心数量。 0.5.0
spark.task.resource.{resourceName}.amount 1 为每个任务分配的特定资源类型的数量,请注意这可以是一个双精度数。如果指定此项,您还必须提供执行器配置 spark.executor.resource.{resourceName}.amount 和任何相应的发现配置,以便您的执行器以该资源类型创建。除了整个数量外,还可以指定分数(例如,0.25,表示 1/4 个资源)。分数必须小于或等于 0.5,换句话说,资源共享的最小量是每个资源 2 个任务。此外,分数会向下取整以分配资源槽位(例如,0.2222 的配置,或 1/0.2222 的槽位将变成 4 个任务/资源,而不是 5 个)。 3.0.0
spark.task.maxFailures 4 在放弃作业之前,任何特定任务的连续失败次数。不同任务之间的总失败不会导致作业失败;必须特定任务在不同尝试中连续失败。如果任何尝试成功,则该任务的失败计数将重置。应大于或等于 1。允许的重试次数 = 此值 - 1。 0.8.0
spark.task.reaper.enabled false 启用对被杀死/中断任务的监控。当设置为 true 时,任何被杀死的任务将在执行器中监控,直到该任务实际完成执行。有关此监控的确切行为的详细信息,请参见其他 spark.task.reaper.* 配置。当设置为 false(默认值)时,任务杀死将使用缺乏此类监控的旧代码路径。 2.0.3
spark.task.reaper.pollingInterval 10s spark.task.reaper.enabled = true 时,此设置控制执行器轮询被杀死任务状态的频率。如果在轮询时被杀死的任务仍在运行,则将记录警告,默认情况下,任务的线程转储将被记录(此线程转储可通过 spark.task.reaper.threadDump 设置禁用,详见下文)。 2.0.3
spark.task.reaper.threadDump true spark.task.reaper.enabled = true 时,此设置控制是否在轮询被杀死的任务期间记录任务线程转储。将其设置为 false 以禁用线程转储的收集。 2.0.3
spark.task.reaper.killTimeout -1 spark.task.reaper.enabled = true 时,此设置指定了一个超时,在此超时后,如果被杀死的任务仍未停止运行,则执行器 JVM 将自行终止。默认值 -1 禁用此机制,并防止执行器自我销毁。此设置的目的是作为一个安全网,防止无法取消的任务使执行器无法使用。 2.0.3
spark.stage.maxConsecutiveAttempts 4 在阶段被中止之前允许的连续阶段尝试次数。 2.2.0
spark.stage.ignoreDecommissionFetchFailure false 是否忽略因执行器退役而导致的阶段获取失败,当计数 spark.stage.maxConsecutiveAttempts 3.4.0

屏障执行模式

属性名称 默认值 含义 自版本
spark.barrier.sync.timeout 365d 每次来自障碍任务的 barrier() 调用的超时时间(以秒为单位)。如果协调者在配置的时间内未接收到来自障碍任务的所有同步消息,则抛出 SparkException 以使所有任务失败。默认值设置为 31536000(3600 * 24 * 365),因此 barrier() 调用将等待一年。 2.4.0
spark.scheduler.barrier.maxConcurrentTasksCheck.interval 15s 在最大并发任务检查失败和下一个检查之间等待的时间(以秒为单位)。最大并发任务检查确保集群可以启动比作业提交的障碍阶段所需的更多并发任务。如果集群刚刚启动且没有足够的执行程序注册,则检查可能会失败,因此我们稍等一会儿并尝试再次进行检查。如果检查失败次数超过为作业配置的最大失败次数,则当前作业提交将失败。请注意,此配置仅适用于包含一个或多个障碍阶段的作业,我们不会对非障碍作业执行检查。 2.4.0
spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures 40 在提交作业之前允许的最大并发任务检查失败次数。最大并发任务检查确保集群可以启动比作业提交的障碍阶段所需的更多并发任务。如果集群刚刚启动且没有足够的执行程序注册,则检查可能会失败,因此我们稍等一会儿并尝试再次进行检查。如果检查失败次数超过为作业配置的最大失败次数,则当前作业提交将失败。请注意,此配置仅适用于包含一个或多个障碍阶段的作业,我们不会对非障碍作业执行检查。 2.4.0

动态分配

属性名称 默认值 含义 自版本以来
spark.dynamicAllocation.enabled false 是否使用动态资源分配,该分配根据工作负载上下调登记的执行者数量。 更多详细信息,请参见描述 这里

这要求满足以下一个条件: 1) 通过 spark.shuffle.service.enabled 启用外部洗牌服务,或 2) 通过 spark.dynamicAllocation.shuffleTracking.enabled 启用洗牌跟踪,或 3) 通过 spark.decommission.enabled spark.storage.decommission.shuffleBlocks.enabled 启用洗牌块退役,或 4) (实验性) 配置 spark.shuffle.sort.io.plugin.class 以使用自定义 ShuffleDataIO ,其 ShuffleDriverComponents 支持可靠存储。 以下配置也相关: spark.dynamicAllocation.minExecutors , spark.dynamicAllocation.maxExecutors , 和 spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.executorAllocationRatio
1.2.0
spark.dynamicAllocation.executorIdleTimeout 60s 如果启用动态分配且一个执行者在此持续时间内保持空闲, 则该执行者将被移除。更多详细信息,请参见 描述 1.2.0
spark.dynamicAllocation.cachedExecutorIdleTimeout infinity 如果启用动态分配且一个包含缓存数据块的执行者在此持续时间内保持空闲, 则该执行者将被移除。更多详细信息,请参见 描述 1.4.0
spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors 如果启用动态分配,则运行的初始执行者数量。

如果设置了 `--num-executors`(或 `spark.executor.instances`)并且大于此值,则将 它用作初始执行者数量。
1.3.0
spark.dynamicAllocation.maxExecutors infinity 如果启用动态分配,则执行者数量的上限。 1.2.0
spark.dynamicAllocation.minExecutors 0 如果启用动态分配,则执行者数量的下限。 1.2.0
spark.dynamicAllocation.executorAllocationRatio 1 默认情况下,动态分配会请求足够的执行者,以根据要处理的任务数量最大化 并行性。虽然这可以最小化作业的延迟,但对于小任务,这些设置可能会因 执行者分配的开销而浪费大量资源,因为某些执行者可能根本不执行任何工作。 此设置允许设置一个比例,用于减少执行者数量相对于完整并行的数量。 默认值为 1.0,以实现最大并行性。 0.5 会将目标执行者数量除以 2。 动态分配计算的目标执行者数量仍然可以被 spark.dynamicAllocation.minExecutors spark.dynamicAllocation.maxExecutors 设置覆盖。 2.4.0
spark.dynamicAllocation.schedulerBacklogTimeout 1s 如果启用动态分配并且有待处理的任务在此持续时间内积压,则将请求新的执行者。 更多详细信息,请参见此 描述 1.2.0
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout spark.dynamicAllocation.schedulerBacklogTimeout 相同,但仅用于 后续执行者请求。更多详细信息,请参见此 描述 1.2.0
spark.dynamicAllocation.shuffleTracking.enabled true 为执行者启用洗牌文件跟踪,这允许动态分配 而无需外部洗牌服务。此选项将尝试保持活跃的执行者 来存储活动作业的洗牌数据。 3.0.0
spark.dynamicAllocation.shuffleTracking.timeout infinity 当启用洗牌跟踪时,控制持有洗牌数据的执行者的超时时间。 默认值表示 Spark 将依赖于洗牌的垃圾回收,以释放执行者。 如果由于某种原因,垃圾回收没有快速清理洗牌, 则可以使用此选项控制在执行者即使存储洗牌数据时的超时。 3.0.0

线程配置

根据作业和集群配置,我们可以在Spark的多个地方设置线程数,以有效利用可用资源,从而获得更好的性能。在Spark 3.0之前,这些线程配置适用于Spark的所有角色,例如驱动程序、执行程序、工作节点和主节点。从Spark 3.0开始,我们可以在更细粒度的层面配置线程,从驱动程序和执行程序开始。以下表格以RPC模块为例。对于其他模块,如shuffle,只需在属性名称中将“rpc”替换为“shuffle”,除了 spark.{driver|executor}.rpc.netty.dispatcher.numThreads ,该属性仅适用于RPC模块。

属性名称 默认值 含义 自版本
spark.{driver|executor}.rpc.io.serverThreads 退回到 spark.rpc.io.serverThreads 服务器线程池中使用的线程数 1.6.0
spark.{driver|executor}.rpc.io.clientThreads 退回到 spark.rpc.io.clientThreads 客户端线程池中使用的线程数 1.6.0
spark.{driver|executor}.rpc.netty.dispatcher.numThreads 退回到 spark.rpc.netty.dispatcher.numThreads RPC消息调度程序线程池中使用的线程数 3.0.0

线程相关配置键的默认值是请求的驱动程序或执行程序的核心数量与可用的JVM核心数量(硬编码的上限为8)中的最小值,或者在没有该值的情况下。

Spark Connect

服务器配置

服务器配置在 Spark Connect 服务器中设置,例如,当您使用 ./sbin/start-connect-server.sh 启动 Spark Connect 服务器时。它们通常通过配置文件和命令行选项设置,使用 --conf/-c

属性名称 默认值 含义 自版本
spark.connect.grpc.binding.port 15002 Spark Connect 服务器绑定的端口。 3.4.0
spark.connect.grpc.interceptor.classes (无) 必须实现 io.grpc.ServerInterceptor 接口的类名的逗号分隔列表 3.4.0
spark.connect.grpc.arrow.maxBatchSize 4m 使用 Apache Arrow 时,限制可从服务器端发送到客户端的单个箭头批次的最大大小。当前,我们保守地使用其 70% 因为大小并不准确,而是估计的。 3.4.0
spark.connect.grpc.maxInboundMessageSize 134217728 设置 gRPC 请求的最大入站消息大小。负载更大的请求将失败。 3.4.0
spark.connect.extensions.relation.classes (无) 实现特性 org.apache.spark.sql.connect.plugin.RelationPlugin 的类的逗号分隔列表,以支持 proto 中的自定义关系类型。 3.4.0
spark.connect.extensions.expression.classes (无) 实现特性 org.apache.spark.sql.connect.plugin.ExpressionPlugin 的类的逗号分隔列表,以支持 proto 中的自定义表达式类型。 3.4.0
spark.connect.extensions.command.classes (无) 实现特性 org.apache.spark.sql.connect.plugin.CommandPlugin 的类的逗号分隔列表,以支持 proto 中的自定义命令类型。 3.4.0

安全

请查看 安全 页面以获取有关如何保护不同Spark子系统的可用选项。

Spark SQL

运行时 SQL 配置

运行时 SQL 配置是每个会话的可变 Spark SQL 配置。可以通过配置文件和以 --conf/-c 开头的命令行选项设置初始值,或者通过设置用于创建 SparkSession SparkConf 。此外, 可以通过 SET 命令设置和查询它们,并通过 RESET 命令将它们重置为初始值,或通过运行时的 SparkSession.conf 的 setter 和 getter 方法。

属性名称 默认值 含义 自版本
spark.sql.adaptive.advisoryPartitionSizeInBytes ( spark.sql.adaptive.shuffle.targetPostShuffleInputSize 的值)

自适应优化过程中shuffle分区的建议字节大小(当spark.sql.adaptive.enabled为true时)。当Spark合并小的shuffle分区或拆分偏斜的shuffle分区时生效。

3.0.0
spark.sql.adaptive.autoBroadcastJoinThreshold (无)

配置在执行连接时将广播到所有工作节点的表的最大字节大小。通过将此值设置为-1,可以禁用广播。默认值与spark.sql.autoBroadcastJoinThreshold相同。请注意,此配置仅在自适应框架中使用。

3.2.0
spark.sql.adaptive.coalescePartitions.enabled true

当为真且'spark.sql.adaptive.enabled'为真时,Spark将根据目标大小(由'spark.sql.adaptive.advisoryPartitionSizeInBytes'指定)合并连续的shuffle分区,以避免过多的小任务。

3.0.0
spark.sql.adaptive.coalescePartitions.initialPartitionNum (无)

合并前的shuffle分区初始数量。如果未设置,则等于spark.sql.shuffle.partitions。仅在'spark.sql.adaptive.enabled'和'spark.sql.adaptive.coalescePartitions.enabled'均为真时,此配置才生效。

3.0.0
spark.sql.adaptive.coalescePartitions.minPartitionSize 1MB

合并后shuffle分区的最小大小。当自适应计算的目标大小在分区合并时过小时,这非常有用。

3.2.0
spark.sql.adaptive.coalescePartitions.parallelismFirst true

当为真时,Spark在合并连续的shuffle分区时不遵循由'spark.sql.adaptive.advisoryPartitionSizeInBytes'(默认为64MB)指定的目标大小,而是根据Spark集群的默认并行度自适应计算目标大小。计算出的大小通常小于配置的目标大小。这是为了最大化并行度,避免在启用自适应查询执行时性能退化。建议将此配置设置为false,并遵循配置的目标大小。

3.2.0
spark.sql.adaptive.customCostEvaluatorClass (无)

用于自适应执行的自定义成本评估器类。如果未设置,Spark将默认使用其自有的SimpleCostEvaluator。

3.2.0
spark.sql.adaptive.enabled true

当为真时,启用自适应查询执行,根据准确的运行时统计信息在查询执行中重新优化查询计划。

1.6.0
spark.sql.adaptive.forceOptimizeSkewedJoin false

当为真时,即使引入额外的shuffle,也强制启用OptimizeSkewedJoin。

3.3.0
spark.sql.adaptive.localShuffleReader.enabled true

当为真且'spark.sql.adaptive.enabled'为真时,Spark尝试使用本地shuffle读取器来读取shuffle数据,当不需要shuffle分区时,例如,在将排序合并连接转换为广播哈希连接后。

3.0.0
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 0b

配置允许构建本地哈希映射的每个分区的最大字节大小。如果该值不小于spark.sql.adaptive.advisoryPartitionSizeInBytes且所有分区大小不大于此配置,则连接选择更倾向于使用shuffle哈希连接而不是排序合并连接,而不考虑spark.sql.join.preferSortMergeJoin的值。

3.2.0
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled true

当为真且'spark.sql.adaptive.enabled'为真时,Spark将在RebalancePartitions中优化偏斜的shuffle分区,并根据目标大小(由'spark.sql.adaptive.advisoryPartitionSizeInBytes'指定)将其拆分为更小的分区,以避免数据偏斜。

3.2.0
spark.sql.adaptive.optimizer.excludedRules (无)

配置将在自适应优化器中禁用的规则列表,这些规则由其规则名称指定并用逗号分隔。优化器将记录确实被排除的规则。

3.1.0
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 0.2

如果分区的大小小于该因子乘以spark.sql.adaptive.advisoryPartitionSizeInBytes,则在拆分过程中将合并一个分区。

3.3.0
spark.sql.adaptive.skewJoin.enabled true

当为真且'spark.sql.adaptive.enabled'为真时,Spark通过拆分(并在需要时复制)偏斜的分区来动态处理shuffle连接中的偏斜(排序合并和shuffle哈希)。

3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5.0

如果分区的大小大于该因子乘以中位数分区大小,并且也大于'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes',则将该分区视为偏斜。

3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB

如果分区的大小(以字节为单位)大于此阈值,并且也大于'spark.sql.adaptive.skewJoin.skewedPartitionFactor'乘以中位数分区大小,则将该分区视为偏斜。理想情况下,此配置应该设置为大于'spark.sql.adaptive.advisoryPartitionSizeInBytes'。

3.0.0
spark.sql.allowNamedFunctionArguments true

如果为true,Spark将为所有已实现命名参数的函数开启命名参数的支持。

3.5.0
spark.sql.ansi.doubleQuotedIdentifiers false

当为真且'spark.sql.ansi.enabled'为真时,Spark SQL将双引号(")括起来的字面量读作标识符。当为假时,它们被读作字符串字面量。

3.4.0
spark.sql.ansi.enabled false

当为真时,Spark SQL使用符合ANSI标准的方言,而不是Hive兼容。例如,当SQL运算符/函数的输入无效时,Spark将在运行时抛出异常,而不是返回空结果。有关该方言的完整详细信息,请参见Spark文档的“ANSI合规性”部分。某些ANSI方言特性可能不是直接来自ANSI SQL标准,但它们的行为与ANSI SQL风格保持一致。

3.0.0
spark.sql.ansi.enforceReservedKeywords false

当为真且'spark.sql.ansi.enabled'为真时,Spark SQL解析器强制执行ANSI保留关键字,并禁止使用保留关键字作为别名和/或表、视图、函数等的标识符的SQL查询。

3.3.0
spark.sql.ansi.relationPrecedence false

当为真且'spark.sql.ansi.enabled'为真时,在组合关系时JOIN优先于逗号。例如, t1, t2 JOIN t3 的结果应该是 t1 X (t2 X t3) 。如果该配置为假,结果是 (t1 X t2) X t3

3.4.0
spark.sql.autoBroadcastJoinThreshold 10MB

配置在执行连接时将广播到所有工作节点的表的最大字节大小。通过将此值设置为-1,可以禁用广播。请注意,目前仅支持对Hive Metastore表进行统计,其中已运行命令 ANALYZE TABLE COMPUTE STATISTICS noscan ,以及在数据文件上直接计算统计信息的基于文件的数据源表。

1.1.0
spark.sql.avro.compression.codec snappy

写入AVRO文件时使用的压缩编解码器。支持的编解码器:uncompressed、deflate、snappy、bzip2、xz和zstandard。默认编解码器为snappy。

2.4.0
spark.sql.avro.deflate.level -1

在写入AVRO文件时,使用deflate编解码器的压缩级别。有效值必须在1到9(包含)或-1的范围内。默认值为-1,对应于当前实现中的6级。

2.4.0
spark.sql.avro.filterPushdown.enabled true

当为真时,启用对Avro数据源的过滤器下推。

3.1.0
spark.sql.broadcastTimeout 300

在广播连接中广播等待时间的超时时间(以秒为单位)。

1.3.0
spark.sql.bucketing.coalesceBucketsInJoin.enabled false

当为真时,如果两个具有不同桶数的桶化表被连接,具有更多桶的侧将合并为与另一侧相同的桶数。更多的桶数可以被较小的桶数整除。桶合并适用于排序合并连接和shuffle哈希连接。注意:合并桶化表可以避免连接中的不必要shuffle,但也会降低并行性,并可能导致shuffle哈希连接的OOM。

3.1.0
spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio 4

两个被合并的桶的数量比应小于等于此值,才能应用桶合并。仅在'spark.sql.bucketing.coalesceBucketsInJoin.enabled'设置为true时,此配置才有效。

3.1.0
spark.sql.catalog.spark_catalog (无)

将用作Spark内置v1目录的v2接口的目录实现:spark_catalog。该目录与spark_catalog共享其标识符命名空间,必须与之保持一致;例如,如果表可以由spark_catalog加载,则该目录也必须返回表的元数据。为了将操作委托给spark_catalog,实施可以扩展'CatalogExtension'。

3.0.0
spark.sql.cbo.enabled false

启用CBO以估算计划统计信息时设置为真。

2.2.0
spark.sql.cbo.joinReorder.dp.star.filter false

将星连接过滤器启发式应用于基于成本的连接枚举。

2.2.0
spark.sql.cbo.joinReorder.dp.threshold 12

动态规划算法中允许的连接节点的最大数量。

2.2.0
spark.sql.cbo.joinReorder.enabled false

启用CBO中的连接重排序。

2.2.0
spark.sql.cbo.planStats.enabled false

当为真时,逻辑计划将从目录获取行数和列统计信息。

3.0.0
spark.sql.cbo.starSchemaDetection false

当为真时,基于星型模式检测启用连接重排。

2.2.0
spark.sql.charAsVarchar false

当为真时,Spark在CREATE/REPLACE/ALTER TABLE命令中将CHAR类型替换为VARCHAR类型,因此新创建/更新的表将不再具有CHAR类型的列/字段。现有的CHAR类型列/字段的表不受此配置的影响。

3.3.0
spark.sql.chunkBase64String.enabled true

是否截断由 Base64 函数生成的字符串。当为真时,由base64函数生成的base64字符串被拆分为最多76个字符的行。当为假时,base64字符串不被拆分。

3.5.2
spark.sql.cli.print.header false

当设置为true时,spark-sql CLI在查询输出中打印列的名称。

3.2.0
spark.sql.columnNameOfCorruptRecord _corrupt_record

存储未解析的原始JSON和CSV记录的内部列名称,这些记录解析失败。

1.2.0
spark.sql.csv.filterPushdown.enabled true

当为真时,启用对CSV数据源的过滤器下推。

3.0.0
spark.sql.datetime.java8API.enabled false

如果配置属性设置为true,则使用Java 8 API的java.time.Instant和java.time.LocalDate类作为Catalyst的TimestampType和DateType的外部类型。如果设置为false,则使用java.sql.Timestamp和java.sql.Date作为相同的目的。

3.0.0
spark.sql.debug.maxToStringFields 25

在调试输出中,可以转换为字符串的序列类型条目的最大字段数量。超出限制的任何元素将被丢弃,并替换为"... N more fields"占位符。

3.0.0
spark.sql.defaultCatalog spark_catalog

默认目录的名称。如果用户尚未显式设置当前目录,则这将是当前目录。

3.0.0
spark.sql.error.messageFormat PRETTY

当为PRETTY时,错误信息由错误类、消息和查询上下文的文本表示组成。MINIMAL和STANDARD格式是漂亮的JSON格式,其中STANDARD包含一个附加的JSON字段 message 。此配置属性影响在运行查询时,Thrift Server和SQL CLI的错误消息。

3.4.0
spark.sql.execution.arrow.enabled false

(自Spark 3.0已弃用,请设置'spark.sql.execution.arrow.pyspark.enabled'。)

2.3.0
spark.sql.execution.arrow.fallback.enabled true

(自Spark 3.0已弃用,请设置'spark.sql.execution.arrow.pyspark.fallback.enabled'。)

2.4.0
spark.sql.execution.arrow.localRelationThreshold 48MB

在将Arrow批转换为Spark DataFrame时,如果Arrow批的字节大小小于此阈值,则在驱动程序端使用本地集合。否则,Arrow批将被发送到执行器并反序列化为Spark内部行。

3.4.0
spark.sql.execution.arrow.maxRecordsPerBatch 10000

在使用Apache Arrow时,限制可以写入单个ArrowRecordBatch中的记录的最大数量。此配置对诸如DataFrame(.cogroup).groupby.applyInPandas之类的分组API无效,因为每个组都会变成每个ArrowRecordBatch。如果设置为零或负数,则没有限制。

2.3.0
spark.sql.execution.arrow.pyspark.enabled ( spark.sql.execution.arrow.enabled 的值)

当为真时,利用Apache Arrow在PySpark中进行列式数据传输。此优化适用于:1. pyspark.sql.DataFrame.toPandas。2. pyspark.sql.SparkSession.createDataFrame当其输入为Pandas DataFrame或NumPy ndarray时。以下数据类型不受支持:TimestampType的ArrayType。

3.0.0
spark.sql.execution.arrow.pyspark.fallback.enabled ( spark.sql.execution.arrow.fallback.enabled 的值)

当为真时,由'spark.sql.execution.arrow.pyspark.enabled'启用的优化将在发生错误时自动回退到未优化的实现。

3.0.0
spark.sql.execution.arrow.pyspark.selfDestruct.enabled false

(实验)当为真时,利用Apache Arrow的自毁和分块选项在PySpark中进行列式数据传输,在从Arrow转换为Pandas时。这会减少内存使用,但会消耗一些CPU时间。此优化适用于:在设置'spark.sql.execution.arrow.pyspark.enabled'时,pyspark.sql.DataFrame.toPandas。

3.2.0
spark.sql.execution.arrow.sparkr.enabled false

当为真时,利用Apache Arrow在SparkR中进行列式数据传输。此优化适用于:1. createDataFrame当其输入为R DataFrame时 2. collect 3. dapply 4. gapply 以下数据类型不受支持:FloatType、BinaryType、ArrayType、StructType和MapType。

3.0.0
spark.sql.execution.pandas.structHandlingMode legacy

创建pandas DataFrame时结构类型的转换模式。当为"legacy"时,1. 当禁用Arrow优化时,转换为Row对象;2. 当启用Arrow优化时,转换为字典或在嵌套字段名称重复时引发异常。当为"row"时,无论Arrow优化如何,总是转换为Row对象。当为"dict"时,转换为字典并使用后缀的键名,例如,a_0、a_1,如果存在重复的嵌套字段名称,则不考虑Arrow优化。

3.5.0
spark.sql.execution.pandas.udf.buffer.size ( spark.buffer.size 的值)

spark.buffer.size 相同,但仅适用于Pandas UDF执行。如果未设置,则后备为 spark.buffer.size 。请注意,Pandas执行需要超过4个字节。降低此值可能会使小Pandas UDF批次迭代并管道化;但是,它可能会降低性能。请参见SPARK-27870。

3.0.0
spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled true

当为真时,Python UDF的回溯被简化。它在回溯中隐藏了Python工作线程、(反)序列化等,仅显示UDF的异常消息。请注意,这仅与CPython 3.7及更高版本兼容。

3.1.0
spark.sql.execution.pythonUDF.arrow.enabled false

在常规Python UDF中启用Arrow优化。此优化仅在给定函数至少需要一个参数时才能启用。

3.4.0
spark.sql.execution.pythonUDTF.arrow.enabled false

启用Python UDTF的Arrow优化。

3.5.0
spark.sql.execution.topKSortFallbackThreshold 2147483632

在带有LIMIT的SQL查询中,例如'SELECT x FROM t ORDER BY y LIMIT m',如果m在此阈值下,则在内存中进行K最大排序;否则进行全局排序,如有必要可溢出到磁盘。

2.4.0
spark.sql.files.ignoreCorruptFiles false

是否忽略损坏的文件。如果为真,当遇到损坏的文件时,Spark作业将继续运行,并且已读取的内容仍然会返回。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。

2.1.1
spark.sql.files.ignoreMissingFiles false

是否忽略丢失的文件。如果为真,当遇到丢失的文件时,Spark作业将继续运行,并且已读取的内容仍然会返回。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。

2.3.0
spark.sql.files.maxPartitionBytes 128MB

读取文件时打包到单个分区中的最大字节数。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。

2.0.0
spark.sql.files.maxPartitionNum (无)

建议的(不保证)最大分片文件分区数量。如果设置,Spark将重新调整每个分区的大小,以使分区数量接近此值(如果初始分区数量超过该值)。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。

3.5.0
spark.sql.files.maxRecordsPerFile 0

写入单个文件的最大记录数。如果该值为零或负数,则没有限制。

2.2.0
spark.sql.files.minPartitionNum (无)

建议的(不保证)最小划分文件分区数量。如果未设置,则默认值为 spark.sql.leafNodeDefaultParallelism 。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。

3.1.0
spark.sql.function.concatBinaryAsString false

当此选项设置为false且所有输入均为二进制时, functions.concat 返回的输出为二进制。否则,它返回字符串。

2.3.0
spark.sql.function.eltOutputAsString false

当此选项设置为false且所有输入均为二进制时, elt 返回的输出为二进制。否则,它返回字符串。

2.3.0
spark.sql.groupByAliases true

当为真时,选择列表中的别名可以在group by子句中使用。当为假时,如果出现这种情况,将引发分析异常。

2.2.0
spark.sql.groupByOrdinal true

当为真时,group by子句中的序数被视为选择列表中的位置。当为假时,将忽略序数。

2.0.0
spark.sql.hive.convertInsertingPartitionedTable true

当设置为true,并且 spark.sql.hive.convertMetastoreParquet spark.sql.hive.convertMetastoreOrc 为真时,使用内置ORC/Parquet写入器处理插入到使用Hive SQL语法创建的分区ORC/Parquet表的操作。

3.0.0
spark.sql.hive.convertMetastoreCtas true

当设置为true时,Spark将尝试在CTAS中使用内置数据源写入器,而不是Hive serde。此标志仅在 spark.sql.hive.convertMetastoreParquet spark.sql.hive.convertMetastoreOrc 分别为Parquet和ORC格式启用时有效。

3.0.0
spark.sql.hive.convertMetastoreInsertDir true

当设置为true时,Spark将尝试在INSERT OVERWRITE DIRECTORY中使用内置数据源写入器,而不是Hive serde。此标志仅在 spark.sql.hive.convertMetastoreParquet spark.sql.hive.convertMetastoreOrc 分别为Parquet和ORC格式启用时有效。

3.3.0
spark.sql.hive.convertMetastoreOrc true

当设置为true时,使用内置ORC读写器处理使用HiveQL语法创建的ORC表,而不是Hive serde。

2.0.0
spark.sql.hive.convertMetastoreParquet true

当设置为true时,使用内置Parquet读写器处理使用HiveQL语法创建的Parquet表,而不是Hive serde。

1.1.1
spark.sql.hive.convertMetastoreParquet.mergeSchema false

当为真时,也尝试合并不同但兼容的Parquet模式。在“spark.sql.hive.convertMetastoreParquet”设置为true时,此配置才有效。

1.3.1
spark.sql.hive.dropPartitionByName.enabled false

当为真时,Spark将获取分区名称而不是分区对象以删除分区,这可以提高删除分区的性能。

3.4.0
spark.sql.hive.filesourcePartitionFileCacheSize 262144000

当非零时,在内存中启用分区文件元数据的缓存。所有表共享一个缓存,可以使用指定的字节数用于文件元数据。此配置仅在启用Hive文件源分区管理时生效。

2.1.1
spark.sql.hive.manageFilesourcePartitions true

当为真时,也为文件源表启用元存储分区管理。这包括数据源和转换的Hive表。当启用分区管理时,数据源表将把分区存储在Hive元存储中,并在查询计划期间使用元存储修剪分区,当'spark.sql.hive.metastorePartitionPruning'设置为true时。

2.1.1
spark.sql.hive.metastorePartitionPruning true

当为真时,一些谓词将被推送到Hive元存储中,以便未匹配的分区可以更早被消除。

1.5.0
spark.sql.hive.metastorePartitionPruningFallbackOnException false

在遇到来自元存储的MetaException时,是否回退以从Hive元存储获取所有分区并在Spark客户端侧执行分区修剪。请注意,如果启用此项且有很多分区待列出,Spark查询性能可能降低。如果禁用此配置,Spark将使查询失败。

3.3.0
spark.sql.hive.metastorePartitionPruningFastFallback false

当启用此配置时,如果谓词不被Hive支持或由于遇到来自元存储的MetaException而导致Spark回退,Spark将首先通过获取分区名称然后在客户端上评估过滤表达式来修剪分区。请注意,不支持具有TimeZoneAwareExpression的谓词。

3.3.0
spark.sql.hive.thriftServer.async true

当设置为true时,Hive Thrift服务器以异步方式执行SQL查询。

1.5.0
spark.sql.hive.verifyPartitionPath false

当为真时,在读取存储在HDFS中的数据时,会检查表的根目录下的所有分区路径。此配置将在未来版本中弃用,并替换为spark.files.ignoreMissingFiles。

1.4.0
spark.sql.inMemoryColumnarStorage.batchSize 10000

控制列式缓存的批次大小。更大的批大小可以提高内存利用率和压缩,但在缓存数据时可能面临OOM风险。

1.1.1
spark.sql.inMemoryColumnarStorage.compressed true

当设置为true时,Spark SQL将根据数据的统计信息自动选择每列的压缩编解码器。

1.0.1
spark.sql.inMemoryColumnarStorage.enableVectorizedReader true

启用列式缓存的向量化读取器。

2.3.1
spark.sql.json.filterPushdown.enabled true

当为真时,启用对JSON数据源的过滤器下推。

3.1.0
spark.sql.jsonGenerator.ignoreNullFields true

在JSON数据源和JSON函数(如to_json)中生成JSON对象时,忽略null字段的设置。如果为假,则在JSON对象中为null字段生成null。

3.0.0
spark.sql.leafNodeDefaultParallelism (无)

生成数据的Spark SQL叶子节点的默认并行度,如文件扫描节点、本地数据扫描节点、范围节点等。此配置的默认值为'SparkContext#defaultParallelism'。

3.2.0
spark.sql.mapKeyDedupPolicy EXCEPTION

在内置函数:CreateMap、MapFromArrays、MapFromEntries、StringToMap、MapConcat和TransformKeys中去重映射键的策略。当为EXCEPTION时,如果检测到重复的映射键,查询将失败。当为LAST_WIN时,最后插入的映射键优先。

3.0.0
spark.sql.maven.additionalRemoteRepositories https://maven-central.storage-download.googleapis.com/maven2/

可选的额外远程Maven镜像仓库的逗号分隔字符串配置。这仅在IsolatedClientLoader中的Hive jars 下载时使用,如果默认的Maven Central repo无法访问。

3.0.0
spark.sql.maxMetadataStringLength 100

元数据字符串的最大字符数。例如,在 DataSourceScanExec 中的文件位置,如果超过长度,则每个值将被缩写。

3.1.0
spark.sql.maxPlanStringLength 2147483632

计划字符串的最大字符数。如果计划过长,进一步输出将被截断。默认设置总是生成完整的计划。如果设置为较低的值,例如8k,计划字符串可能会占用过多内存或在驱动程序或UI进程中导致OutOfMemory错误。

3.0.0
spark.sql.maxSinglePartitionBytes 9223372036854775807b

单个分区允许的最大字节数。否则,规划者会引入shuffle以提高并行性。

3.4.0
spark.sql.optimizer.collapseProjectAlwaysInline false

是否始终合并两个相邻的投影并内联表达式,即使导致额外的重复。

3.3.0
spark.sql.optimizer.dynamicPartitionPruning.enabled true

当为真时,当将分区列用作连接键时,我们将为其生成谓词。

3.0.0
spark.sql.optimizer.enableCsvExpressionOptimization true

是否在SQL优化器中优化CSV表达式。这包括从from_csv中修剪不必要的列。

3.2.0
spark.sql.optimizer.enableJsonExpressionOptimization true

是否在SQL优化器中优化JSON表达式。这包括从from_json中修剪不必要的列,简化from_json + to_json,to_json + named_struct(from_json.col1, from_json.col2, ....)。

3.1.0
spark.sql.optimizer.excludedRules (无)

配置将在优化器中禁用的规则列表,这些规则由其规则名称指定并用逗号分隔。无法保证所有在此配置中的规则最终都会被排除,因为某些规则对于正确性是必要的。优化器将记录确实被排除的规则。

2.4.0
spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold 10GB

Bloom过滤器应用侧计划的聚合扫描大小的字节阈值。Bloom过滤器应用侧的聚合扫描字节大小需要超过此值才能注入Bloom过滤器。

3.3.0
spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold 10MB

用于Bloom过滤器创建侧计划的大小阈值。估计大小必须小于此值,以尝试注入Bloom过滤器。

3.3.0
spark.sql.optimizer.runtime.bloomFilter.enabled true

当为真且shuffle连接的一侧存在选择性谓词时,我们尝试在另一侧插入Bloom过滤器,以减少shuffle数据的量。

3.3.0
spark.sql.optimizer.runtime.bloomFilter.expectedNumItems 1000000

运行时Bloom过滤器的默认预期项数量。

3.3.0
spark.sql.optimizer.runtime.bloomFilter.maxNumBits 67108864

用于运行时Bloom过滤器的最大位数。

3.3.0
spark.sql.optimizer.runtime.bloomFilter.maxNumItems 4000000

运行时Bloom过滤器的最大预期项数量。

3.3.0
spark.sql.optimizer.runtime.bloomFilter.numBits 8388608

运行时Bloom过滤器的默认位数。

3.3.0
spark.sql.optimizer.runtime.rowLevelOperationGroupFilter.enabled true

启用基于组的行级操作的运行时组过滤。替换数据组(例如,文件、分区)的数据源在计划行级操作扫描时可以使用提供的数据源过滤器修剪整个组。但是,这种过滤是有限的,因为并非所有表达式都可以转换为数据源过滤器,并且某些表达式只能由Spark进行评估(例如子查询)。由于重写组是昂贵的,因此Spark可以在运行时执行查询,以确定哪些记录满足行级操作条件。匹配记录的信息将传递回行级操作扫描,允许数据源丢弃无需重写的组。

3.4.0
spark.sql.optimizer.runtimeFilter.number.threshold 10

针对单个查询的注入运行时过滤器(非DPP)的总数量。这是为了防止由于过多的Bloom过滤器而导致的驱动程序内存溢出(OOM)。

3.3.0
spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled false

当为真且shuffle连接的一侧存在选择性谓词时,我们尝试在另一侧插入半连接,以减少shuffle数据的量。

3.3.0
spark.sql.orc.aggregatePushdown false

如果为真,将聚合推送到ORC以进行优化。支持MIN、MAX和COUNT作为聚合表达式。对于MIN/MAX,支持布尔型、整数型、浮点型和日期型。对于COUNT,支持所有数据类型。如果任何ORC文件页脚缺失统计信息,将抛出异常。

3.3.0
spark.sql.orc.columnarReaderBatchSize 4096

ORC向量化读取器批中包含的行数。此数字应仔细选择,以最大限度地减少开销并避免在读取数据时出现OOM。

2.4.0
spark.sql.orc.columnarWriterBatchSize 1024

ORC向量化写入器批中包含的行数。此数字应仔细选择,以最大限度地减少开销并避免在写入数据时出现OOM。

3.4.0
spark.sql.orc.compression.codec snappy

设置写入ORC文件时使用的压缩编解码器。如果在表特定选项/属性中指定了 compression orc.compress ,则优先级顺序为 compression orc.compress spark.sql.orc.compression.codec 。可接受的值包括:none、uncompressed、snappy、zlib、lzo、zstd、lz4。

2.3.0
spark.sql.orc.enableNestedColumnVectorizedReader true

启用嵌套列的向量化ORC解码。

3.2.0
spark.sql.orc.enableVectorizedReader true

启用向量化ORC解码。

2.3.0
spark.sql.orc.filterPushdown true

当为真时,启用对ORC文件的过滤器下推。

1.4.0
spark.sql.orc.mergeSchema false

当为真时,ORC数据源合并从所有数据文件收集的模式,否则模式将从随机数据文件中选择。

3.0.0
spark.sql.orderByOrdinal true

当为真时,序数被视为选择列表中的位置。当为假时,排序/排序子句中的序数被忽略。

2.0.0
spark.sql.parquet.aggregatePushdown false

如果为真,将聚合推送到Parquet以进行优化。支持MIN、MAX和COUNT作为聚合表达式。对于MIN/MAX,支持布尔型、整数型、浮点型和日期型。对于COUNT,支持所有数据类型。如果任何Parquet文件页脚缺失统计信息,将抛出异常。

3.3.0
spark.sql.parquet.binaryAsString false

一些其他生成Parquet的系统,特别是Impala和旧版本的Spark SQL,在写出Parquet模式时不区分二进制数据和字符串。此标志告诉Spark SQL将二进制数据解释为字符串,以提供与这些系统的兼容性。

1.1.1
spark.sql.parquet.columnarReaderBatchSize 4096

Parquet向量化读取器批中包含的行数。此数字应仔细选择,以最大限度地减少开销并避免在读取数据时出现OOM。

2.4.0
spark.sql.parquet.compression.codec snappy

设置写入Parquet文件时使用的压缩编解码器。如果在表特定选项/属性中指定了 compression parquet.compression ,则优先级顺序为 compression parquet.compression spark.sql.parquet.compression.codec 。 可接受的值包括:none、uncompressed、snappy、gzip、lzo、brotli、lz4、lz4raw、lz4_raw、zstd。

1.1.1
spark.sql.parquet.enableNestedColumnVectorizedReader true

启用嵌套列的向量化Parquet解码(例如,结构、列表、映射)。需要启用spark.sql.parquet.enableVectorizedReader。

3.3.0
spark.sql.parquet.enableVectorizedReader true

启用向量化Parquet解码。

2.0.0
spark.sql.parquet.fieldId.read.enabled false

字段ID是Parquet模式规范的本机字段。当启用时,如果所请求的Spark模式中存在字段ID,Parquet读取器将使用字段ID查找Parquet字段,而不是使用列名。

3.3.0
spark.sql.parquet.fieldId.read.ignoreMissing false

当Parquet文件没有任何字段ID但Spark读取模式使用字段ID进行读取时,如果启用此标志,我们将默默返回null,否则将引发错误。

3.3.0
spark.sql.parquet.fieldId.write.enabled true

字段ID是Parquet模式规范的本机字段。启用后,Parquet写入器将填充Spark模式中的字段ID元数据(如果存在)到Parquet模式中。

3.3.0
spark.sql.parquet.filterPushdown true

在设置为true时,启用Parquet的过滤器下推优化。

1.2.0
spark.sql.parquet.inferTimestampNTZ.enabled true

当启用时,带有注释isAdjustedToUTC = false的Parquet时间戳列在模式推断期间将被推断为TIMESTAMP_NTZ类型。否则,所有Parquet时间戳列都被推断为TIMESTAMP_LTZ类型。请注意,Spark在写入文件时将输出模式写入Parquet的页脚元数据,并在读取文件时利用它。因此,此配置仅影响未由Spark写入的Parquet文件的模式推断。

3.4.0
spark.sql.parquet.int96AsTimestamp true

一些生成Parquet的系统,特别是Impala,将时间戳存储为INT96。Spark也会将时间戳存储为INT96,因为我们需要避免纳秒字段的精度丢失。此标志告诉Spark SQL将INT96数据解释为时间戳,以提供与这些系统的兼容性。

1.3.0
spark.sql.parquet.int96TimestampConversion false

此项控制在转换为时间戳时是否应将时间戳调整应用于INT96数据,对于由Impala写入的数据。这是必要的,因为Impala以不同于Hive和Spark的时区偏移存储INT96数据。

2.3.0
spark.sql.parquet.mergeSchema false

当为真时,Parquet数据源合并从所有数据文件收集的模式;否则,从摘要文件或如果没有摘要文件可用,则从随机数据文件中选择模式。

1.5.0
spark.sql.parquet.outputTimestampType INT96

设置当Spark将数据写入Parquet文件时使用的Parquet时间戳类型。INT96是一种非标准但常用的Parquet时间戳类型。TIMESTAMP_MICROS是Parquet中的标准时间戳类型,它存储从Unix纪元开始的微秒数。TIMESTAMP_MILLIS也是标准的,但具有毫秒精度,这意味着Spark必须截断其时间戳值的微秒部分。

2.3.0
spark.sql.parquet.recordLevelFilter.enabled false

如果为真,允许使用下推过滤器启用Parquet的本机记录级过滤。此配置仅在启用'spark.sql.parquet.filterPushdown'时生效,并且未使用向量化读取器。您可以通过将'spark.sql.parquet.enableVectorizedReader'设置为false来确保未使用向量化读取器。

2.3.0
spark.sql.parquet.respectSummaryFiles false

当为真时,我们假设所有Parquet的部分文件与摘要文件一致,并在合并模式时将其忽略。否则,如果为假,这是默认值,我们将合并所有部分文件。此操作应视为仅供专家使用的选项,并且在确切了解其含义之前不应启用。

1.5.0
spark.sql.parquet.writeLegacyFormat false

如果为真,则以Spark 1.4及更早版本的方式写入数据。例如,小数将按Apache Parquet的固定长度字节数组格式写入,其他系统(如Apache Hive和Apache Impala)也使用此格式。如果为假,则将使用Parquet中的较新格式。例如,小数将按基于整数的格式写入。如果Parquet输出是打算与不支持此新格式的系统一起使用,请设置为true。

1.6.0
spark.sql.parser.quotedRegexColumnNames false

当为真时,SELECT语句中的带引号标识符(使用反引号)被解释为正则表达式。

2.3.0
spark.sql.pivotMaxValues 10000

当进行透视而未为透视列指定值时,这是将收集的最大(不同)值数量,而不出错。

1.6.0
spark.sql.pyspark.inferNestedDictAsStruct.enabled false

PySpark的SparkSession.createDataFrame默认将嵌套字典推断为映射。当设置为true时,将嵌套字典推断为结构。

3.3.0
spark.sql.pyspark.jvmStacktrace.enabled false

当为真时,它在用户面对的PySpark异常中显示JVM堆栈跟踪以及Python堆栈跟踪。默认情况下,它被禁用,以隐藏JVM堆栈跟踪,仅显示对Python友好的异常。请注意,这与日志级别设置无关。

3.0.0
spark.sql.pyspark.legacy.inferArrayTypeFromFirstElement.enabled false

PySpark的SparkSession.createDataFrame默认从数组中的所有值推断数组的元素类型。如果此配置设置为true,则恢复仅从第一个数组元素推断类型的传统行为。

3.4.0
spark.sql.readSideCharPadding true

当为真时,Spark在读取CHAR类型列/字段时应用字符串填充,除了写入侧的填充外。此配置默认为true,以更好地在外部表等情况下强制执行CHAR类型语义。

3.4.0
spark.sql.redaction.options.regex (?i)url

正则表达式,用于决定Spark SQL命令的选项映射中的哪些键包含敏感信息。与此正则表达式匹配的选项名称的值将在解释输出中被删除。这项删除将在spark.redaction.regex定义的全局删除配置之上应用。

2.2.2
spark.sql.redaction.string.regex ( spark.redaction.string.regex 的值)

正则表达式,用于决定Spark生成的字符串的哪些部分包含敏感信息。当此正则表达式与字符串部分匹配时,该字符串部分将被替换为虚拟值。当前用于删除SQL解释命令的输出。当此配置未设置时,将使用 spark.redaction.string.regex 中的值。

2.3.0
spark.sql.repl.eagerEval.enabled false

启用急切评估或不。为真时,仅在REPL支持急切评估时才会显示数据集的前K行。当前,PySpark和SparkR支持急切评估。在PySpark中,像Jupyter这样的笔记本将返回HTML表(由 repr_html 生成)。对于普通Python REPL,返回的输出格式类似于dataframe.show()。在SparkR中,返回的输出显示类似于R数据帧。

2.4.0
spark.sql.repl.eagerEval.maxNumRows 20

通过急切评估返回的最大行数。仅在设置 spark.sql.repl.eagerEval.enabled 为真时生效。此配置的有效范围是从0到(Int.MaxValue - 1),因此无效配置如负数和大于(Int.MaxValue - 1)的值将被规范化为0和(Int.MaxValue - 1)。

2.4.0
spark.sql.repl.eagerEval.truncate 20

通过急切评估返回的每个单元格的最大字符数。仅在设置 spark.sql.repl.eagerEval.enabled 为真时生效。

2.4.0
spark.sql.session.localRelationCacheThreshold 67108864

序列化后,驱动程序端缓存的本地关系大小(以字节为单位)的阈值。

3.5.0
spark.sql.session.timeZone (本地时区的值)

会话本地时区的ID,格式为区域基础的时区ID或时区偏移量。区域ID必须为'区域/城市'的形式,例如'America/Los_Angeles'。时区偏移量必须为'(+|-)HH','(+|-)HH:mm'或'(+|-)HH:mm:ss'的格式,例如'-08'、'+01:00'或'-13:33:33'。此外,'UTC'和'Z'也支持作为'+00:00'的别名。其他简短名称不推荐使用,因为它们可能会引起歧义。

2.2.0
spark.sql.shuffle.partitions 200

在为连接或聚合shuffle数据时使用的默认分区数。注意:对于结构化流,配置在从相同检查点位置查询重新启动之间无法更改。

1.1.0
spark.sql.shuffledHashJoinFactor 3

如果小侧的数据大小乘以此因子仍小于大侧,则可以选择shuffle哈希连接。

3.3.0
spark.sql.sources.bucketing.autoBucketedScan.enabled true

当为真时,根据查询计划自动决定是否对输入表执行桶扫描。如果查询没有运算符利用桶化(例如连接、分组等),或者如果这些运算符与表扫描之间存在交换运算符,则不要使用桶扫描。注意当'spark.sql.sources.bucketing.enabled'设置为false时,此配置将不生效。

3.1.0
spark.sql.sources.bucketing.enabled true

当为假时,我们将把桶化表视为普通表。

2.0.0
spark.sql.sources.bucketing.maxBuckets 100000

允许的最大桶数。

2.4.0
spark.sql.sources.default parquet

输入/输出中使用的默认数据源。

1.3.0
spark.sql.sources.parallelPartitionDiscovery.threshold 32

驱动程序端列出文件时允许的最大路径数。如果在分区发现期间检测到的路径数量超过此值,则尝试通过另一个Spark分布式作业来列出文件。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。

1.5.0
spark.sql.sources.partitionColumnTypeInference.enabled true

当为真时,自动推断分区列的数据类型。

1.5.0
spark.sql.sources.partitionOverwriteMode STATIC

在INSERT OVERWRITE到分区数据源表时,目前支持两种模式:静态和动态。在静态模式中,Spark在覆盖之前删除与INSERT语句中的分区规范匹配的所有分区(例如PARTITION(a=1,b))。在动态模式中,Spark不会提前删除分区,只覆盖在运行时写入数据的那些分区。默认情况下,我们使用静态模式,以保持与Spark 2.3之前的相同行为。请注意,此配置不影响Hive serde表,因为它们始终以动态模式覆盖。这也可以通过使用关键字partitionOverwriteMode(优先于此设置)设置为数据源的输出选项,例如dataframe.write.option("partitionOverwriteMode", "dynamic").save(path)。

2.3.0
spark.sql.sources.v2.bucketing.enabled false

类似于spark.sql.sources.bucketing.enabled,此配置用于为V2数据源启用桶化。当启用时,Spark将通过SupportsReportPartitioning识别V2数据源报告的特定分布,并根据需要避免shuffle。

3.3.0
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled false

在存储分区连接期间,是否允许输入分区部分聚类,当连接的两侧都是KeyGroupedPartitioning时。在计划期间,Spark将根据表统计信息选择数据较少的一侧,将其分组并复制以匹配另一侧。这是对偏斜连接的优化,有助于减少某些分区分配大量数据时的数据偏斜。此配置要求同时启用spark.sql.sources.v2.bucketing.enabled和spark.sql.sources.v2.bucketing.pushPartValues.enabled。

3.4.0
spark.sql.sources.v2.bucketing.pushPartValues.enabled false

在启用'spark.sql.sources.v2.bucketing.enabled'时,是否推送常见的分区值。当启用时,如果连接的两侧都是KeyGroupedPartitioning,并且共享兼容的分区键,即使它们没有完全相同的分区值,Spark也会计算分区值的超集并将该信息推送到扫描节点,这将使用缺失的分区值的空分区。在任一侧。这可以帮助消除不必要的shuffle。

3.4.0
spark.sql.statistics.fallBackToHdfs false

当为真时,如果无法从表元数据获取表统计信息,将回退到HDFS。这对于确定表是否足够小以使用广播连接非常有用。此标志仅对非分区Hive表有效。对于非分区数据源表,如果不可用,则将自动重新计算其表统计信息。对于分区数据源和分区Hive表,如果无法获取表统计信息,则为'spark.sql.defaultSizeInBytes'。

2.0.0
spark.sql.statistics.histogram.enabled false

在计算列统计信息时,如果启用,则生成直方图。直方图可以提供更好的估计精度。当前,Spark仅支持等高直方图。请注意,收集直方图会产生额外成本。例如,收集列统计信息通常只需一次表扫描,但生成等高直方图将导致额外的表扫描。

2.3.0
spark.sql.statistics.size.autoUpdate.enabled false

启用在表数据更改后自动更新表大小。请注意,如果表的文件总数非常庞大,则这可能会很昂贵,并可能减慢数据更改命令。

2.3.0
spark.sql.storeAssignmentPolicy ANSI

当将值插入不同数据类型的列时,Spark将执行类型强制转换。目前,我们为类型强制转换规则支持三种策略:ANSI、遗留和严格。使用ANSI策略,Spark根据ANSI SQL执行类型强制转换。实际上,行为与PostgreSQL非常相似。它不允许某些不合理的类型转换,例如将 string 转换为 int double 转换为 boolean 。使用遗留策略,Spark只要是有效的 Cast ,就允许类型强制转换,这在很大程度上很宽松。例如,允许将 string 转换为 int double 转换为 boolean 。这也是Spark 2.x中的唯一行为,并且与Hive兼容。使用严格策略,Spark不允许在类型强制转换中任何可能的精度损失或数据截断,例如,不允许将 double 转换为 int decimal 转换为 double

3.0.0
spark.sql.streaming.checkpointLocation (无)

流查询的检查点数据的默认存储位置。

2.0.0
spark.sql.streaming.continuous.epochBacklogQueueSize 10000

排队等待迟到时间戳的最大条目数。如果此参数被队列的大小所超出,流将以错误停止。

3.0.0
spark.sql.streaming.disabledV2Writers

完全限定数据源注册类名称的逗号分隔列表,StreamWriteSupport被禁用。对此数据源的写入将回退到V1 Sinks。

2.3.1
spark.sql.streaming.fileSource.cleaner.numThreads 1

在文件源完成的文件清理器中使用的线程数。

3.0.0
spark.sql.streaming.forceDeleteTempCheckpointLocation false

当为真时,启用临时检查点位置强制删除。

3.0.0
spark.sql.streaming.metricsEnabled false <pNone

静态 SQL 配置

静态 SQL 配置是跨会话的,不可变的 Spark SQL 配置。它们可以通过配置文件和命令行选项以最终值进行设置,命令行选项以 --conf/-c 为前缀,或通过设置 SparkConf 来创建 SparkSession 。外部用户可以通过 SparkSession.conf 或通过设置命令查询静态 SQL 配置值,例如 SET spark.sql.extensions; ,但不能设置/取消设置这些值。

属性名称 默认值 含义 自版本起
spark.sql.cache.serializer org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer

实现了 org.apache.spark.sql.columnar.CachedBatchSerializer 的类的名称。它将用于将 SQL 数据转换为可以更有效地被缓存的格式。底层 API 可能会发生变化,因此请谨慎使用。不能指定多个类。该类必须具有无参构造函数。

3.1.0
spark.sql.catalog.spark_catalog.defaultDatabase 默认

会话目录的默认数据库。

3.4.0
spark.sql.event.truncate.length 2147483647

超出此 SQL 长度的阈值将在添加到事件之前被截断。默认为不截断。如果设置为 0,将记录调用位置。

3.0.0
spark.sql.extensions (无)

实现函数 Function1[SparkSessionExtensions, Unit] 的类的以逗号分隔的列表,用于配置 Spark 会话扩展。类必须具有无参构造函数。如果指定多个扩展,它们按照指定顺序应用。针对规则和计划者策略的情况,它们按指定顺序应用。对于解析器的情况,使用最后一个解析器,每个解析器可以委托给其前任。对于函数名称冲突的情况,使用最后注册的函数名称。

2.2.0
spark.sql.hive.metastore.barrierPrefixes

应为每个与 Spark SQL 通信的 Hive 版本显式重新加载的类前缀的以逗号分隔的列表。例如,通常会共享的前缀中声明的 Hive UDF。

1.4.0
spark.sql.hive.metastore.jars builtin

用于实例化 HiveMetastoreClient 的 jar 文件位置。 该属性可以是以下四个选项之一: 1. "builtin" 使用 Hive 2.3.9,它与 Spark 组合在一起,当 -Phive 被启用时。当选择此选项时, spark.sql.hive.metastore.version 必须是 2.3.9 或未定义。 2. "maven" 使用从 Maven 存储库下载的指定版本的 Hive jar。 3. "path" 使用由 spark.sql.hive.metastore.jars.path 配置的以逗号分隔的格式的 Hive jar。支持本地或远程路径。提供的 jar 应与 spark.sql.hive.metastore.version 相同版本。 4. Hive 和 Hadoop 的标准格式的类路径。提供的 jar 应与 spark.sql.hive.metastore.version 相同版本。

1.4.0
spark.sql.hive.metastore.jars.path

用于实例化 HiveMetastoreClient 的 jar 文件的以逗号分隔的路径。 此配置仅在将 spark.sql.hive.metastore.jars 设置为 path 时有效。 路径可以是以下任一格式: 1. file://path/to/jar/foo.jar 2. hdfs://nameservice/path/to/jar/foo.jar 3. /path/to/jar/ (没有 URI 方案的路径遵循配置 fs.defaultFS 的 URI 模式) 4. [http/https/ftp]://path/to/jar/foo.jar 注意 1、2 和 3 支持通配符。例如: 1. file://path/to/jar/ ,file://path2/to/jar/ / .jar 2. hdfs://nameservice/path/to/jar/ ,hdfs://nameservice2/path/to/jar/ / .jar

3.1.0
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc

应使用 Spark SQL 和特定版本的 Hive 之间共享的类加载器加载的类前缀的以逗号分隔的列表。应共享的类的示例是与元存储进行通信所需的 JDBC 驱动程序。其他需要共享的类是与已共享的类进行交互的类。例如,log4j 使用的自定义附加器。

1.4.0
spark.sql.hive.metastore.version 2.3.9

Hive 元存储的版本。可用选项为 0.12.0 2.3.9 以及 3.0.0 3.1.3

1.4.0
spark.sql.hive.thriftServer.singleSession false

设置为 true 时,Hive Thrift 服务器在单会话模式下运行。所有 JDBC/ODBC 连接共享临时视图、函数注册表、SQL 配置和当前数据库。

1.6.0
spark.sql.hive.version 2.3.9

与 Spark 发行版捆绑的编译的,亦称为内置的 Hive 版本。请注意,这是只读配置,仅用于报告内置 Hive 版本。如果您想让 Spark 调用不同的元存储客户端,请参阅 spark.sql.hive.metastore.version。

1.1.1
spark.sql.metadataCacheTTLSeconds -1000ms

元数据缓存的生存时间 (TTL) 值:分区文件元数据缓存和会话目录缓存。仅在此值具有正值 (> 0) 时,此配置才会生效。它还需要将 'spark.sql.catalogImplementation' 设置为 hive ,将 'spark.sql.hive.filesourcePartitionFileCacheSize' 设置为 > 0,并将 'spark.sql.hive.manageFilesourcePartitions' 设置为 true ,以应用于分区文件元数据缓存。

3.1.0
spark.sql.queryExecutionListeners (无)

实现 QueryExecutionListener 的类名列表,将自动添加到新创建的会话中。类应具有无参构造函数,或者带有 SparkConf 参数的构造函数。

2.3.0
spark.sql.sources.disabledJdbcConnProviderList

配置禁用的 JDBC 连接提供程序的列表。列表包含以逗号分隔的 JDBC 连接提供程序的名称。

3.1.0
spark.sql.streaming.streamingQueryListeners (无)

实现 StreamingQueryListener 的类名列表,将自动添加到新创建的会话中。类应具有无参构造函数,或者带有 SparkConf 参数的构造函数。

2.4.0
spark.sql.streaming.ui.enabled true

当启用 Spark Web UI 时,是否为 Spark 应用程序运行结构化流处理 Web 界面。

3.0.0
spark.sql.streaming.ui.retainedProgressUpdates 100

保留的结构化流处理 UI 的流查询的进度更新数量。

3.0.0
spark.sql.streaming.ui.retainedQueries 100

保留的结构化流处理 UI 的非活动查询数量。

3.0.0
spark.sql.ui.retainedExecutions 1000

在 Spark UI 中保留的执行次数。

1.5.0
spark.sql.warehouse.dir (value of $PWD/spark-warehouse )

管理数据库和表的默认位置。

2.0.0

火花流处理

属性名称 默认值 含义 自版本
spark.streaming.backpressure.enabled false 启用或禁用 Spark Streaming 的内部背压机制(自 1.5 版本起)。 这使得 Spark Streaming 能够根据当前批调度延迟和处理时间来控制接收速率,从而使系统的接收速度仅与其处理速度相匹配。内部地,这动态地设置接收器的最大接收速率。这个速率被 spark.streaming.receiver.maxRate spark.streaming.kafka.maxRatePerPartition 的值上限限制(如果它们被设置的话)。 1.5.0
spark.streaming.backpressure.initialRate 未设置 当启用背压机制时,每个接收器在处理第一批数据时的初始最大接收速率。 2.0.0
spark.streaming.blockInterval 200毫秒 Spark Streaming 接收器接收到的数据处理成数据块并存储到 Spark 中的时间间隔。推荐的最小值为 50 毫秒。有关更多详细信息,请参见 Spark Streaming 编程指南中的 性能调优 部分。 0.8.0
spark.streaming.receiver.maxRate 未设置 每个接收器接收数据的最大速率(每秒记录数)。实际上,每个流每秒最多消耗此数量的记录。将此配置设置为 0 或负数将不限制速率。有关更多详细信息,请参见 Spark Streaming 编程指南中的 部署指南 1.0.2
spark.streaming.receiver.writeAheadLog.enable false 启用接收器的预写日志。所有通过接收器接收到的输入数据都将保存到预写日志中,以便在驱动程序故障后进行恢复。有关更多详细信息,请参见 Spark Streaming 编程指南中的 部署指南 1.2.1
spark.streaming.unpersist true 强制 Spark Streaming 生成并持久化的 RDD 被自动从 Spark 的内存中取消持久化。Spark Streaming 接收到的原始输入数据也会自动清除。将此设置为 false 将允许原始数据和持久化的 RDD 在流应用程序之外可访问,因为它们不会被自动清除。但这会导致 Spark 中更高的内存使用。 0.9.0
spark.streaming.stopGracefullyOnShutdown false 如果为 true ,Spark 在 JVM 关闭时将 StreamingContext 优雅地关闭,而不是立即关闭。 1.4.0
spark.streaming.kafka.maxRatePerPartition 未设置 使用新的 Kafka 直连流 API 时,从每个 Kafka 分区读取数据的最大速率(每秒记录数)。有关更多详细信息,请参见 Kafka 集成指南 1.3.0
spark.streaming.kafka.minRatePerPartition 1 使用新的 Kafka 直连流 API 时,从每个 Kafka 分区读取数据的最小速率(每秒记录数)。 2.4.0
spark.streaming.ui.retainedBatches 1000 Spark Streaming UI 和状态 API 在垃圾回收之前记住的批次数量。 1.0.0
spark.streaming.driver.writeAheadLog.closeFileAfterWrite false 在驱动程序上写入预写日志记录后是否关闭文件。希望使用 S3(或任何不支持刷新功能的文件系统)作为驱动程序上的元数据 WAL 时,将此设置为 'true'。 1.6.0
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite false 在接收器上写入预写日志记录后是否关闭文件。希望使用 S3(或任何不支持刷新功能的文件系统)作为接收器上的数据 WAL 时,将此设置为 'true'。 1.6.0

SparkR

属性名称 默认值 含义 自版本
spark.r.numRBackendThreads 2 RBackend用于处理来自SparkR包的RPC调用的线程数量。 1.4.0
spark.r.command Rscript 在集群模式下用于执行R脚本的可执行文件,适用于驱动程序和工作节点。 1.5.3
spark.r.driver.command spark.r.command 在客户端模式下用于执行R脚本的可执行文件。 在集群模式下被忽略。 1.5.3
spark.r.shell.command R 在客户端模式下用于执行sparkR shell的可执行文件。 在集群模式下被忽略。 它与环境变量 SPARKR_DRIVER_R 相同,但优先级更高。 spark.r.shell.command 用于sparkR shell,而 spark.r.driver.command 用于运行R脚本。 2.1.0
spark.r.backendConnectionTimeout 6000 R进程在连接到RBackend时设置的连接超时时间(秒)。 2.1.0
spark.r.heartBeatInterval 100 从SparkR后端发送到R进程的心跳间隔,以防止连接超时。 2.1.0

图形X

属性名称 默认值 含义 从版本
spark.graphx.pregel.checkpointInterval -1 Pregel中图和消息的检查点间隔。它用于避免因长的继承链在经过多次迭代后造成的stackOverflowError。默认情况下检查点是禁用的。 2.2.0

部署

属性名称 默认 含义 自版本
spark.deploy.recoveryMode NONE 恢复模式设置,用于在Spark作业失败并重新启动时恢复提交的Spark作业。 这仅适用于在Standalone或Mesos下运行的集群模式。 0.8.1
spark.deploy.zookeeper.url None 当`spark.deploy.recoveryMode`设置为ZOOKEEPER时,此配置用于设置连接到的zookeeper URL。 0.8.1
spark.deploy.zookeeper.dir None 当`spark.deploy.recoveryMode`设置为ZOOKEEPER时,此配置用于设置zookeeper目录以存储恢复状态。 0.8.1

集群管理器

Spark中的每个集群管理器都有额外的配置选项。配置可以在每种模式的页面上找到:

YARN

Mesos

Kubernetes

独立模式

环境变量

某些 Spark 设置可以通过环境变量进行配置,这些变量从 conf/spark-env.sh 脚本中读取,该脚本位于 Spark 安装目录中(在 Windows 上为 conf/spark-env.cmd )。在独立和 Mesos 模式下,该文件可以提供特定于机器的信息,例如主机名。当运行本地 Spark 应用程序或提交脚本时,它也会被引用。

请注意, conf/spark-env.sh 在安装Spark时默认并不存在。 但是,您可以复制 conf/spark-env.sh.template 来创建它。 确保您将复制的文件设为可执行。

可以在 spark-env.sh 中设置以下变量:

环境变量 含义
JAVA_HOME Java 安装的位置(如果不在默认的 PATH 中)。
PYSPARK_PYTHON 在驱动程序和工作节点中用于 PySpark 的 Python 可执行文件(如果可用,默认是 python3 ,否则是 python )。 属性 spark.pyspark.python 如果设置则优先使用
PYSPARK_DRIVER_PYTHON 仅在驱动程序中用于 PySpark 的 Python 可执行文件(默认是 PYSPARK_PYTHON )。 属性 spark.pyspark.driver.python 如果设置则优先使用
SPARKR_DRIVER_R 用于 SparkR shell 的 R 可执行文件(默认是 R )。 属性 spark.r.shell.command 如果设置则优先使用
SPARK_LOCAL_IP 要绑定的机器的 IP 地址。
SPARK_PUBLIC_DNS 您的 Spark 程序将向其他机器广播的主机名。

除了上述内容,还有一些选项可以设置Spark的 独立集群脚本 ,例如每台机器使用的核心数量和最大内存。

由于 spark-env.sh 是一个 shell 脚本,某些设置可以通过编程方式进行 – 例如,您可能会通过查找特定网络接口的 IP 来计算 SPARK_LOCAL_IP

注意:在 cluster 模式下运行 Spark 时,需要使用 spark.yarn.appMasterEnv.[EnvironmentVariableName] 属性在您的 conf/spark-defaults.conf 文件中设置环境变量。在 spark-env.sh 中设置的环境变量在 cluster 模式下不会反映在 YARN Application Master 进程中。有关更多信息,请参见 与 YARN 相关的 Spark 属性

配置日志

Spark 使用 log4j 进行日志记录。你可以通过在 conf 目录中添加一个 log4j2.properties 文件来进行配置。一个开始的方式是复制那里现有的 log4j2.properties.template

默认情况下,Spark 将 1 条记录添加到 MDC(映射诊断上下文): mdc.taskName ,显示类似于 task 1.0 in stage 0.0 的内容。您可以将 %X{mdc.taskName} 添加到您的 patternLayout 中,以便在日志中打印它。 此外,您可以使用 spark.sparkContext.setLocalProperty(s"mdc.$name", "value") 将用户特定的数据添加到 MDC 中。 MDC 中的关键字将是字符串“mdc.$name”。

覆盖配置目录

要指定一个不同于默认 “SPARK_HOME/conf” 的配置目录,您可以设置 SPARK_CONF_DIR。Spark 将使用此目录中的配置文件 (spark-defaults.conf, spark-env.sh, log4j2.properties 等)。

继承Hadoop集群配置

如果您计划使用Spark从HDFS读取和写入,应该在Spark的类路径中包含两个Hadoop配置文件:

这些配置文件的位置在不同的Hadoop版本中有所不同,但一个常见的位置是在 /etc/hadoop/conf 目录下。有些工具会动态创建配置,但提供下载其副本的机制。

为了让这些文件对Spark可见,请在 HADOOP_CONF_DIR 中设置 $SPARK_HOME/conf/spark-env.sh 为包含配置文件的位置。

自定义 Hadoop/Hive 配置

如果您的 Spark 应用程序正在与 Hadoop、Hive 或两者交互,那么在 Spark 的类路径中可能有 Hadoop/Hive 配置文件。

多个运行中的应用程序可能需要不同的Hadoop/Hive客户端配置。您可以复制并修改 hdfs-site.xml core-site.xml yarn-site.xml hive-site.xml 以用于每个应用程序的Spark类路径。在运行在YARN上的Spark集群中,这些配置文件是集群范围内设置的,应用程序不能安全地进行更改。

更好的选择是使用以 spark.hadoop.* 形式的 Spark Hadoop 属性,并使用以 spark.hive.* 形式的 Spark Hive 属性。 例如,添加配置 “spark.hadoop.abc.def=xyz” 代表添加 Hadoop 属性 “abc.def=xyz”, 而添加配置 “spark.hive.abc=xyz” 代表添加 Hive 属性 “hive.abc=xyz”。 它们可以被视为可以在 $SPARK_HOME/conf/spark-defaults.conf 中设置的普通 Spark 属性。

在某些情况下,您可能想要避免在 SparkConf 中硬编码某些配置。例如,Spark允许您简单地创建一个空的配置并设置 spark/spark hadoop/spark hive 属性。

val conf = new SparkConf().set("spark.hadoop.abc.def", "xyz")
val sc = new SparkContext(conf)

此外,您可以在运行时修改或添加配置:

./bin/spark-submit \
--name "我的应用" \
--master 本地[4] \
--conf spark.eventLog.enabled=false \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--conf spark.hadoop.abc.def=xyz \
--conf spark.hive.abc=xyz
  myApp.jar

自定义资源调度和配置概述

GPU和其他加速器已被广泛用于加速特定的工作负载,例如深度学习和信号处理。Spark 现在支持请求和调度通用资源,如 GPUs,但有一些注意事项。目前的实现要求资源必须具有调度器可以分配的地址。它需要您的集群管理器支持并正确配置这些资源。

可用的配置用于请求驱动程序的资源: spark.driver.resource.{resourceName}.amount ,请求执行器的资源: spark.executor.resource.{resourceName}.amount ,并为每个任务指定要求: spark.task.resource.{resourceName}.amount 。在 YARN、Kubernetes 和 Spark Standalone 的客户端驱动程序上, spark.driver.resource.{resourceName}.discoveryScript 配置是必需的。 spark.executor.resource.{resourceName}.discoveryScript 配置是 YARN 和 Kubernetes 所必需的。Kubernetes 还要求 spark.driver.resource.{resourceName}.vendor 和/或 spark.executor.resource.{resourceName}.vendor 。有关每个配置的更多信息,请参见上述配置描述。

Spark将使用指定的配置首先向集群管理器请求具有相应资源的容器。一旦获得容器,Spark将在该容器中启动一个Executor,该Executor将发现容器具有的资源及与每个资源关联的地址。Executor将向Driver注册并报告该Executor可用的资源。然后,Spark调度程序可以将任务调度到每个Executor,并根据用户指定的资源需求分配特定的资源地址。用户可以使用 TaskContext.get().resources API查看分配给任务的资源。在driver上,用户可以通过SparkContext resources 调用查看分配的资源。然后由用户决定使用分配的地址进行他们想要的处理或将这些地址传递到他们正在使用的ML/AI框架中。

请查看您集群管理器的特定页面以获取每个- YARN Kubernetes 独立模式 的要求和详细信息。当前不支持 Mesos 或本地模式。请注意,带有多个工作线程的本地集群模式不受支持(请参见独立文档)。

阶段级调度概述

阶段级调度功能允许用户在阶段级别上指定任务和执行器的资源需求。这允许不同的阶段使用具有不同资源的执行器运行。一个典型的例子是一个ETL阶段使用仅有CPU的执行器运行,而下一个阶段是需要GPU的机器学习阶段。阶段级调度允许用户在机器学习阶段运行时请求具有GPU的不同执行器,而不是在应用程序开始时就获得具有GPU的执行器,这样它们在运行ETL阶段时将处于空闲状态。此功能仅在Scala、Java和Python的RDD API中可用。在启用动态分配的情况下,它可在YARN、Kubernetes和Standalone上使用。当禁用动态分配时,它允许用户在阶段级别上指定不同的任务资源需求,目前在YARN、Kubernetes和Standalone集群上支持此功能。有关更多实现细节,请参见 YARN 页面或 Kubernetes 页面或 Standalone 页面。

有关使用此功能,请参见 RDD.withResources ResourceProfileBuilder API。当动态分配被禁用时,具有不同任务资源要求的任务将与 DEFAULT_RESOURCE_PROFILE 共享执行器。而当动态分配启用时,当前实现将为每个创建的 ResourceProfile 获取新的执行器,并且当前必须完全匹配。Spark 不会尝试将需要与执行器创建时不同的 ResourceProfile 的任务放入执行器中。未使用的执行器将根据动态分配逻辑超时闲置。此功能的默认配置是每个阶段只允许一个 ResourceProfile。如果用户将多个 ResourceProfile 关联到 RDD,Spark 将默认抛出异常。有关控制该行为,请参见配置 spark.scheduler.resource.profileMergeConflicts 。当 spark.scheduler.resource.profileMergeConflicts 启用时,Spark 实施的当前合并策略是简单地选择冲突的 ResourceProfiles 中每个资源的最大值。Spark 将创建一个新的 ResourceProfile,其中包含每个资源的最大值。

基于推送的洗牌概述

基于推送的洗牌有助于提高Spark洗牌的可靠性和性能。它采取了一种尽力而为的方法,将由映射任务生成的洗牌块推送到远程外部洗牌服务,以便按洗牌分区进行合并。减少任务获取合并后的洗牌分区和原始洗牌块的组合作为输入数据,从而将外部洗牌服务的小随机磁盘读取转换为大的顺序读取。对减少任务来说,更好的数据本地性可能还有助于最小化网络IO。在某些场景下,如当合并输出可用时,基于推送的洗牌优先于批量获取。

基于推送的洗牌提高了长时间运行的作业/查询的性能,这些作业/查询在洗牌期间涉及大量磁盘 I/O。目前,它不太适合快速运行、处理较少洗牌数据的作业/查询。未来的版本中将进一步改进这一点。

目前基于推送的洗牌仅支持在 YARN 上运行的 Spark,并且需要外部洗牌服务。

外部洗牌服务(服务器)端配置选项

属性名称 默认值 含义 版本
spark.shuffle.push.server.mergedShuffleFileManagerImpl org.apache.spark.network.shuffle.
NoOpMergedShuffleFileManager
管理基于推送的洗牌的 MergedShuffleFileManager 实现的类名。此配置作为服务器端配置,用于禁用或启用基于推送的洗牌。默认情况下,服务器端禁用基于推送的洗牌。

要在服务器端启用基于推送的洗牌,请将此配置设置为 org.apache.spark.network.shuffle.RemoteBlockPushResolver

3.2.0
spark.shuffle.push.server.minChunkSizeInMergedShuffleFile 2m

在基于推送的洗牌中,将合并的洗牌文件划分为多个块时,块的最小大小。合并的洗牌文件由多个小洗牌块组成。一次性在单个磁盘I/O中获取完整的合并洗牌文件会增加客户端和外部洗牌服务的内存需求。相反,外部洗牌服务按 MB大小的块 提供合并文件。
此配置控制块的最大大小。将为每个合并洗牌文件生成一个相应的索引文件,指示块边界。

将此设置得过高会增加客户端和外部洗牌服务的内存需求。

将此设置得过低会不必要地增加对外部洗牌服务的RPC请求的总体数量。

3.2.0
spark.shuffle.push.server.mergedIndexCacheSize 100m 在基于推送的洗牌中用于存储合并索引文件的内存缓存的最大大小。此缓存是额外于通过 spark.shuffle.service.index.cache.size 配置的缓存。 3.2.0

客户端配置选项

属性名称 默认值 含义 自版本起
spark.shuffle.push.enabled false 设置为 true 以启用基于推送的 shuffle,在客户端侧启用并与服务器端标志 spark.shuffle.push.server.mergedShuffleFileManagerImpl 一起工作。 3.2.0
spark.shuffle.push.finalize.timeout 10s 驱动程序在所有映射器完成特定的 shuffle map 阶段后等待的时间(以秒为单位),然后再向远程外部 shuffle 服务发送合并完成请求。这为外部 shuffle 服务提供了额外的时间来合并块。将此设置得过长可能会导致性能下降。 3.2.0
spark.shuffle.push.maxRetainedMergerLocations 500 针对基于推送的 shuffle 缓存的最大合并位置数。目前,合并位置是处理推送块的外部 shuffle 服务的主机,负责合并它们并为后续的 shuffle 提供合并块。 3.2.0
spark.shuffle.push.mergersMinThresholdRatio 0.05 用于根据 reducer 阶段的分区数量计算所需的最小 shuffle 合并位置数量的比例。例如,具有 100 个分区的 reduce 阶段,使用默认值 0.05,至少需要 5 个唯一的合并位置来启用基于推送的 shuffle。 3.2.0
spark.shuffle.push.mergersMinStaticThreshold 5 为了启用某个阶段的基于推送的 shuffle,必须有静态合并位置的阈值。请注意,此配置与 spark.shuffle.push.mergersMinThresholdRatio 一起工作。最大值为 spark.shuffle.push.mergersMinStaticThreshold spark.shuffle.push.mergersMinThresholdRatio 所需的合并器数量,以启用某个阶段的基于推送的 shuffle。例如: 对于 child 阶段的 1000 个分区,设定 spark.shuffle.push.mergersMinStaticThreshold 为 5, spark.shuffle.push.mergersMinThresholdRatio 设置为 0.05,我们需要至少 50 个合并器才能为该阶段启用基于推送的 shuffle。 3.2.0
spark.shuffle.push.numPushThreads (无) 指定块推送池中的线程数。这些线程协助创建连接并将块推送到远程外部 shuffle 服务。 默认情况下,线程池大小等于 spark 执行器核心的数量。 3.2.0
spark.shuffle.push.maxBlockSizeToPush 1m

要推送到远程外部 shuffle 服务的单个块的最大大小。大于此阈值的块不会被推送以进行远程合并。这些 shuffle 块将以原始方式提取。

将此值设置得过高会导致更多的块被推送到远程外部 shuffle 服务,但这样的块已经通过现有机制高效提取,会导致将大块推送到远程外部 shuffle 服务的额外开销。建议将 spark.shuffle.push.maxBlockSizeToPush 设置为小于 spark.shuffle.push.maxBlockBatchSize 配置的值。

将此值设置得过低会导致更少的块被合并,直接从映射器外部 shuffle 服务提取结果会导致更高的小随机读取,从而影响整体磁盘 I/O 性能。

3.2.0
spark.shuffle.push.maxBlockBatchSize 3m 一批 shuffle 块被组合到一个推送请求中的最大大小。默认设置为 3m ,以便稍微高于 spark.storage.memoryMapThreshold 的默认值,即 2m ,因为每批块都被内存映射,从而导致更高的开销。 3.2.0
spark.shuffle.push.merge.finalizeThreads 8 驱动程序用于最终确定 shuffle 合并的线程数。由于对于大型 shuffle 最终确定可能需要几秒钟,具有多个线程有助于驱动程序在启用基于推送的 shuffle 时处理并发的 shuffle 合并最终请求。 3.3.0
spark.shuffle.push.minShuffleSizeToWait 500m 驱动程序仅在总 shuffle 数据大小超过此阈值时才会等待合并最终确定完成。如果总 shuffle 大小较小,驱动程序会立即最终确定 shuffle 输出。 3.3.0
spark.shuffle.push.minCompletedPushRatio 1.0 在基于推送的 shuffle 中,驱动程序在开始进行 shuffle 合并最终确定之前,应该有的最小映射分区推送完成的比例。 3.3.0