pandera.api.pyspark.container.DataFrameSchema¶
- class pandera.api.pyspark.container.DataFrameSchema(columns=None, checks=None, dtype=None, coerce=False, strict=False, name=None, ordered=False, unique=None, report_duplicates='all', unique_column_names=False, title=None, description=None, metadata=None)[source]¶
一个轻量级的 PySpark DataFrame 验证器。
初始化 DataFrameSchema 校验器。
- Parameters:
列 (映射 列名和列架构组件。) – 一个字典,其中键是列名,值是 列对象,指定特定列的数据类型和属性。
检查 (可选[检查列表]) – 整个数据框的检查。
dtype (PySparkDtypeInputTypes) – 数据框的datatype。这将覆盖任何列中指定的数据类型。如果指定了一个字符串,则假定是有效的pyspark字符串值之一:https://spark.apache.org/docs/latest/sql-ref-datatypes.html。
强制转换 (布尔值) – 在验证时是否强制转换所有列。对
dtype=None的列没有影响严格 (StrictType) – 确保架构中定义的所有列仅存在于数据框中。如果设置为‘过滤’,只有架构中的列会被传递到验证的数据框中。如果设置为过滤并且架构中定义的列不在数据框中,将抛出错误。
name (可选[str]) – 模式的名称。
ordered (bool) – 是否验证列的顺序。
report_duplicates (UniqueSettings) – 如何报告唯一错误 - exclude_first: 报告所有重复项,除了第一次出现 - exclude_last: 报告所有重复项,除了最后一次出现 - all: (默认)报告所有重复项
unique_column_names (bool) – 列名是否必须唯一。
标题 (可选[str]) – 用于模式的人类可读标签。
描述 (可选[str]) – 架构的任意文本描述。
元数据 (可选[字典]) – 一个可选的键值数据。
- Raises:
SchemaInitError – 如果无法从参数构建架构
- Examples:
>>> import pandera.pyspark as psa >>> import pyspark.sql.types as pt >>> >>> schema = psa.DataFrameSchema({ ... "str_column": psa.Column(str), ... "float_column": psa.Column(float), ... "int_column": psa.Column(int), ... "date_column": psa.Column(pt.DateType), ... })
使用pyspark API定义检查,这需要一个具有以下签名的函数:
ps.Dataframe -> Union[bool],输出包含布尔值。>>> schema_withchecks = psa.DataFrameSchema({ ... "probability": psa.Column( ... pt.DoubleType(), psa.Check.greater_than(0)), ... ... # check that the "category" column contains a few discrete ... # values, and the majority of the entries are dogs. ... "category": psa.Column( ... pt.StringType(), psa.Check.str_startswith("B"), ... ), ... })
请查看 这里 获取更多使用详情。
属性
BACKEND_REGISTRYcoerce是否将系列强制转换为指定类型。
dtype获取 dtype 属性。
dtypes一个字典,其中键是列名,值是该列的
DataType。properties获取用于序列化的架构属性。
unique应该共同唯一的列列表。
方法
- __init__(columns=None, checks=None, dtype=None, coerce=False, strict=False, name=None, ordered=False, unique=None, report_duplicates='all', unique_column_names=False, title=None, description=None, metadata=None)[source]¶
初始化 DataFrameSchema 校验器。
- Parameters:
列 (映射 列名和列架构组件。) – 一个字典,其中键是列名,值是 列对象,指定特定列的数据类型和属性。
检查 (可选[检查列表]) – 整个数据框的检查。
dtype (PySparkDtypeInputTypes) – 数据框的datatype。这会覆盖任何列中指定的数据类型。如果指定了一个字符串,则假定为有效的pyspark字符串值之一:https://spark.apache.org/docs/latest/sql-ref-datatypes.html.
强制转换 (布尔值) – 在验证时是否强制转换所有列。对
dtype=None的列没有影响严格 (StrictType) – 确保架构中定义的所有列仅存在于数据框中。如果设置为‘过滤’,只有架构中的列会被传递到验证的数据框中。如果设置为过滤并且架构中定义的列不在数据框中,将抛出错误。
name (可选[str]) – 模式的名称。
ordered (bool) – 是否验证列的顺序。
report_duplicates (UniqueSettings) – 如何报告唯一的错误 - exclude_first: 报告所有重复项,除了第一次出现的 - exclude_last: 报告所有重复项,除了最后一次出现的 - all: (默认) 报告所有重复项
unique_column_names (bool) – 列名是否必须唯一。
标题 (可选[str]) – 一个可供人类阅读的模式标签。
描述 (可选[str]) – 模式的任意文本描述。
元数据 (可选[字典]) – 一种可选的键值数据。
- Raises:
SchemaInitError – 如果无法从参数构建架构
- Examples:
>>> import pandera.pyspark as psa >>> import pyspark.sql.types as pt >>> >>> schema = psa.DataFrameSchema({ ... "str_column": psa.Column(str), ... "float_column": psa.Column(float), ... "int_column": psa.Column(int), ... "date_column": psa.Column(pt.DateType), ... })
使用pyspark API定义检查,它接受一个函数,签名为:
ps.Dataframe -> Union[bool],其中输出包含布尔值。>>> schema_withchecks = psa.DataFrameSchema({ ... "probability": psa.Column( ... pt.DoubleType(), psa.Check.greater_than(0)), ... ... # check that the "category" column contains a few discrete ... # values, and the majority of the entries are dogs. ... "category": psa.Column( ... pt.StringType(), psa.Check.str_startswith("B"), ... ), ... })
请查看 这里 获取更多使用详情。
- classmethod from_json(source)[source]¶
从json文件创建DataFrameSchema。
- Parameters:
source – str,指向json模式的路径,或序列化的yaml字符串。
- Return type:
- Returns:
数据框架模式。
- classmethod from_yaml(yaml_schema)[source]¶
从yaml文件创建DataFrameSchema。
- Parameters:
yaml_schema – str, yaml架构的路径,或序列化的yaml字符串。
- Return type:
- Returns:
数据框架模式。
- static register_default_backends(check_obj_cls)[source]¶
注册默认后端。
此方法在get_backend方法中被调用,以便在验证时加载适当的验证后端,而不是在模式定义时。
该方法需要由模式子类实现。
- to_ddl()[source]¶
将DataFrameSchema的字段恢复为Pyspark DDL字符串。
- Return type:
- Returns:
当前模式字段的字符串,以紧凑的DDL格式表示。
- to_json(target: None = None, **kwargs) str[source]¶
- to_json(target: PathLike, **kwargs) None
将 DataFrameSchema 写入 json 文件。
- to_script(fp=None)[source]¶
从yaml文件创建DataFrameSchema。
- Parameters:
路径 – str,写入脚本的路径
- Return type:
- Returns:
数据框架模式。
- to_structtype()[source]¶
将DataFrameSchema的字段恢复为Pyspark StructType对象。
- As the output of this method will be used to specify a read schema in Pyspark
(避免自动模式推断),False 可空 属性将被忽略,因为此检查将在数据集读取后由 Pandera 验证执行。
- Return type:
- Returns:
带有当前模式字段的StructType对象。
- validate(check_obj, head=None, tail=None, sample=None, random_state=None, lazy=True, inplace=False)[source]¶
检查数据框中的所有列是否在模式中具有列。
- Parameters:
check_obj (
DataFrame) – DataFrame对象,即要验证的数据框。sample (
Optional[int,None]) – 验证 n% 行的随机样本。值的范围从 0-1,例如可以通过将设置值设为 0.1 来抽取 10% 的行。 请参阅以下文档。 https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.sample.html懒惰 (
bool) – 如果为 True,懒惰地对数据框进行所有验证检查,并引发一个SchemaErrors。否则,立即抛出第一个出现的SchemaError。inplace (
bool) – 如果为 True,则对验证对象应用强制转换,否则创建数据的副本。
- Returns:
验证过的
DataFrame- Raises:
SchemaError – 当
DataFrame违反内置或自定义检查时。- Example:
调用
schema.validate返回数据框。>>> import pandera.pyspark as psa >>> from pyspark.sql import SparkSession >>> import pyspark.sql.types as T >>> spark = SparkSession.builder.getOrCreate() >>> >>> data = [("Bread", 9), ("Butter", 15)] >>> spark_schema = T.StructType( ... [ ... T.StructField("product", T.StringType(), False), ... T.StructField("price", T.IntegerType(), False), ... ], ... ) >>> df = spark.createDataFrame(data=data, schema=spark_schema) >>> >>> schema_withchecks = psa.DataFrameSchema( ... columns={ ... "product": psa.Column("str", checks=psa.Check.str_startswith("B")), ... "price": psa.Column("int", checks=psa.Check.gt(5)), ... }, ... name="product_schema", ... description="schema for product info", ... title="ProductSchema", ... ) >>> >>> schema_withchecks.validate(df).take(2) [Row(product='Bread', price=9), Row(product='Butter', price=15)]
- __call__(dataframe, head=None, tail=None, sample=None, random_state=None, lazy=True, inplace=False)[source]¶
表示
DataFrameSchema.validate()方法的别名。- Parameters:
dataframe (
DataFrame) – DataFrame对象,即要验证的dataframe。head (int) – 由于spark没有head或tail的概念,因此未使用。
tail (int) – 由于spark没有head或tail的概念,因此未使用。
示例 (
可选[整型,无]) – 验证n%行的随机样本。值范围从 0-1,例如可以通过将设置值设为0.1来抽取10%的行。请参见以下文档。https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.sample.html懒惰 (
bool) – 如果为 True,懒惰地对数据框进行所有验证检查,并引发一个SchemaErrors。否则,立即抛出第一个出现的SchemaError。inplace (
bool) – 如果为 True,则对验证对象应用强制转换,否则创建数据的副本。