使用KedroSession进行生命周期管理

概述

KedroSession 允许您:

  • 管理Kedro运行的生命周期

  • 持久化运行时参数及其对应的会话ID

  • 追踪运行时参数,例如CLI命令标志和环境变量

KedroSession 解耦了由 KedroContext 管理的Kedro库组件与任何会话数据(包括静态和动态数据)。因此,Kedro组件和插件可以访问会话数据,而无需导入 KedroContext 对象和库组件。

KedroSession的主要方法和属性包括:

  • create(): 创建一个包含会话数据的新KedroSession实例

  • load_context(): 实例化 KedroContext 对象

  • close(): 关闭当前会话 —— 不过我们建议您将会话对象作为上下文管理器使用,这样会自动调用close(),而不是显式调用该方法

  • run(): 使用提供的参数运行管道;详情请参阅 Running pipelines

创建会话

以下代码创建一个KedroSession对象作为上下文管理器,并在该上下文中运行带有会话数据的管道。此脚本可以从Kedro项目中的任何位置调用,因为根文件夹会自动定位。会话在退出后会自动关闭:

from pathlib import Path

from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
from kedro.framework.project import configure_project
from kedro.utils import _find_kedro_project

# Get project root
current_dir = Path(__file__).resolve().parent
project_root = _find_kedro_project(current_dir)
bootstrap_project(Path(project_root))

# Create and use the session
with KedroSession.create(project_path=project_root) as session:
    session.run()

你可以在KedroSession.create()中提供以下可选参数:

  • project_path: 项目根目录的路径

  • save_on_close: 布尔值,用于指示关闭会话时是否将其保存到磁盘

  • env: KedroContext 的运行环境

  • extra_params: 可选字典,包含用于底层KedroContext的额外项目参数;如果指定,这将更新(并因此优先于)从项目配置中检索到的参数

bootstrap_projectconfigure_project

General overview diagram for KedroSession creation

bootstrap_projectconfigure_project 都负责处理Kedro项目的设置,但存在细微差别:bootstrap_project 用于项目模式,而 configure_project 用于打包模式。

Kedro的CLI在启动时会运行这些函数作为kedro run的一部分,因此在大多数情况下您不需要手动调用这些函数。如果您想在交互式会话(如Notebook)中以编程方式与Kedro项目交互,可以在Jupyter或IPython中使用%reload_kedro行魔法命令。

bootstrap_project

该函数使用configure_project,并额外从pyproject.toml读取元数据,同时将项目根目录添加到sys.path中,使得该项目可以作为Python包被导入。它通常用于直接操作Kedro项目的源代码。

configure_project

该函数读取settings.pypipeline_registry.py文件,并在Kedro运行开始前注册配置。如果您有一个打包好的Kedro项目,只需在执行管道前运行configure_project即可。

ValueError: 未找到包名称

ValueError: 未找到包名称。请确保您已使用'bootstrap_project'配置项目。如果您正在使用Kedro命令行界面,这应该会自动完成。

如果您正在使用multiprocessing,需要注意这一点。根据您的操作系统,可能会有不同的默认设置。如果进程是spawn方式创建的,Python会在每个进程中重新导入所有模块,因此您需要在新进程开始时再次运行configure_project。例如,这就是Kedro在ParallelRunner中处理这种情况的方式:

if multiprocessing.get_start_method() == "spawn" and package_name:
        _bootstrap_subprocess(package_name, logging_config)