Python 软件包管理 ¶
当你想在集群上运行你的PySpark应用程序时,例如YARN、Kubernetes、Mesos等,你需要确保你的代码和所有使用的库在执行器上可用。
例如,假设您可能想运行
Pandas UDF 示例
。因为它使用pyarrow作为底层实现,我们需要确保在集群的每个执行器上都安装了pyarrow。否则,您可能会遇到类似于
ModuleNotFoundError:
没有
名为
'pyarrow'
的
模块
的错误。
这是上一个示例中将在集群上执行的脚本
app.py
:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
def main(spark):
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
print(df.groupby("id").agg(mean_udf(df['v'])).collect())
if __name__ == "__main__":
main(SparkSession.builder.getOrCreate())
在集群中管理Python依赖的方式有多种:
-
使用 PySpark 原生功能
-
使用 Conda
-
使用 Virtualenv
-
使用 PEX
使用PySpark原生功能 ¶
PySpark 允许通过以下任一方式将 Python 文件 (
.py
), 压缩的 Python 包 (
.zip
), 和 Egg 文件 (
.egg
) 上传到执行器:
-
设置配置项
spark.submit.pyFiles
-
在 Spark 脚本中设置
--py-files
选项 -
在应用程序中直接调用
pyspark.SparkContext.addPyFile()
这是将额外的自定义Python代码发送到集群的简单方法。您可以单独添加文件或将整个包压缩并上传。使用
pyspark.SparkContext.addPyFile()
允许您在启动作业后上传代码。
然而,它不允许添加构建为 Wheels 的包,因此不允许包含具有本机代码的依赖项。
使用Conda ¶
Conda 是最广泛使用的 Python 包管理系统之一。 PySpark 用户可以直接使用 Conda 环境通过利用 conda-pack 来分发他们的第三方 Python 包,该工具是一个创建可重定位 Conda 环境的命令行工具。
下面的示例创建了一个Conda环境,以供驱动程序和执行器使用,并将其打包到一个归档文件中。这个归档文件捕获了Python的Conda环境,并存储Python解释器及其所有相关依赖项。
conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas conda-pack
conda activate pyspark_conda_env
conda pack -f -o pyspark_conda_env.tar.gz
之后,您可以通过使用
--archives
选项或
spark.archives
配置 (
spark.yarn.dist.archives
在 YARN 中) 将其与脚本一起打包或在代码中打包。它会自动在执行程序上解压缩存档。
在
spark-submit
脚本的情况下,您可以如下使用它:
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_conda_env.tar.gz#environment app.py
请注意,上述
PYSPARK_DRIVER_PYTHON
不应在 YARN 或 Kubernetes 的集群模式中设置。
如果您在常规的Python shell或笔记本中,可以按如下所示进行尝试:
import os
from pyspark.sql import SparkSession
from app import main
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
"spark.archives", # 'spark.yarn.dist.archives' in YARN.
"pyspark_conda_env.tar.gz#environment").getOrCreate()
main(spark)
对于 pyspark shell:
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_conda_env.tar.gz#environment
使用 Virtualenv ¶
Virtualenv 是一个用于创建隔离Python环境的Python工具。自Python 3.3以来,它的部分功能已作为标准库集成到Python中,位于 venv 模块下。PySpark用户可以通过使用 venv-pack 以与conda-pack类似的方式来管理其集群中的Python依赖。
可以创建一个虚拟环境供驱动程序和执行器使用,如下所示。 它将当前虚拟环境打包成一个归档文件,并包含Python解释器及其依赖项。 然而,它要求集群中的所有节点都安装相同的Python解释器,因为 venv-pack将Python解释器打包为符号链接 。
python -m venv pyspark_venv
source pyspark_venv/bin/activate
pip install pyarrow pandas venv-pack
venv-pack -o pyspark_venv.tar.gz
您可以直接传递/解压缩归档文件,并通过利用
--archives
选项或
spark.archives
配置(在 YARN 中为
spark.yarn.dist.archives
)在执行器上启用环境。
对于
spark-submit
,您可以通过运行以下命令来使用它。还要注意,在Kubernetes或YARN集群模式下,
PYSPARK_DRIVER_PYTHON
必须被解除设置。
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_venv.tar.gz#environment app.py
对于普通的Python命令行或笔记本:
import os
from pyspark.sql import SparkSession
from app import main
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
"spark.archives", # 'spark.yarn.dist.archives' in YARN.
"pyspark_venv.tar.gz#environment").getOrCreate()
main(spark)
在pyspark shell的情况下:
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_venv.tar.gz#environment
使用 PEX ¶
PySpark 还可以使用
PEX
来打包 Python 包。 PEX 是一个创建自包含 Python 环境的工具。这类似于 Conda 或 virtualenv,但
.pex
文件可以独立执行。
以下示例为驱动程序和执行程序创建一个
.pex
文件。该文件包含使用
pex
命令指定的 Python 依赖项。
pip install pyarrow pandas pex
pex pyspark pyarrow pandas -o pyspark_pex_env.pex
该文件的行为与常规的Python解释器类似。
./pyspark_pex_env.pex -c "import pandas; print(pandas.__version__)"
1.1.5
然而,
.pex
文件本身并不包含Python解释器,因此集群中的所有节点都应该安装相同的Python解释器。
为了在集群中传输和使用
.pex
文件,您应该通过
spark.files
配置 (
spark.yarn.dist.files
在 YARN 中) 或
--files
选项进行传输,因为它们是常规文件,而不是目录或归档文件。
对于申请提交,您可以运行如下命令。
请注意,
PYSPARK_DRIVER_PYTHON
不应在 YARN 或 Kubernetes 的集群模式下设置。
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./pyspark_pex_env.pex
spark-submit --files pyspark_pex_env.pex app.py
对于普通的Python命令行或笔记本:
import os
from pyspark.sql import SparkSession
from app import main
os.environ['PYSPARK_PYTHON'] = "./pyspark_pex_env.pex"
spark = SparkSession.builder.config(
"spark.files", # 'spark.yarn.dist.files' in YARN.
"pyspark_pex_env.pex").getOrCreate()
main(spark)
对于交互式 pyspark shell,命令几乎是相同的:
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./pyspark_pex_env.pex
pyspark --files pyspark_pex_env.pex
一个端到端的Docker示例,用于部署一个独立的PySpark,使用
SparkSession.builder
和 PEX,可以在
这里
找到 - 它使用了cluster-pack,这是一个建立在PEX之上的库,自动化了手动创建和上传PEX的中间步骤。