Argo工作流(需审核的过时文档)

重要

本页面包含未经最新Kedro版本测试的过时文档。如果您成功在最新版Kedro中使用Argo Workflows,请考虑通过SlackGitHub告知我们您的操作步骤。

This page explains how to convert your Kedro pipeline to use Argo Workflows, an open-source container-native workflow engine for orchestrating parallel jobs on Kubernetes.

为什么你会使用Argo Workflows?

我们对Argo Workflows很感兴趣,它是Argo项目的四大组件之一。Argo Workflows是一个开源的容器原生工作流引擎,用于在Kubernetes上编排并行作业。

以下是使用Argo Workflows的主要理由:

  • 它与云平台无关,可以在任何Kubernetes集群上运行

  • 它允许您在Kubernetes上轻松并行运行和编排计算密集型任务

  • 它使用有向无环图(DAG)来管理任务之间的依赖关系

先决条件

要使用Argo Workflows,请确保满足以下先决条件:

注意

每个节点将在其自己的容器中运行。

如何使用Argo Workflows运行你的Kedro管道

容器化您的Kedro项目

首先,您需要使用任何首选的容器化解决方案(例如Docker)将Kedro项目容器化,以构建用于Argo Workflows的镜像。

在本教程中,我们将采用Docker工作流程。我们推荐使用Kedro-Docker插件来简化流程。Kedro-Docker的使用说明请参阅该插件的README.md文件

在本地为项目构建Docker镜像后,将镜像传输到容器注册表

创建Argo工作流规范

为了以编程方式为您的Kedro流水线构建Argo Workflows规范,您可以使用以下Python脚本,该脚本应存储在项目的根目录中:

# <project_root>/build_argo_spec.py
import re
from pathlib import Path

import click
from jinja2 import Environment, FileSystemLoader

from kedro.framework.project import pipelines
from kedro.framework.startup import bootstrap_project

TEMPLATE_FILE = "argo_spec.tmpl"
SEARCH_PATH = Path("templates")


@click.command()
@click.argument("image", required=True)
@click.option("-p", "--pipeline", "pipeline_name", default=None)
@click.option("--env", "-e", type=str, default=None)
def generate_argo_config(image, pipeline_name, env):
    loader = FileSystemLoader(searchpath=SEARCH_PATH)
    template_env = Environment(loader=loader, trim_blocks=True, lstrip_blocks=True)
    template = template_env.get_template(TEMPLATE_FILE)

    project_path = Path.cwd()
    metadata = bootstrap_project(project_path)
    package_name = metadata.package_name

    pipeline_name = pipeline_name or "__default__"
    pipeline = pipelines.get(pipeline_name)

    tasks = get_dependencies(pipeline.node_dependencies)

    output = template.render(image=image, package_name=package_name, tasks=tasks)

    (SEARCH_PATH / f"argo-{package_name}.yml").write_text(output)


def get_dependencies(dependencies):
    deps_dict = [
        {
            "node": node.name,
            "name": clean_name(node.name),
            "deps": [clean_name(val.name) for val in parent_nodes],
        }
        for node, parent_nodes in dependencies.items()
    ]
    return deps_dict


def clean_name(name):
    return re.sub(r"[\W_]+", "-", name).strip("-")


if __name__ == "__main__":
    generate_argo_config()

该脚本接受一个必需的参数:

  • image: 传输到容器注册表的镜像

您还可以指定两个可选参数:

  • --pipeline: 你想为其构建Argo工作流规范的pipeline名称

  • --env: Kedro配置环境名称,默认为local

将以下Argo Workflows规范模板添加到/templates/argo_spec.tmpl中:

{# <project_root>/templates/argo_spec.tmpl #}
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: {{ package_name }}-
spec:
  entrypoint: dag
  templates:
  - name: kedro
    metadata:
      labels:
        {# Add label to have an ability to remove Kedro Pods easily #}
        app: kedro-argo
    retryStrategy:
      limit: 1
    inputs:
      parameters:
      - name: kedro_node
    container:
      imagePullPolicy: Always
      image: {{ image }}
      env:
        - name: AWS_ACCESS_KEY_ID
          valueFrom:
            secretKeyRef:
              {# Secrets name #}
              name: aws-secrets
              key: access_key_id
        - name: AWS_SECRET_ACCESS_KEY
          valueFrom:
            secretKeyRef:
              name: aws-secrets
              key: secret_access_key
      command: [kedro]{% raw %}
      args: ["run", "-n",  "{{inputs.parameters.kedro_node}}"]
      {% endraw %}
  - name: dag
    dag:
      tasks:
      {% for task in tasks %}
      - name: {{ task.name }}
        template: kedro
        {% if task.deps %}
        dependencies:
        {% for dep in task.deps %}
          - {{ dep }}
        {% endfor %}
        {% endif %}
        arguments:
          parameters:
          - name: kedro_node
            value: {{ task.node }}
      {% endfor %}

注意

Argo Workflows 被定义为使用有向无环图(DAG)来表示任务之间的依赖关系。

在本教程中,我们将使用AWS S3存储桶来存放数据集;因此必须设置AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY环境变量以便与S3进行通信。AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY的值应存储在Kubernetes Secrets中(下方提供了一个Kubernetes Secrets规范示例)。

规范模板使用Jinja模板语言编写,因此您必须安装Jinja Python包:

$ pip install Jinja2

最后,从项目目录运行辅助脚本以构建Argo Workflows规范(该规范将保存到/templates/argo-.yml文件中)。

$ cd <project_root>
$ python build_argo_spec.py <project_image>

向Kubernetes提交Argo工作流规范

在提交Argo Workflows规范之前,您需要先部署Kubernetes Secrets。

以下是一个Secrets规范的示例:

# secret.yml
apiVersion: v1
kind: Secret
metadata:
  name: aws-secrets
data:
  access_key_id: <AWS_ACCESS_KEY_ID value encoded with base64>
  secret_access_key: <AWS_SECRET_ACCESS_KEY value encoded with base64>
type: Opaque

您可以使用以下命令将AWS密钥编码为base64格式:

$ echo -n <original_key> | base64

运行以下命令将Kubernetes密钥部署到default命名空间,并检查是否创建成功:

$ kubectl create -f secret.yml
$ kubectl get secrets aws-secrets

现在,您可以按照以下方式提交Argo Workflows规范:

$ cd <project_root>
$ argo submit --watch templates/argo-<package_name>.yml

注意

Argo Workflows应提交到与Kubernetes Secrets相同的命名空间。有关使用方法的更多详情,请参阅Argo CLI帮助文档。

要清理您的Kubernetes集群,可以使用以下命令:

$ kubectl delete pods --selector app=kedro-argo
$ kubectl delete -f secret.yml

Kedro-Argo插件

作为替代方案,您可以使用Kedro-Argo插件将Kedro项目转换为Argo工作流。

警告

该插件不受Kedro团队支持,我们无法保证其可用性。