使用 Polars 进行数据验证

在 0.19.0 中新增

Polars 是一个用于操作结构化数据的极快的 DataFrame 库。由于核心是用 Rust 编写的,因此您可以获得 C/C++ 的性能,同时在其他语言(如 Python)中提供 SDK。

用法

通过 polars 集成,您可以定义 pandera 模式来验证 Python 中的 polars 数据框。首先,使用 polars 附加安装 pandera

pip install 'pandera[polars]'

注意

截至 pandera >= 0.21.0,仅支持 polars >= 1.0.0

重要

如果您使用的是 Apple Silicon 机器,您需要通过 pip install polars-lts-cpu 安装 polars。

如果已经安装了 polars,您可能需要删除它:

pip uninstall polars
pip install polars-lts-cpu

然后您可以使用 pandera 模式来验证 polars 数据帧。在下面的示例中,我们将使用 基于类的 API 来定义一个 DataFrameModel,然后我们使用它来验证一个 polars.LazyFrame 对象。

import pandera.polars as pa
import polars as pl


class Schema(pa.DataFrameModel):
    state: str
    city: str
    price: int = pa.Field(in_range={"min_value": 5, "max_value": 20})


lf = pl.LazyFrame(
    {
        'state': ['FL','FL','FL','CA','CA','CA'],
        'city': [
            'Orlando',
            'Miami',
            'Tampa',
            'San Francisco',
            'Los Angeles',
            'San Diego',
        ],
        'price': [8, 12, 10, 16, 20, 18],
    }
)
Schema.validate(lf).collect()
shape: (6, 3)
状态城市价格
字符串字符串整型64
"FL""奥兰多"8
"FL""迈阿密"12
"FL""坦帕"10
"CA""旧金山"16
"CA""洛杉矶"20
"加利福尼亚""圣地亚哥"18

您还可以使用 check_types() 装饰器在运行时验证 polars LazyFrame 函数注解:

from pandera.typing.polars import LazyFrame

@pa.check_types
def function(lf: LazyFrame[Schema]) -> LazyFrame[Schema]:
    return lf.filter(pl.col("state").eq("CA"))

function(lf).collect()
shape: (3, 3)
状态城市价格
strstr整型64
"CA""旧金山"16
"CA""洛杉矶"20
"加利福尼亚""圣地亚哥"18

当然,您可以使用基于对象的API来定义一个 DataFrameSchema

schema = pa.DataFrameSchema({
    "state": pa.Column(str),
    "city": pa.Column(str),
    "price": pa.Column(int, pa.Check.in_range(min_value=5, max_value=20))
})
schema.validate(lf).collect()
shape: (6, 3)
状态城市价格
strstr整型64
"FL""奥兰多"8
"FL""迈阿密"12
"FL""坦帕"10
"CA""旧金山"16
"CA""洛杉矶"20
"加利福尼亚""圣地亚哥"18

您还可以验证 polars.DataFrame 对象,这些对象 会立即执行计算。在后台, panderapolars.DataFrame 转换为 polars.LazyFrame 然后进行验证。这是为了 使 pandera 实现的内部验证例程能够利用 polars 懒惰 API 提供的优化。

df: pl.DataFrame = lf.collect()
schema.validate(df)
shape: (6, 3)
状态城市价格
strstri64
"FL""奥兰多"8
"FL""迈阿密"12
"FL""坦帕"10
"CA""旧金山"16
"CA""洛杉矶"20
"加利福尼亚""圣地亚哥"18

为测试合成数据

警告

数据合成策略 功能尚不支持在 polars 集成中。此时,您可以使用 polars 原生 参数检验 函数来为 polars 生成测试数据。

它是如何工作的

pandera处理pandas数据框的方式相比,pandera尽可能利用polars 惰性 API,以利用其查询优化的好处。

在高层次上,这就是模式验证期间发生的事情:

  • 应用解析器: 如果 add_missing_columns=True,添加缺失的列,coerce=True 时强制数据类型,strict="filter" 时过滤列,如果 default= 设置默认值。

  • 应用检查: 在数据上运行所有核心、内置和自定义检查。元数据的检查是在没有 .collect() 操作的情况下进行的,但检查数据值的检查是需要的。

  • 引发错误: 如果发现数据错误,将引发一个 SchemaError。如果 validate(..., lazy=True),将引发一个 SchemaErrors 异常,其中包含数据中所有的验证错误。

  • 返回经过验证的输出: 如果未发现数据错误,则返回经过验证的对象

注意

pl.LazyFrame 对象上进行数据类型强制转换时,无需进行 .collect() 操作,但在 pl.DataFrame 上的强制转换将会导致更多的信息性错误消息,因为所有失败案例都可以被报告。

pandera的验证行为与polars处理惰性与急切操作的方式一致。当你在polars.LazyFrame上调用schema.validate()时,pandera将应用所有可以在不进行任何collect()操作的情况下完成的解析器和检查。这意味着它仅在模式级别进行验证,例如,列名和数据类型。

然而,如果您验证一个 polars.DataFramepandera 会执行 模式级和数据级验证。

注意

在后台, pandera 会将 polars.DataFrame 转换为 polars.LazyFrame 以便进行验证。这是为了在验证过程中利用 polars 延迟 API。虽然此功能在 pandera 库中尚未完全优化,但此设计决策为未来的性能改进奠定了基础。

LazyFrame 方法链

import pandera.polars as pa
import polars as pl

schema = pa.DataFrameSchema({"a": pa.Column(int)})

df = (
    pl.LazyFrame({"a": [1.0, 2.0, 3.0]})
    .cast({"a": pl.Int64})
    .pipe(schema.validate) # this only validates schema-level properties
    .with_columns(b=pl.lit("a"))
    # do more lazy operations
    .collect()
)
print(df)
shape: (3, 2)
┌─────┬─────┐
│ a   ┆ b   │
│ --- ┆ --- │
│ i64 ┆ str │
╞═════╪═════╡
│ 1   ┆ a   │
│ 2   ┆ a   │
│ 3   ┆ a   │
└─────┴─────┘
import pandera.polars as pa
import polars as pl

class SimpleModel(pa.DataFrameModel):
    a: int

df = (
    pl.LazyFrame({"a": [1.0, 2.0, 3.0]})
    .cast({"a": pl.Int64})
    .pipe(SimpleModel.validate) # this only validates schema-level properties
    .with_columns(b=pl.lit("a"))
    # do more lazy operations
    .collect()
)
print(df)
shape: (3, 2)
┌─────┬─────┐
│ a   ┆ b   │
│ --- ┆ --- │
│ i64 ┆ str │
╞═════╪═════╡
│ 1   ┆ a   │
│ 2   ┆ a   │
│ 3   ┆ a   │
└─────┴─────┘

DataFrame 方法链

schema = pa.DataFrameSchema({"a": pa.Column(int)})

df = (
    pl.DataFrame({"a": [1.0, 2.0, 3.0]})
    .cast({"a": pl.Int64})
    .pipe(schema.validate) # this validates schema- and data- level properties
    .with_columns(b=pl.lit("a"))
    # do more eager operations
)
print(df)
shape: (3, 2)
┌─────┬─────┐
│ a   ┆ b   │
│ --- ┆ --- │
│ i64 ┆ str │
╞═════╪═════╡
│ 1   ┆ a   │
│ 2   ┆ a   │
│ 3   ┆ a   │
└─────┴─────┘
class SimpleModel(pa.DataFrameModel):
    a: int

df = (
    pl.DataFrame({"a": [1.0, 2.0, 3.0]})
    .cast({"a": pl.Int64})
    .pipe(SimpleModel.validate) # this validates schema- and data- level properties
    .with_columns(b=pl.lit("a"))
    # do more eager operations
)
print(df)
shape: (3, 2)
┌─────┬─────┐
│ a   ┆ b   │
│ --- ┆ --- │
│ i64 ┆ str │
╞═════╪═════╡
│ 1   ┆ a   │
│ 2   ┆ a   │
│ 3   ┆ a   │
└─────┴─────┘

错误报告

在验证错误发生时, pandera 将主动引发一个 SchemaError

class SimpleModel(pa.DataFrameModel):
    a: int

invalid_lf = pl.LazyFrame({"a": pl.Series(["1", "2", "3"], dtype=pl.Utf8)})
try:
    SimpleModel.validate(invalid_lf)
except pa.errors.SchemaError as exc:
    print(exc)
expected column 'a' to have type Int64, got String

如果使用惰性验证,pandera 将抛出一个 SchemaErrors 异常。这在你想收集数据中所有的验证错误时特别有用。

注意

惰性验证在pandera中与polars中的惰性API不同,这是一个不幸的名称冲突。惰性验证意味着在引发SchemaErrors异常之前,对数据应用所有解析器和检查。polars中的惰性API允许您构建计算图,而无需实际在线执行它,在这里您调用.collect()来真正执行计算。

默认情况下,pl.LazyFrame 验证将仅验证架构层级属性:

class ModelWithChecks(pa.DataFrameModel):
    a: int
    b: str = pa.Field(isin=[*"abc"])
    c: float = pa.Field(ge=0.0, le=1.0)

invalid_lf = pl.LazyFrame({
    "a": pl.Series(["1", "2", "3"], dtype=pl.Utf8),
    "b": ["d", "e", "f"],
    "c": [0.0, 1.1, -0.1],
})
ModelWithChecks.validate(invalid_lf, lazy=True)
Traceback (most recent call last):
...
pandera.errors.SchemaErrors: {
    "SCHEMA": {
        "WRONG_DATATYPE": [
            {
                "schema": "ModelWithChecks",
                "column": "a",
                "check": "dtype('Int64')",
                "error": "expected column 'a' to have type Int64, got String"
            }
        ]
    }
}

默认情况下,pl.DataFrame 验证将验证架构级别和数据级别属性:

class ModelWithChecks(pa.DataFrameModel):
    a: int
    b: str = pa.Field(isin=[*"abc"])
    c: float = pa.Field(ge=0.0, le=1.0)

invalid_lf = pl.DataFrame({
    "a": pl.Series(["1", "2", "3"], dtype=pl.Utf8),
    "b": ["d", "e", "f"],
    "c": [0.0, 1.1, -0.1],
})
ModelWithChecks.validate(invalid_lf, lazy=True)
Traceback (most recent call last):
...
pandera.errors.SchemaErrors: {
    "SCHEMA": {
        "WRONG_DATATYPE": [
            {
                "schema": "ModelWithChecks",
                "column": "a",
                "check": "dtype('Int64')",
                "error": "expected column 'a' to have type Int64, got String"
            }
        ]
    },
    "DATA": {
        "DATAFRAME_CHECK": [
            {
                "schema": "ModelWithChecks",
                "column": "b",
                "check": "isin(['a', 'b', 'c'])",
                "error": "Column 'b' failed validator number 0: <Check isin: isin(['a', 'b', 'c'])> failure case examples: [{'b': 'd'}, {'b': 'e'}, {'b': 'f'}]"
            },
            {
                "schema": "ModelWithChecks",
                "column": "c",
                "check": "greater_than_or_equal_to(0.0)",
                "error": "Column 'c' failed validator number 0: <Check greater_than_or_equal_to: greater_than_or_equal_to(0.0)> failure case examples: [{'c': -0.1}]"
            },
            {
                "schema": "ModelWithChecks",
                "column": "c",
                "check": "less_than_or_equal_to(1.0)",
                "error": "Column 'c' failed validator number 1: <Check less_than_or_equal_to: less_than_or_equal_to(1.0)> failure case examples: [{'c': 1.1}]"
            }
        ]
    }
}

支持的数据类型

pandera 目前支持所有的 polars 数据类型。 内置的 python 类型如 strintfloatbool 将被 以 polars 处理它们的方式进行处理:

assert pl.Series([1,2,3], dtype=int).dtype == pl.Int64
assert pl.Series([*"abc"], dtype=str).dtype == pl.Utf8
assert pl.Series([1.0, 2.0, 3.0], dtype=float).dtype == pl.Float64

因此,以下架构是等价的:

schema1 = pa.DataFrameSchema({
    "a": pa.Column(int),
    "b": pa.Column(str),
    "c": pa.Column(float),
})

schema2 = pa.DataFrameSchema({
    "a": pa.Column(pl.Int64),
    "b": pa.Column(pl.Utf8),
    "c": pa.Column(pl.Float64),
})

assert schema1 == schema2

嵌套类型

Polars 嵌套数据类型也通过 参数化数据类型 得到支持。请参见下面的示例,了解通过基于对象和基于类的 API 指定这一点的不同方法:

schema = pa.DataFrameSchema(
    {
        "list_col": pa.Column(pl.List(pl.Int64())),
        "array_col": pa.Column(pl.Array(pl.Int64(), 3)),
        "struct_col": pa.Column(pl.Struct({"a": pl.Utf8(), "b": pl.Float64()})),
    },
)
try:
    from typing import Annotated  # python 3.9+
except ImportError:
    from typing_extensions import Annotated

class ModelWithAnnotated(pa.DataFrameModel):
    list_col: Annotated[pl.List, pl.Int64()]
    array_col: Annotated[pl.Array, pl.Int64(), 3]
    struct_col: Annotated[pl.Struct, {"a": pl.Utf8(), "b": pl.Float64()}]
class ModelWithDtypeKwargs(pa.DataFrameModel):
    list_col: pl.List = pa.Field(dtype_kwargs={"inner": pl.Int64()})
    array_col: pl.Array = pa.Field(dtype_kwargs={"inner": pl.Int64(), "width": 3})
    struct_col: pl.Struct = pa.Field(dtype_kwargs={"fields": {"a": pl.Utf8(), "b": pl.Float64()}})

与时间无关的日期时间

在某些用例中,包含 pl.DateTime 数据的列是否具有时区可能无关紧要。在这种情况下,您可以使用 pandera-native polars 数据类型:

from pandera.engines.polars_engine import DateTime


schema = pa.DataFrameSchema({
    "created_at": pa.Column(DateTime(time_zone_agnostic=True)),
})
from pandera.engines.polars_engine import DateTime


class DateTimeModel(pa.DataFrameModel):
    created_at: Annotated[DateTime, True, "us", None]

.

注意

对于 Annotated 类型,您需要传入所有的位置参数和关键字参数。

from pandera.engines.polars_engine import DateTime


class DateTimeModel(pa.DataFrameModel):
    created_at: DateTime = pa.Field(dtype_kwargs={"time_zone_agnostic": True})

自定义检查

所有内置的 Check 方法在 polars 集成中都受支持。

要创建自定义检查,您可以创建以PolarsData命名元组作为输入并生成polars.LazyFrame作为输出的函数。PolarsData包含两个属性:

  • 一个 lazyframe 属性,其中包含您想要验证的 polars.LazyFrame 对象。

  • 一个 key 属性,包含您想要验证的列名称。这将在数据框级检查中是 None

逐元素检查也可以通过设置 element_wise=True 来支持。这将需要一个函数,该函数接受列/数据框中的单个元素并返回一个布尔标量,指示传递的值是否符合要求。

警告

在内部,元素级检查使用 map_elements 函数,这比原生的polars表达式API慢。

列级检查

这是一个列级自定义检查的示例:

from pandera.polars import PolarsData


def is_positive_vector(data: PolarsData) -> pl.LazyFrame:
    """Return a LazyFrame with a single boolean column."""
    return data.lazyframe.select(pl.col(data.key).gt(0))

def is_positive_scalar(data: PolarsData) -> pl.LazyFrame:
    """Return a LazyFrame with a single boolean scalar."""
    return data.lazyframe.select(pl.col(data.key).gt(0).all())

def is_positive_element_wise(x: int) -> bool:
    """Take a single value and return a boolean scalar."""
    return x > 0

schema_with_custom_checks = pa.DataFrameSchema({
    "a": pa.Column(
        int,
        checks=[
            pa.Check(is_positive_vector),
            pa.Check(is_positive_scalar),
            pa.Check(is_positive_element_wise, element_wise=True),
        ]
    )
})

lf = pl.LazyFrame({"a": [1, 2, 3]})
validated_df = lf.collect().pipe(schema_with_custom_checks.validate)
print(validated_df)
shape: (3, 1)
┌─────┐
│ a   │
│ --- │
│ i64 │
╞═════╡
│ 1   │
│ 2   │
│ 3   │
└─────┘
from pandera.polars import PolarsData


class ModelWithCustomChecks(pa.DataFrameModel):
    a: int

    @pa.check("a")
    def is_positive_vector(cls, data: PolarsData) -> pl.LazyFrame:
        """Return a LazyFrame with a single boolean column."""
        return data.lazyframe.select(pl.col(data.key).gt(0))

    @pa.check("a")
    def is_positive_scalar(cls, data: PolarsData) -> pl.LazyFrame:
        """Return a LazyFrame with a single boolean scalar."""
        return data.lazyframe.select(pl.col(data.key).gt(0).all())

    @pa.check("a", element_wise=True)
    def is_positive_element_wise(cls, x: int) -> bool:
        """Take a single value and return a boolean scalar."""
        return x > 0

validated_df = lf.collect().pipe(ModelWithCustomChecks.validate)
print(validated_df)
shape: (3, 1)
┌─────┐
│ a   │
│ --- │
│ i64 │
╞═════╡
│ 1   │
│ 2   │
│ 3   │
└─────┘

对于列级检查,自定义检查函数应返回一个 polars.LazyFrame,该框架包含一个布尔列或一个布尔标量。

数据框级检查

如果你需要验证整个数据框中的值,可以在数据框级别指定一个检查。期望的输出是一个 polars.LazyFrame,包含多个布尔列、单个布尔列或一个标量布尔值。

def col1_gt_col2(data: PolarsData, col1: str, col2: str) -> pl.LazyFrame:
    """Return a LazyFrame with a single boolean column."""
    return data.lazyframe.select(pl.col(col1).gt(pl.col(col2)))

def is_positive_df(data: PolarsData) -> pl.LazyFrame:
    """Return a LazyFrame with multiple boolean columns."""
    return data.lazyframe.select(pl.col("*").gt(0))

def is_positive_element_wise(x: int) -> bool:
    """Take a single value and return a boolean scalar."""
    return x > 0

schema_with_df_checks = pa.DataFrameSchema(
    columns={
        "a": pa.Column(int),
        "b": pa.Column(int),
    },
    checks=[
        pa.Check(col1_gt_col2, col1="a", col2="b"),
        pa.Check(is_positive_df),
        pa.Check(is_positive_element_wise, element_wise=True),
    ]
)

lf = pl.LazyFrame({"a": [2, 3, 4], "b": [1, 2, 3]})
validated_df = lf.collect().pipe(schema_with_df_checks.validate)
print(validated_df)
shape: (3, 2)
┌─────┬─────┐
│ a   ┆ b   │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 2   ┆ 1   │
│ 3   ┆ 2   │
│ 4   ┆ 3   │
└─────┴─────┘
class ModelWithDFChecks(pa.DataFrameModel):
    a: int
    b: int

    @pa.dataframe_check
    def cola_gt_colb(cls, data: PolarsData) -> pl.LazyFrame:
        """Return a LazyFrame with a single boolean column."""
        return data.lazyframe.select(pl.col("a").gt(pl.col("b")))

    @pa.dataframe_check
    def is_positive_df(cls, data: PolarsData) -> pl.LazyFrame:
        """Return a LazyFrame with multiple boolean columns."""
        return data.lazyframe.select(pl.col("*").gt(0))

    @pa.dataframe_check(element_wise=True)
    def is_positive_element_wise(cls, x: int) -> bool:
        """Take a single value and return a boolean scalar."""
        return x > 0

validated_df = lf.collect().pipe(ModelWithDFChecks.validate)
print(validated_df)
shape: (3, 2)
┌─────┬─────┐
│ a   ┆ b   │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 2   ┆ 1   │
│ 3   ┆ 2   │
│ 4   ┆ 3   │
└─────┴─────┘

使用LazyFrames进行数据级验证

如本页面之前提到的,默认情况下在 pl.LazyFrame 上调用 schema.validate 只会执行模式层级的验证检查。如果您想验证 pl.LazyFrame 上的数据层级属性,建议的方式是先调用 .collect()

class SimpleModel(pa.DataFrameModel):
        a: int

lf: pl.LazyFrame = (
    pl.LazyFrame({"a": [1.0, 2.0, 3.0]})
    .cast({"a": pl.Int64})
    .collect()  # convert to pl.DataFrame
    .pipe(SimpleModel.validate)
    .lazy()     # convert back to pl.LazyFrame
    # do more lazy operations
)

这个语法很好,因为仅从阅读代码就能清楚地了解发生了什么。Pandera 模式作为方法链中数据实现的一个清晰点。

然而,如果你不介意一点魔法 🪄,你可以将PANDERA_VALIDATION_DEPTH环境变量设置为SCHEMA_AND_DATA,以在polars.LazyFrame上验证数据级属性。这将等同于上面的显式代码:

export PANDERA_VALIDATION_DEPTH=SCHEMA_AND_DATA
lf: pl.LazyFrame = (
    pl.LazyFrame({"a": [1.0, 2.0, 3.0]})
    .cast({"a": pl.Int64})
    .pipe(SimpleModel.validate)  # this will validate schema- and data-level properties
    # do more lazy operations
)

在后台,验证过程将对LazyFrame进行.collect()调用,以运行数据级别的验证检查,并在验证完成后仍将返回pl.LazyFrame

支持和不支持的功能

由于pandera-polars集成的成熟度低于pandas支持,因此pandera与pandas DataFrame提供的一些功能尚不支持polars DataFrame。

这里是支持和不支持功能的列表。您可以参考支持功能矩阵来查看哪些功能在polars验证后端中实现。