性能调优

对于某些工作负载,可以通过在内存中缓存数据或启用一些实验性选项来提高性能。

在内存中缓存数据

Spark SQL 可以通过调用 spark.catalog.cacheTable("tableName") dataFrame.cache() 使用内存列式格式缓存表。然后 Spark SQL 将仅扫描所需的列,并将自动调整压缩以最小化内存使用和 GC 压力。您可以调用 spark.catalog.uncacheTable("tableName") dataFrame.unpersist() 将表从内存中移除。

内存缓存的配置可以通过 SparkSession 上的 setConf 方法完成,或者通过使用 SQL 运行 SET key=value 命令。

属性名称 默认值 含义 从版本
spark.sql.inMemoryColumnarStorage.compressed true 设置为true时,Spark SQL将根据数据的统计信息自动选择每列的压缩编码。 1.0.1
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列式缓存的批大小。更大的批大小可以提高内存利用率和压缩,但在缓存数据时可能会面临OOM风险。 1.1.1

其他配置选项

以下选项也可以用于调整查询执行的性能。可能在未来的版本中这些选项会被弃用,因为更多的优化将会自动执行。

属性名称 默认 含义 自版本
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 读取文件时打包到单个分区的最大字节数。 此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 2.0.0
spark.sql.files.openCostInBytes 4194304 (4 MB) 打开文件的估计成本,按同一时间内可以扫描的字节数来衡量。 当将多个文件放入一个分区时使用。最好高估, 这样小文件的分区会比大文件(优先调度)更快。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 2.0.0
spark.sql.files.minPartitionNum 默认并行度 建议的(不保证)最小拆分文件分区数。如果未设置,默认值为 `spark.sql.leafNodeDefaultParallelism`。 此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 3.1.0
spark.sql.files.maxPartitionNum 建议的(不保证)最大拆分文件分区数。如果设置, 当初始分区数超过该值时,Spark 将重新调整每个分区的规模,使分区数接近该值。 此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 3.5.0
spark.sql.broadcastTimeout 300

广播连接中广播等待时间的超时时间(以秒为单位)

1.3.0
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置将广播到所有工作节点的表的最大字节大小进行连接。通过将此值设置为 -1,可以禁用广播。 注意,目前仅支持对运行过命令 ANALYZE TABLE COMPUTE STATISTICS noscan 的 Hive Metastore 表进行统计。 1.1.0
spark.sql.shuffle.partitions 200 配置在为连接或聚合洗牌数据时使用的分区数量。 1.1.0
spark.sql.sources.parallelPartitionDiscovery.threshold 32 配置启用作业输入路径并行列出的阈值。如果输入路径的数量大于该阈值,Spark 将通过使用 Spark 分布式作业列出文件。 否则,将回退到顺序列出。此配置仅在使用基于文件的数据源(如 Parquet、ORC 和 JSON)时有效。 1.5.0
spark.sql.sources.parallelPartitionDiscovery.parallelism 10000 配置作业输入路径的最大列出并行性。如果输入路径的数量大于该值,将被限制使用该值。 此配置仅在使用基于文件的数据源(如 Parquet、ORC 和 JSON)时有效。 2.1.1

SQL查询的连接策略提示

连接策略提示,即 BROADCAST MERGE SHUFFLE_HASH SHUFFLE_REPLICATE_NL ,指示Spark在连接指定的关系时使用提示的策略。例如,当在表‘t1’上使用 BROADCAST 提示时,广播连接(无论是广播哈希连接还是广播嵌套循环连接,具体取决于是否存在任何等值连接键)以‘t1’作为构建侧将优先被Spark选择,即使表‘t1’的大小在统计数据中超过了配置 spark.sql.autoBroadcastJoinThreshold

当在连接的两侧指定不同的连接策略提示时,Spark 优先考虑 BROADCAST 提示,其次是 MERGE 提示,再其次是 SHUFFLE_HASH 提示,最后是 SHUFFLE_REPLICATE_NL 提示。当两侧都指定了 BROADCAST 提示或 SHUFFLE_HASH 提示时,Spark 将根据连接类型和关联的大小选择构建侧。

请注意,Spark并不能保证会选择提示中指定的连接策略,因为特定的策略可能不支持所有连接类型。

spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show();
src <- sql("从 src 中选择所有项")
records <- sql("从 records 中选择所有项")
head(join(src, hint(records, "广播"), src$key == records$key))
-- 我们接受 BROADCAST, BROADCASTJOIN 和 MAPJOIN 作为广播提示
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key

有关更多详细信息,请参阅 连接提示 的文档。

SQL查询的合并提示

合并提示允许Spark SQL用户像在Dataset API中的 coalesce repartition repartitionByRange 一样控制输出文件的数量,它们可以用于性能调优和减少输出文件的数量。“COALESCE”提示只有一个作为参数的分区号。“REPARTITION”提示可以有一个分区号、列,或者同时/都没有它们作为参数。“REPARTITION_BY_RANGE”提示必须有列名称,分区号是可选的。“REBALANCE”提示有一个初始分区号、列,或同时/都没有它们作为参数。

选择 /*+ COALESCE(3) */ *  t;
选择 /*+ REPARTITION(3) */ *  t;
选择 /*+ REPARTITION(c) */ *  t;
选择 /*+ REPARTITION(3, c) */ *  t;
选择 /*+ REPARTITION */ *  t;
选择 /*+ REPARTITION_BY_RANGE(c) */ *  t;
选择 /*+ REPARTITION_BY_RANGE(3, c) */ *  t;
选择 /*+ REBALANCE */ *  t;
选择 /*+ REBALANCE(3) */ *  t;
选择 /*+ REBALANCE(c) */ *  t;
选择 /*+ REBALANCE(3, c) */ *  t;

有关更多详细信息,请参阅 分区提示 的文档。

自适应查询执行

自适应查询执行 (AQE) 是 Spark SQL 中的一种优化技术,它利用运行时统计信息来选择最有效的查询执行计划,自 Apache Spark 3.2.0 起默认启用。Spark SQL 可以通过 spark.sql.adaptive.enabled 作为一个总配置来启用或禁用 AQE。自 Spark 3.0 起,AQE 中有三个主要特性:包括合并后置洗牌分区,将排序合并连接转换为广播连接,以及倾斜连接优化。

合并后洗牌分区

此功能在两个配置 spark.sql.adaptive.enabled spark.sql.adaptive.coalescePartitions.enabled 为 true 时,根据映射输出统计数据合并后洗牌分区。此功能简化了运行查询时洗牌分区数量的调整。您不需要设置适合您的数据集的合适洗牌分区数量。一旦通过 spark.sql.adaptive.coalescePartitions.initialPartitionNum 配置设置了足够大的初始洗牌分区数量,Spark 可以在运行时选择合适的洗牌分区数量。

属性名称 默认值 含义 自版本起
spark.sql.adaptive.coalescePartitions.enabled true 当为true且 spark.sql.adaptive.enabled 为true时,Spark将根据目标大小(由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定)合并连续的shuffle分区,以避免过多的小任务。 3.0.0
spark.sql.adaptive.coalescePartitions.parallelismFirst true 当为true时,Spark在合并连续的shuffle分区时忽略由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定的目标大小(默认64MB),仅尊重由 spark.sql.adaptive.coalescePartitions.minPartitionSize 指定的最小分区大小(默认1MB),以最大化并行性。这是为了避免在启用自适应查询执行时出现性能回退。建议将此配置设置为false,并尊重由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定的目标大小。 3.2.0
spark.sql.adaptive.coalescePartitions.minPartitionSize 1MB 合并后shuffle分区的最小大小。它的值最多可以是 spark.sql.adaptive.advisoryPartitionSizeInBytes 的20%。在合并分区时,如果目标大小被忽略,这一点非常有用,这是默认情况。 3.2.0
spark.sql.adaptive.coalescePartitions.initialPartitionNum (无) 合并前shuffle分区的初始数量。如果未设置,则等于 spark.sql.shuffle.partitions 。此配置仅在 spark.sql.adaptive.enabled spark.sql.adaptive.coalescePartitions.enabled 都启用时有效。 3.0.0
spark.sql.adaptive.advisoryPartitionSizeInBytes 64 MB 在自适应优化期间shuffle分区的建议字节大小(当 spark.sql.adaptive.enabled 为true时)。当Spark合并小的shuffle分区或拆分倾斜的shuffle分区时生效。 3.0.0

拆分偏斜的洗牌分区

属性名称 默认值 含义 自版本起
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled true 当为真且 spark.sql.adaptive.enabled 为真时,Spark将在RebalancePartitions中优化偏斜的shuffle分区,并根据目标大小(由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定)将其拆分为更小的分区,以避免数据倾斜。 3.2.0
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 0.2 如果一个分区的大小小于该因子乘以 spark.sql.adaptive.advisoryPartitionSizeInBytes ,则在拆分时将合并该分区。 3.3.0

将排序合并连接转换为广播连接

AQE将在任何连接一侧的运行时统计数据小于自适应广播哈希连接阈值时,将排序-合并连接转换为广播哈希连接。这并不像一开始就规划广播哈希连接那样高效,但比继续进行排序-合并连接要好,因为我们可以节省两个连接一侧的排序,并本地读取混洗文件以节省网络流量(如果 spark.sql.adaptive.localShuffleReader.enabled 为true)

属性名称 默认值 含义 自版本起
spark.sql.adaptive.autoBroadcastJoinThreshold (无) 配置在执行连接时将广播到所有工作节点的表的最大字节大小。通过将此值设置为 -1,可以禁用广播。默认值与 spark.sql.autoBroadcastJoinThreshold 相同。请注意,此配置仅在自适应框架中使用。 3.2.0
spark.sql.adaptive.localShuffleReader.enabled true 当为 true 且 spark.sql.adaptive.enabled 为 true 时,Spark 尝试使用本地 shuffle 读取器读取 shuffle 数据,当 shuffle 分区不需要时,例如,在将排序合并连接转换为广播哈希连接后。 3.0.0

将排序合并连接转换为打乱的哈希连接

AQE在所有后洗牌分区都小于阈值时将排序合并连接转换为洗牌哈希连接,最大阈值可以查看配置 spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold

属性名称 默认值 含义 自版本起
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 0 配置每个分区允许构建本地哈希映射的最大字节数。如果此值不小于 spark.sql.adaptive.advisoryPartitionSizeInBytes 且所有分区的大小不大于此配置,则连接选择更倾向于使用洗牌哈希连接而不是排序归并连接,无论 spark.sql.join.preferSortMergeJoin 的值如何。 3.2.0

优化倾斜连接

数据倾斜会严重降低连接查询的性能。此功能通过将倾斜的任务拆分(如果需要则复制)为大致均匀大小的任务,动态处理排序合并连接中的倾斜。此功能在 spark.sql.adaptive.enabled spark.sql.adaptive.skewJoin.enabled 配置均启用时生效。

属性名称 默认值 含义 自版本
spark.sql.adaptive.skewJoin.enabled true 当为true且 spark.sql.adaptive.enabled 为true时,Spark通过拆分(如有必要,还会复制)倾斜的分区动态处理排序合并连接中的倾斜情况。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5.0 如果一个分区的大小大于这个因子乘以中位数分区大小且大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes ,则该分区被视为倾斜的。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB 如果一个分区的大小(字节数)大于这个阈值且大于 spark.sql.adaptive.skewJoin.skewedPartitionFactor 乘以中位数分区大小,则该分区被视为倾斜的。理想情况下,这个配置应设置为大于 spark.sql.adaptive.advisoryPartitionSizeInBytes 3.0.0
spark.sql.adaptive.forceOptimizeSkewedJoin false 当为true时,强制启用OptimizeSkewedJoin,这是一个自适应规则,用于优化倾斜连接,以避免任务拉胯,即使这会引入额外的洗牌。 3.3.0

杂项

属性名称 默认值 含义 自版本
spark.sql.adaptive.optimizer.excludedRules (无) 配置要在自适应优化器中禁用的一系列规则,这些规则通过规则名称指定并用逗号分隔。优化器将记录已被排除的规则。 3.1.0
spark.sql.adaptive.customCostEvaluatorClass (无) 用于自适应执行的自定义成本评估器类。如果未设置,Spark 将默认使用其自己的 SimpleCostEvaluator 3.2.0