跳至内容

异步任务模式

简介

如果从Argo触发一个不在容器中运行完成的外部作业(例如Amazon EMR作业),有两种选择:

  • 创建一个轮询外部作业完成状态的容器
  • 将触发步骤(启动任务)与suspend步骤(当外部任务完成时通过API调用Argo恢复)相结合。

本文档更详细地描述了第二种选项。

模式

该模式包含两个步骤 - 第一步是一个短期运行的步骤,用于触发Argo外部的一个长期运行作业(例如HTTP提交),第二步是一个suspend步骤,该步骤会暂停工作流执行,并最终通过调用Argo API在外部作业成功或失败时恢复或停止(即失败)。

当实现为WorkflowTemplate时,可能看起来像这样:

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: external-job-template
spec:
  entrypoint: run-external-job
  arguments:
    parameters:
      - name: "job-cmd"
  templates:
    - name: run-external-job
      inputs:
        parameters:
          - name: "job-cmd"
            value: "{{workflow.parameters.job-cmd}}"
      steps:
        - - name: trigger-job
            template: trigger-job
            arguments:
              parameters:
                - name: "job-cmd"
                  value: "{{inputs.parameters.job-cmd}}"
        - - name: wait-completion
            template: wait-completion
            arguments:
              parameters:
                - name: uuid
                  value: "{{steps.trigger-job.outputs.result}}"

    - name: trigger-job
      inputs:
        parameters:
          - name: "job-cmd"
      container:
        image: appropriate/curl:latest
        command: [ "/bin/sh", "-c" ]
        args: [ "{{inputs.parameters.job-cmd}}" ]

    - name: wait-completion
      inputs:
        parameters:
          - name: uuid
      suspend: { }

在这种情况下,job-cmd参数可以是一个通过curl向返回作业UUID的端点发起HTTP调用的命令。更复杂的提交和提交输出解析可以通过类似Python脚本步骤的方式完成。

任务完成后,外部作业需要根据成功与否调用resume:

你可能需要一个access token

curl --request PUT \
  --url https://localhost:2746/api/v1/workflows/<NAMESPACE>/<WORKFLOWNAME>/resume
  --header 'content-type: application/json' \
  --header "Authorization: $ARGO_TOKEN" \
  --data '{
      "namespace": "<NAMESPACE>",
      "name": "<WORKFLOWNAME>",
      "nodeFieldSelector": "inputs.parameters.uuid.value=<UUID>"
    }'

如果失败则停止:

curl --request PUT \
  --url https://localhost:2746/api/v1/workflows/<NAMESPACE>/<WORKFLOWNAME>/stop
  --header 'content-type: application/json' \
  --header "Authorization: $ARGO_TOKEN" \
  --data '{
      "namespace": "<NAMESPACE>",
      "name": "<WORKFLOWNAME>",
      "nodeFieldSelector": "inputs.parameters.uuid.value=<UUID>",
      "message": "<FAILURE-MESSAGE>"
    }'

重试失败的任务

对遵循此模式的失败作业使用argo retry命令时,Argo会重新尝试suspend步骤而不会重新触发整个作业。

相反,您需要使用 --restart-successful 选项,例如使用上面的模板时:

argo retry <WORKFLOWNAME> --restart-successful --node-field-selector templateRef.template=run-external-job,phase=Failed

有问题吗?

Search on GitHub Discussions and Slack.