pandera.api.pyspark.model.DataFrameModel

class pandera.api.pyspark.model.DataFrameModel(*args, **kwargs)[source]

一个DataFrameSchema的定义。

0.16.0版本新增

请参阅 用户指南 以获取更多信息。

检查数据框中的所有列是否在模式中具有列。

Parameters:
  • check_obj – DataFrame 对象,即要验证的数据框。

  • head – 由于spark没有head或tail的概念,因此未使用

  • tail – 自从Spark没有头或尾的概念以来未使用

  • sample – 验证 n% 行的随机样本。值的范围为 0-1,例如可以使用设置值为 0.1 采样 10% 的行。请参阅以下文档。https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.sample.html

  • random_statesample 参数的随机种子。

  • lazy – 如果为 True,则懒惰地对数据框进行所有验证检查,并引发一个 SchemaErrors。否则,一旦发生错误,立即引发 SchemaError

  • inplace – 如果为True,应用强制转换到验证对象, 否则创建数据的副本。

Returns:

验证过的 DataFrame

Raises:

SchemaError – 当 DataFrame 违反内置或自定义检查时。

Example:

调用 schema.validate 返回数据框。

>>> import pandera.pyspark as psa
>>> from pyspark.sql import SparkSession
>>> import pyspark.sql.types as T
>>> spark = SparkSession.builder.getOrCreate()
>>>
>>> data = [("Bread", 9), ("Butter", 15)]
>>> spark_schema = T.StructType(
...         [
...             T.StructField("product", T.StringType(), False),
...             T.StructField("price", T.IntegerType(), False),
...         ],
...     )
>>> df = spark.createDataFrame(data=data, schema=spark_schema)
>>>
>>> schema_withchecks = psa.DataFrameSchema(
...         columns={
...             "product": psa.Column("str", checks=psa.Check.str_startswith("B")),
...             "price": psa.Column("int", checks=psa.Check.gt(5)),
...         },
...         name="product_schema",
...         description="schema for product info",
...         title="ProductSchema",
...     )
>>>
>>> schema_withchecks.validate(df).take(2)
    [Row(product='Bread', price=9), Row(product='Butter', price=15)]

方法

classmethod get_metadata()[source]

提供列和模式级别的元数据

Return type:

可选[字典, ]

classmethod pydantic_validate(schema_model)[source]

验证输入是否为兼容的数据框模型。

Return type:

DataFrameModel

classmethod to_ddl()[source]

将DataFrameModel的字段恢复为Pyspark DDL字符串。

Return type:

str

Returns:

当前模型字段的字符串,以紧凑的 DDL 格式表示。

classmethod to_schema()[source]

DataFrameModel 创建 DataFrameSchema

Return type:

DataFrameSchema

classmethod to_structtype()[source]

将DataFrameModel的字段恢复为Pyspark StructType对象。

Return type:

StructType

Returns:

具有当前模型字段的StructType对象。

classmethod to_yaml(stream=None)[source]

Schema 转换为 yaml 使用 io.to_yaml

classmethod validate(check_obj, head=None, tail=None, sample=None, random_state=None, lazy=True, inplace=False)[source]

检查数据框中的所有列是否在模式中具有列。

Parameters:
  • check_obj (DataFrame) – DataFrame对象,即要验证的数据框。

  • head (可选[int, None]) – 由于spark没有头或尾的概念,因此不使用

  • tail (可选[整数, ]) – 由于spark没有头或尾的概念,因此未被使用

  • 示例 (可选[整型, ]) – 验证n%行的随机样本。值范围从 0-1,例如可以通过将设置值设为0.1来抽取10%的行。请参见以下文档。https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.sample.html

  • random_state (可选[int, ]) – sample 参数的随机种子。

  • 懒惰 (bool) – 如果为 True,懒惰地对数据框进行所有验证检查,并引发一个 SchemaErrors。否则,立即抛出第一个出现的 SchemaError

  • inplace (bool) – 如果为 True,则对验证对象应用强制转换,否则创建数据的副本。

Return type:

可选的[数据框基类[~TDataFrameModel], ]

Returns:

验证过的 DataFrame

Raises:

SchemaError – 当 DataFrame 违反内置或自定义检查时。

Example:

调用 schema.validate 返回数据框。

>>> import pandera.pyspark as psa
>>> from pyspark.sql import SparkSession
>>> import pyspark.sql.types as T
>>> spark = SparkSession.builder.getOrCreate()
>>>
>>> data = [("Bread", 9), ("Butter", 15)]
>>> spark_schema = T.StructType(
...         [
...             T.StructField("product", T.StringType(), False),
...             T.StructField("price", T.IntegerType(), False),
...         ],
...     )
>>> df = spark.createDataFrame(data=data, schema=spark_schema)
>>>
>>> schema_withchecks = psa.DataFrameSchema(
...         columns={
...             "product": psa.Column("str", checks=psa.Check.str_startswith("B")),
...             "price": psa.Column("int", checks=psa.Check.gt(5)),
...         },
...         name="product_schema",
...         description="schema for product info",
...         title="ProductSchema",
...     )
>>>
>>> schema_withchecks.validate(df).take(2)
    [Row(product='Bread', price=9), Row(product='Butter', price=15)]