Argo工作流(需审核的过时文档)¶
为什么你会使用Argo Workflows?¶
我们对Argo Workflows很感兴趣,它是Argo项目的四大组件之一。Argo Workflows是一个开源的容器原生工作流引擎,用于在Kubernetes上编排并行作业。
以下是使用Argo Workflows的主要理由:
它与云平台无关,可以在任何Kubernetes集群上运行
它允许您在Kubernetes上轻松并行运行和编排计算密集型任务
它使用有向无环图(DAG)来管理任务之间的依赖关系
先决条件¶
要使用Argo Workflows,请确保满足以下先决条件:
Argo Workflows 已安装 在你的 Kubernetes 集群上
Argo CLI已安装在您的机器上
为每个Kedro
node设置了name属性,因为它用于构建DAG所有节点的输入/输出数据集必须在
catalog.yml中配置并引用外部位置(如AWS S3);您不能在流程中使用MemoryDataset
注意
每个节点将在其自己的容器中运行。
如何使用Argo Workflows运行你的Kedro管道¶
容器化您的Kedro项目¶
首先,您需要使用任何首选的容器化解决方案(例如Docker)将Kedro项目容器化,以构建用于Argo Workflows的镜像。
在本教程中,我们将采用Docker工作流程。我们推荐使用Kedro-Docker插件来简化流程。Kedro-Docker的使用说明请参阅该插件的README.md文件。
在本地为项目构建Docker镜像后,将镜像传输到容器注册表。
创建Argo工作流规范¶
为了以编程方式为您的Kedro流水线构建Argo Workflows规范,您可以使用以下Python脚本,该脚本应存储在项目的根目录中:
# <project_root>/build_argo_spec.py
import re
from pathlib import Path
import click
from jinja2 import Environment, FileSystemLoader
from kedro.framework.project import pipelines
from kedro.framework.startup import bootstrap_project
TEMPLATE_FILE = "argo_spec.tmpl"
SEARCH_PATH = Path("templates")
@click.command()
@click.argument("image", required=True)
@click.option("-p", "--pipeline", "pipeline_name", default=None)
@click.option("--env", "-e", type=str, default=None)
def generate_argo_config(image, pipeline_name, env):
loader = FileSystemLoader(searchpath=SEARCH_PATH)
template_env = Environment(loader=loader, trim_blocks=True, lstrip_blocks=True)
template = template_env.get_template(TEMPLATE_FILE)
project_path = Path.cwd()
metadata = bootstrap_project(project_path)
package_name = metadata.package_name
pipeline_name = pipeline_name or "__default__"
pipeline = pipelines.get(pipeline_name)
tasks = get_dependencies(pipeline.node_dependencies)
output = template.render(image=image, package_name=package_name, tasks=tasks)
(SEARCH_PATH / f"argo-{package_name}.yml").write_text(output)
def get_dependencies(dependencies):
deps_dict = [
{
"node": node.name,
"name": clean_name(node.name),
"deps": [clean_name(val.name) for val in parent_nodes],
}
for node, parent_nodes in dependencies.items()
]
return deps_dict
def clean_name(name):
return re.sub(r"[\W_]+", "-", name).strip("-")
if __name__ == "__main__":
generate_argo_config()
该脚本接受一个必需的参数:
image: 传输到容器注册表的镜像
您还可以指定两个可选参数:
--pipeline: 你想为其构建Argo工作流规范的pipeline名称--env: Kedro配置环境名称,默认为local
将以下Argo Workflows规范模板添加到中:
{# <project_root>/templates/argo_spec.tmpl #}
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: {{ package_name }}-
spec:
entrypoint: dag
templates:
- name: kedro
metadata:
labels:
{# Add label to have an ability to remove Kedro Pods easily #}
app: kedro-argo
retryStrategy:
limit: 1
inputs:
parameters:
- name: kedro_node
container:
imagePullPolicy: Always
image: {{ image }}
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
{# Secrets name #}
name: aws-secrets
key: access_key_id
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-secrets
key: secret_access_key
command: [kedro]{% raw %}
args: ["run", "-n", "{{inputs.parameters.kedro_node}}"]
{% endraw %}
- name: dag
dag:
tasks:
{% for task in tasks %}
- name: {{ task.name }}
template: kedro
{% if task.deps %}
dependencies:
{% for dep in task.deps %}
- {{ dep }}
{% endfor %}
{% endif %}
arguments:
parameters:
- name: kedro_node
value: {{ task.node }}
{% endfor %}
注意
Argo Workflows 被定义为使用有向无环图(DAG)来表示任务之间的依赖关系。
在本教程中,我们将使用AWS S3存储桶来存放数据集;因此必须设置AWS_ACCESS_KEY_ID和AWS_SECRET_ACCESS_KEY环境变量以便与S3进行通信。AWS_ACCESS_KEY_ID和AWS_SECRET_ACCESS_KEY的值应存储在Kubernetes Secrets中(下方提供了一个Kubernetes Secrets规范示例)。
规范模板使用Jinja模板语言编写,因此您必须安装Jinja Python包:
$ pip install Jinja2
最后,从项目目录运行辅助脚本以构建Argo Workflows规范(该规范将保存到文件中)。
$ cd <project_root>
$ python build_argo_spec.py <project_image>
向Kubernetes提交Argo工作流规范¶
在提交Argo Workflows规范之前,您需要先部署Kubernetes Secrets。
以下是一个Secrets规范的示例:
# secret.yml
apiVersion: v1
kind: Secret
metadata:
name: aws-secrets
data:
access_key_id: <AWS_ACCESS_KEY_ID value encoded with base64>
secret_access_key: <AWS_SECRET_ACCESS_KEY value encoded with base64>
type: Opaque
您可以使用以下命令将AWS密钥编码为base64格式:
$ echo -n <original_key> | base64
运行以下命令将Kubernetes密钥部署到default命名空间,并检查是否创建成功:
$ kubectl create -f secret.yml
$ kubectl get secrets aws-secrets
现在,您可以按照以下方式提交Argo Workflows规范:
$ cd <project_root>
$ argo submit --watch templates/argo-<package_name>.yml
注意
Argo Workflows应提交到与Kubernetes Secrets相同的命名空间。有关使用方法的更多详情,请参阅Argo CLI帮助文档。
要清理您的Kubernetes集群,可以使用以下命令:
$ kubectl delete pods --selector app=kedro-argo
$ kubectl delete -f secret.yml
Kedro-Argo插件¶
作为替代方案,您可以使用Kedro-Argo插件将Kedro项目转换为Argo工作流。
警告
该插件不受Kedro团队支持,我们无法保证其可用性。