为PySpark做贡献

有许多类型的贡献,例如,帮助其他用户,测试版本,审查更改,文档贡献,错误报告,JIRA维护,代码更改等。这些在 一般指南 中有记载。本页面重点关注PySpark,并包括专门针对PySpark的额外细节。

通过测试发布进行贡献

在正式发布之前,PySpark的发布候选版本会在 dev@spark.apache.org 邮件列表中分享以进行投票。 这些发布候选版本可以通过pip轻松安装。例如,对于Spark 3.0.0 RC1,您可以如下安装:

pip install https://dist.apache.org/repos/dist/dev/spark/v3.0.0-rc1-bin/pyspark-3.0.0.tar.gz

发布文件的链接,如 https://dist.apache.org/repos/dist/dev/spark/v3.0.0-rc1-bin 可以在投票线程中找到。

测试和验证用户现有工作负载与发布候选版本之间的关系是对PySpark的重要贡献之一。 它可以防止在官方发布之前破坏用户现有的工作负载。 当出现回归、正确性问题或性能下降等值得放弃发布候选版本的问题时, 通常会放弃发布候选版本,社区会专注于修复该问题,以便在下一个发布候选版本中包含。

贡献文档变更

发布文档位于Spark的 docs 目录下。 README.md 描述了所需的依赖项和生成文档的步骤。 通常,PySpark文档通过以下命令在 docs 目录下进行测试:

SKIP_SCALADOC=1 SKIP_RDOC=1 SKIP_SQLDOC=1 bundle exec jekyll serve --watch

PySpark 使用 Sphinx 来生成其发布的 PySpark 文档。因此,如果你只想构建 PySpark 文档, 你可以在 python/docs 目录下构建:

make html

它在 python/docs/build/html 目录下生成相应的 HTML。

最后,请确保通过手动在相应的 RST 文件中添加方法和/或类来记录新的 API,位于 python/docs/source/reference 下。否则,它们将不会在 PySpark 文档中被记录。

准备贡献代码更改

在开始在PySpark中编写代码之前,建议阅读 一般指南 。此外,还有一些额外的注意事项需要记住,当您在PySpark中贡献代码时:

  • 遵循 Python 风格

    参见 Python 之禅

  • 与 Scala 和 Java 端匹配 API

    Apache Spark 是一个统一的引擎,提供一致的 API 层。一般来说,API 在其他语言中一致地得到支持。

  • 可以接受 PySpark 特定的 API

    只要它们符合 Python 风格且不与其他现有 API 冲突,提出 API 请求是可以的,例如,UDF 的装饰器用法。

  • 如果你扩展或修改公共 API,请调整相应的类型提示

    有关详细信息,请参见 贡献和维护类型提示

如果您正在修复 pandas API 在 Spark ( pyspark.pandas ) 包,请考虑以下设计原则:

  • 返回大数据的 pandas-on-Spark 数据结构,以及小数据的 pandas 数据结构

    开发人员常常面临一个问题:特定功能是否应该返回 pandas-on-Spark DataFrame/Series,还是返回 pandas DataFrame/Series。原则是:如果返回的对象可能很大,使用 pandas-on-Spark DataFrame/Series。如果数据一定很小,使用 pandas DataFrame/Series。例如, DataFrame.dtypes 返回一个 pandas Series,因为 DataFrame 中的列数是有限且较小的,而 DataFrame.head() Series.unique() 返回一个 pandas-on-Spark DataFrame/Series,因为结果对象可能很大。

  • 为常见数据科学任务提供可发现的 API

    夸大其辞的风险下,有两种 API 设计方法:第一种侧重于为常见任务提供 API;第二种从抽象开始,通过组合原语使用户能够完成他们的任务。虽然世界并非黑白分明,但 pandas 更倾向于前者,而 Spark 则更多地采用后者。

    一个例子是值计数(按某个关键列计数),这是数据科学中最常见的操作之一。pandas DataFrame.value_counts() 按排序顺序返回结果,在 90% 的情况下,这是用户在探索数据时所偏好的,而 Spark 的结果不排序,更适合构建数据管道,因为用户可以通过添加显式的 orderBy 来实现 pandas 的行为。

    与 pandas 类似,pandas API on Spark 也应该更加倾向于前者,为常见数据科学任务提供可发现的 API。在大多数情况下,这一原则通过简单实现 pandas 的 API 得到了很好的遵循。然而,也会有一些情况,pandas 的 API 无法满足特定需求,例如处理大数据的绘图。

  • 防止用户自杀式操作的防护措施

    在 pandas 中,某些操作随着数据规模的增加而变得极其昂贵,我们不想让用户产生依赖于此类操作的错觉,尤其是在 pandas API on Spark 中。也就是说,pandas API on Spark 中实现的方法在默认情况下应该能够安全地在大型数据集上执行。因此,以下能力未在 pandas API on Spark 中实现:

    • 从根本上无法并行化的能力:例如,逐个元素地命令性循环

    • 需要在单个节点内存中实现整个工作集的能力。这就是为什么我们不实现 pandas.DataFrame.to_xarray 的原因。另一个例子是 _repr_html_ 调用将显示的记录总数限制为最大 1000,以防止用户仅仅通过在笔记本中键入 DataFrame 的名称而使其驱动节点崩溃。

    然而,存在一些例外情况。“大数据科学”的一个常见模式是,虽然初始数据集很大,但随着分析的深入,工作集会变得更小。例如,数据科学家常常对数据集进行聚合,然后希望将聚合后的数据集转换为某种本地数据结构。为了帮助数据科学家,我们提供:

    • DataFrame.to_pandas :返回一个 pandas DataFrame(仅适用于 pandas-on-Spark)

    • DataFrame.to_numpy :返回一个 numpy 数组,适用于 pandas 和 pandas API on Spark

    请注意,从这些函数的名称中可以清楚看出,它们返回一些本地数据结构,这将需要在单个节点的内存中实现数据。对于这些函数,我们还明确在文档中注明了警告,结果数据结构必须很小。

环境设置

先决条件

PySpark 开发需要构建 Spark,这需要安装合适的 JDK 等。有关更多详细信息,请参见 Building Spark

注意,如果您打算为Python中的Spark Connect做贡献,必须使用 buf 版本 1.24.0 ,有关更多详细信息,请参见 Buf安装

Conda

如果您正在使用Conda,开发环境可以如下设置。

# Python 3.8+ is required
conda create --name pyspark-dev-env python=3.9
conda activate pyspark-dev-env
pip install --upgrade -r dev/requirements.txt

一旦设置完成,请确保在开始开发之前切换到 pyspark-dev-env

conda activate pyspark-dev-env

现在,您可以开始开发并 运行测试

pip

使用 Python 3.8+,可以如下使用 pip 来安装和设置开发环境。

pip install --upgrade -r dev/requirements.txt

现在,您可以开始开发并 运行测试

贡献和维护类型提示

PySpark 类型提示是内联的,以利用静态类型检查。

作为一个经验法则,只有公共API会被注释。

注释应该在可能的情况下:

  • 反映底层JVM API的期望,以帮助避免Python解释器之外的类型相关失败。

  • 在过于宽泛( Any )和过于狭窄的参数注释之间发生冲突时,优先选择后者,只要它覆盖了大多数典型用例。

  • 使用 @overload 注释指示无意义的参数组合。例如,表示 *Col *Cols 参数是互斥的:

    @overload
    def __init__(
        self,
        *,
        threshold: float = ...,
        inputCol: Optional[str] = ...,
        outputCol: Optional[str] = ...
    ) -> None: ...
    @overload
    def __init__(
        self,
        *,
        thresholds: Optional[List[float]] = ...,
        inputCols: Optional[List[str]] = ...,
        outputCols: Optional[List[str]] = ...
    ) -> None: ...
    
  • 与当前稳定的MyPy版本兼容。

复杂的支持类型定义,应放置在专用的 _typing.pyi 存根中。可以参考 pyspark.sql._typing.pyi

可以使用 dev/lint-python 脚本或直接调用 mypy 来验证注释:

mypy --config python/mypy.ini python/pyspark

代码和文档字符串指南

代码规范

请遵循现有代码库的风格,基本上是PEP 8,唯一的例外是:行长度可以达到100个字符,而不是79个。

注意:

  • 在 PySpark 中的方法和变量名称与 Python 本身的 threading 库类似,其中的 API 受到了 Java 的启发。PySpark 还遵循 camelCase 规则,以匹配 Scala 和 Java 的公开 API。

  • 相比之下, functions.py 使用 snake_case 以使 API 更加友好于 SQL(和 Python)。

  • 此外,pandas-on-Spark ( pyspark.pandas ) 也使用 snake_case ,因为该包不需要与其他语言保持 API 一致性。

PySpark 利用像 pycodestyle flake8 这样的代码检查工具,这些工具由 dev/lint-python 运行。因此,确保运行该脚本以进行再次确认。

文档字符串约定

PySpark遵循 NumPy文档风格

文档测试约定

一般来说,文档测试应通过换行符逻辑上分组。

例如,第一个块是准备语句,第二个块是使用特定参数的函数,第三个块是另一个参数。作为示例,请参考 DataFrame.rsub 在 pandas 中。

这些代码块在PySpark文档测试中应一致分开,如果文档测试的覆盖率或展示的示例数量不足,应添加更多文档测试。

贡献错误和异常

为了抛出标准化的用户可见错误或异常,开发人员应该指定错误类和消息参数,而不是任意错误消息。

使用

  1. 检查 error_classes.py 中是否已经存在合适的错误类。 如果是,则使用该错误类并跳到第3步。

  2. error_classes.py 中添加一个新类;请记住以下的不变条件。

  3. 检查异常类型是否已经扩展了 PySparkException 。 如果是,则跳到第5步。

  4. PySparkException 混入异常中。

  5. 使用错误类和消息参数抛出异常。

之前

抛出任意错误消息:

raise ValueError("Problem A because B")

之后

error_classes.py

"PROBLEM_BECAUSE": {
  "message": ["Problem <problem> because <cause>"]
}

exceptions.py

class PySparkTestError(PySparkException):
    def __init__(self, error_class: str, message_parameters: Dict[str, str]):
        super().__init__(error_class=error_class, message_parameters=message_parameters)

    def getMessageParameters(self) -> Optional[Dict[str, str]]:
        return super().getMessageParameters()

抛出带有错误类和消息参数:

raise PySparkTestError("PROBLEM_BECAUSE", {"problem": "A", "cause": "B"})

访问字段

要访问错误字段,请捕获扩展了 PySparkException 的异常,并通过 PySparkException.getErrorClass() 访问错误类别。

try:
    ...
except PySparkException as pe:
    if pe.getErrorClass() == "PROBLEM_BECAUSE":
        ...

字段

错误类

错误类别是错误类别的简明易懂的表示。

未分类的错误可以被分配到一个以前的错误类别,前缀为 _LEGACY_ERROR_TEMP_ ,并带有一个未使用的顺序号,例如 _LEGACY_ERROR_TEMP_0053

不变性:

  • 唯一

  • 在不同版本中保持一致

  • 按字母顺序排序

消息

错误信息提供了对错误的描述性、易读的表示。 消息格式通过C风格的printf语法接受字符串参数。

错误信息的质量应符合 Apache Spark 错误信息指南

不变式:

  • 唯一