Spark 独立模式

除了在Mesos或YARN集群管理器上运行,Spark还提供了一种简单的独立部署模式。您可以手动启动独立集群,通过手动启动一个主节点和工作节点,或者使用我们提供的 启动脚本 。也可以在单台机器上运行这些守护程序进行测试。

安全

安全特性如认证默认情况下未启用。部署一个对互联网或不受信任网络开放的集群时,确保集群的访问安全非常重要,以防止未经授权的应用程序在集群上运行。在运行Spark之前,请参见 Spark安全性 以及本文件中的具体安全部分。

将 Spark 独立安装到集群

要安装Spark独立模式,您只需将Spark的已编译版本放在集群中的每个节点上。您可以通过每个版本获得预构建的Spark版本或 自己构建

手动启动集群

您可以通过执行以下操作来启动一个独立的主服务器:

./sbin/start-master.sh

一旦启动,主节点将为自己打印出一个 spark://HOST:PORT URL,你可以用它来连接工作节点,或者作为“master”参数传递给 SparkContext 。你还可以在主节点的网页用户界面上找到这个URL,默认情况下是 http://localhost:8080

同样,您可以启动一个或多个工作进程,并通过以下方式将它们连接到主节点:

./sbin/start-worker.sh 

一旦您启动了一个工作节点,请查看主节点的网络界面(默认地址为 http://localhost:8080 )。您应该在那里看到新节点的列表,连同它的CPU数量和内存(减去留给操作系统的一个千兆字节)。

最后,可以将以下配置选项传递给主节点和工作节点:

参数 含义
-h HOST , --host HOST 监听的主机名
-i HOST , --ip HOST 监听的主机名(已弃用,请使用 -h 或 --host)
-p PORT , --port PORT 服务监听的端口(默认:7077用于主节点,随机用于工作节点)
--webui-port PORT Web UI的端口(默认:8080用于主节点,8081用于工作节点)
-c CORES , --cores CORES 允许Spark应用在机器上使用的总CPU核心数(默认:所有可用核心);仅用于工作节点
-m MEM , --memory MEM 允许Spark应用在机器上使用的总内存,以1000M或2G的格式(默认:您的机器总RAM减去1 GiB);仅用于工作节点
-d DIR , --work-dir DIR 用于临时空间和作业输出日志的目录(默认:SPARK_HOME/work);仅用于工作节点
--properties-file FILE 要加载的自定义Spark属性文件的路径(默认:conf/spark-defaults.conf)

集群启动脚本

要使用启动脚本启动 Spark 独立集群,您应该在 Spark 目录中创建一个名为 conf/workers 的文件,该文件必须包含您打算启动 Spark 工作节点的所有机器的主机名,每行一个。如果 conf/workers 不存在,启动脚本默认使用单台机器(localhost),这对于测试很有用。请注意,主机通过 ssh 访问每个工作节点。默认情况下,ssh 在并行中运行,并要求设置无密码(使用私钥)访问。如果您没有无密码设置,您可以设置环境变量 SPARK_SSH_FOREGROUND,并为每个工作节点串行提供密码。

一旦您设置好这个文件,您可以使用以下基于Hadoop的部署脚本的shell脚本来启动或停止您的集群,这些脚本可在 SPARK_HOME/sbin 中找到:

请注意,这些脚本必须在您想要运行 Spark 主控的机器上执行,而不是您的本地机器。

您可以通过在 conf/spark-env.sh 中设置环境变量来进一步配置集群。通过以 conf/spark-env.sh.template 开始创建此文件,并且 将其复制到所有工作机器 以使设置生效。以下设置可用:

环境变量 含义
SPARK_MASTER_HOST 将主节点绑定到特定的主机名或IP地址,例如一个公共的地址。
SPARK_MASTER_PORT 在不同的端口上启动主节点(默认:7077)。
SPARK_MASTER_WEBUI_PORT 主节点Web UI的端口(默认:8080)。
SPARK_MASTER_OPTS 仅适用于主节点的配置属性,格式为"-Dx=y"(默认:无)。以下是可能选项的列表。
SPARK_LOCAL_DIRS 用于Spark中的“临时”空间的目录,包括映射输出文件和存储在磁盘上的RDD。这应该位于系统中的快速本地磁盘上。它还可以是多个不同磁盘上的目录的以逗号分隔的列表。
SPARK_WORKER_CORES 允许Spark应用程序在该机器上使用的总核心数(默认:所有可用核心)。
SPARK_WORKER_MEMORY 允许Spark应用程序在机器上使用的总内存量,例如 1000m 2g (默认:总内存减去1 GiB);请注意,每个应用程序的 单独 内存是通过其 spark.executor.memory 属性配置的。
SPARK_WORKER_PORT 在特定端口上启动Spark工作节点(默认:随机)。
SPARK_WORKER_WEBUI_PORT 工作节点Web UI的端口(默认:8081)。
SPARK_WORKER_DIR 运行应用程序的目录,其中将包括日志和临时空间(默认:SPARK_HOME/work)。
SPARK_WORKER_OPTS 仅适用于工作节点的配置属性,格式为"-Dx=y"(默认:无)。以下是可能选项的列表。
SPARK_DAEMON_MEMORY 分配给Spark主节点和工作节点守护进程本身的内存(默认:1g)。
SPARK_DAEMON_JAVA_OPTS 用于Spark主节点和工作节点守护进程本身的JVM选项,格式为"-Dx=y"(默认:无)。
SPARK_DAEMON_CLASSPATH Spark主节点和工作节点守护进程本身的类路径(默认:无)。
SPARK_PUBLIC_DNS Spark主节点和工作节点的公共DNS名称(默认:无)。

注意: 启动脚本当前不支持Windows。要在Windows上运行Spark集群,请手动启动主节点和工作节点。

SPARK_MASTER_OPTS 支持以下系统属性:

属性名称 默认值 含义 版本号
spark.master.ui.port 8080 指定主控Web UI端点的端口号。 1.1.0
spark.master.ui.decommission.allow.mode LOCAL 指定主控Web UI的/workers/kill端点的行为。可能的选择是: LOCAL 表示允许来自运行主控的机器上的IP调用此端点, DENY 表示完全禁用此端点, ALLOW 表示允许来自任何IP调用此端点。 3.1.0
spark.master.rest.enabled false 是否使用主控REST API端点。 1.3.0
spark.master.rest.port 6066 指定主控REST API端点的端口号。 1.3.0
spark.deploy.retainedApplications 200 要显示的完成的应用程序的最大数量。为了维持此限制,较旧的应用程序将从UI中删除。
0.8.0
spark.deploy.retainedDrivers 200 要显示的完成的驱动程序的最大数量。为了维持此限制,较旧的驱动程序将从UI中删除。
1.1.0
spark.deploy.spreadOut true 独立集群管理器是否应将应用程序分散到节点之间,还是尽量将它们集中到尽可能少的节点上。通常在HDFS中,分散更有利于数据本地化,但对于计算密集型工作负载,集中处理更有效。
0.6.1
spark.deploy.defaultCores (无限) 在Spark的独立模式中,如果应用程序没有设置 spark.cores.max ,则分配给应用程序的默认核心数量。如果未设置,应用程序将始终获得所有可用核心,除非它们自己配置 spark.cores.max 。在共享集群上将此设置为较低值,以防止用户默认抓取整个集群。
0.9.0
spark.deploy.maxExecutorRetries 10 在独立集群管理器删除一个有故障的应用程序之前,允许的连续执行器失败的最大数量限制。如果应用程序有任何正在运行的执行器,则不会被删除。如果一个应用程序在不到 spark.deploy.maxExecutorRetries 的情况下经历多个失败,并且在这些失败中之间没有成功启动的执行器,并且应用程序没有正在运行的执行器,那么独立集群管理器将删除该应用程序并将其标记为失败。要禁用此自动删除,请将 spark.deploy.maxExecutorRetries 设置为 -1
1.6.3
spark.worker.timeout 60 如果在此秒数后工作者未发送心跳,则独立部署主控将其视为丢失。 0.6.2
spark.worker.resource.{name}.amount (无) 在工作节点上使用的特定资源的数量。 3.0.0
spark.worker.resource.{name}.discoveryScript (无) 资源发现脚本的路径,该脚本在工作节点启动时用于查找特定资源。脚本的输出应格式化为 ResourceInformation 类的格式。 3.0.0
spark.worker.resourcesFile (无) 资源文件的路径,该文件在工作节点启动时用于查找各种资源。资源文件的内容应格式化为 [{"id":{"componentName": "spark.worker", "resourceName":"gpu"}, "addresses":["0","1","2"]}] 。如果在资源文件中未找到特定资源,则将使用发现脚本查找该资源。如果发现脚本也未找到资源,则工作节点将无法启动。 3.0.0

SPARK_WORKER_OPTS 支持以下系统属性:

属性名称 默认值 含义 自版本
spark.worker.cleanup.enabled false 启用对工作节点/应用程序目录的定期清理。注意这只影响独立模式,因为YARN的工作方式不同。仅停止的应用程序的目录会被清理。 如果spark.shuffle.service.db.enabled为“true”,则应启用此选项。 1.0.0
spark.worker.cleanup.interval 1800 (30分钟) 控制工作节点在本地机器上清理旧应用程序工作目录的间隔,单位为秒。 1.0.0
spark.worker.cleanup.appDataTtl 604800 (7天, 7 * 24 * 3600) 保留每个工作节点上应用程序工作目录的秒数。这是一个生存时间,应该根据您可用的磁盘空间量来决定。应用程序日志和jar文件会被下载到每个应用程序工作目录中。随着时间的推移,工作目录可能迅速填满磁盘空间,特别是如果您非常频繁地运行作业。 1.0.0
spark.shuffle.service.db.enabled true 将外部Shuffle服务状态存储在本地磁盘上,以便在外部Shuffle服务重启时,它将自动重新加载当前执行器的信息。这仅影响独立模式(YARN始终启用此行为)。您还应该启用 spark.worker.cleanup.enabled ,以确保状态最终被清理。该配置可能在未来被移除。 3.0.0
spark.shuffle.service.db.backend LEVELDB spark.shuffle.service.db.enabled 为true时,用户可以使用此设置指定在Shuffle服务状态存储中使用的磁盘存储类型。目前支持`LEVELDB`和`ROCKSDB`,默认值为`LEVELDB`。 原始数据存储在`LevelDB/RocksDB`中现在不会自动转换为另一种存储类型。 3.4.0
spark.storage.cleanupFilesAfterExecutorExit true 启用在执行器退出后清理工作节点目录中的非Shuffle文件(例如临时Shuffle块、缓存的RDD/广播块、溢出文件等)。注意这与`spark.worker.cleanup.enabled`不重叠,因为这使得清理一个死执行器的本地目录中的非Shuffle文件,而`spark.worker.cleanup.enabled`则使得清理已停止和超时应用程序的所有文件/子目录。 这仅影响独立模式,未来可能会添加对其他集群管理器的支持。 2.4.0
spark.worker.ui.compressedLogFileLengthCacheSize 100 对于压缩日志文件,只有通过解压缩文件才能计算出未压缩的文件。Spark缓存压缩日志文件的未压缩文件大小。此属性控制缓存大小。 2.0.2

资源分配和配置概述

请确保您已阅读 配置页面 上的自定义资源调度和配置概述部分。本节仅讨论Spark Standalone特定的资源调度方面。

Spark Standalone 有两个部分,第一个是为 Worker 配置资源,第二个是为特定应用分配资源。

用户必须配置 Workers,使其具有一组可用的资源,以便将这些资源分配给 Executors。 spark.worker.resource.{resourceName}.amount 用于控制工作节点分配的每种资源的数量。用户还必须指定 spark.worker.resourcesFile spark.worker.resource.{resourceName}.discoveryScript 以指定 Worker 如何发现其分配的资源。请参见上面的描述,以了解哪种方法最适合您的设置。

第二部分是在 Spark Standalone 上运行应用程序。从标准的 Spark 资源配置中唯一特殊的情况是,当您以客户端模式运行 Driver 时。对于客户端模式下的 Driver,用户可以通过 spark.driver.resourcesFile spark.driver.resource.{resourceName}.discoveryScript 指定其使用的资源。如果 Driver 与其他 Driver 在同一主机上运行,请确保资源文件或发现脚本仅返回与在同一节点上运行的其他 Driver 不冲突的资源。

注意,用户在提交申请时不需要指定发现脚本,因为工作者将使用分配给它的资源启动每个执行器。

将应用程序连接到集群

要在Spark集群上运行应用程序,只需将主节点的 spark://IP:PORT URL 传递给 SparkContext 构造函数

要在集群上运行交互式 Spark shell,运行以下命令:

./bin/spark-shell --master spark://IP:PORT

您还可以传递一个选项 --total-executor-cores 来控制 spark-shell 在集群上使用的核心数量。

客户端属性

Spark 应用程序支持以下特定于独立模式的配置属性:

属性名称 默认值 含义 自版本起
spark.standalone.submit.waitAppCompletion false 在独立集群模式下,控制客户端是否等待应用程序完成后再退出。 如果设置为 true ,客户端进程将保持活跃状态,轮询驱动程序的状态。 否则,客户端进程将在提交后退出。 3.1.0

启动 Spark 应用程序

火花协议

spark-submit 脚本 提供了将编译好的Spark应用程序提交到集群的最简单方法。对于独立集群,Spark目前支持两种部署模式。在 client 模式下,驱动程序与提交应用程序的客户端在同一个进程中启动。然而,在 cluster 模式下,驱动程序是从集群内部的一个Worker进程中启动的,客户端进程在完成提交应用程序的责任后立即退出,而不需等待应用程序的完成。

如果您的应用程序是通过 Spark 提交启动的,那么应用程序的 jar 文件会自动分发到所有工作节点。对于您的应用程序所依赖的任何其他 jar,您应该通过 --jars 标志指定它们,并使用逗号作为分隔符(例如 --jars jar1,jar2 )。要控制应用程序的配置或执行环境,请参见 Spark 配置

此外,独立的 cluster 模式支持在应用程序以非零退出代码退出时自动重启您的应用程序。要使用此功能,您可以在启动您的应用程序时将 --supervise 标志传递给 spark-submit 。然后,如果您希望终止一个重复失败的应用程序,您可以通过以下方式做到:

./bin/spark-class org.apache.spark.deploy.Client kill  

您可以通过独立的主控网页用户界面找到驱动程序ID,网址为 http:// :8080

REST API

如果 spark.master.rest.enabled 被启用,Spark 主节点通过 http://[host:port]/[version]/submissions/[action] 提供额外的 REST API,其中 host 是主节点主机, port 是由 spark.master.rest.port 指定的端口号(默认: 6066), version 是协议版本,今天的版本为 v1 action 是以下支持的操作之一。

命令 描述 HTTP 方法 自版本
create 通过 cluster 模式创建一个 Spark 驱动程序。 POST 1.3.0
kill 终止一个单独的 Spark 驱动程序。 POST 1.3.0
status 检查 Spark 作业的状态。 GET 1.3.0

以下是一个使用 curl 的 CLI 命令示例,包括 pi.py 和 REST API。

$ curl -XPOST http://IP:PORT/v1/submissions/create \
--header "Content-Type:application/json;charset=UTF-8" \
--data '{
  "appResource": "",
  "sparkProperties": {
    "spark.master": "spark://master:7077",
    "spark.app.name": "Spark Pi",
    "spark.driver.memory": "1g",
    "spark.driver.cores": "1",
    "spark.jars": ""
  },
  "clientSparkVersion": "",
  "mainClass": "org.apache.spark.deploy.SparkSubmit",
  "environmentVariables": { },
  "action": "CreateSubmissionRequest",
  "appArgs": [ "/opt/spark/examples/src/main/python/pi.py", "10" ]
}'

以下是上述 create 请求的REST API响应。

{
"action" : "CreateSubmissionResponse",
  "message" : "驱动成功提交为 driver-20231124153531-0000",
  "serverSparkVersion" : "3.5.1",
  "submissionId" : "driver-20231124153531-0000",
  "success" : true
}

资源调度

当前独立集群模式仅支持跨应用程序的简单 FIFO 调度器。 然而,为了允许多个并发用户,您可以控制每个应用程序将使用的最大资源数量。 默认情况下,它将获取集群中的 所有 核心,这只有在您一次运行一个应用程序时才有意义。您可以通过在您的 SparkConf 中设置 spark.cores.max 来限制核心数量。例如:

val conf = new SparkConf()
.setMaster(...)
.setAppName(...)
.set("spark.cores.max", "10")
val sc = new SparkContext(conf)

此外,您可以在集群主进程中配置 spark.deploy.defaultCores 来更改默认值,以便对那些未将 spark.cores.max 设置为小于无限值的应用程序生效。通过将以下内容添加到 conf/spark-env.sh 来实现:

导出 SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores="

这在共享集群上很有用,因为用户可能没有单独配置最大核心数。

执行者调度

分配给每个执行器的核心数量是可配置的。当 spark.executor.cores 被明确设置时,如果工作节点具有足够的核心和内存,来自同一应用程序的多个执行器可能会在同一个工作节点上启动。否则,每个执行器将默认占用工作节点上所有可用的核心,在这种情况下,每个应用程序在每个工作节点上在单次调度迭代中只能启动一个执行器。

阶段级调度概述

独立模式支持阶段级调度:

注意事项

正如在 动态资源分配 中提到的,如果未明确指定每个执行器的核心数并启用了动态分配,Spark 可能会获取比预期更多的执行器。因此,建议您在使用阶段级调度时明确为每个资源配置设置执行器核心数。

监控与日志记录

Spark的独立模式提供了一个基于Web的用户界面来监控集群。主节点和每个工作节点都有自己的Web UI,显示集群和作业统计信息。默认情况下,您可以通过端口8080访问主节点的Web UI。可以在配置文件中或通过命令行选项更改此端口。

此外,每个作业的详细日志输出也会写入每个工作节点的工作目录(默认为 SPARK_HOME/work )。您将会看到每个作业的两个文件, stdout stderr ,其中包含它写入控制台的所有输出。

与Hadoop并行运行

您可以通过在相同机器上将其作为单独的服务启动,来将 Spark 与现有的 Hadoop 集群一起运行。要从 Spark 访问 Hadoop 数据,只需使用 hdfs:// URL(通常是 hdfs:// :9000/path ,但您可以在 Hadoop Namenode 的网页用户界面上找到正确的 URL)。或者,您可以为 Spark 设置一个单独的集群,并仍然通过网络访问 HDFS;这将比直接访问磁盘慢,但如果您仍然在同一局域网中运行(例如,您在每个放有 Hadoop 的机架上放置几台 Spark 机器),这可能不是一个问题。

为网络安全配置端口

一般来说,Spark集群及其服务不部署在公共互联网中。它们通常是私有服务,应该仅在部署Spark的组织的网络内可访问。对Spark服务使用的主机和端口的访问应限制在需要访问这些服务的源主机内。

这对于使用独立资源管理器的集群尤为重要,因为它们不支持像其他资源管理器那样的细粒度访问控制。

有关要配置的端口的完整列表,请参见 安全页面

高可用性

默认情况下,独立调度集群对工作节点故障具有弹性(在一定程度上,Spark本身能够通过将工作转移到其他工作节点来抵御工作丢失)。然而,调度器使用一个Master来做出调度决策,默认情况下这会创建一个单点故障:如果Master崩溃,则无法创建新的应用程序。为了规避这个问题,我们有两种高可用性方案,详细内容如下。

带有ZooKeeper的备用主节点

概述

利用ZooKeeper提供领导者选举和某些状态存储,您可以在集群中启动多个连接到同一ZooKeeper实例的Master。其中一个将被选为“领导者”,其他的将保持待命模式。如果当前领导者失效,将会选举另一个Master,恢复旧Master的状态,然后重新开始调度。整个恢复过程(从第一个领导者崩溃开始)应该在1到2分钟之间完成。请注意,此延迟仅影响调度 应用程序——在Master故障转移期间已经运行的应用程序不受影响。

了解有关如何开始使用 ZooKeeper 的更多信息 这里

配置

为了启用此恢复模式,您可以通过配置 spark.deploy.recoveryMode 和相关的 spark.deploy.zookeeper.* 配置在 spark-env 中设置 SPARK_DAEMON_JAVA_OPTS。有关这些配置的更多信息,请参阅 配置文档

可能的陷阱:如果您的集群中有多个主节点但未能正确配置主节点以使用 ZooKeeper,则主节点将无法相互发现,并认为它们都是领导者。这将导致集群状态不健康(因为所有主节点将独立调度)。

详情

在您设置好ZooKeeper集群后,启用高可用性很简单。只需在不同的节点上启动多个具有相同ZooKeeper配置(ZooKeeper URL和目录)的Master进程即可。Master可以随时添加和移除。

为了调度新的应用程序或向集群中添加工作节点,它们需要知道当前领导者的IP地址。这可以通过简单地传入一个主节点的列表来实现,而不是传入单个主节点。例如,您可以启动您的 SparkContext 指向 spark://host1:port1,host2:port2 。这将导致您的 SparkContext 尝试向两个主节点注册 - 如果 host1 发生故障,这个配置仍然是正确的,因为我们会找到新的领导者 host2

在“向主节点注册”和正常操作之间有一个重要的区别。当启动时,应用程序或Worker需要能够找到并向当前的主节点注册。一旦成功注册,它就“在系统中”(即存储在ZooKeeper中)。如果发生故障转移,新的主节点将联系所有之前注册的应用程序和Worker,以通知他们领导权的变化,因此它们甚至不需要在启动时知道新的主节点的存在。

由于这个属性,可以在任何时候创建新的 Masters,而你需要担心的唯一事情是 new 应用程序和 Workers 能够找到它进行注册,以防它成为领导者。一旦注册,你就不需要担心了。

使用本地文件系统的单节点恢复

概述

ZooKeeper 是实现生产级高可用性的最佳选择,但如果您只想在 Master 崩溃时能够重启它,FILESYSTEM 模式可以解决这个问题。当应用程序和 Worker 注册时,它们会将足够的状态写入提供的目录,以便在 Master 进程重启时能够恢复。

配置

为了启用此恢复模式,您可以在 spark-env 中设置 SPARK_DAEMON_JAVA_OPTS,使用以下配置:

系统属性 含义 自版本起
spark.deploy.recoveryMode 设置为 FILESYSTEM 以启用单节点恢复模式(默认值:NONE)。 0.8.1
spark.deploy.recoveryDirectory Spark 将存储恢复状态的目录,从 Master 的角度可访问。 0.8.1

详情