选项和设置

Pandas API在Spark上有一个选项系统,允许你自定义其行为的一些方面,用户最有可能调整的是与显示相关的选项。

选项具有完整的“点状风格”,不区分大小写的名称(例如 display.max_rows )。 您可以直接将选项作为顶层 options 属性的属性来获取/设置:

>>> import pyspark.pandas as ps
>>> ps.options.display.max_rows
1000
>>> ps.options.display.max_rows = 10
>>> ps.options.display.max_rows
10

该API由3个相关函数组成,可直接从 pandas_on_spark 命名空间使用:

注意: 开发者可以查看 pyspark.pandas/config.py 以获取更多信息。

>>> import pyspark.pandas as ps
>>> ps.get_option("display.max_rows")
1000
>>> ps.set_option("display.max_rows", 101)
>>> ps.get_option("display.max_rows")
101

获取和设置选项

如上所述, get_option() set_option() 可从 pandas_on_spark 命名空间中访问。 要更改选项,请调用 set_option('option name', new_value)

>>> import pyspark.pandas as ps
>>> ps.get_option('compute.max_rows')
1000
>>> ps.set_option('compute.max_rows', 2000)
>>> ps.get_option('compute.max_rows')
2000

所有选项也都有一个默认值,你可以使用 reset_option 来做到这一点:

>>> import pyspark.pandas as ps
>>> ps.reset_option("display.max_rows")
>>> import pyspark.pandas as ps
>>> ps.get_option("display.max_rows")
1000
>>> ps.set_option("display.max_rows", 999)
>>> ps.get_option("display.max_rows")
999
>>> ps.reset_option("display.max_rows")
>>> ps.get_option("display.max_rows")
1000

option_context 上下文管理器已通过顶层 API 暴露,允许您使用给定的选项值执行代码。选项值在您退出 with 块时会自动恢复:

>>> with ps.option_context("display.max_rows", 10, "compute.max_rows", 5):
...    print(ps.get_option("display.max_rows"))
...    print(ps.get_option("compute.max_rows"))
10
5
>>> print(ps.get_option("display.max_rows"))
>>> print(ps.get_option("compute.max_rows"))
1000
1000

不同DataFrame上的操作

Pandas API在Spark上默认禁止对不同的DataFrame(或Series)执行操作,以防止昂贵的操作。它在内部执行一个连接操作,这通常是昂贵的。

通过将 compute.ops_on_diff_frames 设置为 True 来启用此功能,以允许此类情况。
请参阅下面的示例。

>>> import pyspark.pandas as ps
>>> ps.set_option('compute.ops_on_diff_frames', True)
>>> psdf1 = ps.range(5)
>>> psdf2 = ps.DataFrame({'id': [5, 4, 3]})
>>> (psdf1 - psdf2).sort_index()
    id
0 -5.0
1 -3.0
2 -1.0
3  NaN
4  NaN
>>> ps.reset_option('compute.ops_on_diff_frames')
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.ops_on_diff_frames', True)
>>> psdf = ps.range(5)
>>> psser_a = ps.Series([1, 2, 3, 4])
>>> # 'psser_a' is not from 'psdf' DataFrame. So it is considered as a Series not from 'psdf'.
>>> psdf['new_col'] = psser_a
>>> psdf
   id  new_col
0   0      1.0
1   1      2.0
3   3      4.0
2   2      3.0
4   4      NaN
>>> ps.reset_option('compute.ops_on_diff_frames')

默认索引类型

在Spark的pandas API中,默认索引在多个情况下使用,例如,当Spark DataFrame转换为pandas-on-Spark DataFrame时。在这种情况下,Spark的pandas API在内部会为pandas-on-Spark DataFrame附加一个默认索引。

可以通过 compute.default_index_type 配置几种类型的默认索引,如下所示:

sequence :它实现了一个逐一增加的序列,通过PySpark的窗口函数而不指定分区。因此,它可能最终在单个节点中结束一个完整的分区。当数据量大时,应避免使用这种索引类型。请参见下面的示例:

>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'sequence')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Int64Index([0, 1, 2], dtype='int64')

这在概念上等同于以下的PySpark示例:

>>> from pyspark.sql import functions as sf, Window
>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> sequential_index = sf.row_number().over(
...    Window.orderBy(sf.monotonically_increasing_id().asc())) - 1
>>> spark_df.select(sequential_index).rdd.map(lambda r: r[0]).collect()
[0, 1, 2]

分布式序列 (默认):它实现了一个以分布式方式按组聚合和按组映射逐一递增的序列。它仍然在全局生成顺序索引。如果默认索引必须是大型数据集中的序列,则必须使用此索引。请参见下面的示例:

>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'distributed-sequence')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Int64Index([0, 1, 2], dtype='int64')

这在概念上等同于下面的PySpark示例:

>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> spark_df.rdd.zipWithIndex().map(lambda p: p[1]).collect()
[0, 1, 2]

分布式 : 它通过使用PySpark的 monotonically_increasing_id 函数以完全分布的方式实现一个单调递增的序列。值是不可确定的。如果索引不必是逐一递增的序列,应该使用这个索引。从性能角度来看,这个索引与其他类型的索引几乎没有任何惩罚。请参考下面的例子:

>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'distributed')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Int64Index([25769803776, 60129542144, 94489280512], dtype='int64')

这在概念上等同于下面的PySpark示例:

>>> from pyspark.sql import functions as sf
>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> spark_df.select(sf.monotonically_increasing_id()) \
...     .rdd.map(lambda r: r[0]).collect()
[25769803776, 60129542144, 94489280512]

警告

这种类型的索引用于计算两个不同的数据框是不太可能的,因为它不保证在两个数据框中具有相同的索引。如果您使用此默认索引并开启 compute.ops_on_diff_frames ,则在两个不同的 DataFrame 之间的操作结果可能会因为不确定的索引值而产生意外的输出。

可用选项

选项

默认值

描述

display.max_rows

1000

这设置了 pandas-on-Spark 打印各种输出时的最大行数。例如,这个值决定了在 dataframe 的 repr() 中显示的行数。设置 None 以取消输入长度的限制。默认是 1000。

compute.max_rows

1000

‘compute.max_rows’ 设置当前 pandas-on-Spark DataFrame 的限制。设置 None 以取消输入长度的限制。当限制被设置时,它通过快捷方式收集数据到驱动程序中,然后使用 pandas API 执行。如果限制未设置,操作将通过 PySpark 执行。默认是 1000。

compute.shortcut_limit

1000

‘compute.shortcut_limit’ 设置快捷方式的限制。它计算指定数量的行并使用其模式。当 dataframe 长度大于此限制时,pandas-on-Spark 使用 PySpark 进行计算。

compute.ops_on_diff_frames

False

这决定是否在两个不同的 dataframes 之间进行操作。例如,‘combine_frames’ 函数内部执行一个联接操作,这通常是昂贵的。因此,如果 compute.ops_on_diff_frames 变量不为 True,那个方法将抛出一个异常。

compute.default_index_type

‘distributed-sequence’

这设置了默认索引类型:sequence、distributed 和 distributed-sequence。

compute.default_index_cache

‘MEMORY_AND_DISK_SER’

这设置了临时 RDD 在 distributed-sequence 索引中的默认存储级别:‘NONE’、‘DISK_ONLY’、‘DISK_ONLY_2’、‘DISK_ONLY_3’、‘MEMORY_ONLY’、‘MEMORY_ONLY_2’、‘MEMORY_ONLY_SER’、‘MEMORY_ONLY_SER_2’、‘MEMORY_AND_DISK’、‘MEMORY_AND_DISK_2’、‘MEMORY_AND_DISK_SER’、‘MEMORY_AND_DISK_SER_2’、‘OFF_HEAP’、‘LOCAL_CHECKPOINT’。

compute.ordered_head

False

‘compute.ordered_head’ 设置是否对 head 进行自然排序。pandas-on-Spark 不保证行的顺序,因此 head 可能会从分布式分区中返回一些行。如果 ‘compute.ordered_head’ 设置为 True,pandas-on-Spark 将提前执行自然排序,但这会造成性能开销。

compute.eager_check

True

‘compute.eager_check’ 设置是否仅为了验证启动一些 Spark 作业。如果 ‘compute.eager_check’ 设置为 True,pandas-on-Spark 将提前进行验证,但这会造成性能开销。否则,pandas-on-Spark 将跳过验证,可能会与 pandas 有细微不同。受影响的 API: Series.dot Series.asof Series.compare FractionalExtensionOps.astype IntegralExtensionOps.astype FractionalOps.astype DecimalOps.astype 统计函数的 skipna

compute.isin_limit

80

‘compute.isin_limit’ 设置通过‘Column.isin(list)’ 过滤的限制。如果‘list’的长度超过限制,将使用广播联接以获得更好的性能。

plotting.max_rows

1000

‘plotting.max_rows’ 设置基于 top-n 的图形的可视限制,例如 plot.bar plot.pie 。如果设置为 1000,将使用前 1000 个数据点进行绘图。默认是 1000。

plotting.sample_ratio

None

‘plotting.sample_ratio’ 设置用于基于样本的绘图(如 plot.line plot.area )的绘图数据比例。此选项默认为 ‘plotting.max_rows’ 选项。

plotting.backend

‘plotly’

用于绘图的后端。默认是 plotly。支持任何具有顶级 .plot 方法的包。已知选项有:[matplotlib, plotly]。