跳至内容

执行器插件

v3.3 及更高版本

配置

插件默认处于禁用状态。如需启用,请使用ARGO_EXECUTOR_PLUGINS=true启动控制器,例如:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: workflow-controller
spec:
  template:
    spec:
      containers:
        - name: workflow-controller
          env:
            - name: ARGO_EXECUTOR_PLUGINS
              value: "true"

使用Helm chart时,请将此添加到您的values.yaml中:

controller:
  extraEnv:
    - name: ARGO_EXECUTOR_PLUGINS
      value: "true"

模板执行器

这是一个运行自定义"插件"模板的插件,例如用于非Pod任务,如Tekton构建、Spark作业、发送Slack通知等。

一个简单的Python插件

让我们创建一个Python插件,每次操作工作流时打印"hello"。

我们需要以下内容:

  1. 已启用插件(见上文)。
  2. 一个HTTP服务器,它将作为主容器的边车运行,并通过此API契约响应来自执行器的RPC HTTP请求。
  3. 一个plugin.yaml配置文件,该文件会被转换为配置映射(config map),以便控制器能够发现该插件。

模板执行器插件在 /api/v1/template.execute 上提供HTTP POST请求服务:

curl http://localhost:4355/api/v1/template.execute -d \
'{
  "workflow": {
    "metadata": {
      "name": "my-wf"
    }
  },
  "template": {
    "name": "my-tmpl",
    "inputs": {},
    "outputs": {},
    "plugin": {
      "hello": {}
    }
  }
}'
# ...
HTTP/1.1 200 OK
{
  "node": {
    "phase": "Succeeded",
    "message": "Hello template!"
  }
}

提示:端口号可以是任意数字,但不能与其他插件冲突。不要使用常见端口如80、443、8080、8081、8443。如果您计划发布插件,请选择10,000以下的随机端口号并提交PR来添加您的插件。如果不发布,请使用大于10,000的端口号。

我们需要创建一个启动HTTP服务器的脚本。将其保存为server.py

import json
from http.server import BaseHTTPRequestHandler, HTTPServer

with open("/var/run/argo/token") as f:
    token = f.read().strip()


class Plugin(BaseHTTPRequestHandler):

    def args(self):
        return json.loads(self.rfile.read(int(self.headers.get('Content-Length'))))

    def reply(self, reply):
        self.send_response(200)
        self.end_headers()
        self.wfile.write(json.dumps(reply).encode("UTF-8"))

    def forbidden(self):
        self.send_response(403)
        self.end_headers()

    def unsupported(self):
        self.send_response(404)
        self.end_headers()

    def do_POST(self):
        if self.headers.get("Authorization") != "Bearer " + token:
            self.forbidden()
        elif self.path == '/api/v1/template.execute':
            args = self.args()
            if 'hello' in args['template'].get('plugin', {}):
                self.reply(
                    {'node': {'phase': 'Succeeded', 'message': 'Hello template!',
                              'outputs': {'parameters': [{'name': 'foo', 'value': 'bar'}]}}})
            else:
                self.reply({})
        else:
            self.unsupported()


if __name__ == '__main__':
    httpd = HTTPServer(('', 4355), Plugin)
    httpd.serve_forever()

提示: 插件可以用任何能作为容器运行的语言编写。Python很方便,因为您可以将脚本嵌入到容器中。

这里需要注意以下几点:

  • 您只需实现所需的调用。返回404状态码后,该调用将不再被执行。
  • 路径即为RPC方法名称。
  • 您应检查Authorization请求头是否包含与/var/run/argo/token相同的值。若不一致则返回403状态码
  • 请求体包含模板的输入参数。
  • 响应体可能包含节点的结果,包括阶段(例如"Succeeded"或"Failed")和消息。
  • 如果响应是{},则表示插件无法执行该插件模板,例如它是一个Slack插件,但模板是一个Tekton作业。
  • 如果状态码为404,则不会再次调用该插件。
  • 如果将文件保存为server.*,它将被复制到sidecar容器的args字段中。这对于用Python或Node.JS等脚本语言构建自包含插件非常有用。

接下来,创建一个名为 plugin.yaml 的清单文件:

apiVersion: argoproj.io/v1alpha1
kind: ExecutorPlugin
metadata:
  name: hello
spec:
  sidecar:
    container:
      command:
        - python
        - -u # disables output buffering
        - -c
      image: python:alpine3.6
      name: hello-executor-plugin
      ports:
        - containerPort: 4355
      securityContext:
        runAsNonRoot: true
        runAsUser: 65534 # nobody
      resources:
        requests:
          memory: "64Mi"
          cpu: "250m"
        limits:
          memory: "128Mi"
          cpu: "500m"

按以下步骤构建和安装:

argo executor-plugin build .
kubectl -n argo apply -f hello-executor-plugin-configmap.yaml

检查您的控制器日志:

level=info msg="Executor plugin added" name=hello-controller-plugin

运行此工作流。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: hello-
spec:
  entrypoint: main
  templates:
    - name: main
      plugin:
        hello: { }

您将看到工作流成功完成。

发现

当工作流运行时,插件将从以下位置加载:

  • 工作流的命名空间。
  • Argo的安装命名空间(通常为argo)。

如果两个插件名称相同,则仅加载工作流命名空间中的那个。

密钥

如果您需要与第三方系统交互,您将需要访问密钥。不要将它们放在plugin.yaml中。请使用密钥:

spec:
  sidecar:
    container:
      env:
        - name: URL
          valueFrom:
            secretKeyRef:
              name: slack-executor-plugin
              key: URL

请参考Kubernetes Secret文档了解密钥管理的最佳实践与安全注意事项。

资源与安全上下文

我们将这些设为必选项,这样没人能创建一个占用不合理内存量的插件,或以root身份运行,除非他们有意这么做:

spec:
  sidecar:
    container:
      resources:
        requests:
          cpu: 100m
          memory: 32Mi
        limits:
          cpu: 200m
          memory: 64Mi
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000

失败

插件可能会以下列方式失败:

  • 连接/套接字错误 - 视为暂时性问题。
  • 超时 - 被视为暂时性问题。
  • 404错误 - 插件不支持该方法,因此该方法(在同一工作流中)将不会被再次调用。
  • 503错误 - 视为暂时性错误。
  • 其他4xx/5xx错误 - 视为致命错误。

临时性错误会自动重试,其他所有错误均视为致命错误。

致命错误将导致步骤失败。

重新排队

有时插件可能无法立即完成任务。例如,它启动了一个长时间运行的任务。当这种情况发生时,您需要返回"Pending"或"Running"状态以及重新排队的时间:

{
  "node": {
    "phase": "Running",
    "message": "Long-running task started"
  },
  "requeue": "2m"
}

在这个示例中,任务将被重新排队,template.execute将在2分钟后再次被调用。

调试

您可以在智能体Pod的边车容器中找到该插件的日志,例如:

kubectl -n argo logs ${agentPodName} -c hello-executor-plugin

插件列表

由于插件只是配置映射,您可以使用kubectl列出它们:

kubectl get cm -l workflows.argoproj.io/configmap-type=ExecutorPlugin

示例与社区贡献插件

插件目录

发布您的插件

如果您希望发布并分享您的插件(我们非常鼓励这样做!),请提交一个拉取请求将其添加到上述目录中。


有问题吗?

Search on GitHub Discussions and Slack.