PySpark中的Apache Arrow

Apache Arrow 是一种内存中的列式数据格式,用于在 Spark 中高效地在 JVM 和 Python 进程之间传输数据。这对使用 Pandas/NumPy 数据的 Python 用户最为有利。其使用并非自动化的,可能需要对配置或代码进行一些小调整,以便充分利用并确保兼容性。本指南将对如何在 Spark 中使用 Arrow 进行高层次描述,并强调使用支持 Arrow 的数据时的任何差异。

确保安装PyArrow

要在PySpark中使用Apache Arrow, 推荐的PyArrow版本 应该被安装。如果您使用pip安装PySpark,则可以通过命令 pip install pyspark[sql] 将PyArrow作为SQL模块的额外依赖安装。否则,您必须确保PyArrow在所有集群节点上安装并可用。您可以通过pip或从conda-forge通道使用conda进行安装。有关详细信息,请参见PyArrow 安装

启用 Pandas 的转换

Arrow可作为将Spark DataFrame转换为Pandas DataFrame时的优化,使用调用 DataFrame.toPandas() ,以及在从Pandas DataFrame创建Spark DataFrame时,使用 SparkSession.createDataFrame() 。要在执行这些调用时使用Arrow,用户需要先将Spark配置 spark.sql.execution.arrow.pyspark.enabled 设置为 true 。默认情况下此功能是禁用的。

此外,由 spark.sql.execution.arrow.pyspark.enabled 使能的优化可以在实际计算之前如果发生错误时自动回退到非Arrow优化实现。这可以通过 spark.sql.execution.arrow.pyspark.fallback.enabled 进行控制。

import numpy as np
import pandas as pd

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()

print("Pandas DataFrame result statistics:\n%s\n" % str(result_pdf.describe()))

使用上述与Arrow的优化将产生与未启用Arrow时相同的结果。

请注意,即使使用Arrow, DataFrame.toPandas() 也会将DataFrame中的所有记录收集到驱动程序中,并且应在数据的小子集上进行。并非所有Spark数据类型当前都得到支持,如果某列具有不支持的类型,则可能会引发错误。如果在 SparkSession.createDataFrame() 期间发生错误,Spark将退回到不使用Arrow创建DataFrame。

Pandas用户定义函数(又称向量化用户定义函数)

Pandas UDF是由Spark执行的用户定义函数,它使用Arrow来传输数据,并使用Pandas来处理数据,从而允许向量化操作。Pandas UDF是使用 pandas_udf() 作为装饰器或包装函数来定义的,不需要额外的配置。Pandas UDF在一般情况下表现得像一个常规的PySpark函数API。

在Spark 3.0之前,Pandas UDF是使用 pyspark.sql.functions.PandasUDFType 定义的。从Spark 3.0开始,结合Python 3.6+,您还可以使用 Python类型提示 。建议使用Python类型提示,未来版本将弃用 pyspark.sql.functions.PandasUDFType

请注意,在所有情况下类型提示应使用 pandas.Series ,但有一种情况应使用 pandas.DataFrame 作为输入或输出的类型提示,当输入或输出列为 StructType 时。以下示例展示了一个 Pandas UDF,它接受长整型列、字符串列和结构列,并输出一个结构列。它要求函数指定 pandas.Series pandas.DataFrame 的类型提示,如下所示:

import pandas as pd

from pyspark.sql.functions import pandas_udf

@pandas_udf("col1 string, col2 long")  # type: ignore[call-overload]
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3['col2'] = s1 + s2.str.len()
    return s3

# Create a Spark DataFrame that has three columns including a struct column.
df = spark.createDataFrame(
    [[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1:string>")

df.printSchema()
# root
# |-- long_column: long (nullable = true)
# |-- string_column: string (nullable = true)
# |-- struct_column: struct (nullable = true)
# |    |-- col1: string (nullable = true)

df.select(func("long_col", "string_col", "struct_col")).printSchema()
# |-- func(long_col, string_col, struct_col): struct (nullable = true)
# |    |-- col1: string (nullable = true)
# |    |-- col2: long (nullable = true)

在接下来的章节中,它描述了支持的类型提示的组合。为了简单起见, pandas.DataFrame 变体被省略。

系列到系列

类型提示可以表示为 pandas.Series , … -> pandas.Series .

通过使用 pandas_udf() 并且函数具有上述类型提示,它创建了一个 Pandas UDF,其中给定的 函数接收一个或多个 pandas.Series 并输出一个 pandas.Series 。函数的输出应该 始终与输入的长度相同。在内部,PySpark 将通过将 列拆分为批次并对每个批次作为数据的子集调用该函数来执行 Pandas UDF,然后将 结果连接在一起。

以下示例展示了如何创建这个 Pandas UDF,用于计算 2 列的乘积。

import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())  # type: ignore[call-overload]

# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

有关详细用法,请参见 pandas_udf()

系列的迭代器到系列的迭代器

类型提示可以表示为 Iterator[pandas.Series] -> Iterator[pandas.Series]

通过使用 pandas_udf() 和上述类型提示的函数,它创建了一个 Pandas UDF,给定的函数接受一个 pandas.Series 的迭代器,并输出一个 pandas.Series 的迭代器。函数的整个输出的长度应与整个输入的长度相同;因此,只要长度相同,它可以预先获取输入迭代器中的数据。在这种情况下,创建的 Pandas UDF 在调用时需要一个输入列。要使用多个输入列,需要不同的类型提示。请参阅多个系列的迭代器到系列的迭代器。

当 UDF 执行需要初始化某些状态时,这也是有用的,尽管在内部它的工作方式与系列到系列的情况完全相同。下面的伪代码展示了这个例子。

@pandas_udf("long")
def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    # Do some expensive initialization with a state
    state = very_expensive_initialization()
    for x in iterator:
        # Use that state for the whole iterator.
        yield calculate_with_state(x, state)

df.select(calculate("value")).show()

以下示例展示了如何创建这个 Pandas UDF:

from typing import Iterator

import pandas as pd

from pyspark.sql.functions import pandas_udf

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# Declare the function and create the UDF
@pandas_udf("long")  # type: ignore[call-overload]
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in iterator:
        yield x + 1

df.select(plus_one("x")).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

有关详细用法,请参见 pandas_udf()

多个序列的迭代器到序列的迭代器

类型提示可以表示为 Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series] .

通过使用 pandas_udf() 并具有上述类型提示的函数,它创建了一个 Pandas UDF,其中给定的函数接受一个包含多个 pandas.Series 的元组的迭代器,并输出一个 pandas.Series 的迭代器。在这种情况下,创建的 pandas UDF 在调用时需要与元组中的系列数量相同的多个输入列。否则,它具有与系列到系列的迭代器相同的特征和限制。

以下示例展示了如何创建这个 Pandas UDF:

from typing import Iterator, Tuple

import pandas as pd

from pyspark.sql.functions import pandas_udf

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# Declare the function and create the UDF
@pandas_udf("long")  # type: ignore[call-overload]
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

详细使用请参见 pandas_udf()

系列到标量

类型提示可以表示为 pandas.Series , … -> Any .

通过使用 pandas_udf() 以及具有上述类型提示的函数,可以创建一个类似于 PySpark 聚合函数的 Pandas UDF。给定的函数接受 pandas.Series 并返回一个标量值。返回类型应为原始数据类型,返回的标量可以是 Python 原始类型,例如 int float ,也可以是 numpy 数据类型,例如 numpy.int64 numpy.float64 Any 理想情况下应当是相应的具体标量类型。

此 UDF 也可以与 GroupedData.agg() 窗口 一起使用。它定义了从一个或多个 pandas.Series 到标量值的聚合,其中每个 pandas.Series 代表组或窗口中的一列。

请注意,这种类型的 UDF 不支持部分汇总,所有数据将被加载到内存中。此外,目前仅支持无界窗口的分组聚合 Pandas UDF。以下示例展示了如何使用这种类型的 UDF 通过分组和窗口操作计算均值:

import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")  # type: ignore[call-overload]
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

有关详细用法,请参见 pandas_udf()

Pandas 函数 API

Pandas函数API可以直接应用一个Python本地函数于整个 DataFrame ,通过使用Pandas实例。内部它的工作方式与Pandas UDFs类似,使用Arrow传输数据,使用Pandas处理数据,这允许矢量化操作。然而,Pandas函数API在PySpark DataFrame 下表现得像一个常规API,而不是 Column ,并且在Pandas函数API中的Python类型提示是可选的,目前不会影响其内部工作,虽然未来可能会被要求。

从 Spark 3.0 开始,分组映射 pandas UDF 现在被归类为一个单独的 Pandas 函数 API, DataFrame.groupby().applyInPandas() 。仍然可以使用 pyspark.sql.functions.PandasUDFType DataFrame.groupby().apply() ,但是更推荐直接使用 DataFrame.groupby().applyInPandas() 。在未来,使用 pyspark.sql.functions.PandasUDFType 将被弃用。

分组映射

支持使用Pandas实例进行分组映射操作,使用 DataFrame.groupby().applyInPandas() 这需要一个Python函数,该函数接受一个 pandas.DataFrame 并返回另一个 pandas.DataFrame 。它将每个组映射到Python函数中的每个 pandas.DataFrame

这个API实现了“拆分-应用-合并”模式,由三个步骤组成:

  • 通过使用 DataFrame.groupBy() 将数据分组。

  • 对每个组应用一个函数。函数的输入和输出都是 pandas.DataFrame 。输入数据包含每个组的所有行和列。

  • 将结果合并到一个新的 PySpark DataFrame 中。

要使用 DataFrame.groupBy().applyInPandas() , 用户需要定义以下内容:

  • 一个定义每个组的计算的 Python 函数。

  • 一个 StructType 对象或一个字符串,定义输出 PySpark DataFrame 的模式。

返回的 pandas.DataFrame 的列标签必须与定义的输出架构中的字段名称匹配(如果指定为字符串),或者如果不是字符串,则通过位置匹配字段数据类型,例如整数索引。有关在构建 pandas.DataFrame 时如何标记列,请参见 pandas.DataFrame

请注意,所有数据在对组应用函数之前都将加载到内存中。这可能导致内存溢出异常,特别是在组大小不均衡的情况下。 maxRecordsPerBatch 的配置不适用于组,用户必须确保分组数据能够适应可用内存。

以下示例演示了如何使用 DataFrame.groupby().applyInPandas() 从组中的每个值中减去平均值。

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf: pd.DataFrame) -> pd.DataFrame:
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

有关详细用法,请参见 请参见 GroupedData.applyInPandas()

地图

使用 Pandas 实例的 Map 操作由 DataFrame.mapInPandas() 支持,它将 pandas.DataFrame 的迭代器映射到另一个 pandas.DataFrame 的迭代器,代表当前的 PySpark DataFrame 并将结果返回为 PySpark DataFrame 。该函数接受并输出一个 pandas.DataFrame 的迭代器。与某些 Pandas UDF 不同,它可以返回任意长度的输出,尽管在内部它与系列到系列的 Pandas UDF 的工作方式相似。

以下示例演示了如何使用 DataFrame.mapInPandas()

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

有关详细用法,请参见 DataFrame.mapInPandas()

联合分组映射

通过 DataFrame.groupby().cogroup().applyInPandas() 支持与 Pandas 实例的共同分组映射操作,该操作允许两个 PySpark DataFrame 通过一个公共键进行共同分组,然后对每个共同分组应用一个 Python 函数。它包含以下步骤:

  • 打乱数据,使得每个共享键的dataframe组共同分组在一起。

  • 对每个共同组应用一个函数。函数的输入是两个 pandas.DataFrame (可选的元组表示键)。函数的输出是一个 pandas.DataFrame

  • 将所有组的 pandas.DataFrame 合并为一个新的 PySpark DataFrame

要使用 groupBy().cogroup().applyInPandas() ,用户需要定义以下内容:

  • 一个定义每个协同组计算的Python函数。

  • 一个 StructType 对象或一个定义输出PySpark DataFrame 模式的字符串。

返回的 pandas.DataFrame 的列标签必须与定义的输出模式中的字段名称匹配(如果指定为字符串),或者如果不是字符串,则按位置匹配字段数据类型,例如整数索引。请参阅 pandas.DataFrame 。有关构建 pandas.DataFrame 时如何标记列的信息。

请注意,在应用函数之前,所有的cogroup数据将被加载到内存中。这可能会导致内存溢出异常,特别是当组的大小不均匀时。 maxRecordsPerBatch 的配置未被应用,用户需要确保cogrouped数据能够适应可用内存。

下面的示例演示了如何使用 DataFrame.groupby().cogroup().applyInPandas() 在两个数据集之间执行 asof 连接。

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def merge_ordered(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
    return pd.merge_ordered(left, right)

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    merge_ordered, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+----+
# |    time| id| v1|  v2|
# +--------+---+---+----+
# |20000101|  1|1.0|   x|
# |20000102|  1|3.0|null|
# |20000101|  2|2.0|   y|
# |20000102|  2|4.0|null|
# +--------+---+---+----+

有关详细用法,请参见 PandasCogroupedOps.applyInPandas()

箭头 Python 用户定义函数

Arrow Python UDF是用户定义的函数,它逐行执行,利用Arrow进行高效的批量数据传输和序列化。要定义一个Arrow Python UDF,您可以使用 udf() 装饰器或将函数包装在 udf() 方法中,确保 useArrow 参数设置为True。此外,您可以通过将Spark配置 spark.sql .execution.pythonUDF.arrow.enabled 设置为true,从而在整个SparkSession中启用Python UDF的Arrow优化。重要的是要注意,Spark配置仅在 useArrow 未设置或设置为None时生效。

Arrow Python UDF的类型提示应按与默认的、经过pickle处理的Python UDF相同的方式指定。

这里是一个示例,演示了默认的、经过序列化的Python用户定义函数(UDF)和Arrow Python用户定义函数(UDF)的用法:

from pyspark.sql.functions import udf

@udf(returnType='int')  # A default, pickled Python UDF
def slen(s):  # type: ignore[no-untyped-def]
    return len(s)

@udf(returnType='int', useArrow=True)  # An Arrow Python UDF
def arrow_slen(s):  # type: ignore[no-untyped-def]
    return len(s)

df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))

df.select(slen("name"), arrow_slen("name")).show()
# +----------+----------------+
# |slen(name)|arrow_slen(name)|
# +----------+----------------+
# |         8|               8|
# +----------+----------------+

与默认的、经过pickle序列化的Python UDF相比,Arrow Python UDF提供了更一致的类型强制机制。UDF类型强制在UDF返回的Python实例与用户指定的返回类型不一致时会带来挑战。默认的、经过pickle序列化的Python UDF的类型强制存在某些限制,例如依赖None作为类型不匹配的回退,导致潜在的歧义和数据丢失。此外,将日期、日期时间和元组转换为字符串可能会产生模糊的结果。另一方面,Arrow Python UDF利用Arrow的能力来标准化类型强制并有效解决这些问题。

使用说明

支持的SQL类型

目前,除了 ArrayType TimestampType 之外,所有Spark SQL数据类型都支持基于Arrow的转换。 MapType 和嵌套的 ArrayType 只有在使用PyArrow 2.0.0及以上版本时才被支持。

设置箭头批处理大小

在Spark中,数据分区被转换为Arrow记录批,这可能暂时导致JVM中的高内存使用。为了避免可能的内存溢出异常,可以通过设置配置 spark.sql.execution.arrow.maxRecordsPerBatch 的值为一个整数来调整Arrow记录批的大小,这将决定每个批次的最大行数。默认值是每批10,000条记录。如果列数较多,应该相应地调整该值。使用此限制,每个数据分区将被制作成1个或多个记录批进行处理。

带时区语义的时间戳

Spark内部将时间戳存储为UTC值,未经指定时区引入的时间戳数据被转换为本地时间,以微秒精度转换为UTC。当在Spark中导出或显示时间戳数据时,使用会话时区来本地化时间戳值。会话时区通过配置 spark.sql.session.timeZone 设置,如果未设置,将默认为JVM系统本地时区。Pandas使用 datetime64 类型,具有纳秒精度, datetime64[ns] ,可以按列选择是否使用时区。

当时间戳数据从 Spark 转移到 Pandas 时,它将被转换为纳秒,每列将被转换为 Spark 会话的时区,然后本地化为该时区,这会移除时区并以当地时间显示值。这将在调用 DataFrame.toPandas() pandas_udf 时发生,带有时间戳列。

当时间戳数据从 Pandas 转移到 Spark 时,它将被转换为 UTC 微秒。这发生在使用 SparkSession.createDataFrame() 处理 Pandas DataFrame 时,或者当从 pandas_udf 返回时间戳时。这些转换是自动进行的,以确保 Spark 拥有期望格式的数据,因此您不需要自己进行任何这些转换。任何纳秒值将被截断。

请注意,标准的UDF(非Pandas)将会将时间戳数据加载为Python datetime对象,这与Pandas时间戳不同。当在 pandas_udf 中处理时间戳时,建议使用Pandas时间序列功能以获得最佳性能,详细信息请参见 这里

设置箭头 self_destruct 以节省内存

自 Spark 3.2 起,Spark 配置 spark.sql.execution.arrow.pyspark.selfDestruct.enabled 可用于启用 PyArrow 的 self_destruct 功能,这在通过 toPandas 创建 Pandas DataFrame 时可以节省内存,因为它在构建 Pandas DataFrame 时释放了 Arrow 分配的内存。这个选项是实验性的,一些操作可能会因为不可变的后备数组而在结果 Pandas DataFrame 上失败。通常,您会看到错误 ValueError: buffer source array is read-only 。更新版本的 Pandas 可能通过改善对这种情况的支持来修复这些错误。您可以通过提前复制列来规避此错误。此外,这种转换可能会更慢,因为它是单线程的。