转换并应用函数

有许多 API 允许用户对 pandas-on-Spark DataFrame 应用函数,例如 DataFrame.transform() , DataFrame.apply() , DataFrame.pandas_on_spark.transform_batch() , DataFrame.pandas_on_spark.apply_batch() , Series.pandas_on_spark.transform_batch() ,等等。每个都有不同的目的,并且内部工作方式不同。本节描述了用户常常感到困惑的它们之间的差异。

transform apply

DataFrame.transform() DataFrame.apply() 之间的主要区别在于前者要求返回与输入相同的长度,而后者不要求这一点。请参见下面的示例:

>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pser):
...     return pser + 1  # should always return the same length as input.
...
>>> psdf.transform(pandas_plus)
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[5,6,7]})
>>> def pandas_plus(pser):
...     return pser[pser % 2 == 1]  # allows an arbitrary length
...
>>> psdf.apply(pandas_plus)

在这种情况下,每个函数接受一个 pandas Series,并且 pandas API 在 Spark 上以分布式方式计算这些函数,如下所示。

transform and apply

在“列”轴的情况下,该函数将每一行作为一个 pandas Series。

>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pser):
...     return sum(pser)  # allows an arbitrary length
...
>>> psdf.apply(pandas_plus, axis='columns')

上面的例子计算了每一行的求和,结果作为一个 pandas Series。见下文:

apply axis

在上述示例中,为了简化没有使用类型提示,但建议使用它们以避免性能损失。请参阅API文档。

pandas_on_spark.transform_batch pandas_on_spark.apply_batch

DataFrame.pandas_on_spark.transform_batch() , DataFrame.pandas_on_spark.apply_batch() , Series.pandas_on_spark.transform_batch() 等, batch 后缀表示 pandas-on-Spark DataFrame 或 Series 中的每个块。这些 API 切片 pandas-on-Spark DataFrame 或 Series,然后将给定函数应用于 pandas DataFrame 或 Series 作为输入和输出。请参见下面的示例:

>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pdf):
...     return pdf + 1  # should always return the same length as input.
...
>>> psdf.pandas_on_spark.transform_batch(pandas_plus)
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pdf):
...     return pdf[pdf.a > 1]  # allow arbitrary length
...
>>> psdf.pandas_on_spark.apply_batch(pandas_plus)

这两个示例中的函数将一个pandas DataFrame作为pandas-on-Spark DataFrame的一部分,并输出一个pandas DataFrame。 Spark上的Pandas API将pandas DataFrames组合为pandas-on-Spark DataFrame。

请注意, DataFrame.pandas_on_spark.transform_batch() 有长度限制 - 输入和输出的长度应该相同 - 而 DataFrame.pandas_on_spark.apply_batch() 则没有。然而,重要的是要知道当 DataFrame.pandas_on_spark.transform_batch() 返回一个系列时,输出属于同一个 DataFrame,并且您可以通过不同 DataFrame 之间的操作来避免洗牌。在 DataFrame.pandas_on_spark.apply_batch() 的情况下,它的输出总是被视为属于一个新的不同 DataFrame。有关更多详细信息,请参见 不同 DataFrame 之间的操作

pandas_on_spark.transform_batch and pandas_on_spark.apply_batch in Frame

如果使用 Series.pandas_on_spark.transform_batch() ,它也与 DataFrame.pandas_on_spark.transform_batch() 相似;然而,它接受一个 pandas Series 作为 pandas-on-Spark Series 的一块。

>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pser):
...     return pser + 1  # should always return the same length as input.
...
>>> psdf.a.pandas_on_spark.transform_batch(pandas_plus)

在后台,pandas-on-Spark Series 的每个批次被分割成多个 pandas Series,以下是每个函数的计算方式:

pandas_on_spark.transform_batch in Series

还有更多细节,例如类型推导和防止其性能惩罚。请参阅API文档。