转换并应用函数 ¶
有许多 API 允许用户对 pandas-on-Spark DataFrame 应用函数,例如
DataFrame.transform()
,
DataFrame.apply()
,
DataFrame.pandas_on_spark.transform_batch()
,
DataFrame.pandas_on_spark.apply_batch()
,
Series.pandas_on_spark.transform_batch()
,等等。每个都有不同的目的,并且内部工作方式不同。本节描述了用户常常感到困惑的它们之间的差异。
transform
和
apply
¶
DataFrame.transform()
和
DataFrame.apply()
之间的主要区别在于前者要求返回与输入相同的长度,而后者不要求这一点。请参见下面的示例:
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pser):
... return pser + 1 # should always return the same length as input.
...
>>> psdf.transform(pandas_plus)
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[5,6,7]})
>>> def pandas_plus(pser):
... return pser[pser % 2 == 1] # allows an arbitrary length
...
>>> psdf.apply(pandas_plus)
在这种情况下,每个函数接受一个 pandas Series,并且 pandas API 在 Spark 上以分布式方式计算这些函数,如下所示。

在“列”轴的情况下,该函数将每一行作为一个 pandas Series。
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pser):
... return sum(pser) # allows an arbitrary length
...
>>> psdf.apply(pandas_plus, axis='columns')
上面的例子计算了每一行的求和,结果作为一个 pandas Series。见下文:

在上述示例中,为了简化没有使用类型提示,但建议使用它们以避免性能损失。请参阅API文档。
pandas_on_spark.transform_batch
和
pandas_on_spark.apply_batch
¶
在
DataFrame.pandas_on_spark.transform_batch()
,
DataFrame.pandas_on_spark.apply_batch()
,
Series.pandas_on_spark.transform_batch()
等,
batch
后缀表示 pandas-on-Spark DataFrame 或 Series 中的每个块。这些 API 切片 pandas-on-Spark DataFrame 或 Series,然后将给定函数应用于 pandas DataFrame 或 Series 作为输入和输出。请参见下面的示例:
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pdf):
... return pdf + 1 # should always return the same length as input.
...
>>> psdf.pandas_on_spark.transform_batch(pandas_plus)
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pdf):
... return pdf[pdf.a > 1] # allow arbitrary length
...
>>> psdf.pandas_on_spark.apply_batch(pandas_plus)
这两个示例中的函数将一个pandas DataFrame作为pandas-on-Spark DataFrame的一部分,并输出一个pandas DataFrame。 Spark上的Pandas API将pandas DataFrames组合为pandas-on-Spark DataFrame。
请注意,
DataFrame.pandas_on_spark.transform_batch()
有长度限制 - 输入和输出的长度应该相同 - 而
DataFrame.pandas_on_spark.apply_batch()
则没有。然而,重要的是要知道当
DataFrame.pandas_on_spark.transform_batch()
返回一个系列时,输出属于同一个 DataFrame,并且您可以通过不同 DataFrame 之间的操作来避免洗牌。在
DataFrame.pandas_on_spark.apply_batch()
的情况下,它的输出总是被视为属于一个新的不同 DataFrame。有关更多详细信息,请参见
不同 DataFrame 之间的操作
。

如果使用
Series.pandas_on_spark.transform_batch()
,它也与
DataFrame.pandas_on_spark.transform_batch()
相似;然而,它接受一个 pandas Series 作为 pandas-on-Spark Series 的一块。
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pser):
... return pser + 1 # should always return the same length as input.
...
>>> psdf.a.pandas_on_spark.transform_batch(pandas_plus)
在后台,pandas-on-Spark Series 的每个批次被分割成多个 pandas Series,以下是每个函数的计算方式:

还有更多细节,例如类型推导和防止其性能惩罚。请参阅API文档。