Kedro插件¶
Kedro插件允许您为Kedro创建新功能,并向CLI注入额外命令。插件作为独立的Python包开发,存在于任何Kedro项目之外。
概述¶
Kedro的扩展机制基于pluggy构建,这是一个为pytest生态系统创建的可靠插件管理库。pluggy依赖于entry points,这是一种Python机制,允许包提供可通过importlib.metadata被其他包发现的组件。
一个简单插件的示例¶
这是一个简单的插件示例,可以将流水线以JSON格式打印出来:
kedrojson/plugin.py
import click
from kedro.framework.project import pipelines
@click.group(name="JSON")
def commands():
pass
@commands.command(name="to_json")
@click.pass_obj
def to_json(metadata):
"""Display the pipeline in JSON format"""
pipeline = pipelines["__default__"]
print(pipeline.to_json())
从0.18.14版本开始,Kedro将setup.py替换为pyproject.toml。插件需要在任一文件中提供入口点。如果您仍在使用setup.py,请参考0.18.13版本的文档。
要将入口点添加到pyproject.toml中,插件需要提供以下entry_points配置:
[project.entry-points."kedro.project_commands"]
kedrojson = "kedrojson.plugin:commands"
安装插件后,您可以按如下方式运行它:
kedro to_json
使用 click¶
命令必须作为click Groups提供
click Group将被合并到主CLI组中。在此过程中,该组上的选项会丢失,其回调函数中已完成的任何处理也会丢失。
项目背景¶
当插件运行时,它们可以通过创建会话并加载其上下文来请求有关当前项目的信息:
from pathlib import Path
from kedro.framework.session import KedroSession
project_path = Path.cwd()
session = KedroSession.create(project_path=project_path)
context = session.load_context()
初始化¶
如果插件初始化需要在Kedro启动之前完成,它可以声明entry_point键为kedro.init。该入口点必须引用一个当前没有参数的函数,但为了未来兼容性,您应该使用**kwargs来声明它。
global 和 project 命令¶
插件也可以向Kedro CLI添加命令,该CLI支持两种类型的命令:
全局 - 在Kedro项目内外均可使用。全局命令使用
entry_point键kedro.global_commands。project - 仅当在当前目录检测到Kedro项目时可用。项目命令使用
entry_point键kedro.project_commands。
建议的命令规范¶
我们采用以下命令约定:kedro ,其中kedro 作为顶级命令组。这是我们建议的插件结构方式,但您的插件并非必须遵循此结构才能正常工作。
高级:插件命令的延迟加载¶
如果您正在开发一个包含大量CLI命令的插件,或者某些导入缓慢但仅被少数命令使用的大型库,可以考虑对这些命令采用延迟加载。这能显著提升插件性能以及Kedro CLI的整体运行效率。具体实现可参考click文档中关于命令延迟加载的说明。从Kedro 0.19.7版本开始,Kedro命令已声明为延迟加载的命令组,您可以直接参考其实现方式。
考虑之前kedrojson插件的例子。假设该插件有两个命令:kedro to_json pipelines和kedro to_json nodes。其中to_json pipelines命令比to_json nodes命令使用更频繁,且to_json nodes命令需要导入一个大型库。在这种情况下,您可以如下定义延迟加载的命令:
在 kedrojson/plugin.py 文件中:
import click
from kedro.framework.project import pipelines
from kedro.framework.cli.utils import LazyGroup
@click.group()
def commands():
pass
@commands.group(
name="to_json",
cls=LazyGroup,
lazy_subcommands={
"nodes": "kedrojson.plugin.nodes",
"pipelines": "kedrojson.plugin.pipelines"
}
)
def to_json():
"""Convert Kedro nodes and pipelines to JSON"""
pass
@click.command(name="nodes")
def nodes():
"""Convert Kedro nodes to JSON"""
import some_large_library
print("Converting nodes to JSON")
...
@click.command("pipelines")
def pipelines():
"""Convert Kedro pipelines to JSON"""
print("Converting pipelines to JSON")
...
加载单个nodes和pipelines命令以及随后导入大型库的操作将被延迟,直到调用相应的命令时才会执行。
钩子¶
您可以开发钩子实现,并在安装插件时自动将它们注册到项目上下文中。
要为您的自定义插件启用此功能,只需在pyproject.toml中添加以下条目
要使用 pyproject.toml,请指定
[project.entry-points."kedro.hooks"]
plugin_name = "plugin_name.plugin:hooks"
其中 plugin.py 是声明钩子实现的模块:
import logging
from kedro.framework.hooks import hook_impl
class MyHooks:
@hook_impl
def after_catalog_created(self, catalog):
logging.info("Reached after_catalog_created hook")
hooks = MyHooks()
注意
hooks 应该是定义Hooks的类的一个实例。
CLI 钩子¶
您还可以开发Hook实现来扩展Kedro在插件中的CLI行为。要查找可用的CLI Hooks,请访问我们的hooks API文档。要将插件中开发的CLI Hooks注册到Kedro,请在项目的pyproject.toml中添加以下条目:
[project.entry-points."kedro.cli_hooks"]
plugin_name = "plugin_name.plugin:cli_hooks"
(其中 plugin.py 是声明 Hook 实现的模块):
import logging
from kedro.framework.cli.hooks import cli_hook_impl
class MyCLIHooks:
@cli_hook_impl
def before_command_run(self, project_metadata, command_args):
logging.info(
"Command %s will be run for project %s", command_args, project_metadata
)
cli_hooks = MyCLIHooks()
贡献流程¶
当你准备好提交代码时:
使用我们的命名规范为
plugin创建单独的代码仓库(kedro-)选择命令方式:
global和/或project命令:所有
global命令都应作为单个click组提供所有
project命令都应作为另一个click组提供click组通过 entry points mechanism 声明
包含一个
README.md文件,描述您的插件功能以及需要包含的所有依赖项使用GitHub标签功能将您的插件标记为
kedro-plugin,以便我们能够找到它
支持的Kedro插件¶
Kedro-Datasets,Kedro所有数据连接器的集合。这些数据连接器是
AbstractDataset的实现Kedro-Docker,一个用于在容器内打包和分发Kedro项目的工具
Kedro-Airflow,一个将您的Kedro项目转换为Airflow项目的工具
Kedro-Viz,一个用于可视化Kedro管道的工具
社区开发的插件¶
有许多社区开发的插件可供使用,完整的插件列表发布在awesome-kedro GitHub仓库中。以下列表是其中一些正在积极维护的插件的小型快照。
注意
您的插件需要具备Apache 2.0兼容许可证才能被考虑列入此列表。
kedro-mlflow,由Yolan Honoré-Rougé和Takieddine Kadiri开发,实现了MLflow与Kedro项目的无缝集成。其主要功能包括模块化配置、自动参数追踪、数据集版本控制、Kedro流水线打包与服务,以及训练与推理流水线间的自动同步,从而确保机器学习实验的高度可复现性并简化部署流程。相关教程可在kedro-mlflow-tutorial repo中获取,更多详细信息请参阅kedro-mlflow documentation。
kedro-kubeflow, 由 GetInData 开发,允许您使用 Kubeflow Pipelines 在 Kubernetes 集群上运行和调度管道
kedro-airflow-k8s, 由 GetInData 开发,支持在 Kubernetes 集群上使用 Airflow 运行 Kedro 流水线
kedro-vertexai, 由 GetInData 开发,支持使用 Vertex AI Pipelines 服务运行 Kedro 流水线
kedro-azureml, 由 GetInData 开发,支持使用 Azure ML Pipelines 服务运行 Kedro 管道
kedro-sagemaker, 由 GetInData 开发,支持使用亚马逊 SageMaker 服务运行 Kedro 管道
kedro-partitioned, 由 Gabriel Daiha Alves 和 Nickolas da Rocha Machado 开发,扩展了处理分区数据的功能。