为PySpark做贡献 ¶
有许多类型的贡献,例如,帮助其他用户,测试版本,审查更改,文档贡献,错误报告,JIRA维护,代码更改等。这些在 一般指南 中有记载。本页面重点关注PySpark,并包括专门针对PySpark的额外细节。
通过测试发布进行贡献 ¶
在正式发布之前,PySpark的发布候选版本会在 dev@spark.apache.org 邮件列表中分享以进行投票。 这些发布候选版本可以通过pip轻松安装。例如,对于Spark 3.0.0 RC1,您可以如下安装:
pip install https://dist.apache.org/repos/dist/dev/spark/v3.0.0-rc1-bin/pyspark-3.0.0.tar.gz
发布文件的链接,如
https://dist.apache.org/repos/dist/dev/spark/v3.0.0-rc1-bin
可以在投票线程中找到。
测试和验证用户现有工作负载与发布候选版本之间的关系是对PySpark的重要贡献之一。 它可以防止在官方发布之前破坏用户现有的工作负载。 当出现回归、正确性问题或性能下降等值得放弃发布候选版本的问题时, 通常会放弃发布候选版本,社区会专注于修复该问题,以便在下一个发布候选版本中包含。
贡献文档变更 ¶
发布文档位于Spark的 docs 目录下。 README.md 描述了所需的依赖项和生成文档的步骤。 通常,PySpark文档通过以下命令在 docs 目录下进行测试:
SKIP_SCALADOC=1 SKIP_RDOC=1 SKIP_SQLDOC=1 bundle exec jekyll serve --watch
PySpark 使用 Sphinx 来生成其发布的 PySpark 文档。因此,如果你只想构建 PySpark 文档, 你可以在 python/docs 目录下构建:
make html
它在
python/docs/build/html
目录下生成相应的 HTML。
最后,请确保通过手动在相应的 RST 文件中添加方法和/或类来记录新的 API,位于
python/docs/source/reference
下。否则,它们将不会在 PySpark 文档中被记录。
准备贡献代码更改 ¶
在开始在PySpark中编写代码之前,建议阅读 一般指南 。此外,还有一些额外的注意事项需要记住,当您在PySpark中贡献代码时:
-
- 遵循 Python 风格
-
参见 Python 之禅 。
-
- 与 Scala 和 Java 端匹配 API
-
Apache Spark 是一个统一的引擎,提供一致的 API 层。一般来说,API 在其他语言中一致地得到支持。
-
- 可以接受 PySpark 特定的 API
-
只要它们符合 Python 风格且不与其他现有 API 冲突,提出 API 请求是可以的,例如,UDF 的装饰器用法。
-
- 如果你扩展或修改公共 API,请调整相应的类型提示
-
有关详细信息,请参见 贡献和维护类型提示 。
如果您正在修复 pandas API 在 Spark (
pyspark.pandas
) 包,请考虑以下设计原则:
-
- 返回大数据的 pandas-on-Spark 数据结构,以及小数据的 pandas 数据结构
-
开发人员常常面临一个问题:特定功能是否应该返回 pandas-on-Spark DataFrame/Series,还是返回 pandas DataFrame/Series。原则是:如果返回的对象可能很大,使用 pandas-on-Spark DataFrame/Series。如果数据一定很小,使用 pandas DataFrame/Series。例如,
DataFrame.dtypes
返回一个 pandas Series,因为 DataFrame 中的列数是有限且较小的,而DataFrame.head()
或Series.unique()
返回一个 pandas-on-Spark DataFrame/Series,因为结果对象可能很大。
-
- 为常见数据科学任务提供可发现的 API
-
夸大其辞的风险下,有两种 API 设计方法:第一种侧重于为常见任务提供 API;第二种从抽象开始,通过组合原语使用户能够完成他们的任务。虽然世界并非黑白分明,但 pandas 更倾向于前者,而 Spark 则更多地采用后者。
一个例子是值计数(按某个关键列计数),这是数据科学中最常见的操作之一。pandas
DataFrame.value_counts()
按排序顺序返回结果,在 90% 的情况下,这是用户在探索数据时所偏好的,而 Spark 的结果不排序,更适合构建数据管道,因为用户可以通过添加显式的orderBy
来实现 pandas 的行为。与 pandas 类似,pandas API on Spark 也应该更加倾向于前者,为常见数据科学任务提供可发现的 API。在大多数情况下,这一原则通过简单实现 pandas 的 API 得到了很好的遵循。然而,也会有一些情况,pandas 的 API 无法满足特定需求,例如处理大数据的绘图。
-
- 防止用户自杀式操作的防护措施
-
在 pandas 中,某些操作随着数据规模的增加而变得极其昂贵,我们不想让用户产生依赖于此类操作的错觉,尤其是在 pandas API on Spark 中。也就是说,pandas API on Spark 中实现的方法在默认情况下应该能够安全地在大型数据集上执行。因此,以下能力未在 pandas API on Spark 中实现:
-
从根本上无法并行化的能力:例如,逐个元素地命令性循环
-
需要在单个节点内存中实现整个工作集的能力。这就是为什么我们不实现 pandas.DataFrame.to_xarray 的原因。另一个例子是
_repr_html_
调用将显示的记录总数限制为最大 1000,以防止用户仅仅通过在笔记本中键入 DataFrame 的名称而使其驱动节点崩溃。
然而,存在一些例外情况。“大数据科学”的一个常见模式是,虽然初始数据集很大,但随着分析的深入,工作集会变得更小。例如,数据科学家常常对数据集进行聚合,然后希望将聚合后的数据集转换为某种本地数据结构。为了帮助数据科学家,我们提供:
-
DataFrame.to_pandas
:返回一个 pandas DataFrame(仅适用于 pandas-on-Spark) -
DataFrame.to_numpy
:返回一个 numpy 数组,适用于 pandas 和 pandas API on Spark
请注意,从这些函数的名称中可以清楚看出,它们返回一些本地数据结构,这将需要在单个节点的内存中实现数据。对于这些函数,我们还明确在文档中注明了警告,结果数据结构必须很小。
-
环境设置 ¶
先决条件 ¶
PySpark 开发需要构建 Spark,这需要安装合适的 JDK 等。有关更多详细信息,请参见 Building Spark 。
注意,如果您打算为Python中的Spark Connect做贡献,必须使用
buf
版本
1.24.0
,有关更多详细信息,请参见
Buf安装
。
贡献和维护类型提示 ¶
PySpark 类型提示是内联的,以利用静态类型检查。
作为一个经验法则,只有公共API会被注释。
注释应该在可能的情况下:
-
反映底层JVM API的期望,以帮助避免Python解释器之外的类型相关失败。
-
在过于宽泛(
Any
)和过于狭窄的参数注释之间发生冲突时,优先选择后者,只要它覆盖了大多数典型用例。 -
使用
@overload
注释指示无意义的参数组合。例如,表示*Col
和*Cols
参数是互斥的:@overload def __init__( self, *, threshold: float = ..., inputCol: Optional[str] = ..., outputCol: Optional[str] = ... ) -> None: ... @overload def __init__( self, *, thresholds: Optional[List[float]] = ..., inputCols: Optional[List[str]] = ..., outputCols: Optional[List[str]] = ... ) -> None: ...
-
与当前稳定的MyPy版本兼容。
复杂的支持类型定义,应放置在专用的
_typing.pyi
存根中。可以参考
pyspark.sql._typing.pyi
。
可以使用
dev/lint-python
脚本或直接调用 mypy 来验证注释:
mypy --config python/mypy.ini python/pyspark
代码和文档字符串指南 ¶
代码规范 ¶
请遵循现有代码库的风格,基本上是PEP 8,唯一的例外是:行长度可以达到100个字符,而不是79个。
注意:
-
在 PySpark 中的方法和变量名称与 Python 本身的
threading
库类似,其中的 API 受到了 Java 的启发。PySpark 还遵循 camelCase 规则,以匹配 Scala 和 Java 的公开 API。 -
相比之下,
functions.py
使用 snake_case 以使 API 更加友好于 SQL(和 Python)。 -
此外,pandas-on-Spark (
pyspark.pandas
) 也使用 snake_case ,因为该包不需要与其他语言保持 API 一致性。
PySpark 利用像
pycodestyle
和
flake8
这样的代码检查工具,这些工具由
dev/lint-python
运行。因此,确保运行该脚本以进行再次确认。
文档测试约定 ¶
一般来说,文档测试应通过换行符逻辑上分组。
例如,第一个块是准备语句,第二个块是使用特定参数的函数,第三个块是另一个参数。作为示例,请参考 DataFrame.rsub 在 pandas 中。
这些代码块在PySpark文档测试中应一致分开,如果文档测试的覆盖率或展示的示例数量不足,应添加更多文档测试。
贡献错误和异常 ¶
为了抛出标准化的用户可见错误或异常,开发人员应该指定错误类和消息参数,而不是任意错误消息。
使用 ¶
-
检查 error_classes.py 中是否已经存在合适的错误类。 如果是,则使用该错误类并跳到第3步。
-
在 error_classes.py 中添加一个新类;请记住以下的不变条件。
-
检查异常类型是否已经扩展了 PySparkException 。 如果是,则跳到第5步。
-
将 PySparkException 混入异常中。
-
使用错误类和消息参数抛出异常。
之前
抛出任意错误消息:
raise ValueError("Problem A because B")
之后
error_classes.py
"PROBLEM_BECAUSE": {
"message": ["Problem <problem> because <cause>"]
}
exceptions.py
class PySparkTestError(PySparkException):
def __init__(self, error_class: str, message_parameters: Dict[str, str]):
super().__init__(error_class=error_class, message_parameters=message_parameters)
def getMessageParameters(self) -> Optional[Dict[str, str]]:
return super().getMessageParameters()
抛出带有错误类和消息参数:
raise PySparkTestError("PROBLEM_BECAUSE", {"problem": "A", "cause": "B"})
访问字段 ¶
要访问错误字段,请捕获扩展了
PySparkException
的异常,并通过
PySparkException.getErrorClass()
访问错误类别。
try:
...
except PySparkException as pe:
if pe.getErrorClass() == "PROBLEM_BECAUSE":
...
字段 ¶
错误类
错误类别是错误类别的简明易懂的表示。
未分类的错误可以被分配到一个以前的错误类别,前缀为 _LEGACY_ERROR_TEMP_ ,并带有一个未使用的顺序号,例如 _LEGACY_ERROR_TEMP_0053 。
不变性:
-
唯一
-
在不同版本中保持一致
-
按字母顺序排序
消息
错误信息提供了对错误的描述性、易读的表示。 消息格式通过C风格的printf语法接受字符串参数。
错误信息的质量应符合 Apache Spark 错误信息指南
不变式:
-
唯一