用户定义的 Python 函数
Polars 表达式非常强大且灵活,因此与其他库相比,对自定义 Python 函数的需求要少得多。不过,您可能需要将表达式的状态传递给第三方库,或者将您的黑盒函数应用于 Polars 中的数据。
在文档的这一部分,我们将使用两个API来实现这一点:
-
map_elements: 对Series中的每个值分别调用一个函数。 -
map_batches: 总是将完整的Series传递给函数。
使用 map_elements() 处理单个值
让我们从最简单的情况开始:我们希望单独处理Series中的每个值。这是我们的数据:
df = pl.DataFrame(
{
"keys": ["a", "a", "b", "b"],
"values": [10, 7, 1, 23],
}
)
print(df)
let df = df!(
"keys" => &["a", "a", "b", "b"],
"values" => &[10, 7, 1, 23],
)?;
println!("{}", df);
shape: (4, 2)
┌──────┬────────┐
│ keys ┆ values │
│ --- ┆ --- │
│ str ┆ i64 │
╞══════╪════════╡
│ a ┆ 10 │
│ a ┆ 7 │
│ b ┆ 1 │
│ b ┆ 23 │
└──────┴────────┘
我们将在每个单独的值上调用math.log():
import math
def my_log(value):
return math.log(value)
out = df.select(pl.col("values").map_elements(my_log, return_dtype=pl.Float64))
print(out)
shape: (4, 1)
┌──────────┐
│ values │
│ --- │
│ f64 │
╞══════════╡
│ 2.302585 │
│ 1.94591 │
│ 0.0 │
│ 3.135494 │
└──────────┘
虽然这可以工作,map_elements() 有两个问题:
- 仅限于单个项目: 通常你会希望有一个计算需要对整个
Series进行操作,而不是逐个处理单个项目。 - 性能开销: 即使您确实希望单独处理每个项目,为每个单独的项目调用函数也很慢;所有这些额外的函数调用都会增加很多开销。
让我们从解决第一个问题开始,然后我们将看看如何解决第二个问题。
使用 map_batches() 处理整个 Series
我们想要在整个Series的内容上运行一个自定义函数。为了演示目的,假设我们想要计算Series的平均值与每个值之间的差异。
我们可以使用map_batches() API在完整的Series或group_by()中的单个组上运行此函数:
def diff_from_mean(series):
# This will be very slow for non-trivial Series, since it's all Python
# code:
total = 0
for value in series:
total += value
mean = total / len(series)
return pl.Series([value - mean for value in series])
# Apply our custom function to a full Series with map_batches():
out = df.select(pl.col("values").map_batches(diff_from_mean))
print("== select() with UDF ==")
print(out)
# Apply our custom function per group:
print("== group_by() with UDF ==")
out = df.group_by("keys").agg(pl.col("values").map_batches(diff_from_mean))
print(out)
== select() with UDF ==
shape: (4, 1)
┌────────┐
│ values │
│ --- │
│ f64 │
╞════════╡
│ -0.25 │
│ -3.25 │
│ -9.25 │
│ 12.75 │
└────────┘
== group_by() with UDF ==
shape: (2, 2)
┌──────┬───────────────┐
│ keys ┆ values │
│ --- ┆ --- │
│ str ┆ list[f64] │
╞══════╪═══════════════╡
│ a ┆ [1.5, -1.5] │
│ b ┆ [-11.0, 11.0] │
└──────┴───────────────┘
使用用户定义函数进行快速操作
纯Python实现的问题在于它很慢。一般来说,如果你想要快速的结果,你需要尽量减少调用的Python代码量。
为了最大化速度,您需要确保使用的是用编译语言编写的函数。对于数值计算,Polars 支持由 NumPy 定义的一对接口,称为 "ufuncs" 和 "generalized ufuncs"。前者在每个项目上单独运行,后者接受整个 NumPy 数组,允许更灵活的操作。
NumPy 和其他库如 SciPy 提供了 预写的 ufuncs,你可以与 Polars 一起使用。例如:
import numpy as np
out = df.select(pl.col("values").map_batches(np.log))
print(out)
shape: (4, 1)
┌──────────┐
│ values │
│ --- │
│ f64 │
╞══════════╡
│ 2.302585 │
│ 1.94591 │
│ 0.0 │
│ 3.135494 │
└──────────┘
注意我们可以使用map_batches(),因为numpy.log()能够在单个项目和整个NumPy数组上运行。这意味着它将比我们最初的示例运行得更快,因为我们只有一个Python调用,然后所有处理都在快速的低级语言中进行。
示例:使用Numba的快速自定义函数
NumPy 提供的预写函数很有帮助,但我们的目标是编写自己的函数。
例如,假设我们想要一个上面 diff_from_mean() 示例的快速版本。在 Python 中编写这个的最简单方法是使用 Numba,它允许你在 Python 的(一个子集)中编写自定义函数,同时仍然获得编译代码的好处。
特别是,Numba 提供了一个名为
@guvectorize 的装饰器。
这通过将 Python 函数编译为快速机器代码来创建一个通用的 ufunc,使其能够被 Polars 使用。
在以下示例中,diff_from_mean_numba() 将在导入时编译为快速的机器代码,这需要一点时间。之后,所有对该函数的调用都将快速运行。Series 将在传递给函数之前转换为 NumPy 数组:
from numba import float64, guvectorize, int64
# This will be compiled to machine code, so it will be fast. The Series is
# converted to a NumPy array before being passed to the function. See the
# Numba documentation for more details:
# https://numba.readthedocs.io/en/stable/user/vectorize.html
@guvectorize([(int64[:], float64[:])], "(n)->(n)")
def diff_from_mean_numba(arr, result):
total = 0
for value in arr:
total += value
mean = total / len(arr)
for i, value in enumerate(arr):
result[i] = value - mean
out = df.select(pl.col("values").map_batches(diff_from_mean_numba))
print("== select() with UDF ==")
print(out)
out = df.group_by("keys").agg(pl.col("values").map_batches(diff_from_mean_numba))
print("== group_by() with UDF ==")
print(out)
== select() with UDF ==
shape: (4, 1)
┌────────┐
│ values │
│ --- │
│ f64 │
╞════════╡
│ -0.25 │
│ -3.25 │
│ -9.25 │
│ 12.75 │
└────────┘
== group_by() with UDF ==
shape: (2, 2)
┌──────┬───────────────┐
│ keys ┆ values │
│ --- ┆ --- │
│ str ┆ list[f64] │
╞══════╪═══════════════╡
│ a ┆ [1.5, -1.5] │
│ b ┆ [-11.0, 11.0] │
└──────┴───────────────┘
调用广义ufuncs时不允许缺失数据
在传递给用户定义的函数如diff_from_mean_numba()之前,Series将被转换为NumPy数组。不幸的是,NumPy数组没有缺失数据的概念。如果原始Series中存在缺失数据,这意味着生成的数组实际上不会与Series匹配。
如果你逐项计算结果,这并不重要。例如,numpy.log() 是分别对每个单独的值调用的,因此这些缺失值不会改变计算结果。但如果用户定义函数的结果依赖于 Series 中的多个值,那么对于缺失值应该如何处理就不清楚了。
因此,当调用诸如用@guvectorize装饰的Numba函数等广义ufunc时,如果你尝试传入一个包含缺失数据的Series,Polars将会报错。你如何去除缺失数据?在调用自定义函数之前,要么填充它,要么删除它。
合并多个列值
如果你想将多个列传递给用户定义的函数,你可以使用Struct,这在另一节中有详细说明。基本思路是将多个列组合成一个Struct,然后函数可以将这些列重新提取出来:
# Add two arrays together:
@guvectorize([(int64[:], int64[:], float64[:])], "(n),(n)->(n)")
def add(arr, arr2, result):
for i in range(len(arr)):
result[i] = arr[i] + arr2[i]
df3 = pl.DataFrame({"values1": [1, 2, 3], "values2": [10, 20, 30]})
out = df3.select(
# Create a struct that has two columns in it:
pl.struct(["values1", "values2"])
# Pass the struct to a lambda that then passes the individual columns to
# the add() function:
.map_batches(
lambda combined: add(
combined.struct.field("values1"), combined.struct.field("values2")
)
)
.alias("add_columns")
)
print(out)
shape: (3, 1)
┌─────────────┐
│ add_columns │
│ --- │
│ f64 │
╞═════════════╡
│ 11.0 │
│ 22.0 │
│ 33.0 │
└─────────────┘
流式计算
将完整的Series传递给用户定义的函数是有代价的:它可能会使用大量内存,因为其内容被复制到NumPy数组中。您可以使用is_elementwise=True参数来 map_batches将结果流式传输到函数中,这意味着它可能不会一次性获取所有值。
注意
is_elementwise 参数如果设置不正确,可能会导致错误的结果。
如果你设置 is_elementwise=True,请确保你的函数实际上是逐个元素操作的(例如“计算每个值的对数”)——例如,我们的示例函数 diff_from_mean() 就不是这样。
返回类型
自定义的Python函数通常是黑盒子;Polars不知道你的函数在做什么或它将返回什么。因此,返回的数据类型会自动推断。我们通过等待第一个非空值来实现这一点。然后,该值将用于确定结果Series的类型。
Python 类型到 Polars 数据类型的映射如下:
int->Int64float->Float64bool->Booleanstr->Stringlist[tp]->List[tp](其中内部类型使用相同的规则推断)dict[str, [tp]]->structAny->object(始终避免这种情况)
Rust 类型映射如下:
i32或i64->Int64f32或f64->Float64bool->BooleanString或str->StringVec->List[tp](其中内部类型使用相同的规则推断)
你可以传递一个return_dtype参数给
map_batches
如果你想覆盖推断的类型。