Python 软件包管理

当你想在集群上运行你的PySpark应用程序时,例如YARN、Kubernetes、Mesos等,你需要确保你的代码和所有使用的库在执行器上可用。

例如,假设您可能想运行 Pandas UDF 示例 。因为它使用pyarrow作为底层实现,我们需要确保在集群的每个执行器上都安装了pyarrow。否则,您可能会遇到类似于 ModuleNotFoundError: 没有 名为 'pyarrow' 模块 的错误。

这是上一个示例中将在集群上执行的脚本 app.py

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession

def main(spark):
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))

    @pandas_udf("double")
    def mean_udf(v: pd.Series) -> float:
        return v.mean()

    print(df.groupby("id").agg(mean_udf(df['v'])).collect())


if __name__ == "__main__":
    main(SparkSession.builder.getOrCreate())

在集群中管理Python依赖的方式有多种:

  • 使用 PySpark 原生功能

  • 使用 Conda

  • 使用 Virtualenv

  • 使用 PEX

使用PySpark原生功能

PySpark 允许通过以下任一方式将 Python 文件 ( .py ), 压缩的 Python 包 ( .zip ), 和 Egg 文件 ( .egg ) 上传到执行器:

这是将额外的自定义Python代码发送到集群的简单方法。您可以单独添加文件或将整个包压缩并上传。使用 pyspark.SparkContext.addPyFile() 允许您在启动作业后上传代码。

然而,它不允许添加构建为 Wheels 的包,因此不允许包含具有本机代码的依赖项。

使用Conda

Conda 是最广泛使用的 Python 包管理系统之一。 PySpark 用户可以直接使用 Conda 环境通过利用 conda-pack 来分发他们的第三方 Python 包,该工具是一个创建可重定位 Conda 环境的命令行工具。

下面的示例创建了一个Conda环境,以供驱动程序和执行器使用,并将其打包到一个归档文件中。这个归档文件捕获了Python的Conda环境,并存储Python解释器及其所有相关依赖项。

conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas conda-pack
conda activate pyspark_conda_env
conda pack -f -o pyspark_conda_env.tar.gz

之后,您可以通过使用 --archives 选项或 spark.archives 配置 ( spark.yarn.dist.archives 在 YARN 中) 将其与脚本一起打包或在代码中打包。它会自动在执行程序上解压缩存档。

spark-submit 脚本的情况下,您可以如下使用它:

export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_conda_env.tar.gz#environment app.py

请注意,上述 PYSPARK_DRIVER_PYTHON 不应在 YARN 或 Kubernetes 的集群模式中设置。

如果您在常规的Python shell或笔记本中,可以按如下所示进行尝试:

import os
from pyspark.sql import SparkSession
from app import main

os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
    "spark.archives",  # 'spark.yarn.dist.archives' in YARN.
    "pyspark_conda_env.tar.gz#environment").getOrCreate()
main(spark)

对于 pyspark shell:

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_conda_env.tar.gz#environment

使用 Virtualenv

Virtualenv 是一个用于创建隔离Python环境的Python工具。自Python 3.3以来,它的部分功能已作为标准库集成到Python中,位于 venv 模块下。PySpark用户可以通过使用 venv-pack 以与conda-pack类似的方式来管理其集群中的Python依赖。

可以创建一个虚拟环境供驱动程序和执行器使用,如下所示。 它将当前虚拟环境打包成一个归档文件,并包含Python解释器及其依赖项。 然而,它要求集群中的所有节点都安装相同的Python解释器,因为 venv-pack将Python解释器打包为符号链接

python -m venv pyspark_venv
source pyspark_venv/bin/activate
pip install pyarrow pandas venv-pack
venv-pack -o pyspark_venv.tar.gz

您可以直接传递/解压缩归档文件,并通过利用 --archives 选项或 spark.archives 配置(在 YARN 中为 spark.yarn.dist.archives )在执行器上启用环境。

对于 spark-submit ,您可以通过运行以下命令来使用它。还要注意,在Kubernetes或YARN集群模式下, PYSPARK_DRIVER_PYTHON 必须被解除设置。

export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_venv.tar.gz#environment app.py

对于普通的Python命令行或笔记本:

import os
from pyspark.sql import SparkSession
from app import main

os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
    "spark.archives",  # 'spark.yarn.dist.archives' in YARN.
    "pyspark_venv.tar.gz#environment").getOrCreate()
main(spark)

在pyspark shell的情况下:

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_venv.tar.gz#environment

使用 PEX

PySpark 还可以使用 PEX 来打包 Python 包。 PEX 是一个创建自包含 Python 环境的工具。这类似于 Conda 或 virtualenv,但 .pex 文件可以独立执行。

以下示例为驱动程序和执行程序创建一个 .pex 文件。该文件包含使用 pex 命令指定的 Python 依赖项。

pip install pyarrow pandas pex
pex pyspark pyarrow pandas -o pyspark_pex_env.pex

该文件的行为与常规的Python解释器类似。

./pyspark_pex_env.pex -c "import pandas; print(pandas.__version__)"
1.1.5

然而, .pex 文件本身并不包含Python解释器,因此集群中的所有节点都应该安装相同的Python解释器。

为了在集群中传输和使用 .pex 文件,您应该通过 spark.files 配置 ( spark.yarn.dist.files 在 YARN 中) 或 --files 选项进行传输,因为它们是常规文件,而不是目录或归档文件。

对于申请提交,您可以运行如下命令。 请注意, PYSPARK_DRIVER_PYTHON 不应在 YARN 或 Kubernetes 的集群模式下设置。

export PYSPARK_DRIVER_PYTHON=python  # Do not set in cluster modes.
export PYSPARK_PYTHON=./pyspark_pex_env.pex
spark-submit --files pyspark_pex_env.pex app.py

对于普通的Python命令行或笔记本:

import os
from pyspark.sql import SparkSession
from app import main

os.environ['PYSPARK_PYTHON'] = "./pyspark_pex_env.pex"
spark = SparkSession.builder.config(
    "spark.files",  # 'spark.yarn.dist.files' in YARN.
    "pyspark_pex_env.pex").getOrCreate()
main(spark)

对于交互式 pyspark shell,命令几乎是相同的:

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./pyspark_pex_env.pex
pyspark --files pyspark_pex_env.pex

一个端到端的Docker示例,用于部署一个独立的PySpark,使用 SparkSession.builder 和 PEX,可以在 这里 找到 - 它使用了cluster-pack,这是一个建立在PEX之上的库,自动化了手动创建和上传PEX的中间步骤。