Spark 连接概述

构建客户端Spark应用程序

在Apache Spark 3.4中,Spark Connect引入了一种解耦的客户端-服务器架构,允许通过DataFrame API和未解析的逻辑计划作为协议远程连接到Spark集群。客户端和服务器之间的分离使得Spark及其开放生态系统可以在任何地方利用。它可以嵌入到现代数据应用程序、IDE、笔记本和编程语言中。

要开始,请查看 快速入门:Spark Connect

Spark 连接 API 图示

Spark Connect 是如何工作的

Spark Connect 客户端库旨在简化 Spark 应用程序的开发。它是一个薄型 API,可以嵌入到任何地方:在应用程序服务器、IDE、笔记本和编程语言中。Spark Connect API 基于 Spark 的 DataFrame API,使用未解决的逻辑计划作为客户端与 Spark 驱动程序之间的语言无关协议。

Spark Connect 客户端将 DataFrame 操作翻译为未解决的逻辑查询计划,这些计划使用协议缓冲区进行编码。它们通过 gRPC 框架发送到服务器。

嵌入在Spark Server上的Spark Connect端点接收并 将未解析的逻辑计划转换为Spark的逻辑计划运算符。 这类似于解析SQL查询,其中属性和关系被 解析并构建初始解析计划。从那里,标准的Spark 执行过程开始,确保Spark Connect利用了所有的 Spark优化和增强。结果通过gRPC以Apache Arrow编码的行批次流回客户端。

Spark 连接通信

Spark Connect的操作优势

通过这种新架构,Spark Connect 缓解了若干多租户操作问题:

稳定性 : 使用过多内存的应用程序现在只会影响它们自己的环境,因为它们可以在自己的进程中运行。用户可以在客户端定义自己的依赖项,无需担心与Spark驱动程序之间的潜在冲突。

可升级性 : Spark驱动程序现在可以独立于应用程序无缝升级,例如可以受益于性能提升和安全修复。这意味着应用程序可以向前兼容,只要服务器端RPC定义设计为向后兼容。

调试能力和可观察性 : Spark Connect 允许在开发过程中直接从你最喜欢的 IDE 进行交互式调试。同样,应用程序可以使用应用程序的框架本地指标和日志库进行监控。

如何使用 Spark Connect

从 Spark 3.4 开始,Spark Connect 可用,并支持 PySpark 和 Scala 应用。我们将介绍如何使用 Spark Connect 运行 Apache Spark 服务器,并通过 Spark Connect 客户端库从客户端应用程序连接到它。

下载并启动带有 Spark Connect 的 Spark 服务器

首先,从 下载 Apache Spark 页面下载 Spark。Spark Connect是在 Apache Spark 版本 3.4 中引入的,因此请确保在页面顶部的发布下拉菜单中选择 3.4.0 或更新版本。然后选择您的软件包类型,通常选择“为 Apache Hadoop 3.3 及更高版本预构建”,然后单击链接进行下载。

现在提取你刚刚下载的Spark包,例如:

tar -xvf spark-3.5.3-bin-hadoop3.tgz

在终端窗口中,前往您之前提取Spark的 spark 文件夹,并运行 start-connect-server.sh 脚本以启动带有Spark Connect的Spark服务器,如下面的示例所示:

./sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.3

请注意,我们包含一个 Spark Connect 包 ( spark-connect_2.12:3.5.3 ),当启动 Spark 服务器时。这是使用 Spark Connect 所必需的。确保使用与您之前下载的 Spark 版本相同的包版本。在这个示例中, Spark 3.5.3 与 Scala 2.12。

现在 Spark 服务器正在运行,并准备接受来自客户端应用程序的 Spark Connect 会话。在下一节中,我们将讲解如何在编写客户端应用程序时使用 Spark Connect。

使用 Spark Connect 进行交互式分析

创建Spark会话时,您可以指定要使用Spark Connect,有几种方式可以做到这一点,如下所述。

如果您不使用这里概述的机制,您的Spark会话将像以前一样工作,而不利用Spark Connect。

设置 SPARK_REMOTE 环境变量

如果您在运行 Spark 客户端应用程序的客户端机器上设置 SPARK_REMOTE 环境变量,并创建一个新的 Spark 会话,如以下示例所示,则该会话将成为一个 Spark Connect 会话。使用这种方法,不需要更改代码即可开始使用 Spark Connect。

在终端窗口中,将 SPARK_REMOTE 环境变量设置为指向您之前在计算机上启动的本地Spark服务器:

export SPARK_REMOTE="sc://localhost"

并像往常一样启动Spark shell:

./bin/pyspark

PySpark shell 现在通过 Spark Connect 连接到 Spark,如欢迎消息所示:

Client connected to the Spark Connect server at localhost

在创建 Spark 会话时指定 Spark 连接

您还可以在创建 Spark 会话时明确指定要使用 Spark Connect。

例如,您可以启动带有Spark Connect的PySpark外壳,如此处所示。

要使用 Spark Connect 启动 PySpark shell,只需包含 remote 参数并指定 Spark 服务器的位置。在这个例子中,我们使用 localhost 连接到之前启动的本地 Spark 服务器:

./bin/pyspark --remote "sc://localhost"

你会注意到,PySpark shell 欢迎信息告诉你,你已通过 Spark Connect 连接到 Spark:

Client connected to the Spark Connect server at localhost

您还可以检查 Spark 会话类型。如果它包含 .connect. ,您正在使用 Spark Connect,如下面的示例所示:

SparkSession available as 'spark'.
>>> type(spark)
<class 'pyspark.sql.connect.session.SparkSession'>

现在你可以在命令行中运行PySpark代码,以查看Spark Connect的实际效果:

>>> columns = ["id","name"]
>>> data = [(1,"Sarah"),(2,"Maria")]
>>> df = spark.createDataFrame(data).toDF(*columns)
>>> df.show()
+---+-----+
| id| name|
+---+-----+
| 1|Sarah|
| 2|Maria|
+---+-----+

对于Scala shell,我们使用基于Ammonite的REPL,该REPL目前不包含在Apache Spark包中。

要设置新的 Scala shell,首先下载并安装 Coursier CLI 。然后,在终端窗口中使用以下命令安装 REPL:

cs install –-contrib spark-connect-repl

现在,您可以像这样启动基于 Ammonite 的 Scala REPL/外壳,以连接到您的 Spark 服务器:

spark-connect-repl

当 REPL 成功初始化时,将出现一条问候消息:

Spark session available as 'spark'.
   _____                  __      ______                            __
  / ___/____  ____ ______/ /__   / ____/___  ____  ____  ___  _____/ /_
  \__ \/ __ \/ __ `/ ___/ //_/  / /   / __ \/ __ \/ __ \/ _ \/ ___/ __/
 ___/ / /_/ / /_/ / /  / ,<    / /___/ /_/ / / / / / / /  __/ /__/ /_
/____/ .___/\__,_/_/  /_/|_|   \____/\____/_/ /_/_/ /_/\___/\___/\__/
    /_/

默认情况下,REPL 将尝试连接到本地 Spark 服务器。
在 shell 中运行以下 Scala 代码以查看 Spark Connect 的实际效果:

@ spark.range(10).count
res0: Long = 10L

配置客户端-服务器连接

默认情况下,REPL 将尝试连接到本地 Spark 服务器,端口为 15002。 但是,连接可以通过多种方式进行配置,如此配置 参考 所述。

设置 SPARK_REMOTE 环境变量

可以在客户端机器上设置SPARK_REMOTE环境变量,以自定义在REPL启动时初始化的客户端-服务器连接。

export SPARK_REMOTE="sc://myhost.com:443/;token=ABCDEFG"
spark-connect-repl

或者

SPARK_REMOTE="sc://myhost.com:443/;token=ABCDEFG" spark-connect-repl

使用CLI参数

自定义设置也可以通过以下所示的CLI参数传递:

spark-connect-repl --host myhost.com --port 443 --token ABCDEFG

支持的CLI参数列表可以在 此处 找到。

通过连接字符串进行程序化配置

连接也可以通过编程方式使用 SparkSession#builder 创建,如下例所示:

@ import org.apache.spark.sql.SparkSession
@ val spark = SparkSession.builder.remote("sc://localhost:443/;token=ABCDEFG").build()

在独立应用程序中使用 Spark Connect

首先,使用 pip install pyspark[connect]==3.5.0 安装 PySpark,或者如果构建一个打包的 PySpark 应用程序/库,请将其添加到你的 setup.py 文件中,如下所示:

install_requires=[
'pyspark[connect]==3.5.0'
]

在编写自己代码时,创建Spark会话时,请包含带有对您的Spark服务器引用的 remote 函数,如以下示例所示:

from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://localhost").getOrCreate()

为了说明,我们将创建一个简单的 Spark Connect 应用程序 SimpleApp.py:

"""SimpleApp.py"""
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" # 应该是你系统上的某个文件
spark = SparkSession.builder.remote("sc://localhost").appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
spark.stop()

该程序仅计算文本文件中包含'a'的行数和包含'b'的行数。 注意,您需要将YOUR_SPARK_HOME替换为Spark安装的位置。

我们可以使用常规的Python解释器运行这个应用程序,如下所示:

# 使用Python解释器运行你的应用程序
$ python SimpleApp.py
...
 with a: 72,  with b: 39

要在Scala应用程序/项目中使用Spark Connect,我们首先需要包含正确的依赖项。
sbt 构建系统为例,我们将以下依赖项添加到 build.sbt 文件中:

libraryDependencies += "org.apache.spark" %% "spark-sql-api" % "3.5.0"
libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.0"

在编写自己代码时,创建Spark会话时,请包含带有对您的Spark服务器引用的 remote 函数,如以下示例所示:

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().remote("sc://localhost").build()

注意 : 参考用户定义代码的操作,如 UDF、过滤、映射等,需要注册一个 ClassFinder 以便拾取和上传任何所需的类文件。此外,任何 JAR 依赖项必须使用 SparkSession#AddArtifact 上传到服务器。

示例:

import org.apache.spark.sql.connect.client.REPLClassDirMonitor
// 注册一个 ClassFinder 以监控并上传构建输出的类文件。
val classFinder = new REPLClassDirMonitor(<ABSOLUTE_PATH_TO_BUILD_OUTPUT_DIR>)
spark.registerClassFinder(classfinder)
// 上传 JAR 依赖
spark.addArtifact(<ABSOLUTE_PATH_JAR_DEP>)

在这里, ABSOLUTE_PATH_TO_BUILD_OUTPUT_DIR 是构建系统写入类文件的输出目录, ABSOLUTE_PATH_JAR_DEP 是本地文件系统中JAR的位置。

提供的 REPLClassDirMonitor 是对 ClassFinder 的一个实现,它监视一个特定的目录,但您可以实现自己的类,扩展 ClassFinder 以进行自定义搜索和监视。

客户端应用程序认证

虽然 Spark Connect 没有内置的身份验证,但它旨在与您现有的身份验证基础设施无缝协作。其 gRPC HTTP/2 接口允许使用身份验证代理,这使得在不必直接在 Spark 中实现身份验证逻辑的情况下保护 Spark Connect 成为可能。

Spark 3.4 支持什么

PySpark : 在 Spark 3.4 中,Spark Connect 支持大多数 PySpark API,包括 DataFrame , 函数 ,以及 。然而, 一些 API,例如 SparkContext RDD 不受支持。 您可以在 API 参考 文档中查看当前支持哪些 API。 支持的 API 被标记为“支持 Spark Connect”,因此您可以在将现有代码迁移到 Spark Connect 之前检查您使用的 API 是否可用。

Scala :在 Spark 3.5 中,Spark Connect 支持大多数 Scala API,包括 Dataset , functions , Column , Catalog KeyValueGroupedDataset

用户定义函数 (UDFs) 是支持的,默认情况下用于 shell 和在独立应用程序中,需要额外的设置要求。

大部分流媒体API都得到了支持,包括 DataStreamReader , DataStreamWriter , StreamingQuery StreamingQueryListener

诸如 SparkContext RDD 的API在所有Spark Connect版本中已被弃用。

计划在即将发布的Spark版本中支持更多API。