最佳实践 ¶
利用 PySpark API ¶
Pandas API on Spark 在底层使用 Spark;因此,许多功能和性能优化在 Pandas API on Spark 中同样可用。利用并结合这些尖端功能与 Pandas API on Spark。
在Spark上的pandas API中,现有的Spark上下文和Spark会话可以直接使用。如果您已经有自己的配置好的Spark上下文或会话在运行,pandas API将在Spark中使用它们。
如果您的环境中没有运行 Spark 上下文或会话(例如,普通的 Python 解释器),可以将这些配置设置为
SparkContext
和/或
SparkSession
。一旦创建了 Spark 上下文和/或会话,Spark 上的 pandas API 可以自动使用这个上下文和/或会话。例如,如果您想在 Spark 中配置执行程序内存,可以如下操作:
from pyspark import SparkConf, SparkContext
conf = SparkConf()
conf.set('spark.executor.memory', '2g')
# Pandas API on Spark automatically uses this Spark context with the configurations set.
SparkContext(conf=conf)
import pyspark.pandas as ps
...
另一个常见的配置可能是PySpark中的Arrow优化。在SQL配置的情况下,可以在Spark会话中设置如下:
from pyspark.sql import SparkSession
builder = SparkSession.builder.appName("pandas-on-spark")
builder = builder.config("spark.sql.execution.arrow.pyspark.enabled", "true")
# Pandas API on Spark automatically uses this Spark session with the configurations set.
builder.getOrCreate()
import pyspark.pandas as ps
...
所有Spark功能,例如历史服务器、网页UI和部署模式,可以与Spark上的pandas API一起使用。如果您对性能调优感兴趣,请参阅 调优Spark 。
检查执行计划 ¶
通过利用 PySpark API DataFrame.spark.explain() 在实际计算之前,可以预测昂贵的操作,因为 Spark 上的 pandas API 是基于惰性执行的。例如,见下文。
>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)})
>>> psdf = psdf[psdf.id > 5]
>>> psdf.spark.explain()
== Physical Plan ==
*(1) Filter (id#1L > 5)
+- *(1) Scan ExistingRDD[__index_level_0__#0L,id#1L]
每当您对这种情况不确定时,您可以检查实际的执行计划并预见到昂贵的情况。
尽管Spark上的pandas API尽力通过利用Spark优化器来优化和减少这样的洗牌操作,但在应用程序端尽可能避免洗牌是最好的选择。
使用检查点 ¶
在对 Spark 对象上的 pandas API 进行了一系列操作后,底层的 Spark 规划器可能由于计划庞大而变慢。如果 Spark 计划变得庞大或规划时间过长,
DataFrame.spark.checkpoint()
或
DataFrame.spark.local_checkpoint()
将会有所帮助。
>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)})
>>> psdf = psdf[psdf.id > 5]
>>> psdf['id'] = psdf['id'] + (10 * psdf['id'] + psdf['id'])
>>> psdf = psdf.groupby('id').head(2)
>>> psdf.spark.explain()
== Physical Plan ==
*(3) Project [__index_level_0__#0L, id#31L]
+- *(3) Filter (isnotnull(__row_number__#44) AND (__row_number__#44 <= 2))
+- Window [row_number() windowspecdefinition(__groupkey_0__#36L, __natural_order__#16L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS __row_number__#44], [__groupkey_0__#36L], [__natural_order__#16L ASC NULLS FIRST]
+- *(2) Sort [__groupkey_0__#36L ASC NULLS FIRST, __natural_order__#16L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(__groupkey_0__#36L, 200), true, [id=#33]
+- *(1) Project [__index_level_0__#0L, (id#1L + ((id#1L * 10) + id#1L)) AS __groupkey_0__#36L, (id#1L + ((id#1L * 10) + id#1L)) AS id#31L, __natural_order__#16L]
+- *(1) Project [__index_level_0__#0L, id#1L, monotonically_increasing_id() AS __natural_order__#16L]
+- *(1) Filter (id#1L > 5)
+- *(1) Scan ExistingRDD[__index_level_0__#0L,id#1L]
>>> psdf = psdf.spark.local_checkpoint() # or psdf.spark.checkpoint()
>>> psdf.spark.explain()
== Physical Plan ==
*(1) Project [__index_level_0__#0L, id#31L]
+- *(1) Scan ExistingRDD[__index_level_0__#0L,id#31L,__natural_order__#59L]
正如您所看到的,之前的 Spark 计划被丢弃并开始于一个简单的计划。
调用
DataFrame.spark.checkpoint()
时,之前的 DataFrame 的结果被存储在配置的文件系统中,
或者在调用
DataFrame.spark.local_checkpoint()
时存储在执行器中。
避免洗牌 ¶
在并行或分布式环境中,某些操作,例如
sort_values
,比在单台机器的内存中更难完成,因为它需要将数据发送到其他节点,并通过网络在多个节点之间交换数据。请参见下面的示例。
>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)}).sort_values(by="id")
>>> psdf.spark.explain()
== Physical Plan ==
*(2) Sort [id#9L ASC NULLS LAST], true, 0
+- Exchange rangepartitioning(id#9L ASC NULLS LAST, 200), true, [id=#18]
+- *(1) Scan ExistingRDD[__index_level_0__#8L,id#9L]
如您所见,它需要
Exchange
,这需要一次洗牌,并且可能很昂贵。
避免在单个分区上进行计算 ¶
另一个常见的情况是在单个分区上的计算。目前,一些API,例如 DataFrame.rank 在没有指定分区规范的情况下使用PySpark的Window。这会将所有数据移动到单台机器的单个分区中,可能导致严重的性能下降。 对于非常大的数据集,应避免使用此类API。
>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)})
>>> psdf.rank().spark.explain()
== Physical Plan ==
*(4) Project [__index_level_0__#16L, id#24]
+- Window [avg(cast(_w0#26 as bigint)) windowspecdefinition(id#17L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS id#24], [id#17L]
+- *(3) Project [__index_level_0__#16L, _w0#26, id#17L]
+- Window [row_number() windowspecdefinition(id#17L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _w0#26], [id#17L ASC NULLS FIRST]
+- *(2) Sort [id#17L ASC NULLS FIRST], false, 0
+- Exchange SinglePartition, true, [id=#48]
+- *(1) Scan ExistingRDD[__index_level_0__#16L,id#17L]
相反,使用 GroupBy.rank 因为它的开销更小,因为数据可以被分布并为每个组计算。
避免保留列名 ¶
以
__
开头和以
__
结尾的列在Spark中的pandas API中是保留的。为了处理内部行为,比如索引,Spark中的pandas API使用一些内部列。因此,不建议使用这样的列名,并且无法保证它们能正常工作。
不要使用重复的列名 ¶
不允许使用重复的列名,因为Spark SQL通常不允许这样。Pandas API在Spark中继承了这种行为。例如,请参见下面:
>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'a': [1, 2], 'b':[3, 4]})
>>> psdf.columns = ["a", "a"]
...
Reference 'a' is ambiguous, could be: a, a.;
此外,强烈不建议使用区分大小写的列名。Spark上的Pandas API默认不允许这样做。
>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
...
Reference 'a' is ambiguous, could be: a, a.;
然而,您可以在Spark配置中启用
spark.sql.caseSensitive
,以便自行使用,但风险自负。
>>> from pyspark.sql import SparkSession
>>> builder = SparkSession.builder.appName("pandas-on-spark")
>>> builder = builder.config("spark.sql.caseSensitive", "true")
>>> builder.getOrCreate()
>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
>>> psdf
a A
0 1 3
1 2 4
在将Spark DataFrame转换为pandas-on-Spark DataFrame时指定索引列 ¶
当pandas-on-Spark DataFrame从Spark DataFrame转换时,它会丢失索引信息,这导致在Spark DataFrame上使用pandas API时使用默认索引。与明确指定索引列相比,默认索引通常效率较低。尽可能指定索引列。
查看 使用PySpark
使用
distributed
或
distributed-sequence
默认索引
¶
使用pandas-on-Spark的用户常见的问题是由于默认索引导致的性能缓慢。当索引未知时,Spark上的Pandas API会附加一个默认索引,例如,当Spark DataFrame被直接转换为pandas-on-Spark DataFrame时。
请注意,
sequence
需要在单个分区上进行计算,这种做法是不推荐的。如果您计划处理生产中的大数据,请通过将默认索引配置为
distributed
或
distributed-sequence
来实现分布式。
有关配置默认索引的更多详细信息,请参见 Default Index Type 。
减少对不同 DataFrame/Series 的操作 ¶
在Spark上的Pandas API默认不允许对不同的DataFrame(或Series)进行操作,以防止昂贵的操作。 它内部执行联合操作,这通常是昂贵的,因此不被鼓励。尽可能地, 应避免此操作。
有关更多详细信息,请参见 对不同DataFrame的操作 。
尽可能直接在Spark上使用pandas API ¶
尽管Spark上的pandas API具有大多数与pandas等效的API,但仍有一些API尚未实现或明确不支持。
作为一个例子,pandas API 在 Spark 中没有实现
__iter__()
以防止用户将所有数据从整个集群收集到客户端(驱动程序)端。不幸的是,许多外部 API,例如 Python 内置函数如 min、max、sum 等,要求给定的参数是可迭代的。在 pandas 的情况下,它可以正常工作,如下所示:
>>> import pandas as pd
>>> max(pd.Series([1, 2, 3]))
3
>>> min(pd.Series([1, 2, 3]))
1
>>> sum(pd.Series([1, 2, 3]))
6
pandas 数据集生活在单台机器上,并且在同一台机器内自然可迭代。 然而,pandas-on-Spark 数据集分布在多台机器上,并以分布式方式进行计算。 这使得本地可迭代变得困难,并且用户很可能在不知情的情况下将整个数据收集到客户端。 因此,最好坚持使用 pandas-on-Spark API。 上述示例可以转换如下:
>>> import pyspark.pandas as ps
>>> ps.Series([1, 2, 3]).max()
3
>>> ps.Series([1, 2, 3]).min()
1
>>> ps.Series([1, 2, 3]).sum()
6
另一个来自pandas用户的常见模式可能是依赖于列表推导或生成器表达式。 但是,它也假设数据集在内部是可迭代的。 因此,它在pandas中可以无缝工作,如下所示:
>>> import pandas as pd
>>> data = []
>>> countries = ['London', 'New York', 'Helsinki']
>>> pser = pd.Series([20., 21., 12.], index=countries)
>>> for temperature in pser:
... assert temperature > 0
... if temperature > 1000:
... temperature = None
... data.append(temperature ** 2)
...
>>> pd.Series(data, index=countries)
London 400.0
New York 441.0
Helsinki 144.0
dtype: float64
然而,对于Spark上的pandas API,它由于上述相同的原因无法工作。 上面的示例也可以直接更改为如下使用pandas-on-Spark API:
>>> import pyspark.pandas as ps
>>> import numpy as np
>>> countries = ['London', 'New York', 'Helsinki']
>>> psser = ps.Series([20., 21., 12.], index=countries)
>>> def square(temperature) -> np.float64:
... assert temperature > 0
... if temperature > 1000:
... temperature = None
... return temperature ** 2
...
>>> psser.apply(square)
London 400.0
New York 441.0
Helsinki 144.0
dtype: float64