Prefect

本页介绍如何使用开源工作流管理系统Prefect 2.0来运行您的Kedro管道。

本文档的范围是部署到自托管的Prefect Server,这是一个开源后端,可轻松监控和执行您的Prefect工作流,并自动扩展Prefect 2.0。我们将使用一个从工作队列中取出已提交流程运行的智能体

注意

本次部署已使用Kedro 0.18.10和Prefect 2.10.17版本进行测试。如需使用Prefect 1.0进行部署,建议查阅Kedro早期版本的Prefect部署文档

先决条件

要使用Prefect 2.0和Prefect Server,请确保满足以下先决条件:

设置

将您的PREFECT_API_URL配置为指向本地Prefect实例:

prefect config set PREFECT_API_URL="http://127.0.0.1:4200/api"

对于每个新创建的Kedro项目,您需要决定是否选择加入使用情况分析。您的选择会被记录在项目根目录下的.telemetry文件中。

重要

当你在本地运行一个Kedro项目时,系统会在首次执行kedro命令时询问项目相关信息,但在这种使用场景下,除非按照以下说明操作,否则项目将会挂起。

手动创建一个.telemetry文件并放置在你的Kedro项目根目录中,然后添加你的偏好选择同意或拒绝。为此,请指定true(表示同意)或false(表示拒绝)。下面给出的示例接受了Kedro的使用分析。

consent: true

运行一个Prefect Server实例:

prefect server start

在另一个终端中,创建工作池来组织任务,并创建工作队列供您的智能体从中获取任务:

prefect work-pool create --type prefect-agent <work_pool_name>
prefect work-queue create --pool <work_pool_name> <work_queue_name>

现在运行一个Prefect智能体,它会订阅到你创建的工作池中的工作队列:

prefect agent start --pool <work_pool_name> --work-queue <work_queue_name>

如何使用Prefect 2.0运行你的Kedro管道

将您的Kedro管道转换为Prefect 2.0工作流

要以编程方式为您的Kedro流水线构建Prefect flow并将其注册到Prefect API,请使用以下Python脚本,该脚本应保存在项目的根目录中:

# <project_root>/register_prefect_flow.py
import click
from pathlib import Path
from typing import Dict, List, Union, Callable

from kedro.framework.hooks.manager import _create_hook_manager
from kedro.framework.project import pipelines
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
from kedro.io import DataCatalog, MemoryDataset
from kedro.pipeline.node import Node
from kedro.runner import run_node

from prefect import flow, task, get_run_logger
from prefect.deployments import Deployment


@click.command()
@click.option("-p", "--pipeline", "pipeline_name", default="__default__")
@click.option("--env", "-e", type=str, default="base")
@click.option("--deployment_name", "deployment_name", default="example")
@click.option("--work_pool_name", "work_pool_name", default="default")
@click.option("--work_queue_name", "work_queue_name", default="default")
@click.option("--version", "version", default="1.0")
def prefect_deploy(
    pipeline_name, env, deployment_name, work_pool_name, work_queue_name, version
):
    """Register a Kedro pipeline as a Prefect flow."""

    # Pipeline name to execute
    pipeline_name = pipeline_name or "__default__"

    # Use standard deployment configuration for local execution. If you require a different
    # infrastructure, check the API docs for Deployments at: https://docs.prefect.io/latest/api-ref/prefect/deployments/
    deployment = Deployment.build_from_flow(
        flow=my_flow,
        name=deployment_name,
        path=str(Path.cwd()),
        version=version,
        parameters={
            "pipeline_name": pipeline_name,
            "env": env,
        },
        infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
        work_pool_name=work_pool_name,
        work_queue_name=work_queue_name,
    )

    deployment.apply()


@flow(name="my_flow")
def my_flow(pipeline_name: str, env: str):
    logger = get_run_logger()
    project_path = Path.cwd()

    metadata = bootstrap_project(project_path)
    logger.info("Project name: %s", metadata.project_name)

    logger.info("Initializing Kedro...")
    execution_config = kedro_init(
        pipeline_name=pipeline_name, project_path=project_path, env=env
    )

    logger.info("Building execution layers...")
    execution_layers = init_kedro_tasks_by_execution_layer(
        pipeline_name, execution_config
    )

    for layer in execution_layers:
        logger.info("Running layer...")
        for node_task in layer:
            logger.info("Running node...")
            node_task()


@task()
def kedro_init(
    pipeline_name: str,
    project_path: Path,
    env: str,
):
    """
    Initializes a Kedro session and returns the DataCatalog and
    KedroSession
    """
    # bootstrap project within task / flow scope

    logger = get_run_logger()
    logger.info("Bootstrapping project")
    bootstrap_project(project_path)

    session = KedroSession.create(
        project_path=project_path,
        env=env,
    )
    # Note that for logging inside a Prefect task logger is used.
    logger.info("Session created with ID %s", session.session_id)
    pipeline = pipelines.get(pipeline_name)
    logger.info("Loading context...")
    context = session.load_context()
    catalog = context.catalog
    logger.info("Registering datasets...")
    unregistered_ds = pipeline.datasets() - set(catalog.list())
    for ds_name in unregistered_ds:
        catalog.add(ds_name, MemoryDataset())
    return {"catalog": catalog, "sess_id": session.session_id}


def init_kedro_tasks_by_execution_layer(
    pipeline_name: str,
    execution_config: Union[None, Dict[str, Union[DataCatalog, str]]] = None,
) -> List[List[Callable]]:
    """
    Inits the Kedro tasks ordered topologically in groups, which implies that an earlier group
    is the dependency of later one.

    Args:
        pipeline_name (str): The pipeline name to execute
        execution_config (Union[None, Dict[str, Union[DataCatalog, str]]], optional):
        The required execution config for each node. Defaults to None.

    Returns:
        List[List[Callable]]: A list of topologically ordered task groups
    """

    pipeline = pipelines.get(pipeline_name)

    execution_layers = []

    # Return a list of the pipeline nodes in topologically ordered groups,
    #  i.e. if node A needs to be run before node B, it will appear in an
    #  earlier group.
    for layer in pipeline.grouped_nodes:
        execution_layer = []
        for node in layer:
            # Use a function for task instantiation which avoids duplication of
            # tasks
            task = instantiate_task(node, execution_config)
            execution_layer.append(task)
        execution_layers.append(execution_layer)

    return execution_layers


def kedro_task(
    node: Node, task_dict: Union[None, Dict[str, Union[DataCatalog, str]]] = None
):
    run_node(
        node,
        task_dict["catalog"],
        _create_hook_manager(),
        task_dict["sess_id"],
    )


def instantiate_task(
    node: Node,
    execution_config: Union[None, Dict[str, Union[DataCatalog, str]]] = None,
) -> Callable:
    """
    Function that wraps a Node inside a task for future execution

    Args:
        node: Kedro node for which a Prefect task is being created.
        execution_config: The configurations required for the node to execute
        that includes catalogs and session id

    Returns: Prefect task for the passed node

    """
    return task(lambda: kedro_task(node, execution_config)).with_options(name=node.name)


if __name__ == "__main__":
    prefect_deploy()

然后,在其他终端运行部署脚本:

python register_prefect_flow.py --work_pool_name <work_pool_name> --work_queue_name <work_queue_name>

注意

请确保您的Prefect服务器已启动并运行。验证部署脚本参数是否与工作池和工作队列名称匹配。

运行Prefect流程

现在,流程已注册完成,您可以使用Prefect Server UI来编排和监控它。

访问 http://localhost:4200/deployments 查看您已注册的流程。

prefect_2_flow_deployment

点击流程图将其打开,然后使用“运行”>“快速运行”按钮触发流程,并保持参数为默认值。如需运行特定管道,可以替换__default__值。

注意

请确保您的Prefect Server和Agent都在正常运行。

prefect_2_flow_details