Spark 连接概述
构建客户端Spark应用程序
在Apache Spark 3.4中,Spark Connect引入了一种解耦的客户端-服务器架构,允许通过DataFrame API和未解析的逻辑计划作为协议远程连接到Spark集群。客户端和服务器之间的分离使得Spark及其开放生态系统可以在任何地方利用。它可以嵌入到现代数据应用程序、IDE、笔记本和编程语言中。
要开始,请查看 快速入门:Spark Connect 。
Spark Connect 是如何工作的
Spark Connect 客户端库旨在简化 Spark 应用程序的开发。它是一个薄型 API,可以嵌入到任何地方:在应用程序服务器、IDE、笔记本和编程语言中。Spark Connect API 基于 Spark 的 DataFrame API,使用未解决的逻辑计划作为客户端与 Spark 驱动程序之间的语言无关协议。
Spark Connect 客户端将 DataFrame 操作翻译为未解决的逻辑查询计划,这些计划使用协议缓冲区进行编码。它们通过 gRPC 框架发送到服务器。
嵌入在Spark Server上的Spark Connect端点接收并 将未解析的逻辑计划转换为Spark的逻辑计划运算符。 这类似于解析SQL查询,其中属性和关系被 解析并构建初始解析计划。从那里,标准的Spark 执行过程开始,确保Spark Connect利用了所有的 Spark优化和增强。结果通过gRPC以Apache Arrow编码的行批次流回客户端。
Spark Connect的操作优势
通过这种新架构,Spark Connect 缓解了若干多租户操作问题:
稳定性 : 使用过多内存的应用程序现在只会影响它们自己的环境,因为它们可以在自己的进程中运行。用户可以在客户端定义自己的依赖项,无需担心与Spark驱动程序之间的潜在冲突。
可升级性 : Spark驱动程序现在可以独立于应用程序无缝升级,例如可以受益于性能提升和安全修复。这意味着应用程序可以向前兼容,只要服务器端RPC定义设计为向后兼容。
调试能力和可观察性 : Spark Connect 允许在开发过程中直接从你最喜欢的 IDE 进行交互式调试。同样,应用程序可以使用应用程序的框架本地指标和日志库进行监控。
如何使用 Spark Connect
从 Spark 3.4 开始,Spark Connect 可用,并支持 PySpark 和 Scala 应用。我们将介绍如何使用 Spark Connect 运行 Apache Spark 服务器,并通过 Spark Connect 客户端库从客户端应用程序连接到它。
下载并启动带有 Spark Connect 的 Spark 服务器
首先,从 下载 Apache Spark 页面下载 Spark。Spark Connect是在 Apache Spark 版本 3.4 中引入的,因此请确保在页面顶部的发布下拉菜单中选择 3.4.0 或更新版本。然后选择您的软件包类型,通常选择“为 Apache Hadoop 3.3 及更高版本预构建”,然后单击链接进行下载。
现在提取你刚刚下载的Spark包,例如:
tar -xvf spark-3.5.3-bin-hadoop3.tgz
在终端窗口中,前往您之前提取Spark的
spark
文件夹,并运行
start-connect-server.sh
脚本以启动带有Spark Connect的Spark服务器,如下面的示例所示:
./sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.3
请注意,我们包含一个 Spark Connect 包 (
spark-connect_2.12:3.5.3
),当启动
Spark 服务器时。这是使用 Spark Connect 所必需的。确保使用与您之前下载的 Spark 版本相同的包版本。在这个示例中,
Spark 3.5.3 与 Scala 2.12。
现在 Spark 服务器正在运行,并准备接受来自客户端应用程序的 Spark Connect 会话。在下一节中,我们将讲解如何在编写客户端应用程序时使用 Spark Connect。
使用 Spark Connect 进行交互式分析
创建Spark会话时,您可以指定要使用Spark Connect,有几种方式可以做到这一点,如下所述。
如果您不使用这里概述的机制,您的Spark会话将像以前一样工作,而不利用Spark Connect。
设置 SPARK_REMOTE 环境变量
如果您在运行 Spark 客户端应用程序的客户端机器上设置
SPARK_REMOTE
环境变量,并创建一个新的 Spark 会话,如以下示例所示,则该会话将成为一个 Spark Connect 会话。使用这种方法,不需要更改代码即可开始使用 Spark Connect。
在终端窗口中,将
SPARK_REMOTE
环境变量设置为指向您之前在计算机上启动的本地Spark服务器:
export SPARK_REMOTE="sc://localhost"
并像往常一样启动Spark shell:
./bin/pyspark
PySpark shell 现在通过 Spark Connect 连接到 Spark,如欢迎消息所示:
Client connected to the Spark Connect server at localhost
在创建 Spark 会话时指定 Spark 连接
您还可以在创建 Spark 会话时明确指定要使用 Spark Connect。
例如,您可以启动带有Spark Connect的PySpark外壳,如此处所示。
要使用 Spark Connect 启动 PySpark shell,只需包含
remote
参数并指定 Spark 服务器的位置。在这个例子中,我们使用
localhost
连接到之前启动的本地 Spark 服务器:
./bin/pyspark --remote "sc://localhost"
你会注意到,PySpark shell 欢迎信息告诉你,你已通过 Spark Connect 连接到 Spark:
Client connected to the Spark Connect server at localhost
您还可以检查 Spark 会话类型。如果它包含
.connect.
,您正在使用 Spark Connect,如下面的示例所示:
SparkSession available as 'spark'.
>>> type(spark)
<class 'pyspark.sql.connect.session.SparkSession'>
现在你可以在命令行中运行PySpark代码,以查看Spark Connect的实际效果:
>>> columns = ["id","name"]
>>> data = [(1,"Sarah"),(2,"Maria")]
>>> df = spark.createDataFrame(data).toDF(*columns)
>>> df.show()
+---+-----+
| id| name|
+---+-----+
| 1|Sarah|
| 2|Maria|
+---+-----+
对于Scala shell,我们使用基于Ammonite的REPL,该REPL目前不包含在Apache Spark包中。
要设置新的 Scala shell,首先下载并安装 Coursier CLI 。然后,在终端窗口中使用以下命令安装 REPL:
cs install –-contrib spark-connect-repl
现在,您可以像这样启动基于 Ammonite 的 Scala REPL/外壳,以连接到您的 Spark 服务器:
spark-connect-repl
当 REPL 成功初始化时,将出现一条问候消息:
Spark session available as 'spark'.
_____ __ ______ __
/ ___/____ ____ ______/ /__ / ____/___ ____ ____ ___ _____/ /_
\__ \/ __ \/ __ `/ ___/ //_/ / / / __ \/ __ \/ __ \/ _ \/ ___/ __/
___/ / /_/ / /_/ / / / ,< / /___/ /_/ / / / / / / / __/ /__/ /_
/____/ .___/\__,_/_/ /_/|_| \____/\____/_/ /_/_/ /_/\___/\___/\__/
/_/
默认情况下,REPL 将尝试连接到本地 Spark 服务器。
在 shell 中运行以下 Scala 代码以查看 Spark Connect 的实际效果:
@ spark.range(10).count
res0: Long = 10L
配置客户端-服务器连接
默认情况下,REPL 将尝试连接到本地 Spark 服务器,端口为 15002。 但是,连接可以通过多种方式进行配置,如此配置 参考 所述。
设置 SPARK_REMOTE 环境变量
可以在客户端机器上设置SPARK_REMOTE环境变量,以自定义在REPL启动时初始化的客户端-服务器连接。
export SPARK_REMOTE="sc://myhost.com:443/;token=ABCDEFG"
spark-connect-repl
或者
SPARK_REMOTE="sc://myhost.com:443/;token=ABCDEFG" spark-connect-repl
使用CLI参数
自定义设置也可以通过以下所示的CLI参数传递:
spark-connect-repl --host myhost.com --port 443 --token ABCDEFG
支持的CLI参数列表可以在 此处 找到。
通过连接字符串进行程序化配置
连接也可以通过编程方式使用 SparkSession#builder 创建,如下例所示:
@ import org.apache.spark.sql.SparkSession
@ val spark = SparkSession.builder.remote("sc://localhost:443/;token=ABCDEFG").build()
在独立应用程序中使用 Spark Connect
首先,使用
pip install pyspark[connect]==3.5.0
安装 PySpark,或者如果构建一个打包的 PySpark 应用程序/库,请将其添加到你的 setup.py 文件中,如下所示:
install_requires=[
'pyspark[connect]==3.5.0'
]
在编写自己代码时,创建Spark会话时,请包含带有对您的Spark服务器引用的
remote
函数,如以下示例所示:
from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://localhost").getOrCreate()
为了说明,我们将创建一个简单的 Spark Connect 应用程序 SimpleApp.py:
"""SimpleApp.py"""
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" # 应该是你系统上的某个文件
spark = SparkSession.builder.remote("sc://localhost").appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
spark.stop()
该程序仅计算文本文件中包含'a'的行数和包含'b'的行数。 注意,您需要将YOUR_SPARK_HOME替换为Spark安装的位置。
我们可以使用常规的Python解释器运行这个应用程序,如下所示:
# 使用Python解释器运行你的应用程序
$ python SimpleApp.py
...
行 with a: 72, 行 with b: 39
要在Scala应用程序/项目中使用Spark Connect,我们首先需要包含正确的依赖项。
以
sbt
构建系统为例,我们将以下依赖项添加到
build.sbt
文件中:
libraryDependencies += "org.apache.spark" %% "spark-sql-api" % "3.5.0"
libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.0"
在编写自己代码时,创建Spark会话时,请包含带有对您的Spark服务器引用的
remote
函数,如以下示例所示:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().remote("sc://localhost").build()
注意
: 参考用户定义代码的操作,如 UDF、过滤、映射等,需要注册一个
ClassFinder
以便拾取和上传任何所需的类文件。此外,任何 JAR 依赖项必须使用
SparkSession#AddArtifact
上传到服务器。
示例:
import org.apache.spark.sql.connect.client.REPLClassDirMonitor
// 注册一个 ClassFinder 以监控并上传构建输出的类文件。
val classFinder = new REPLClassDirMonitor(<ABSOLUTE_PATH_TO_BUILD_OUTPUT_DIR>)
spark.registerClassFinder(classfinder)
// 上传 JAR 依赖
spark.addArtifact(<ABSOLUTE_PATH_JAR_DEP>)
在这里,
ABSOLUTE_PATH_TO_BUILD_OUTPUT_DIR
是构建系统写入类文件的输出目录,
ABSOLUTE_PATH_JAR_DEP
是本地文件系统中JAR的位置。
提供的
REPLClassDirMonitor
是对
ClassFinder
的一个实现,它监视一个特定的目录,但您可以实现自己的类,扩展
ClassFinder
以进行自定义搜索和监视。
客户端应用程序认证
虽然 Spark Connect 没有内置的身份验证,但它旨在与您现有的身份验证基础设施无缝协作。其 gRPC HTTP/2 接口允许使用身份验证代理,这使得在不必直接在 Spark 中实现身份验证逻辑的情况下保护 Spark Connect 成为可能。
Spark 3.4 支持什么
PySpark : 在 Spark 3.4 中,Spark Connect 支持大多数 PySpark API,包括 DataFrame , 函数 ,以及 列 。然而, 一些 API,例如 SparkContext 和 RDD 不受支持。 您可以在 API 参考 文档中查看当前支持哪些 API。 支持的 API 被标记为“支持 Spark Connect”,因此您可以在将现有代码迁移到 Spark Connect 之前检查您使用的 API 是否可用。
Scala :在 Spark 3.5 中,Spark Connect 支持大多数 Scala API,包括 Dataset , functions , Column , Catalog 和 KeyValueGroupedDataset 。
用户定义函数 (UDFs) 是支持的,默认情况下用于 shell 和在独立应用程序中,需要额外的设置要求。
大部分流媒体API都得到了支持,包括 DataStreamReader , DataStreamWriter , StreamingQuery 和 StreamingQueryListener 。
诸如 SparkContext 和 RDD 的API在所有Spark Connect版本中已被弃用。
计划在即将发布的Spark版本中支持更多API。