PySpark中的Apache Arrow ¶
Apache Arrow 是一种内存中的列式数据格式,用于在 Spark 中高效地在 JVM 和 Python 进程之间传输数据。这对使用 Pandas/NumPy 数据的 Python 用户最为有利。其使用并非自动化的,可能需要对配置或代码进行一些小调整,以便充分利用并确保兼容性。本指南将对如何在 Spark 中使用 Arrow 进行高层次描述,并强调使用支持 Arrow 的数据时的任何差异。
确保安装PyArrow ¶
要在PySpark中使用Apache Arrow,
推荐的PyArrow版本
应该被安装。如果您使用pip安装PySpark,则可以通过命令
pip
install
pyspark[sql]
将PyArrow作为SQL模块的额外依赖安装。否则,您必须确保PyArrow在所有集群节点上安装并可用。您可以通过pip或从conda-forge通道使用conda进行安装。有关详细信息,请参见PyArrow
安装
。
启用 Pandas 的转换 ¶
Arrow可作为将Spark DataFrame转换为Pandas DataFrame时的优化,使用调用
DataFrame.toPandas()
,以及在从Pandas DataFrame创建Spark DataFrame时,使用
SparkSession.createDataFrame()
。要在执行这些调用时使用Arrow,用户需要先将Spark配置
spark.sql.execution.arrow.pyspark.enabled
设置为
true
。默认情况下此功能是禁用的。
此外,由
spark.sql.execution.arrow.pyspark.enabled
使能的优化可以在实际计算之前如果发生错误时自动回退到非Arrow优化实现。这可以通过
spark.sql.execution.arrow.pyspark.fallback.enabled
进行控制。
import numpy as np
import pandas as pd
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))
# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()
print("Pandas DataFrame result statistics:\n%s\n" % str(result_pdf.describe()))
使用上述与Arrow的优化将产生与未启用Arrow时相同的结果。
请注意,即使使用Arrow,
DataFrame.toPandas()
也会将DataFrame中的所有记录收集到驱动程序中,并且应在数据的小子集上进行。并非所有Spark数据类型当前都得到支持,如果某列具有不支持的类型,则可能会引发错误。如果在
SparkSession.createDataFrame()
期间发生错误,Spark将退回到不使用Arrow创建DataFrame。
Pandas用户定义函数(又称向量化用户定义函数) ¶
Pandas UDF是由Spark执行的用户定义函数,它使用Arrow来传输数据,并使用Pandas来处理数据,从而允许向量化操作。Pandas UDF是使用
pandas_udf()
作为装饰器或包装函数来定义的,不需要额外的配置。Pandas UDF在一般情况下表现得像一个常规的PySpark函数API。
在Spark 3.0之前,Pandas UDF是使用
pyspark.sql.functions.PandasUDFType
定义的。从Spark 3.0开始,结合Python 3.6+,您还可以使用
Python类型提示
。建议使用Python类型提示,未来版本将弃用
pyspark.sql.functions.PandasUDFType
。
请注意,在所有情况下类型提示应使用
pandas.Series
,但有一种情况应使用
pandas.DataFrame
作为输入或输出的类型提示,当输入或输出列为
StructType
时。以下示例展示了一个 Pandas UDF,它接受长整型列、字符串列和结构列,并输出一个结构列。它要求函数指定
pandas.Series
和
pandas.DataFrame
的类型提示,如下所示:
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("col1 string, col2 long") # type: ignore[call-overload]
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
s3['col2'] = s1 + s2.str.len()
return s3
# Create a Spark DataFrame that has three columns including a struct column.
df = spark.createDataFrame(
[[1, "a string", ("a nested string",)]],
"long_col long, string_col string, struct_col struct<col1:string>")
df.printSchema()
# root
# |-- long_column: long (nullable = true)
# |-- string_column: string (nullable = true)
# |-- struct_column: struct (nullable = true)
# | |-- col1: string (nullable = true)
df.select(func("long_col", "string_col", "struct_col")).printSchema()
# |-- func(long_col, string_col, struct_col): struct (nullable = true)
# | |-- col1: string (nullable = true)
# | |-- col2: long (nullable = true)
在接下来的章节中,它描述了支持的类型提示的组合。为了简单起见,
pandas.DataFrame
变体被省略。
系列到系列 ¶
类型提示可以表示为
pandas.Series
, … ->
pandas.Series
.
通过使用
pandas_udf()
并且函数具有上述类型提示,它创建了一个 Pandas UDF,其中给定的
函数接收一个或多个
pandas.Series
并输出一个
pandas.Series
。函数的输出应该
始终与输入的长度相同。在内部,PySpark 将通过将
列拆分为批次并对每个批次作为数据的子集调用该函数来执行 Pandas UDF,然后将
结果连接在一起。
以下示例展示了如何创建这个 Pandas UDF,用于计算 2 列的乘积。
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType()) # type: ignore[call-overload]
# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
有关详细用法,请参见
pandas_udf()
。
系列的迭代器到系列的迭代器 ¶
类型提示可以表示为
Iterator[pandas.Series]
->
Iterator[pandas.Series]
。
通过使用
pandas_udf()
和上述类型提示的函数,它创建了一个 Pandas UDF,给定的函数接受一个
pandas.Series
的迭代器,并输出一个
pandas.Series
的迭代器。函数的整个输出的长度应与整个输入的长度相同;因此,只要长度相同,它可以预先获取输入迭代器中的数据。在这种情况下,创建的 Pandas UDF 在调用时需要一个输入列。要使用多个输入列,需要不同的类型提示。请参阅多个系列的迭代器到系列的迭代器。
当 UDF 执行需要初始化某些状态时,这也是有用的,尽管在内部它的工作方式与系列到系列的情况完全相同。下面的伪代码展示了这个例子。
@pandas_udf("long")
def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
# Do some expensive initialization with a state
state = very_expensive_initialization()
for x in iterator:
# Use that state for the whole iterator.
yield calculate_with_state(x, state)
df.select(calculate("value")).show()
以下示例展示了如何创建这个 Pandas UDF:
from typing import Iterator
import pandas as pd
from pyspark.sql.functions import pandas_udf
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# Declare the function and create the UDF
@pandas_udf("long") # type: ignore[call-overload]
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in iterator:
yield x + 1
df.select(plus_one("x")).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
有关详细用法,请参见
pandas_udf()
。
多个序列的迭代器到序列的迭代器 ¶
类型提示可以表示为
Iterator[Tuple[pandas.Series,
...]]
->
Iterator[pandas.Series]
.
通过使用
pandas_udf()
并具有上述类型提示的函数,它创建了一个 Pandas UDF,其中给定的函数接受一个包含多个
pandas.Series
的元组的迭代器,并输出一个
pandas.Series
的迭代器。在这种情况下,创建的 pandas UDF 在调用时需要与元组中的系列数量相同的多个输入列。否则,它具有与系列到系列的迭代器相同的特征和限制。
以下示例展示了如何创建这个 Pandas UDF:
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import pandas_udf
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# Declare the function and create the UDF
@pandas_udf("long") # type: ignore[call-overload]
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
详细使用请参见
pandas_udf()
。
系列到标量 ¶
类型提示可以表示为
pandas.Series
, … ->
Any
.
通过使用
pandas_udf()
以及具有上述类型提示的函数,可以创建一个类似于 PySpark 聚合函数的 Pandas UDF。给定的函数接受
pandas.Series
并返回一个标量值。返回类型应为原始数据类型,返回的标量可以是 Python 原始类型,例如
int
或
float
,也可以是 numpy 数据类型,例如
numpy.int64
或
numpy.float64
。
Any
理想情况下应当是相应的具体标量类型。
此 UDF 也可以与
GroupedData.agg()
和
窗口
一起使用。它定义了从一个或多个
pandas.Series
到标量值的聚合,其中每个
pandas.Series
代表组或窗口中的一列。
请注意,这种类型的 UDF 不支持部分汇总,所有数据将被加载到内存中。此外,目前仅支持无界窗口的分组聚合 Pandas UDF。以下示例展示了如何使用这种类型的 UDF 通过分组和窗口操作计算均值:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double") # type: ignore[call-overload]
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
有关详细用法,请参见
pandas_udf()
。
Pandas 函数 API ¶
Pandas函数API可以直接应用一个Python本地函数于整个
DataFrame
,通过使用Pandas实例。内部它的工作方式与Pandas UDFs类似,使用Arrow传输数据,使用Pandas处理数据,这允许矢量化操作。然而,Pandas函数API在PySpark
DataFrame
下表现得像一个常规API,而不是
Column
,并且在Pandas函数API中的Python类型提示是可选的,目前不会影响其内部工作,虽然未来可能会被要求。
从 Spark 3.0 开始,分组映射 pandas UDF 现在被归类为一个单独的 Pandas 函数 API,
DataFrame.groupby().applyInPandas()
。仍然可以使用
pyspark.sql.functions.PandasUDFType
和
DataFrame.groupby().apply()
,但是更推荐直接使用
DataFrame.groupby().applyInPandas()
。在未来,使用
pyspark.sql.functions.PandasUDFType
将被弃用。
分组映射 ¶
支持使用Pandas实例进行分组映射操作,使用
DataFrame.groupby().applyInPandas()
这需要一个Python函数,该函数接受一个
pandas.DataFrame
并返回另一个
pandas.DataFrame
。它将每个组映射到Python函数中的每个
pandas.DataFrame
。
这个API实现了“拆分-应用-合并”模式,由三个步骤组成:
-
通过使用
DataFrame.groupBy()
将数据分组。 -
对每个组应用一个函数。函数的输入和输出都是
pandas.DataFrame
。输入数据包含每个组的所有行和列。 -
将结果合并到一个新的 PySpark
DataFrame
中。
要使用
DataFrame.groupBy().applyInPandas()
, 用户需要定义以下内容:
-
一个定义每个组的计算的 Python 函数。
-
一个
StructType
对象或一个字符串,定义输出 PySparkDataFrame
的模式。
返回的
pandas.DataFrame
的列标签必须与定义的输出架构中的字段名称匹配(如果指定为字符串),或者如果不是字符串,则通过位置匹配字段数据类型,例如整数索引。有关在构建
pandas.DataFrame
时如何标记列,请参见
pandas.DataFrame
。
请注意,所有数据在对组应用函数之前都将加载到内存中。这可能导致内存溢出异常,特别是在组大小不均衡的情况下。 maxRecordsPerBatch 的配置不适用于组,用户必须确保分组数据能够适应可用内存。
以下示例演示了如何使用
DataFrame.groupby().applyInPandas()
从组中的每个值中减去平均值。
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf: pd.DataFrame) -> pd.DataFrame:
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
有关详细用法,请参见 请参见
GroupedData.applyInPandas()
地图 ¶
使用 Pandas 实例的 Map 操作由
DataFrame.mapInPandas()
支持,它将
pandas.DataFrame
的迭代器映射到另一个
pandas.DataFrame
的迭代器,代表当前的 PySpark
DataFrame
并将结果返回为 PySpark
DataFrame
。该函数接受并输出一个
pandas.DataFrame
的迭代器。与某些 Pandas UDF 不同,它可以返回任意长度的输出,尽管在内部它与系列到系列的 Pandas UDF 的工作方式相似。
以下示例演示了如何使用
DataFrame.mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
有关详细用法,请参见
DataFrame.mapInPandas()
。
联合分组映射 ¶
通过
DataFrame.groupby().cogroup().applyInPandas()
支持与 Pandas 实例的共同分组映射操作,该操作允许两个 PySpark
DataFrame
通过一个公共键进行共同分组,然后对每个共同分组应用一个 Python 函数。它包含以下步骤:
-
打乱数据,使得每个共享键的dataframe组共同分组在一起。
-
对每个共同组应用一个函数。函数的输入是两个
pandas.DataFrame
(可选的元组表示键)。函数的输出是一个pandas.DataFrame
。 -
将所有组的
pandas.DataFrame
合并为一个新的 PySparkDataFrame
。
要使用
groupBy().cogroup().applyInPandas()
,用户需要定义以下内容:
-
一个定义每个协同组计算的Python函数。
-
一个
StructType
对象或一个定义输出PySparkDataFrame
模式的字符串。
返回的
pandas.DataFrame
的列标签必须与定义的输出模式中的字段名称匹配(如果指定为字符串),或者如果不是字符串,则按位置匹配字段数据类型,例如整数索引。请参阅
pandas.DataFrame
。有关构建
pandas.DataFrame
时如何标记列的信息。
请注意,在应用函数之前,所有的cogroup数据将被加载到内存中。这可能会导致内存溢出异常,特别是当组的大小不均匀时。 maxRecordsPerBatch 的配置未被应用,用户需要确保cogrouped数据能够适应可用内存。
下面的示例演示了如何使用
DataFrame.groupby().cogroup().applyInPandas()
在两个数据集之间执行 asof 连接。
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def merge_ordered(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
return pd.merge_ordered(left, right)
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
merge_ordered, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+----+
# | time| id| v1| v2|
# +--------+---+---+----+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0|null|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0|null|
# +--------+---+---+----+
有关详细用法,请参见
PandasCogroupedOps.applyInPandas()
箭头 Python 用户定义函数 ¶
Arrow Python UDF是用户定义的函数,它逐行执行,利用Arrow进行高效的批量数据传输和序列化。要定义一个Arrow Python UDF,您可以使用
udf()
装饰器或将函数包装在
udf()
方法中,确保
useArrow
参数设置为True。此外,您可以通过将Spark配置
spark.sql
.execution.pythonUDF.arrow.enabled
设置为true,从而在整个SparkSession中启用Python UDF的Arrow优化。重要的是要注意,Spark配置仅在
useArrow
未设置或设置为None时生效。
Arrow Python UDF的类型提示应按与默认的、经过pickle处理的Python UDF相同的方式指定。
这里是一个示例,演示了默认的、经过序列化的Python用户定义函数(UDF)和Arrow Python用户定义函数(UDF)的用法:
from pyspark.sql.functions import udf
@udf(returnType='int') # A default, pickled Python UDF
def slen(s): # type: ignore[no-untyped-def]
return len(s)
@udf(returnType='int', useArrow=True) # An Arrow Python UDF
def arrow_slen(s): # type: ignore[no-untyped-def]
return len(s)
df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name"), arrow_slen("name")).show()
# +----------+----------------+
# |slen(name)|arrow_slen(name)|
# +----------+----------------+
# | 8| 8|
# +----------+----------------+
与默认的、经过pickle序列化的Python UDF相比,Arrow Python UDF提供了更一致的类型强制机制。UDF类型强制在UDF返回的Python实例与用户指定的返回类型不一致时会带来挑战。默认的、经过pickle序列化的Python UDF的类型强制存在某些限制,例如依赖None作为类型不匹配的回退,导致潜在的歧义和数据丢失。此外,将日期、日期时间和元组转换为字符串可能会产生模糊的结果。另一方面,Arrow Python UDF利用Arrow的能力来标准化类型强制并有效解决这些问题。
使用说明 ¶
支持的SQL类型 ¶
目前,除了
ArrayType
和
TimestampType
之外,所有Spark SQL数据类型都支持基于Arrow的转换。
MapType
和嵌套的
ArrayType
只有在使用PyArrow 2.0.0及以上版本时才被支持。
设置箭头批处理大小 ¶
在Spark中,数据分区被转换为Arrow记录批,这可能暂时导致JVM中的高内存使用。为了避免可能的内存溢出异常,可以通过设置配置
spark.sql.execution.arrow.maxRecordsPerBatch
的值为一个整数来调整Arrow记录批的大小,这将决定每个批次的最大行数。默认值是每批10,000条记录。如果列数较多,应该相应地调整该值。使用此限制,每个数据分区将被制作成1个或多个记录批进行处理。
带时区语义的时间戳 ¶
Spark内部将时间戳存储为UTC值,未经指定时区引入的时间戳数据被转换为本地时间,以微秒精度转换为UTC。当在Spark中导出或显示时间戳数据时,使用会话时区来本地化时间戳值。会话时区通过配置
spark.sql.session.timeZone
设置,如果未设置,将默认为JVM系统本地时区。Pandas使用
datetime64
类型,具有纳秒精度,
datetime64[ns]
,可以按列选择是否使用时区。
当时间戳数据从 Spark 转移到 Pandas 时,它将被转换为纳秒,每列将被转换为 Spark 会话的时区,然后本地化为该时区,这会移除时区并以当地时间显示值。这将在调用
DataFrame.toPandas()
或
pandas_udf
时发生,带有时间戳列。
当时间戳数据从 Pandas 转移到 Spark 时,它将被转换为 UTC 微秒。这发生在使用
SparkSession.createDataFrame()
处理 Pandas DataFrame 时,或者当从
pandas_udf
返回时间戳时。这些转换是自动进行的,以确保 Spark 拥有期望格式的数据,因此您不需要自己进行任何这些转换。任何纳秒值将被截断。
请注意,标准的UDF(非Pandas)将会将时间戳数据加载为Python datetime对象,这与Pandas时间戳不同。当在
pandas_udf
中处理时间戳时,建议使用Pandas时间序列功能以获得最佳性能,详细信息请参见
这里
。
推荐的 Pandas 和 PyArrow 版本 ¶
对于使用pyspark.sql,Pandas的最低支持版本是1.0.5,PyArrow是4.0.0。可以使用更高版本,但不保证兼容性和数据正确性,应由用户自行验证。
设置箭头
self_destruct
以节省内存
¶
自 Spark 3.2 起,Spark 配置
spark.sql.execution.arrow.pyspark.selfDestruct.enabled
可用于启用 PyArrow 的
self_destruct
功能,这在通过
toPandas
创建 Pandas DataFrame 时可以节省内存,因为它在构建 Pandas DataFrame 时释放了 Arrow 分配的内存。这个选项是实验性的,一些操作可能会因为不可变的后备数组而在结果 Pandas DataFrame 上失败。通常,您会看到错误
ValueError:
buffer
source
array
is
read-only
。更新版本的 Pandas 可能通过改善对这种情况的支持来修复这些错误。您可以通过提前复制列来规避此错误。此外,这种转换可能会更慢,因为它是单线程的。