执行器插件¶
v3.3 及更高版本
配置¶
插件默认处于禁用状态。如需启用,请使用ARGO_EXECUTOR_PLUGINS=true启动控制器,例如:
apiVersion: apps/v1
kind: Deployment
metadata:
name: workflow-controller
spec:
template:
spec:
containers:
- name: workflow-controller
env:
- name: ARGO_EXECUTOR_PLUGINS
value: "true"
使用Helm chart时,请将此添加到您的values.yaml中:
controller:
extraEnv:
- name: ARGO_EXECUTOR_PLUGINS
value: "true"
模板执行器¶
这是一个运行自定义"插件"模板的插件,例如用于非Pod任务,如Tekton构建、Spark作业、发送Slack通知等。
一个简单的Python插件¶
让我们创建一个Python插件,每次操作工作流时打印"hello"。
我们需要以下内容:
- 已启用插件(见上文)。
- 一个HTTP服务器,它将作为主容器的边车运行,并通过此API契约响应来自执行器的RPC HTTP请求。
- 一个
plugin.yaml配置文件,该文件会被转换为配置映射(config map),以便控制器能够发现该插件。
模板执行器插件在 /api/v1/template.execute 上提供HTTP POST请求服务:
curl http://localhost:4355/api/v1/template.execute -d \
'{
"workflow": {
"metadata": {
"name": "my-wf"
}
},
"template": {
"name": "my-tmpl",
"inputs": {},
"outputs": {},
"plugin": {
"hello": {}
}
}
}'
# ...
HTTP/1.1 200 OK
{
"node": {
"phase": "Succeeded",
"message": "Hello template!"
}
}
提示:端口号可以是任意数字,但不能与其他插件冲突。不要使用常见端口如80、443、8080、8081、8443。如果您计划发布插件,请选择10,000以下的随机端口号并提交PR来添加您的插件。如果不发布,请使用大于10,000的端口号。
我们需要创建一个启动HTTP服务器的脚本。将其保存为server.py:
import json
from http.server import BaseHTTPRequestHandler, HTTPServer
with open("/var/run/argo/token") as f:
token = f.read().strip()
class Plugin(BaseHTTPRequestHandler):
def args(self):
return json.loads(self.rfile.read(int(self.headers.get('Content-Length'))))
def reply(self, reply):
self.send_response(200)
self.end_headers()
self.wfile.write(json.dumps(reply).encode("UTF-8"))
def forbidden(self):
self.send_response(403)
self.end_headers()
def unsupported(self):
self.send_response(404)
self.end_headers()
def do_POST(self):
if self.headers.get("Authorization") != "Bearer " + token:
self.forbidden()
elif self.path == '/api/v1/template.execute':
args = self.args()
if 'hello' in args['template'].get('plugin', {}):
self.reply(
{'node': {'phase': 'Succeeded', 'message': 'Hello template!',
'outputs': {'parameters': [{'name': 'foo', 'value': 'bar'}]}}})
else:
self.reply({})
else:
self.unsupported()
if __name__ == '__main__':
httpd = HTTPServer(('', 4355), Plugin)
httpd.serve_forever()
提示: 插件可以用任何能作为容器运行的语言编写。Python很方便,因为您可以将脚本嵌入到容器中。
这里需要注意以下几点:
- 您只需实现所需的调用。返回404状态码后,该调用将不再被执行。
- 路径即为RPC方法名称。
- 您应检查
Authorization请求头是否包含与/var/run/argo/token相同的值。若不一致则返回403状态码 - 请求体包含模板的输入参数。
- 响应体可能包含节点的结果,包括阶段(例如"Succeeded"或"Failed")和消息。
- 如果响应是
{},则表示插件无法执行该插件模板,例如它是一个Slack插件,但模板是一个Tekton作业。 - 如果状态码为404,则不会再次调用该插件。
- 如果将文件保存为
server.*,它将被复制到sidecar容器的args字段中。这对于用Python或Node.JS等脚本语言构建自包含插件非常有用。
接下来,创建一个名为 plugin.yaml 的清单文件:
apiVersion: argoproj.io/v1alpha1
kind: ExecutorPlugin
metadata:
name: hello
spec:
sidecar:
container:
command:
- python
- -u # disables output buffering
- -c
image: python:alpine3.6
name: hello-executor-plugin
ports:
- containerPort: 4355
securityContext:
runAsNonRoot: true
runAsUser: 65534 # nobody
resources:
requests:
memory: "64Mi"
cpu: "250m"
limits:
memory: "128Mi"
cpu: "500m"
按以下步骤构建和安装:
argo executor-plugin build .
kubectl -n argo apply -f hello-executor-plugin-configmap.yaml
检查您的控制器日志:
level=info msg="Executor plugin added" name=hello-controller-plugin
运行此工作流。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-
spec:
entrypoint: main
templates:
- name: main
plugin:
hello: { }
您将看到工作流成功完成。
发现¶
当工作流运行时,插件将从以下位置加载:
- 工作流的命名空间。
- Argo的安装命名空间(通常为
argo)。
如果两个插件名称相同,则仅加载工作流命名空间中的那个。
密钥¶
如果您需要与第三方系统交互,您将需要访问密钥。不要将它们放在plugin.yaml中。请使用密钥:
spec:
sidecar:
container:
env:
- name: URL
valueFrom:
secretKeyRef:
name: slack-executor-plugin
key: URL
请参考Kubernetes Secret文档了解密钥管理的最佳实践与安全注意事项。
资源与安全上下文¶
我们将这些设为必选项,这样没人能创建一个占用不合理内存量的插件,或以root身份运行,除非他们有意这么做:
spec:
sidecar:
container:
resources:
requests:
cpu: 100m
memory: 32Mi
limits:
cpu: 200m
memory: 64Mi
securityContext:
runAsNonRoot: true
runAsUser: 1000
失败¶
插件可能会以下列方式失败:
- 连接/套接字错误 - 视为暂时性问题。
- 超时 - 被视为暂时性问题。
- 404错误 - 插件不支持该方法,因此该方法(在同一工作流中)将不会被再次调用。
- 503错误 - 视为暂时性错误。
- 其他4xx/5xx错误 - 视为致命错误。
临时性错误会自动重试,其他所有错误均视为致命错误。
致命错误将导致步骤失败。
重新排队¶
有时插件可能无法立即完成任务。例如,它启动了一个长时间运行的任务。当这种情况发生时,您需要返回"Pending"或"Running"状态以及重新排队的时间:
{
"node": {
"phase": "Running",
"message": "Long-running task started"
},
"requeue": "2m"
}
在这个示例中,任务将被重新排队,template.execute将在2分钟后再次被调用。
调试¶
您可以在智能体Pod的边车容器中找到该插件的日志,例如:
kubectl -n argo logs ${agentPodName} -c hello-executor-plugin
插件列表¶
由于插件只是配置映射,您可以使用kubectl列出它们:
kubectl get cm -l workflows.argoproj.io/configmap-type=ExecutorPlugin
示例与社区贡献插件¶
发布您的插件¶
如果您希望发布并分享您的插件(我们非常鼓励这样做!),请提交一个拉取请求将其添加到上述目录中。