工作调度
概述
Spark 有多种设施用于在计算之间调度资源。首先,回顾一下,在 集群模式概述 中描述的内容,每个 Spark 应用程序(SparkContext 的实例)运行一组独立的执行进程。Spark 所运行的集群管理器提供了 跨应用调度 的功能。其次, 在 每个 Spark 应用程序内部,如果由不同线程提交,则可能会同时运行多个“作业”(Spark 操作)。如果您的应用程序通过网络服务请求,这种情况很常见。Spark 包括一个 公平调度器 ,用于在每个 SparkContext 内部调度资源。
跨应用程序调度
当在集群上运行时,每个Spark应用程序都会获得一组独立的执行器JVM,这些JVM只运行该应用程序的任务并存储数据。如果多个用户需要共享集群,则可以根据集群管理器选择不同的分配管理选项。
最简单的选项,适用于所有集群管理器,是资源的 静态分区 。使用这种方法,每个应用程序被分配一个可以使用的最大资源量,并在整个运行期间保持这些资源。这是Spark的 独立 和 YARN 模式中使用的方法,以及 粗粒度Mesos模式 。资源分配可以根据集群类型进行如下配置:
-
独立模式:
默认情况下,提交到独立模式集群的应用程序将按照 FIFO(先到先处理)顺序运行,每个应用程序将尽量使用所有可用节点。您可以通过在应用程序中设置
spark.cores.max配置属性来限制应用程序使用的节点数量,或者通过spark.deploy.defaultCores改变默认值,以便不设置此设置的应用程序使用。最后,除了控制核心,每个应用程序的spark.executor.memory设置控制其内存使用。 -
Mesos:
要在 Mesos 上使用静态分区,请将
spark.mesos.coarse配置属性设置为true,并可选择性地设置spark.cores.max来限制每个应用程序的资源共享,正如在独立模式中一样。您还应该设置spark.executor.memory来控制执行器内存。 -
YARN:
Spark YARN 客户端的
--num-executors选项控制在集群中分配多少个执行器(作为配置属性的spark.executor.instances),而--executor-memory(spark.executor.memory配置属性)和--executor-cores(spark.executor.cores配置属性)控制每个执行器的资源。有关更多信息,请参见 YARN Spark 属性 。
在Mesos上可用的第二个选项是
动态共享
CPU核心。在该模式下,每个Spark应用程序仍然具有固定和独立的内存分配(由
spark.executor.memory
设置),但是当应用程序在机器上不运行任务时,其他应用程序可以在这些核心上运行任务。这种模式在你预期有大量不太活跃的应用程序时非常有用,例如来自不同用户的shell会话。然而,这带来了不太可预测的延迟风险,因为当应用程序有工作要做时,可能需要一段时间才能在一个节点上重新获得核心。要使用此模式,只需使用
mesos://
URL并将
spark.mesos.coarse
设置为false。
请注意,目前没有任何模式提供跨应用程序的内存共享。如果您希望以这种方式共享数据,我们建议运行一个单一的服务器应用程序,该应用程序可以通过查询相同的 RDDs 来处理多个请求。
动态资源分配
Spark提供了一种机制,可以根据工作负载动态调整您的应用程序所占用的资源。这意味着,如果您的应用程序不再使用这些资源,可以将其返还给集群,并在有需求时再次请求它们。此功能在多个应用程序共享Spark集群中的资源时特别有用。
此功能默认是禁用的,并在所有粗粒度集群管理器上可用,即 独立模式 、 YARN模式 、 Mesos粗粒度模式 和 K8s模式 。
注意事项
-
在
独立模式
下,如果不显式设置
spark.executor.cores,每个执行程序将获取工作节点上所有可用的核心。在这种情况下,当启用动态分配时,Spark 可能会获得比预期多得多的执行程序。当你想在 独立模式 下使用动态分配时,建议在 SPARK-30299 问题被修复之前,显式为每个执行程序设置核心数。
配置和设置
使用此功能有几种方法。无论您选择哪种方法,您的应用程序必须首先将
spark.dynamicAllocation.enabled
设置为
true
,此外,
-
您的应用程序必须将
spark.shuffle.service.enabled设置为true,在同一集群中对每个工作节点设置 外部洗牌服务 后,或者 -
您的应用程序必须将
spark.dynamicAllocation.shuffleTracking.enabled设置为true,或者 -
您的应用程序必须同时将
spark.decommission.enabled和spark.storage.decommission.shuffleBlocks.enabled设置为true,或者 -
您的应用程序必须配置
spark.shuffle.sort.io.plugin.class以使用支持可靠存储的自定义ShuffleDataIO的ShuffleDriverComponents。
外部洗牌服务或洗牌跟踪或
ShuffleDriverComponents
支持可靠存储的目的是允许执行器被移除而不删除由它们写入的洗牌文件(更多细节在
下面
描述)。虽然启用洗牌跟踪很简单,但设置外部洗牌服务的方法在不同的集群管理器之间有所不同:
在独立模式下,只需将您的工作节点的
spark.shuffle.service.enabled
设置为
true
启动即可。
在 Mesos 粗粒度模式下,在所有工作节点上运行
$SPARK_HOME/sbin/start-mesos-shuffle-service.sh
,并将
spark.shuffle.service.enabled
设置为
true
。例如,您可以通过 Marathon 来实现。
在YARN模式下,请按照 这里 的说明进行操作。
所有其他相关配置都是可选的,并在
spark.dynamicAllocation.*
和
spark.shuffle.service.*
命名空间下。有关更多详细信息,请参见
配置页面
。
资源分配政策
在高层次上,Spark应该在不再使用时释放执行器,并在需要时获取执行器。由于没有明确的方法来预测即将被移除的执行器是否会在不久的将来运行任务,或者即将添加的新执行器是否实际上会空闲,我们需要一组启发式规则来确定何时移除和请求执行器。
请求策略
启用动态分配的 Spark 应用程序在有待调度的待处理任务时会请求额外的执行者。这个条件必然意味着现有的执行者集合不足以同时满足所有已提交但尚未完成的任务。
Spark 以轮次方式请求执行器。当有待处理任务达到
spark.dynamicAllocation.schedulerBacklogTimeout
秒时,实际请求将被触发,如果待处理任务的队列持续存在,之后每
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
秒再次触发。此外,每轮请求的执行器数量将从前一轮开始指数级增加。例如,一个应用程序将在第一轮中添加 1 个执行器,然后在随后的轮次中添加 2、4、8 等执行器。
指数增长策略的动机有两个方面。首先,一个应用在开始时应该谨慎地请求执行者,以防最终发现只需要少量额外的执行者。这呼应了TCP慢启动的理由。其次,应用应该能够及时增加其资源使用量,以防最终发现实际上需要许多执行者。
移除策略
移除执行器的策略要简单得多。当Spark应用程序的执行器闲置时间超过
spark.dynamicAllocation.executorIdleTimeout
秒时,它将被移除。请注意,在大多数情况下,这个条件与请求条件是相互排斥的,因为如果还有待调度的任务,执行器不应该闲置。
优雅地退役执行器
在动态分配之前,如果一个Spark执行器在关联的应用程序也已退出时退出,则与该执行器相关的所有状态不再需要,可以安全地丢弃。然而,在动态分配中,当执行器被显式移除时,应用程序仍在运行。如果应用程序尝试访问存储在执行器中或由执行器写入的状态,则必须重新计算该状态。因此,Spark需要一种机制在移除执行器之前,通过保留其状态来优雅地退役执行器。
这个要求对于洗牌操作尤其重要。在洗牌过程中,Spark 执行器首先将其自己的映射输出本地写入磁盘,然后在其他执行器尝试获取这些文件时充当这些文件的服务器。如果出现了拖后腿的任务(运行时间远长于其他任务),动态分配可能会在洗牌完成之前移除一个执行器,在这种情况下,该执行器写入的洗牌文件必须不必要地重新计算。
保存混洗文件的解决方案是使用外部混洗服务,该服务在Spark 1.2中被引入。此服务指的是在您的集群中每个节点上独立于您的Spark应用程序及其执行器运行的长期运行过程。如果启用该服务,Spark执行器将从服务中获取混洗文件,而不是相互之间获取。这意味着执行器写入的任何混洗状态可能会在执行器的生命周期之外继续提供。
除了写入洗牌文件,执行器还会将数据缓存到磁盘或内存中。然而,当一个执行器被移除时,所有缓存的数据将不再可访问。为了减轻这个问题,默认情况下,包含缓存数据的执行器永远不会被移除。您可以通过配置
spark.dynamicAllocation.cachedExecutorIdleTimeout
来设置此行为。当将
spark.shuffle.service.fetch.rdd.enabled
设置为
true
时,Spark 可以使用 ExternalShuffleService 来获取磁盘持久化的 RDD 块。如果启用此功能,则在动态分配的情况下,只有磁盘持久化块的执行器会在
spark.dynamicAllocation.executorIdleTimeout
之后被认为是闲置,并将相应地被释放。在未来的版本中,缓存的数据可能通过类似于外部洗牌服务如何保存洗牌文件的方式,通过离堆存储来保存。
应用程序中的调度
在给定的Spark应用程序(SparkContext实例)中,如果多个任务是从不同的线程提交的,则可以同时运行多个并行作业。在本节中,所说的“作业”是指一个Spark操作(例如:
save
,
collect
)以及需要运行的任何任务以评估该操作。Spark的调度器是完全线程安全的,并支持此用例,以启用服务多个请求的应用程序(例如:多个用户的查询)。
默认情况下,Spark的调度程序以FIFO方式运行作业。每个作业被划分为“阶段”(例如,map和reduce阶段),第一个作业在所有可用资源上具有优先权,只要其阶段有任务要启动,然后第二个作业获得优先权,依此类推。如果队列头部的作业不需要使用整个集群,那么后续作业可以立即开始运行,但如果队列头部的作业很大,那么后续作业可能会显著延迟。
从 Spark 0.8 开始,配置作业之间的公平共享也是可能的。在公平共享下,Spark 以“轮流”的方式在作业之间分配任务,使所有作业大致获得相等的集群资源。这意味着在长作业运行时提交的短作业可以立即开始获得资源,并且仍然能获得良好的响应时间,而无需等待长作业完成。这种模式最适合多用户环境。
此功能默认是禁用的,并且在所有粗粒度集群管理器上可用,即
独立模式
、
YARN模式
、
K8s模式
和
Mesos粗粒度模式
。要启用公平调度器,只需在配置SparkContext时将
spark.scheduler.mode
属性设置为
FAIR
:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)
公平调度器池
公平调度器还支持将作业分组到 池 中,并为每个池设置不同的调度选项(例如权重)。这对于创建一个“高优先级”池,以便对更重要的作业给予优先级是有用的,例如,或将每个用户的作业分组在一起,并给予 用户 平等的份额,而不管他们有多少个并发作业,而不是给予 作业 平等的份额。这种方法是模仿 Hadoop公平调度器 。
没有任何干预,刚提交的任务会进入一个
默认池
,但是任务的池可以通过在提交任务的线程中将
spark.scheduler.pool
“本地属性”添加到SparkContext来设置。
操作步骤如下:
// 假设 sc 是你的 SparkContext 变量
sc.setLocalProperty("spark.scheduler.pool", "pool1")
在设置了这个本地属性后,
所有
在此线程中提交的作业(通过此线程中的调用至
RDD.save
、
count
、
collect
等)都将使用这个池名称。此设置是针对每个线程的,以便于让一个线程代表同一用户运行多个作业。如果您想清除一个线程所关联的池,只需调用:
sc.setLocalProperty("spark.scheduler.pool", null)
池的默认行为
默认情况下,每个池获得集群的均等份额(对默认池中的每个作业也同样均等),但在每个池内部,作业按照FIFO顺序运行。例如,如果您为每个用户创建一个池,这意味着每个用户将获得集群的均等份额,并且每个用户的查询将按顺序运行,而不是较晚的查询从该用户较早的查询中获取资源。
配置池属性
特定池的属性也可以通过配置文件进行修改。每个池支持三个属性:
-
schedulingMode:这可以是 FIFO 或 FAIR,以控制池中的作业是相互排队(默认)还是公平地共享池的资源。 -
weight:这控制池在集群中相对于其他池的份额。默认情况下,所有池的权重为 1。如果您给某个特定池一个权重为 2,例如,它将获得比其他活动池多 2 倍的资源。设置一个较高的权重,例如 1000,也可以在池之间实现 优先级 ——实质上,权重为 1000 的池将在其有作业活动时总是优先启动任务。 -
minShare:除了整体权重外,每个池还可以被赋予一个管理员希望其拥有的 最小份额 (作为 CPU 核心的数量)。公平调度程序始终尝试在根据权重重新分配额外资源之前,满足所有活动池的最小份额。因此,minShare属性可以成为确保池始终可以快速获取一定数量资源(例如 10 个核心)的另一种方式,而无需为其在整个集群中设定一个高优先级。默认情况下,每个池的minShare为 0。
可以通过创建一个 XML 文件来设置池属性,类似于
conf/fairscheduler.xml.template
,并将一个名为
fairscheduler.xml
的文件放在类路径上,或者在您的
SparkConf
中设置
spark.scheduler.allocation.file
属性。文件路径遵循 Hadoop 配置,可以是本地文件路径或 HDFS 文件路径。
// 本地的调度器文件
conf.set("spark.scheduler.allocation.file", "file:///path/to/file")
// HDFS上的调度器文件
conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file")
XML文件的格式对于每个池来说只是一个
元素,其中包含不同的元素用于各种设置。例如:
name="生产">
公平
1
2
name="测试">
先进先出
2
3
完整的示例也可以在
conf/fairscheduler.xml.template
中找到。请注意,任何未在 XML 文件中配置的池将会获得所有设置的默认值(调度模式 FIFO、权重 1 和 minShare 0)。
使用JDBC连接进行调度
要为JDBC客户端会话设置一个
公平调度器
池,用户可以设置
spark.sql.thriftserver.scheduler.pool
变量:
SET spark.sql.thriftserver.scheduler.pool=accounting;
PySpark中的并发作业
默认情况下,PySpark不支持将PVM线程与JVM线程同步,并且在多个PVM线程中启动多个作业并不能保证在每个对应的JVM线程中启动每个作业。由于这个限制,无法通过
sc.setJobGroup
在单独的PVM线程中设置不同的作业组,这也不允许稍后通过
sc.cancelJobGroup
取消该作业。
pyspark.InheritableThread
建议与 PVM 线程一起使用,以便 JVM 线程可以继承可继承的属性,例如本地属性。