使用 Polars 进行数据验证¶
在 0.19.0 中新增
Polars 是一个用于操作结构化数据的极快的 DataFrame 库。由于核心是用 Rust 编写的,因此您可以获得 C/C++ 的性能,同时在其他语言(如 Python)中提供 SDK。
用法¶
通过 polars 集成,您可以定义 pandera 模式来验证 Python 中的 polars 数据框。首先,使用 polars 附加安装 pandera:
pip install 'pandera[polars]'
注意
截至 pandera >= 0.21.0,仅支持 polars >= 1.0.0。
重要
如果您使用的是 Apple Silicon 机器,您需要通过 pip install polars-lts-cpu 安装 polars。
如果已经安装了 polars,您可能需要删除它:
pip uninstall polars
pip install polars-lts-cpu
然后您可以使用 pandera 模式来验证 polars 数据帧。在下面的示例中,我们将使用 基于类的 API 来定义一个 DataFrameModel,然后我们使用它来验证一个 polars.LazyFrame 对象。
import pandera.polars as pa
import polars as pl
class Schema(pa.DataFrameModel):
state: str
city: str
price: int = pa.Field(in_range={"min_value": 5, "max_value": 20})
lf = pl.LazyFrame(
{
'state': ['FL','FL','FL','CA','CA','CA'],
'city': [
'Orlando',
'Miami',
'Tampa',
'San Francisco',
'Los Angeles',
'San Diego',
],
'price': [8, 12, 10, 16, 20, 18],
}
)
Schema.validate(lf).collect()
| 状态 | 城市 | 价格 |
|---|---|---|
| 字符串 | 字符串 | 整型64 |
| "FL" | "奥兰多" | 8 |
| "FL" | "迈阿密" | 12 |
| "FL" | "坦帕" | 10 |
| "CA" | "旧金山" | 16 |
| "CA" | "洛杉矶" | 20 |
| "加利福尼亚" | "圣地亚哥" | 18 |
您还可以使用 check_types() 装饰器在运行时验证 polars LazyFrame 函数注解:
from pandera.typing.polars import LazyFrame
@pa.check_types
def function(lf: LazyFrame[Schema]) -> LazyFrame[Schema]:
return lf.filter(pl.col("state").eq("CA"))
function(lf).collect()
| 状态 | 城市 | 价格 |
|---|---|---|
| str | str | 整型64 |
| "CA" | "旧金山" | 16 |
| "CA" | "洛杉矶" | 20 |
| "加利福尼亚" | "圣地亚哥" | 18 |
当然,您可以使用基于对象的API来定义一个
DataFrameSchema:
schema = pa.DataFrameSchema({
"state": pa.Column(str),
"city": pa.Column(str),
"price": pa.Column(int, pa.Check.in_range(min_value=5, max_value=20))
})
schema.validate(lf).collect()
| 状态 | 城市 | 价格 |
|---|---|---|
| str | str | 整型64 |
| "FL" | "奥兰多" | 8 |
| "FL" | "迈阿密" | 12 |
| "FL" | "坦帕" | 10 |
| "CA" | "旧金山" | 16 |
| "CA" | "洛杉矶" | 20 |
| "加利福尼亚" | "圣地亚哥" | 18 |
您还可以验证 polars.DataFrame 对象,这些对象
会立即执行计算。在后台, pandera 将
polars.DataFrame 转换为 polars.LazyFrame 然后进行验证。这是为了
使 pandera 实现的内部验证例程能够利用 polars 懒惰 API 提供的优化。
df: pl.DataFrame = lf.collect()
schema.validate(df)
| 状态 | 城市 | 价格 |
|---|---|---|
| str | str | i64 |
| "FL" | "奥兰多" | 8 |
| "FL" | "迈阿密" | 12 |
| "FL" | "坦帕" | 10 |
| "CA" | "旧金山" | 16 |
| "CA" | "洛杉矶" | 20 |
| "加利福尼亚" | "圣地亚哥" | 18 |
为测试合成数据¶
它是如何工作的¶
与pandera处理pandas数据框的方式相比,pandera尽可能利用polars 惰性 API,以利用其查询优化的好处。
在高层次上,这就是模式验证期间发生的事情:
应用解析器: 如果
add_missing_columns=True,添加缺失的列,coerce=True时强制数据类型,strict="filter"时过滤列,如果default=设置默认值。应用检查: 在数据上运行所有核心、内置和自定义检查。元数据的检查是在没有
.collect()操作的情况下进行的,但检查数据值的检查是需要的。引发错误: 如果发现数据错误,将引发一个
SchemaError。如果validate(..., lazy=True),将引发一个SchemaErrors异常,其中包含数据中所有的验证错误。返回经过验证的输出: 如果未发现数据错误,则返回经过验证的对象
注意
在 pl.LazyFrame 对象上进行数据类型强制转换时,无需进行 .collect() 操作,但在 pl.DataFrame 上的强制转换将会导致更多的信息性错误消息,因为所有失败案例都可以被报告。
pandera的验证行为与polars处理惰性与急切操作的方式一致。当你在polars.LazyFrame上调用schema.validate()时,pandera将应用所有可以在不进行任何collect()操作的情况下完成的解析器和检查。这意味着它仅在模式级别进行验证,例如,列名和数据类型。
然而,如果您验证一个 polars.DataFrame, pandera 会执行
模式级和数据级验证。
注意
在后台, pandera 会将 polars.DataFrame 转换为
polars.LazyFrame 以便进行验证。这是为了在验证过程中利用
polars 延迟 API。虽然此功能在 pandera 库中尚未完全优化,但此设计决策为未来的性能改进奠定了基础。
LazyFrame 方法链¶
import pandera.polars as pa
import polars as pl
schema = pa.DataFrameSchema({"a": pa.Column(int)})
df = (
pl.LazyFrame({"a": [1.0, 2.0, 3.0]})
.cast({"a": pl.Int64})
.pipe(schema.validate) # this only validates schema-level properties
.with_columns(b=pl.lit("a"))
# do more lazy operations
.collect()
)
print(df)
shape: (3, 2)
┌─────┬─────┐
│ a ┆ b │
│ --- ┆ --- │
│ i64 ┆ str │
╞═════╪═════╡
│ 1 ┆ a │
│ 2 ┆ a │
│ 3 ┆ a │
└─────┴─────┘
import pandera.polars as pa
import polars as pl
class SimpleModel(pa.DataFrameModel):
a: int
df = (
pl.LazyFrame({"a": [1.0, 2.0, 3.0]})
.cast({"a": pl.Int64})
.pipe(SimpleModel.validate) # this only validates schema-level properties
.with_columns(b=pl.lit("a"))
# do more lazy operations
.collect()
)
print(df)
shape: (3, 2)
┌─────┬─────┐
│ a ┆ b │
│ --- ┆ --- │
│ i64 ┆ str │
╞═════╪═════╡
│ 1 ┆ a │
│ 2 ┆ a │
│ 3 ┆ a │
└─────┴─────┘
DataFrame 方法链¶
schema = pa.DataFrameSchema({"a": pa.Column(int)})
df = (
pl.DataFrame({"a": [1.0, 2.0, 3.0]})
.cast({"a": pl.Int64})
.pipe(schema.validate) # this validates schema- and data- level properties
.with_columns(b=pl.lit("a"))
# do more eager operations
)
print(df)
shape: (3, 2)
┌─────┬─────┐
│ a ┆ b │
│ --- ┆ --- │
│ i64 ┆ str │
╞═════╪═════╡
│ 1 ┆ a │
│ 2 ┆ a │
│ 3 ┆ a │
└─────┴─────┘
class SimpleModel(pa.DataFrameModel):
a: int
df = (
pl.DataFrame({"a": [1.0, 2.0, 3.0]})
.cast({"a": pl.Int64})
.pipe(SimpleModel.validate) # this validates schema- and data- level properties
.with_columns(b=pl.lit("a"))
# do more eager operations
)
print(df)
shape: (3, 2)
┌─────┬─────┐
│ a ┆ b │
│ --- ┆ --- │
│ i64 ┆ str │
╞═════╪═════╡
│ 1 ┆ a │
│ 2 ┆ a │
│ 3 ┆ a │
└─────┴─────┘
错误报告¶
在验证错误发生时, pandera 将主动引发一个 SchemaError。
class SimpleModel(pa.DataFrameModel):
a: int
invalid_lf = pl.LazyFrame({"a": pl.Series(["1", "2", "3"], dtype=pl.Utf8)})
try:
SimpleModel.validate(invalid_lf)
except pa.errors.SchemaError as exc:
print(exc)
expected column 'a' to have type Int64, got String
如果使用惰性验证,pandera 将抛出一个 SchemaErrors 异常。这在你想收集数据中所有的验证错误时特别有用。
注意
惰性验证在pandera中与polars中的惰性API不同,这是一个不幸的名称冲突。惰性验证意味着在引发SchemaErrors异常之前,对数据应用所有解析器和检查。polars中的惰性API允许您构建计算图,而无需实际在线执行它,在这里您调用.collect()来真正执行计算。
默认情况下,pl.LazyFrame 验证将仅验证架构层级属性:
class ModelWithChecks(pa.DataFrameModel):
a: int
b: str = pa.Field(isin=[*"abc"])
c: float = pa.Field(ge=0.0, le=1.0)
invalid_lf = pl.LazyFrame({
"a": pl.Series(["1", "2", "3"], dtype=pl.Utf8),
"b": ["d", "e", "f"],
"c": [0.0, 1.1, -0.1],
})
ModelWithChecks.validate(invalid_lf, lazy=True)
Traceback (most recent call last):
...
pandera.errors.SchemaErrors: {
"SCHEMA": {
"WRONG_DATATYPE": [
{
"schema": "ModelWithChecks",
"column": "a",
"check": "dtype('Int64')",
"error": "expected column 'a' to have type Int64, got String"
}
]
}
}
默认情况下,pl.DataFrame 验证将验证架构级别和数据级别属性:
class ModelWithChecks(pa.DataFrameModel):
a: int
b: str = pa.Field(isin=[*"abc"])
c: float = pa.Field(ge=0.0, le=1.0)
invalid_lf = pl.DataFrame({
"a": pl.Series(["1", "2", "3"], dtype=pl.Utf8),
"b": ["d", "e", "f"],
"c": [0.0, 1.1, -0.1],
})
ModelWithChecks.validate(invalid_lf, lazy=True)
Traceback (most recent call last):
...
pandera.errors.SchemaErrors: {
"SCHEMA": {
"WRONG_DATATYPE": [
{
"schema": "ModelWithChecks",
"column": "a",
"check": "dtype('Int64')",
"error": "expected column 'a' to have type Int64, got String"
}
]
},
"DATA": {
"DATAFRAME_CHECK": [
{
"schema": "ModelWithChecks",
"column": "b",
"check": "isin(['a', 'b', 'c'])",
"error": "Column 'b' failed validator number 0: <Check isin: isin(['a', 'b', 'c'])> failure case examples: [{'b': 'd'}, {'b': 'e'}, {'b': 'f'}]"
},
{
"schema": "ModelWithChecks",
"column": "c",
"check": "greater_than_or_equal_to(0.0)",
"error": "Column 'c' failed validator number 0: <Check greater_than_or_equal_to: greater_than_or_equal_to(0.0)> failure case examples: [{'c': -0.1}]"
},
{
"schema": "ModelWithChecks",
"column": "c",
"check": "less_than_or_equal_to(1.0)",
"error": "Column 'c' failed validator number 1: <Check less_than_or_equal_to: less_than_or_equal_to(1.0)> failure case examples: [{'c': 1.1}]"
}
]
}
}
支持的数据类型¶
pandera 目前支持所有的
polars 数据类型。
内置的 python 类型如 str、int、float 和 bool 将被
以 polars 处理它们的方式进行处理:
assert pl.Series([1,2,3], dtype=int).dtype == pl.Int64
assert pl.Series([*"abc"], dtype=str).dtype == pl.Utf8
assert pl.Series([1.0, 2.0, 3.0], dtype=float).dtype == pl.Float64
因此,以下架构是等价的:
schema1 = pa.DataFrameSchema({
"a": pa.Column(int),
"b": pa.Column(str),
"c": pa.Column(float),
})
schema2 = pa.DataFrameSchema({
"a": pa.Column(pl.Int64),
"b": pa.Column(pl.Utf8),
"c": pa.Column(pl.Float64),
})
assert schema1 == schema2
嵌套类型¶
Polars 嵌套数据类型也通过 参数化数据类型 得到支持。请参见下面的示例,了解通过基于对象和基于类的 API 指定这一点的不同方法:
schema = pa.DataFrameSchema(
{
"list_col": pa.Column(pl.List(pl.Int64())),
"array_col": pa.Column(pl.Array(pl.Int64(), 3)),
"struct_col": pa.Column(pl.Struct({"a": pl.Utf8(), "b": pl.Float64()})),
},
)
try:
from typing import Annotated # python 3.9+
except ImportError:
from typing_extensions import Annotated
class ModelWithAnnotated(pa.DataFrameModel):
list_col: Annotated[pl.List, pl.Int64()]
array_col: Annotated[pl.Array, pl.Int64(), 3]
struct_col: Annotated[pl.Struct, {"a": pl.Utf8(), "b": pl.Float64()}]
class ModelWithDtypeKwargs(pa.DataFrameModel):
list_col: pl.List = pa.Field(dtype_kwargs={"inner": pl.Int64()})
array_col: pl.Array = pa.Field(dtype_kwargs={"inner": pl.Int64(), "width": 3})
struct_col: pl.Struct = pa.Field(dtype_kwargs={"fields": {"a": pl.Utf8(), "b": pl.Float64()}})
与时间无关的日期时间¶
在某些用例中,包含 pl.DateTime 数据的列是否具有时区可能无关紧要。在这种情况下,您可以使用 pandera-native polars 数据类型:
from pandera.engines.polars_engine import DateTime
schema = pa.DataFrameSchema({
"created_at": pa.Column(DateTime(time_zone_agnostic=True)),
})
from pandera.engines.polars_engine import DateTime
class DateTimeModel(pa.DataFrameModel):
created_at: Annotated[DateTime, True, "us", None]
.
注意
对于 Annotated 类型,您需要传入所有的位置参数和关键字参数。
from pandera.engines.polars_engine import DateTime
class DateTimeModel(pa.DataFrameModel):
created_at: DateTime = pa.Field(dtype_kwargs={"time_zone_agnostic": True})
自定义检查¶
所有内置的 Check 方法在 polars 集成中都受支持。
要创建自定义检查,您可以创建以PolarsData命名元组作为输入并生成polars.LazyFrame作为输出的函数。PolarsData包含两个属性:
一个
lazyframe属性,其中包含您想要验证的polars.LazyFrame对象。一个
key属性,包含您想要验证的列名称。这将在数据框级检查中是None。
逐元素检查也可以通过设置 element_wise=True 来支持。这将需要一个函数,该函数接受列/数据框中的单个元素并返回一个布尔标量,指示传递的值是否符合要求。
警告
在内部,元素级检查使用 map_elements 函数,这比原生的polars表达式API慢。
列级检查¶
这是一个列级自定义检查的示例:
from pandera.polars import PolarsData
def is_positive_vector(data: PolarsData) -> pl.LazyFrame:
"""Return a LazyFrame with a single boolean column."""
return data.lazyframe.select(pl.col(data.key).gt(0))
def is_positive_scalar(data: PolarsData) -> pl.LazyFrame:
"""Return a LazyFrame with a single boolean scalar."""
return data.lazyframe.select(pl.col(data.key).gt(0).all())
def is_positive_element_wise(x: int) -> bool:
"""Take a single value and return a boolean scalar."""
return x > 0
schema_with_custom_checks = pa.DataFrameSchema({
"a": pa.Column(
int,
checks=[
pa.Check(is_positive_vector),
pa.Check(is_positive_scalar),
pa.Check(is_positive_element_wise, element_wise=True),
]
)
})
lf = pl.LazyFrame({"a": [1, 2, 3]})
validated_df = lf.collect().pipe(schema_with_custom_checks.validate)
print(validated_df)
shape: (3, 1)
┌─────┐
│ a │
│ --- │
│ i64 │
╞═════╡
│ 1 │
│ 2 │
│ 3 │
└─────┘
from pandera.polars import PolarsData
class ModelWithCustomChecks(pa.DataFrameModel):
a: int
@pa.check("a")
def is_positive_vector(cls, data: PolarsData) -> pl.LazyFrame:
"""Return a LazyFrame with a single boolean column."""
return data.lazyframe.select(pl.col(data.key).gt(0))
@pa.check("a")
def is_positive_scalar(cls, data: PolarsData) -> pl.LazyFrame:
"""Return a LazyFrame with a single boolean scalar."""
return data.lazyframe.select(pl.col(data.key).gt(0).all())
@pa.check("a", element_wise=True)
def is_positive_element_wise(cls, x: int) -> bool:
"""Take a single value and return a boolean scalar."""
return x > 0
validated_df = lf.collect().pipe(ModelWithCustomChecks.validate)
print(validated_df)
shape: (3, 1)
┌─────┐
│ a │
│ --- │
│ i64 │
╞═════╡
│ 1 │
│ 2 │
│ 3 │
└─────┘
对于列级检查,自定义检查函数应返回一个
polars.LazyFrame,该框架包含一个布尔列或一个布尔标量。
数据框级检查¶
如果你需要验证整个数据框中的值,可以在数据框级别指定一个检查。期望的输出是一个 polars.LazyFrame,包含多个布尔列、单个布尔列或一个标量布尔值。
def col1_gt_col2(data: PolarsData, col1: str, col2: str) -> pl.LazyFrame:
"""Return a LazyFrame with a single boolean column."""
return data.lazyframe.select(pl.col(col1).gt(pl.col(col2)))
def is_positive_df(data: PolarsData) -> pl.LazyFrame:
"""Return a LazyFrame with multiple boolean columns."""
return data.lazyframe.select(pl.col("*").gt(0))
def is_positive_element_wise(x: int) -> bool:
"""Take a single value and return a boolean scalar."""
return x > 0
schema_with_df_checks = pa.DataFrameSchema(
columns={
"a": pa.Column(int),
"b": pa.Column(int),
},
checks=[
pa.Check(col1_gt_col2, col1="a", col2="b"),
pa.Check(is_positive_df),
pa.Check(is_positive_element_wise, element_wise=True),
]
)
lf = pl.LazyFrame({"a": [2, 3, 4], "b": [1, 2, 3]})
validated_df = lf.collect().pipe(schema_with_df_checks.validate)
print(validated_df)
shape: (3, 2)
┌─────┬─────┐
│ a ┆ b │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 2 ┆ 1 │
│ 3 ┆ 2 │
│ 4 ┆ 3 │
└─────┴─────┘
class ModelWithDFChecks(pa.DataFrameModel):
a: int
b: int
@pa.dataframe_check
def cola_gt_colb(cls, data: PolarsData) -> pl.LazyFrame:
"""Return a LazyFrame with a single boolean column."""
return data.lazyframe.select(pl.col("a").gt(pl.col("b")))
@pa.dataframe_check
def is_positive_df(cls, data: PolarsData) -> pl.LazyFrame:
"""Return a LazyFrame with multiple boolean columns."""
return data.lazyframe.select(pl.col("*").gt(0))
@pa.dataframe_check(element_wise=True)
def is_positive_element_wise(cls, x: int) -> bool:
"""Take a single value and return a boolean scalar."""
return x > 0
validated_df = lf.collect().pipe(ModelWithDFChecks.validate)
print(validated_df)
shape: (3, 2)
┌─────┬─────┐
│ a ┆ b │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 2 ┆ 1 │
│ 3 ┆ 2 │
│ 4 ┆ 3 │
└─────┴─────┘
使用LazyFrames进行数据级验证¶
如本页面之前提到的,默认情况下在 pl.LazyFrame 上调用 schema.validate 只会执行模式层级的验证检查。如果您想验证 pl.LazyFrame 上的数据层级属性,建议的方式是先调用 .collect():
class SimpleModel(pa.DataFrameModel):
a: int
lf: pl.LazyFrame = (
pl.LazyFrame({"a": [1.0, 2.0, 3.0]})
.cast({"a": pl.Int64})
.collect() # convert to pl.DataFrame
.pipe(SimpleModel.validate)
.lazy() # convert back to pl.LazyFrame
# do more lazy operations
)
这个语法很好,因为仅从阅读代码就能清楚地了解发生了什么。Pandera 模式作为方法链中数据实现的一个清晰点。
然而,如果你不介意一点魔法 🪄,你可以将PANDERA_VALIDATION_DEPTH环境变量设置为SCHEMA_AND_DATA,以在polars.LazyFrame上验证数据级属性。这将等同于上面的显式代码:
export PANDERA_VALIDATION_DEPTH=SCHEMA_AND_DATA
lf: pl.LazyFrame = (
pl.LazyFrame({"a": [1.0, 2.0, 3.0]})
.cast({"a": pl.Int64})
.pipe(SimpleModel.validate) # this will validate schema- and data-level properties
# do more lazy operations
)
在后台,验证过程将对LazyFrame进行.collect()调用,以运行数据级别的验证检查,并在验证完成后仍将返回pl.LazyFrame。
支持和不支持的功能¶
由于pandera-polars集成的成熟度低于pandas支持,因此pandera与pandas DataFrame提供的一些功能尚不支持polars DataFrame。
这里是支持和不支持功能的列表。您可以参考支持功能矩阵来查看哪些功能在polars验证后端中实现。