高级:分区与增量数据集

分区数据集

分布式系统在ETL数据管道中扮演着日益重要的角色。它们提高了处理吞吐量,使我们能够处理更大量的输入数据。在某些情况下,您的Kedro节点可能需要从包含大量同类型文件(如JSON或CSV)的目录中读取数据。像PySpark这样的工具及其对应的SparkDataset可以满足这类用例,但并不总是可行。

这就是为什么Kedro提供了PartitionedDataset并具有以下特性:

  • PartitionedDataset 可以从给定位置递归加载/保存所有或特定文件。

  • 它是平台无关的,可以与fsspec支持的任何文件系统实现协同工作,包括本地、S3、GCS等众多选项。

  • 它采用了一种懒加载的方式,在数据处理节点明确请求之前不会尝试加载任何分区数据。

  • 它支持通过使用Callable实现延迟保存。

注意

在本节中,给定位置内的每个单独文件被称为一个分区。

如何使用 PartitionedDataset

你可以在catalog.yml文件中像使用其他常规数据集定义一样使用PartitionedDataset

# conf/base/catalog.yml

my_partitioned_dataset:
  type: partitions.PartitionedDataset
  path: s3://my-bucket-name/path/to/folder  # path to the location of partitions
  dataset: pandas.CSVDataset  # shorthand notation for the dataset which will handle individual partitions
  credentials: my_credentials
  load_args:
    load_arg1: value1
    load_arg2: value2

注意

与其他数据集类似,PartitionedDataset也可以通过Python以编程方式实例化:

from kedro_datasets.pandas import CSVDataset
from kedro_datasets.partitions import PartitionedDataset

my_credentials = {...}  # credentials dictionary

my_partitioned_dataset = PartitionedDataset(
    path="s3://my-bucket-name/path/to/folder",
    dataset=CSVDataset,
    credentials=my_credentials,
    load_args={"load_arg1": "value1", "load_arg2": "value2"},
)

另外,如果您需要对底层数据集进行更细粒度的配置,可以完整提供其定义:

# conf/base/catalog.yml

my_partitioned_dataset:
  type: partitions.PartitionedDataset
  path: s3://my-bucket-name/path/to/folder
  dataset:  # full dataset config notation
    type: pandas.CSVDataset
    load_args:
      delimiter: ","
    save_args:
      index: false
  credentials: my_credentials
  load_args:
    load_arg1: value1
    load_arg2: value2
  filepath_arg: filepath  # the argument of the dataset to pass the filepath to
  filename_suffix: ".csv"

以下是PartitionedDataset支持的所有参数列表:

参数

必填

支持类型

描述

path

str

包含分区数据的文件夹路径。如果路径以协议开头(例如s3://),则将使用相应的fsspec具体文件系统实现。如果未指定协议,则将使用本地文件系统

dataset

str, Type[AbstractDataset], Dict[str, Any]

底层数据集定义,更多详情请参阅下方章节

credentials

Dict[str, Any]

将传递给fsspec.filesystemcall的协议特定选项,更多详情请参阅下方章节

load_args

Dict[str, Any]

要传递给对应文件系统实现中find()方法的关键字参数

filepath_arg

str (默认为 filepath)

底层数据集初始化器的参数名称,该参数将包含指向单个分区的路径

filename_suffix

str (默认为空字符串)

如果指定,不以该字符串结尾的分区将被忽略

数据集定义

数据集定义应传入PartitionedDatasetdataset参数。该数据集定义用于为每个独立分区实例化新的数据集对象,并使用该数据集对象进行加载和保存操作。数据集定义支持简写和完整两种表示方式。

简写符号

仅需指定底层数据集的类,可以是字符串形式(例如pandas.CSVDataset或完整类路径如kedro_datasets.pandas.CSVDataset),或是作为AbstractDataset子类的类对象。

完整符号表示法

完整表示法允许您指定一个包含完整底层数据集定义的字典,不包括以下参数:

  • 接收分区路径的参数(默认为filepath)——如果指定该参数,将发出UserWarning警告,提示该值将被各个分区路径覆盖

  • credentials 键 - 指定它将导致抛出 DatasetError 错误;数据集凭证应传递给 PartitionedDatasetcredentials 参数,而不是底层数据集定义 - 详情请参阅下面关于 partitioned dataset credentials 的部分

  • versioned 标志 - 指定它会导致抛出 DatasetError;无法为底层数据集启用版本控制

分区数据集凭证

注意

PartitionedDataset凭证中对dataset_credentials键的支持现已弃用。数据集凭证应明确在数据集配置中指定。

PartitionedDataset的凭证管理较为特殊,因为它可能同时包含PartitionedDataset本身以及用于分区加载和保存的底层数据集的凭证。顶级凭证会传递到底层数据集配置中(除非该配置已设置凭证),但反之则不行——数据集凭证永远不会传播到文件系统。

以下是所有可能场景的完整列表:

顶级凭证

底层数据集凭据

示例 PartitionedDataset 定义

描述

未定义

未定义

PartitionedDataset(path="s3://bucket-name/path/to/folder", dataset="pandas.CSVDataset")

凭证不会传递给底层数据集或文件系统

未定义

已指定

PartitionedDataset(path="s3://bucket-name/path/to/folder", dataset={"type": "pandas.CSVDataset", "credentials": {"secret": True}})

底层数据集凭证被传递给CSVDataset构造函数,文件系统实例化时不使用凭证

已指定

未定义

PartitionedDataset(path="s3://bucket-name/path/to/folder", dataset="pandas.CSVDataset", credentials={"secret": True})

顶级凭证会被传递给底层的CSVDataset构造函数和文件系统

已指定

None

PartitionedDataset(path="s3://bucket-name/path/to/folder", dataset={"type": "pandas.CSVDataset", "credentials": None}, credentials={"dataset_secret": True})

顶级凭据会被传递给文件系统,CSVDataset实例化时不使用凭据 - 这样您可以阻止顶级凭据传播到数据集配置中

已指定

已指定

PartitionedDataset(path="s3://bucket-name/path/to/folder", dataset={"type": "pandas.CSVDataset", "credentials": {"dataset_secret": True}}, credentials={"secret": True})

顶级凭据会传递给文件系统,底层数据集凭据则传递给CSVDataset构造函数

分区数据集加载

假设您正在使用的Kedro管道包含以下定义的节点:

from kedro.pipeline import node

node(concat_partitions, inputs="my_partitioned_dataset", outputs="concatenated_result")

底层节点函数 concat_partitions 可能如下所示:

from typing import Any, Callable, Dict
import pandas as pd


def concat_partitions(partitioned_input: Dict[str, Callable[[], Any]]) -> pd.DataFrame:
    """Concatenate input partitions into one pandas DataFrame.

    Args:
        partitioned_input: A dictionary with partition ids as keys and load functions as values.

    Returns:
        Pandas DataFrame representing a concatenation of all loaded partitions.
    """
    result = pd.DataFrame()

    for partition_key, partition_load_func in sorted(partitioned_input.items()):
        partition_data = partition_load_func()  # load the actual partition data
        # concat with existing result
        result = pd.concat([result, partition_data], ignore_index=True, sort=True)

    return result

从上面的例子可以看出,加载时PartitionedDataset不会自动从定位的分区加载数据。相反,PartitionedDataset返回一个字典,其中分区ID作为键,对应的加载函数作为值。这使得使用PartitionedDataset的节点能够实现定义需要加载哪些分区以及如何处理这些数据的逻辑。

分区ID 并不代表整个分区路径,而仅代表给定分区文件名后缀中唯一的部分:

  • 示例1:如果 path=s3://my-bucket-name/folder 且分区存储在 s3://my-bucket-name/folder/2019-12-04/data.csv,那么它的分区ID是 2019-12-04/data.csv

  • 示例2:如果path=s3://my-bucket-name/folderfilename_suffix=".csv",分区存储在s3://my-bucket-name/folder/2019-12-04/data.csv中,那么它的分区ID是2019-12-04/data

PartitionedDataset 实现了加载操作时的缓存机制,这意味着如果多个节点使用同一个 PartitionedDataset,即使首次加载完成后文件夹中新增了分区,所有节点仍会收到相同的分区字典。这种设计是为了确保节点间加载操作的一致性,避免竞态条件。如需重置缓存,请调用分区数据集对象的 release() 方法。

分区数据集保存

PartitionedDataset 同样支持保存操作。假设有以下配置:

# conf/base/catalog.yml

new_partitioned_dataset:
  type: partitions.PartitionedDataset
  path: s3://my-bucket-name
  dataset: pandas.CSVDataset
  filename_suffix: ".csv"
  save_lazily: True

以下是节点定义:

from kedro.pipeline import node

node(create_partitions, inputs=None, outputs="new_partitioned_dataset")

底层节点函数如下,位于create_partitions中:

from typing import Any, Dict
import pandas as pd


def create_partitions() -> Dict[str, Any]:
    """Create new partitions and save using PartitionedDataset.

    Returns:
        Dictionary with the partitions to create.
    """
    return {
        # create a file "s3://my-bucket-name/part/foo.csv"
        "part/foo": pd.DataFrame({"data": [1, 2]}),
        # create a file "s3://my-bucket-name/part/bar.csv.csv"
        "part/bar.csv": pd.DataFrame({"data": [3, 4]}),
    }

注意

写入现有分区可能导致其数据被覆盖,如果底层数据集实现未专门处理这种情况。您应自行实现检查机制,以确保在写入PartitionedDataset时不会丢失现有数据。最简单的安全机制可以是使用具有高唯一性概率的分区ID:例如当前时间戳。

分区数据集的延迟保存

PartitionedDataset 还支持延迟保存功能,在真正执行写入操作时才会具体化分区的数据。

要使用此功能,只需在字典中返回Callable类型:

from typing import Any, Dict, Callable
import pandas as pd


def create_partitions() -> Dict[str, Callable[[], Any]]:
    """Create new partitions and save using PartitionedDataset.

    Returns:
        Dictionary of the partitions to create to a function that creates them.
    """
    return {
        # create a file "s3://my-bucket-name/part/foo.csv"
        "part/foo": lambda: pd.DataFrame({"data": [1, 2]}),
        # create a file "s3://my-bucket-name/part/bar.csv"
        "part/bar": lambda: pd.DataFrame({"data": [3, 4]}),
    }

注意

使用延迟保存时,数据集将在after_node_run hook之后写入。

注意

延迟保存是默认行为,这意味着如果提供了Callable类型,数据集将在after_node_run钩子执行之后被写入。

在某些情况下,禁用延迟保存可能很有用,例如当您的对象已经是Callable(如TensorFlow模型)且您不打算延迟保存时。 要禁用延迟保存,请将save_lazily参数设置为False

# conf/base/catalog.yml

new_partitioned_dataset:
  type: partitions.PartitionedDataset
  path: s3://my-bucket-name
  dataset: pandas.CSVDataset
  filename_suffix: ".csv"
  save_lazily: False

增量数据集

IncrementalDatasetPartitionedDataset 的一个子类,它在所谓的 checkpoint 中存储关于最后处理的分区信息。IncrementalDataset 解决了需要增量处理分区的使用场景,即每个后续的管道运行应该只处理之前运行未处理过的分区。

默认情况下,该检查点会持久化到数据分区的位置。例如,对于使用路径s3://my-bucket-name/path/to/folder实例化的IncrementalDataset,检查点将被保存到s3://my-bucket-name/path/to/folder/CHECKPOINT,除非显式覆盖检查点配置

检查点文件仅在分区数据集被明确确认之后创建。

增量数据集加载

加载 IncrementalDataset 的工作方式与 PartitionedDataset 类似,但有几点不同:

  1. IncrementalDataset立即加载数据,因此返回字典中的值代表对应分区中存储的实际数据,而非指向加载函数的指针。当给定检查点值时,IncrementalDataset 会通过比较函数判断分区是否与当前处理相关。

  2. IncrementalDataset 不会在加载时发现没有分区可返回时抛出DatasetError错误 - 而是返回一个空字典。对于IncrementalDataset来说,可用分区列表为空是正常工作流程的一部分。

增量数据集保存

IncrementalDataset的保存操作与PartitionedDataset的保存操作完全相同。

增量数据集确认

注意

当成功加载或保存一组新的分区时,检查点值不会自动更新。

分区数据集检查点的更新由下游某个节点中的显式confirms指令触发。这个节点可以是处理该分区数据集的同一个节点:

from kedro.pipeline import node

# process and then confirm `IncrementalDataset` within the same node
node(
    process_partitions,
    inputs="my_partitioned_dataset",
    outputs="my_processed_dataset",
    confirms="my_partitioned_dataset",
)

或者,可以将确认操作推迟到下游的某个节点执行,这样在加载的分区被认定为成功处理之前,您可以实现额外的验证:

from kedro.pipeline import node, pipeline

pipeline(
    [
        node(
            func=process_partitions,
            inputs="my_partitioned_dataset",
            outputs="my_processed_dataset",
        ),
        # do something else
        node(
            func=confirm_partitions,
            # note that the node may not require 'my_partitioned_dataset' as an input
            inputs="my_processed_dataset",
            outputs=None,
            confirms="my_partitioned_dataset",
        ),
        # ...
        node(
            func=do_something_else_with_partitions,
            # will return the same partitions even though they were already confirmed
            inputs=["my_partitioned_dataset", "my_processed_dataset"],
            outputs=None,
        ),
    ]
)

关于确认操作的重要说明:

  • 确认分区数据集不会影响同一运行中的后续加载。所有输入相同分区数据集的下游节点都将接收相同的分区。在运行期间外部创建的分区也不会影响数据集加载,并且在下次运行之前或对数据集对象调用release()方法之前,这些分区不会出现在已加载分区列表中。

  • 一个管道不能包含多个确认同一数据集的节点。

检查点配置

IncrementalDataset 不需要显式配置检查点,除非需要偏离默认设置。要更新检查点配置,可添加包含有效数据集配置的 checkpoint 键。例如,当管道对分区位置仅有读取权限时(或出于其他原因不希望进行写入操作),就可能需要此配置。在这种情况下,可以配置 IncrementalDataset 将检查点保存到其他位置。checkpoint 键还支持部分配置更新,即仅覆盖某些检查点属性,同时保留其余属性的默认值:

my_partitioned_dataset:
  type: partitions.IncrementalDataset
  path: s3://my-bucket-name/path/to/folder
  dataset: pandas.CSVDataset
  checkpoint:
    # update the filepath and load_args, but keep the dataset type unchanged
    filepath: gcs://other-bucket/CHECKPOINT
    load_args:
      k1: v1

特殊检查点配置键

除了标准数据集属性外,checkpoint配置还接受两个特殊的可选键:

  • comparison_func (默认为 operator.gt) - 一个完全限定的函数导入路径,该函数用于比较分区ID与检查点值,以确定是否应处理该分区。此类函数必须接受两个位置字符串参数 - 分区ID和检查点值 - 如果该分区被视为已通过检查点,则返回 True。如果需要自定义检查点过滤机制,指定自己的 comparison_func 可能会很有用 - 例如,您可能希望实现窗口加载,其中总是希望加载代表上个月历月的分区。请参阅指定自定义比较函数的示例配置:

my_partitioned_dataset:
  type: partitions.IncrementalDataset
  path: s3://my-bucket-name/path/to/folder
  dataset: pandas.CSVDataset
  checkpoint:
    comparison_func: my_module.path.to.custom_comparison_function  # the path must be importable
  • force_checkpoint - 如果设置此参数,分区数据集将使用该值作为检查点,而不是加载对应的检查点文件。这在需要回滚处理步骤并重新处理部分(或全部)可用分区时可能很有用。查看以下强制设置检查点值的配置示例:

my_partitioned_dataset:
  type: partitions.IncrementalDataset
  path: s3://my-bucket-name/path/to/folder
  dataset: pandas.CSVDataset
  checkpoint:
    force_checkpoint: 2020-01-01/data.csv

注意

也可以通过以下简写形式指定force_checkpoint

my_partitioned_dataset:
  type: partitions.IncrementalDataset
  path: s3://my-bucket-name/path/to/folder
  dataset: pandas.CSVDataset
  checkpoint: 2020-01-01/data.csv

注意

如果需要强制分区数据集加载所有可用分区,请将checkpoint设置为空字符串:

my_partitioned_dataset:
  type: partitions.IncrementalDataset
  path: s3://my-bucket-name/path/to/folder
  dataset: pandas.CSVDataset
  checkpoint: ""