最佳实践

利用 PySpark API

Pandas API on Spark 在底层使用 Spark;因此,许多功能和性能优化在 Pandas API on Spark 中同样可用。利用并结合这些尖端功能与 Pandas API on Spark。

在Spark上的pandas API中,现有的Spark上下文和Spark会话可以直接使用。如果您已经有自己的配置好的Spark上下文或会话在运行,pandas API将在Spark中使用它们。

如果您的环境中没有运行 Spark 上下文或会话(例如,普通的 Python 解释器),可以将这些配置设置为 SparkContext 和/或 SparkSession 。一旦创建了 Spark 上下文和/或会话,Spark 上的 pandas API 可以自动使用这个上下文和/或会话。例如,如果您想在 Spark 中配置执行程序内存,可以如下操作:

from pyspark import SparkConf, SparkContext
conf = SparkConf()
conf.set('spark.executor.memory', '2g')
# Pandas API on Spark automatically uses this Spark context with the configurations set.
SparkContext(conf=conf)

import pyspark.pandas as ps
...

另一个常见的配置可能是PySpark中的Arrow优化。在SQL配置的情况下,可以在Spark会话中设置如下:

from pyspark.sql import SparkSession
builder = SparkSession.builder.appName("pandas-on-spark")
builder = builder.config("spark.sql.execution.arrow.pyspark.enabled", "true")
# Pandas API on Spark automatically uses this Spark session with the configurations set.
builder.getOrCreate()

import pyspark.pandas as ps
...

所有Spark功能,例如历史服务器、网页UI和部署模式,可以与Spark上的pandas API一起使用。如果您对性能调优感兴趣,请参阅 调优Spark

检查执行计划

通过利用 PySpark API DataFrame.spark.explain() 在实际计算之前,可以预测昂贵的操作,因为 Spark 上的 pandas API 是基于惰性执行的。例如,见下文。

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)})
>>> psdf = psdf[psdf.id > 5]
>>> psdf.spark.explain()
== Physical Plan ==
*(1) Filter (id#1L > 5)
+- *(1) Scan ExistingRDD[__index_level_0__#0L,id#1L]

每当您对这种情况不确定时,您可以检查实际的执行计划并预见到昂贵的情况。

尽管Spark上的pandas API尽力通过利用Spark优化器来优化和减少这样的洗牌操作,但在应用程序端尽可能避免洗牌是最好的选择。

使用检查点

在对 Spark 对象上的 pandas API 进行了一系列操作后,底层的 Spark 规划器可能由于计划庞大而变慢。如果 Spark 计划变得庞大或规划时间过长, DataFrame.spark.checkpoint() DataFrame.spark.local_checkpoint() 将会有所帮助。

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)})
>>> psdf = psdf[psdf.id > 5]
>>> psdf['id'] = psdf['id'] + (10 * psdf['id'] + psdf['id'])
>>> psdf = psdf.groupby('id').head(2)
>>> psdf.spark.explain()
== Physical Plan ==
*(3) Project [__index_level_0__#0L, id#31L]
+- *(3) Filter (isnotnull(__row_number__#44) AND (__row_number__#44 <= 2))
   +- Window [row_number() windowspecdefinition(__groupkey_0__#36L, __natural_order__#16L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS __row_number__#44], [__groupkey_0__#36L], [__natural_order__#16L ASC NULLS FIRST]
      +- *(2) Sort [__groupkey_0__#36L ASC NULLS FIRST, __natural_order__#16L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(__groupkey_0__#36L, 200), true, [id=#33]
            +- *(1) Project [__index_level_0__#0L, (id#1L + ((id#1L * 10) + id#1L)) AS __groupkey_0__#36L, (id#1L + ((id#1L * 10) + id#1L)) AS id#31L, __natural_order__#16L]
               +- *(1) Project [__index_level_0__#0L, id#1L, monotonically_increasing_id() AS __natural_order__#16L]
                  +- *(1) Filter (id#1L > 5)
                     +- *(1) Scan ExistingRDD[__index_level_0__#0L,id#1L]

>>> psdf = psdf.spark.local_checkpoint()  # or psdf.spark.checkpoint()
>>> psdf.spark.explain()
== Physical Plan ==
*(1) Project [__index_level_0__#0L, id#31L]
+- *(1) Scan ExistingRDD[__index_level_0__#0L,id#31L,__natural_order__#59L]

正如您所看到的,之前的 Spark 计划被丢弃并开始于一个简单的计划。 调用 DataFrame.spark.checkpoint() 时,之前的 DataFrame 的结果被存储在配置的文件系统中, 或者在调用 DataFrame.spark.local_checkpoint() 时存储在执行器中。

避免洗牌

在并行或分布式环境中,某些操作,例如 sort_values ,比在单台机器的内存中更难完成,因为它需要将数据发送到其他节点,并通过网络在多个节点之间交换数据。请参见下面的示例。

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)}).sort_values(by="id")
>>> psdf.spark.explain()
== Physical Plan ==
*(2) Sort [id#9L ASC NULLS LAST], true, 0
+- Exchange rangepartitioning(id#9L ASC NULLS LAST, 200), true, [id=#18]
   +- *(1) Scan ExistingRDD[__index_level_0__#8L,id#9L]

如您所见,它需要 Exchange ,这需要一次洗牌,并且可能很昂贵。

避免在单个分区上进行计算

另一个常见的情况是在单个分区上的计算。目前,一些API,例如 DataFrame.rank 在没有指定分区规范的情况下使用PySpark的Window。这会将所有数据移动到单台机器的单个分区中,可能导致严重的性能下降。 对于非常大的数据集,应避免使用此类API。

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)})
>>> psdf.rank().spark.explain()
== Physical Plan ==
*(4) Project [__index_level_0__#16L, id#24]
+- Window [avg(cast(_w0#26 as bigint)) windowspecdefinition(id#17L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS id#24], [id#17L]
   +- *(3) Project [__index_level_0__#16L, _w0#26, id#17L]
      +- Window [row_number() windowspecdefinition(id#17L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _w0#26], [id#17L ASC NULLS FIRST]
         +- *(2) Sort [id#17L ASC NULLS FIRST], false, 0
            +- Exchange SinglePartition, true, [id=#48]
               +- *(1) Scan ExistingRDD[__index_level_0__#16L,id#17L]

相反,使用 GroupBy.rank 因为它的开销更小,因为数据可以被分布并为每个组计算。

避免保留列名

__ 开头和以 __ 结尾的列在Spark中的pandas API中是保留的。为了处理内部行为,比如索引,Spark中的pandas API使用一些内部列。因此,不建议使用这样的列名,并且无法保证它们能正常工作。

不要使用重复的列名

不允许使用重复的列名,因为Spark SQL通常不允许这样。Pandas API在Spark中继承了这种行为。例如,请参见下面:

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'a': [1, 2], 'b':[3, 4]})
>>> psdf.columns = ["a", "a"]
...
Reference 'a' is ambiguous, could be: a, a.;

此外,强烈不建议使用区分大小写的列名。Spark上的Pandas API默认不允许这样做。

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
...
Reference 'a' is ambiguous, could be: a, a.;

然而,您可以在Spark配置中启用 spark.sql.caseSensitive ,以便自行使用,但风险自负。

>>> from pyspark.sql import SparkSession
>>> builder = SparkSession.builder.appName("pandas-on-spark")
>>> builder = builder.config("spark.sql.caseSensitive", "true")
>>> builder.getOrCreate()

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
>>> psdf
   a  A
0  1  3
1  2  4

在将Spark DataFrame转换为pandas-on-Spark DataFrame时指定索引列

当pandas-on-Spark DataFrame从Spark DataFrame转换时,它会丢失索引信息,这导致在Spark DataFrame上使用pandas API时使用默认索引。与明确指定索引列相比,默认索引通常效率较低。尽可能指定索引列。

查看 使用PySpark

使用 distributed distributed-sequence 默认索引

使用pandas-on-Spark的用户常见的问题是由于默认索引导致的性能缓慢。当索引未知时,Spark上的Pandas API会附加一个默认索引,例如,当Spark DataFrame被直接转换为pandas-on-Spark DataFrame时。

请注意, sequence 需要在单个分区上进行计算,这种做法是不推荐的。如果您计划处理生产中的大数据,请通过将默认索引配置为 distributed distributed-sequence 来实现分布式。

有关配置默认索引的更多详细信息,请参见 Default Index Type

减少对不同 DataFrame/Series 的操作

在Spark上的Pandas API默认不允许对不同的DataFrame(或Series)进行操作,以防止昂贵的操作。 它内部执行联合操作,这通常是昂贵的,因此不被鼓励。尽可能地, 应避免此操作。

有关更多详细信息,请参见 对不同DataFrame的操作

尽可能直接在Spark上使用pandas API

尽管Spark上的pandas API具有大多数与pandas等效的API,但仍有一些API尚未实现或明确不支持。

作为一个例子,pandas API 在 Spark 中没有实现 __iter__() 以防止用户将所有数据从整个集群收集到客户端(驱动程序)端。不幸的是,许多外部 API,例如 Python 内置函数如 min、max、sum 等,要求给定的参数是可迭代的。在 pandas 的情况下,它可以正常工作,如下所示:

>>> import pandas as pd
>>> max(pd.Series([1, 2, 3]))
3
>>> min(pd.Series([1, 2, 3]))
1
>>> sum(pd.Series([1, 2, 3]))
6

pandas 数据集生活在单台机器上,并且在同一台机器内自然可迭代。 然而,pandas-on-Spark 数据集分布在多台机器上,并以分布式方式进行计算。 这使得本地可迭代变得困难,并且用户很可能在不知情的情况下将整个数据收集到客户端。 因此,最好坚持使用 pandas-on-Spark API。 上述示例可以转换如下:

>>> import pyspark.pandas as ps
>>> ps.Series([1, 2, 3]).max()
3
>>> ps.Series([1, 2, 3]).min()
1
>>> ps.Series([1, 2, 3]).sum()
6

另一个来自pandas用户的常见模式可能是依赖于列表推导或生成器表达式。 但是,它也假设数据集在内部是可迭代的。 因此,它在pandas中可以无缝工作,如下所示:

>>> import pandas as pd
>>> data = []
>>> countries = ['London', 'New York', 'Helsinki']
>>> pser = pd.Series([20., 21., 12.], index=countries)
>>> for temperature in pser:
...     assert temperature > 0
...     if temperature > 1000:
...         temperature = None
...     data.append(temperature ** 2)
...
>>> pd.Series(data, index=countries)
London      400.0
New York    441.0
Helsinki    144.0
dtype: float64

然而,对于Spark上的pandas API,它由于上述相同的原因无法工作。 上面的示例也可以直接更改为如下使用pandas-on-Spark API:

>>> import pyspark.pandas as ps
>>> import numpy as np
>>> countries = ['London', 'New York', 'Helsinki']
>>> psser = ps.Series([20., 21., 12.], index=countries)
>>> def square(temperature) -> np.float64:
...     assert temperature > 0
...     if temperature > 1000:
...         temperature = None
...     return temperature ** 2
...
>>> psser.apply(square)
London      400.0
New York    441.0
Helsinki    144.0
dtype: float64