Prefect¶
本页介绍如何使用开源工作流管理系统Prefect 2.0来运行您的Kedro管道。
本文档的范围是部署到自托管的Prefect Server,这是一个开源后端,可轻松监控和执行您的Prefect工作流,并自动扩展Prefect 2.0。我们将使用一个从工作队列中取出已提交流程运行的智能体。
注意
本次部署已使用Kedro 0.18.10和Prefect 2.10.17版本进行测试。如需使用Prefect 1.0进行部署,建议查阅Kedro早期版本的Prefect部署文档。
先决条件¶
要使用Prefect 2.0和Prefect Server,请确保满足以下先决条件:
Prefect 2.0 已安装 在你的机器上
设置¶
将您的PREFECT_API_URL配置为指向本地Prefect实例:
prefect config set PREFECT_API_URL="http://127.0.0.1:4200/api"
对于每个新创建的Kedro项目,您需要决定是否选择加入使用情况分析。您的选择会被记录在项目根目录下的.telemetry文件中。
重要
当你在本地运行一个Kedro项目时,系统会在首次执行kedro命令时询问项目相关信息,但在这种使用场景下,除非按照以下说明操作,否则项目将会挂起。
手动创建一个.telemetry文件并放置在你的Kedro项目根目录中,然后添加你的偏好选择同意或拒绝。为此,请指定true(表示同意)或false(表示拒绝)。下面给出的示例接受了Kedro的使用分析。
consent: true
运行一个Prefect Server实例:
prefect server start
在另一个终端中,创建工作池来组织任务,并创建工作队列供您的智能体从中获取任务:
prefect work-pool create --type prefect-agent <work_pool_name>
prefect work-queue create --pool <work_pool_name> <work_queue_name>
现在运行一个Prefect智能体,它会订阅到你创建的工作池中的工作队列:
prefect agent start --pool <work_pool_name> --work-queue <work_queue_name>
如何使用Prefect 2.0运行你的Kedro管道¶
将您的Kedro管道转换为Prefect 2.0工作流¶
要以编程方式为您的Kedro流水线构建Prefect flow并将其注册到Prefect API,请使用以下Python脚本,该脚本应保存在项目的根目录中:
# <project_root>/register_prefect_flow.py
import click
from pathlib import Path
from typing import Dict, List, Union, Callable
from kedro.framework.hooks.manager import _create_hook_manager
from kedro.framework.project import pipelines
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
from kedro.io import DataCatalog, MemoryDataset
from kedro.pipeline.node import Node
from kedro.runner import run_node
from prefect import flow, task, get_run_logger
from prefect.deployments import Deployment
@click.command()
@click.option("-p", "--pipeline", "pipeline_name", default="__default__")
@click.option("--env", "-e", type=str, default="base")
@click.option("--deployment_name", "deployment_name", default="example")
@click.option("--work_pool_name", "work_pool_name", default="default")
@click.option("--work_queue_name", "work_queue_name", default="default")
@click.option("--version", "version", default="1.0")
def prefect_deploy(
pipeline_name, env, deployment_name, work_pool_name, work_queue_name, version
):
"""Register a Kedro pipeline as a Prefect flow."""
# Pipeline name to execute
pipeline_name = pipeline_name or "__default__"
# Use standard deployment configuration for local execution. If you require a different
# infrastructure, check the API docs for Deployments at: https://docs.prefect.io/latest/api-ref/prefect/deployments/
deployment = Deployment.build_from_flow(
flow=my_flow,
name=deployment_name,
path=str(Path.cwd()),
version=version,
parameters={
"pipeline_name": pipeline_name,
"env": env,
},
infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
work_pool_name=work_pool_name,
work_queue_name=work_queue_name,
)
deployment.apply()
@flow(name="my_flow")
def my_flow(pipeline_name: str, env: str):
logger = get_run_logger()
project_path = Path.cwd()
metadata = bootstrap_project(project_path)
logger.info("Project name: %s", metadata.project_name)
logger.info("Initializing Kedro...")
execution_config = kedro_init(
pipeline_name=pipeline_name, project_path=project_path, env=env
)
logger.info("Building execution layers...")
execution_layers = init_kedro_tasks_by_execution_layer(
pipeline_name, execution_config
)
for layer in execution_layers:
logger.info("Running layer...")
for node_task in layer:
logger.info("Running node...")
node_task()
@task()
def kedro_init(
pipeline_name: str,
project_path: Path,
env: str,
):
"""
Initializes a Kedro session and returns the DataCatalog and
KedroSession
"""
# bootstrap project within task / flow scope
logger = get_run_logger()
logger.info("Bootstrapping project")
bootstrap_project(project_path)
session = KedroSession.create(
project_path=project_path,
env=env,
)
# Note that for logging inside a Prefect task logger is used.
logger.info("Session created with ID %s", session.session_id)
pipeline = pipelines.get(pipeline_name)
logger.info("Loading context...")
context = session.load_context()
catalog = context.catalog
logger.info("Registering datasets...")
unregistered_ds = pipeline.datasets() - set(catalog.list())
for ds_name in unregistered_ds:
catalog.add(ds_name, MemoryDataset())
return {"catalog": catalog, "sess_id": session.session_id}
def init_kedro_tasks_by_execution_layer(
pipeline_name: str,
execution_config: Union[None, Dict[str, Union[DataCatalog, str]]] = None,
) -> List[List[Callable]]:
"""
Inits the Kedro tasks ordered topologically in groups, which implies that an earlier group
is the dependency of later one.
Args:
pipeline_name (str): The pipeline name to execute
execution_config (Union[None, Dict[str, Union[DataCatalog, str]]], optional):
The required execution config for each node. Defaults to None.
Returns:
List[List[Callable]]: A list of topologically ordered task groups
"""
pipeline = pipelines.get(pipeline_name)
execution_layers = []
# Return a list of the pipeline nodes in topologically ordered groups,
# i.e. if node A needs to be run before node B, it will appear in an
# earlier group.
for layer in pipeline.grouped_nodes:
execution_layer = []
for node in layer:
# Use a function for task instantiation which avoids duplication of
# tasks
task = instantiate_task(node, execution_config)
execution_layer.append(task)
execution_layers.append(execution_layer)
return execution_layers
def kedro_task(
node: Node, task_dict: Union[None, Dict[str, Union[DataCatalog, str]]] = None
):
run_node(
node,
task_dict["catalog"],
_create_hook_manager(),
task_dict["sess_id"],
)
def instantiate_task(
node: Node,
execution_config: Union[None, Dict[str, Union[DataCatalog, str]]] = None,
) -> Callable:
"""
Function that wraps a Node inside a task for future execution
Args:
node: Kedro node for which a Prefect task is being created.
execution_config: The configurations required for the node to execute
that includes catalogs and session id
Returns: Prefect task for the passed node
"""
return task(lambda: kedro_task(node, execution_config)).with_options(name=node.name)
if __name__ == "__main__":
prefect_deploy()
然后,在其他终端运行部署脚本:
python register_prefect_flow.py --work_pool_name <work_pool_name> --work_queue_name <work_queue_name>
注意
请确保您的Prefect服务器已启动并运行。验证部署脚本参数是否与工作池和工作队列名称匹配。
运行Prefect流程¶
现在,流程已注册完成,您可以使用Prefect Server UI来编排和监控它。
访问 http://localhost:4200/deployments 查看您已注册的流程。

点击流程图将其打开,然后使用“运行”>“快速运行”按钮触发流程,并保持参数为默认值。如需运行特定管道,可以替换__default__值。
注意
请确保您的Prefect Server和Agent都在正常运行。
