pandera.api.pyspark.container.DataFrameSchema

class pandera.api.pyspark.container.DataFrameSchema(columns=None, checks=None, dtype=None, coerce=False, strict=False, name=None, ordered=False, unique=None, report_duplicates='all', unique_column_names=False, title=None, description=None, metadata=None)[source]

一个轻量级的 PySpark DataFrame 验证器。

初始化 DataFrameSchema 校验器。

Parameters:
  • (映射 列名和列架构组件。) – 一个字典,其中键是列名,值是 列对象,指定特定列的数据类型和属性。

  • 检查 (可选[检查列表]) – 整个数据框的检查。

  • dtype (PySparkDtypeInputTypes) – 数据框的datatype。这将覆盖任何列中指定的数据类型。如果指定了一个字符串,则假定是有效的pyspark字符串值之一:https://spark.apache.org/docs/latest/sql-ref-datatypes.html

  • 强制转换 (布尔值) – 在验证时是否强制转换所有列。对 dtype=None 的列没有影响

  • 严格 (StrictType) – 确保架构中定义的所有列仅存在于数据框中。如果设置为‘过滤’,只有架构中的列会被传递到验证的数据框中。如果设置为过滤并且架构中定义的列不在数据框中,将抛出错误。

  • name (可选[str]) – 模式的名称。

  • ordered (bool) – 是否验证列的顺序。

  • unique (可选[合并[str, 列表[str]]]) – 一组应该共同唯一的列的列表。

  • report_duplicates (UniqueSettings) – 如何报告唯一错误 - exclude_first: 报告所有重复项,除了第一次出现 - exclude_last: 报告所有重复项,除了最后一次出现 - all: (默认)报告所有重复项

  • unique_column_names (bool) – 列名是否必须唯一。

  • 标题 (可选[str]) – 用于模式的人类可读标签。

  • 描述 (可选[str]) – 架构的任意文本描述。

  • 元数据 (可选[字典]) – 一个可选的键值数据。

Raises:

SchemaInitError – 如果无法从参数构建架构

Examples:

>>> import pandera.pyspark as psa
>>> import pyspark.sql.types as pt
>>>
>>> schema = psa.DataFrameSchema({
...     "str_column": psa.Column(str),
...     "float_column": psa.Column(float),
...     "int_column": psa.Column(int),
...     "date_column": psa.Column(pt.DateType),
... })

使用pyspark API定义检查,这需要一个具有以下签名的函数:ps.Dataframe -> Union[bool],输出包含布尔值。

>>> schema_withchecks = psa.DataFrameSchema({
...     "probability": psa.Column(
...         pt.DoubleType(), psa.Check.greater_than(0)),
...
...     # check that the "category" column contains a few discrete
...     # values, and the majority of the entries are dogs.
...     "category": psa.Column(
...         pt.StringType(), psa.Check.str_startswith("B"),
...            ),
... })

请查看 这里 获取更多使用详情。

属性

BACKEND_REGISTRY

coerce

是否将系列强制转换为指定类型。

dtype

获取 dtype 属性。

dtypes

一个字典,其中键是列名,值是该列的 DataType

properties

获取用于序列化的架构属性。

unique

应该共同唯一的列列表。

方法

__init__(columns=None, checks=None, dtype=None, coerce=False, strict=False, name=None, ordered=False, unique=None, report_duplicates='all', unique_column_names=False, title=None, description=None, metadata=None)[source]

初始化 DataFrameSchema 校验器。

Parameters:
  • (映射 列名和列架构组件。) – 一个字典,其中键是列名,值是 列对象,指定特定列的数据类型和属性。

  • 检查 (可选[检查列表]) – 整个数据框的检查。

  • dtype (PySparkDtypeInputTypes) – 数据框的datatype。这会覆盖任何列中指定的数据类型。如果指定了一个字符串,则假定为有效的pyspark字符串值之一:https://spark.apache.org/docs/latest/sql-ref-datatypes.html.

  • 强制转换 (布尔值) – 在验证时是否强制转换所有列。对 dtype=None 的列没有影响

  • 严格 (StrictType) – 确保架构中定义的所有列仅存在于数据框中。如果设置为‘过滤’,只有架构中的列会被传递到验证的数据框中。如果设置为过滤并且架构中定义的列不在数据框中,将抛出错误。

  • name (可选[str]) – 模式的名称。

  • ordered (bool) – 是否验证列的顺序。

  • unique (可选[联合[str, List[str]]]) – 应该共同唯一的列的列表。

  • report_duplicates (UniqueSettings) – 如何报告唯一的错误 - exclude_first: 报告所有重复项,除了第一次出现的 - exclude_last: 报告所有重复项,除了最后一次出现的 - all: (默认) 报告所有重复项

  • unique_column_names (bool) – 列名是否必须唯一。

  • 标题 (可选[str]) – 一个可供人类阅读的模式标签。

  • 描述 (可选[str]) – 模式的任意文本描述。

  • 元数据 (可选[字典]) – 一种可选的键值数据。

Raises:

SchemaInitError – 如果无法从参数构建架构

Examples:

>>> import pandera.pyspark as psa
>>> import pyspark.sql.types as pt
>>>
>>> schema = psa.DataFrameSchema({
...     "str_column": psa.Column(str),
...     "float_column": psa.Column(float),
...     "int_column": psa.Column(int),
...     "date_column": psa.Column(pt.DateType),
... })

使用pyspark API定义检查,它接受一个函数,签名为: ps.Dataframe -> Union[bool],其中输出包含布尔值。

>>> schema_withchecks = psa.DataFrameSchema({
...     "probability": psa.Column(
...         pt.DoubleType(), psa.Check.greater_than(0)),
...
...     # check that the "category" column contains a few discrete
...     # values, and the majority of the entries are dogs.
...     "category": psa.Column(
...         pt.StringType(), psa.Check.str_startswith("B"),
...            ),
... })

请查看 这里 获取更多使用详情。

coerce_dtype(check_obj)[source]

将对象强制转换为期望的类型。

Return type:

DataFrame

classmethod from_json(source)[source]

从json文件创建DataFrameSchema。

Parameters:

source – str,指向json模式的路径,或序列化的yaml字符串。

Return type:

DataFrameSchema

Returns:

数据框架模式。

classmethod from_yaml(yaml_schema)[source]

从yaml文件创建DataFrameSchema。

Parameters:

yaml_schema – str, yaml架构的路径,或序列化的yaml字符串。

Return type:

DataFrameSchema

Returns:

数据框架模式。

get_dtypes(dataframe)[source]

dtype属性相同,但根据提供的数据帧扩展regex == True的列。

Return type:

Dict[str, DataType]

Returns:

列及其关联数据类型的字典。

get_metadata()[source]

提供列和模式级别的元数据

Return type:

可选[字典, ]

static register_default_backends(check_obj_cls)[source]

注册默认后端。

此方法在get_backend方法中被调用,以便在验证时加载适当的验证后端,而不是在模式定义时。

该方法需要由模式子类实现。

to_ddl()[source]

将DataFrameSchema的字段恢复为Pyspark DDL字符串。

Return type:

str

Returns:

当前模式字段的字符串,以紧凑的DDL格式表示。

to_json(target: None = None, **kwargs) str[source]
to_json(target: PathLike, **kwargs) None

将 DataFrameSchema 写入 json 文件。

Parameters:

目标 (可选[路径类似, ]) – 要写入的文件目标。如果为无,则输出为字符串。

Return type:

可选的[str, None]

Returns:

如果目标是 None,则返回 json 字符串,否则返回 None。

to_script(fp=None)[source]

从yaml文件创建DataFrameSchema。

Parameters:

路径 – str,写入脚本的路径

Return type:

DataFrameSchema

Returns:

数据框架模式。

to_structtype()[source]

将DataFrameSchema的字段恢复为Pyspark StructType对象。

As the output of this method will be used to specify a read schema in Pyspark

(避免自动模式推断),False 可空 属性将被忽略,因为此检查将在数据集读取后由 Pandera 验证执行。

Return type:

StructType

Returns:

带有当前模式字段的StructType对象。

to_yaml(stream=None)[source]

将 DataFrameSchema 写入 yaml 文件。

Parameters:

(可选[路径类似, ]) – 要写入的文件流。如果为无,则转储为字符串。

Return type:

可选的[str, None]

Returns:

如果流为 None,则返回 yaml 字符串,否则返回 None。

validate(check_obj, head=None, tail=None, sample=None, random_state=None, lazy=True, inplace=False)[source]

检查数据框中的所有列是否在模式中具有列。

Parameters:
  • check_obj (DataFrame) – DataFrame对象,即要验证的数据框。

  • head (可选[int, None]) – 由于spark没有头或尾的概念,因此不使用

  • tail (可选[整数, ]) – 由于spark没有头或尾的概念,因此未被使用

  • sample (Optional[int, None]) – 验证 n% 行的随机样本。值的范围从 0-1,例如可以通过将设置值设为 0.1 来抽取 10% 的行。 请参阅以下文档。 https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.sample.html

  • random_state (可选[int, ]) – sample 参数的随机种子。

  • 懒惰 (bool) – 如果为 True,懒惰地对数据框进行所有验证检查,并引发一个 SchemaErrors。否则,立即抛出第一个出现的 SchemaError

  • inplace (bool) – 如果为 True,则对验证对象应用强制转换,否则创建数据的副本。

Returns:

验证过的 DataFrame

Raises:

SchemaError – 当 DataFrame 违反内置或自定义检查时。

Example:

调用 schema.validate 返回数据框。

>>> import pandera.pyspark as psa
>>> from pyspark.sql import SparkSession
>>> import pyspark.sql.types as T
>>> spark = SparkSession.builder.getOrCreate()
>>>
>>> data = [("Bread", 9), ("Butter", 15)]
>>> spark_schema = T.StructType(
...         [
...             T.StructField("product", T.StringType(), False),
...             T.StructField("price", T.IntegerType(), False),
...         ],
...     )
>>> df = spark.createDataFrame(data=data, schema=spark_schema)
>>>
>>> schema_withchecks = psa.DataFrameSchema(
...         columns={
...             "product": psa.Column("str", checks=psa.Check.str_startswith("B")),
...             "price": psa.Column("int", checks=psa.Check.gt(5)),
...         },
...         name="product_schema",
...         description="schema for product info",
...         title="ProductSchema",
...     )
>>>
>>> schema_withchecks.validate(df).take(2)
    [Row(product='Bread', price=9), Row(product='Butter', price=15)]
__call__(dataframe, head=None, tail=None, sample=None, random_state=None, lazy=True, inplace=False)[source]

表示DataFrameSchema.validate()方法的别名。

Parameters:
  • dataframe (DataFrame) – DataFrame对象,即要验证的dataframe。

  • head (int) – 由于spark没有head或tail的概念,因此未使用。

  • tail (int) – 由于spark没有head或tail的概念,因此未使用。

  • 示例 (可选[整型, ]) – 验证n%行的随机样本。值范围从 0-1,例如可以通过将设置值设为0.1来抽取10%的行。请参见以下文档。https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.sample.html

  • 懒惰 (bool) – 如果为 True,懒惰地对数据框进行所有验证检查,并引发一个 SchemaErrors。否则,立即抛出第一个出现的 SchemaError

  • inplace (bool) – 如果为 True,则对验证对象应用强制转换,否则创建数据的副本。