将SDK连接到API
概述
Kubeflow Pipelines SDK 提供了一个 Python 接口,用于与 Kubeflow Pipelines API 进行交互。 本指南将向您展示如何在各种场景中将 SDK 连接到 Pipelines API。
Kubeflow平台
当在多用户 Kubeflow Platform 中运行 Kubeflow Pipelines 时,您如何验证 Pipelines SDK 取决于您是在集群内部 还是外部 运行代码。
Kubeflow 平台 - 集群内部
Click to expand
一个 ServiceAccount token volume 可以挂载到与 Kubeflow Pipelines 运行在同一集群中的 Pod。Kubeflow Pipelines SDK 可以使用此令牌与 Kubeflow Pipelines API 进行身份验证。
以下Python代码将使用ServiceAccount令牌进行身份验证创建一个 kfp.Client():
import kfp
# by default, when run from inside a Kubernetes cluster:
# - the token is read from the `KF_PIPELINES_SA_TOKEN_PATH` path
# - the host is set to `http://ml-pipeline-ui.kubeflow.svc.cluster.local`
kfp_client = kfp.Client()
# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace="my-profile")
print(experiments)
服务账户令牌卷
要使用前面的代码,您需要从一个挂载了ServiceAccount令牌卷的Pod中运行它。您可以手动向您的PodSpec添加一个 volume 和 volumeMount,或者使用Kubeflow的 PodDefaults 来注入所需的卷。
选项 1 - 手动向您的 PodSpec 添加一个卷:
apiVersion: v1
kind: Pod
metadata:
name: access-kfp-example
spec:
containers:
- image: hello-world:latest
name: hello-world
env:
- ## this environment variable is automatically read by `kfp.Client()`
## this is the default value, but we show it here for clarity
name: KF_PIPELINES_SA_TOKEN_PATH
value: /var/run/secrets/kubeflow/pipelines/token
volumeMounts:
- mountPath: /var/run/secrets/kubeflow/pipelines
name: volume-kf-pipeline-token
readOnly: true
volumes:
- name: volume-kf-pipeline-token
projected:
sources:
- serviceAccountToken:
path: token
expirationSeconds: 7200
## defined by the `TOKEN_REVIEW_AUDIENCE` environment variable on the `ml-pipeline` deployment
audience: pipelines.kubeflow.org
选项 2 - 使用 PodDefault 来注入卷:
apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
name: access-ml-pipeline
namespace: "<YOUR_USER_PROFILE_NAMESPACE>"
spec:
desc: Allow access to Kubeflow Pipelines
selector:
matchLabels:
access-ml-pipeline: "true"
env:
- ## this environment variable is automatically read by `kfp.Client()`
## this is the default value, but we show it here for clarity
name: KF_PIPELINES_SA_TOKEN_PATH
value: /var/run/secrets/kubeflow/pipelines/token
volumes:
- name: volume-kf-pipeline-token
projected:
sources:
- serviceAccountToken:
path: token
expirationSeconds: 7200
## defined by the `TOKEN_REVIEW_AUDIENCE` environment variable on the `ml-pipeline` deployment
audience: pipelines.kubeflow.org
volumeMounts:
- mountPath: /var/run/secrets/kubeflow/pipelines
name: volume-kf-pipeline-token
readOnly: true
提示
PodDefaults是有命名空间的资源,因此您需要在每个 KubeflowProfile命名空间中创建一个。- Notebook Spawner 用户界面将能够识别用户命名空间中的任何
PodDefaults(它们可以在“配置”部分下选择)。
RBAC 授权
Kubeflow Pipelines API 尊重 Kubernetes RBAC,并将在允许服务账户进行 Pipelines API 操作之前检查分配给服务账户的 RoleBindings。
例如,该 RoleBinding 允许在 namespace-2 中具有 default-editor ServiceAccount 的 Pods 管理 namespace-1 中的 Kubeflow Pipelines:
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: allow-namespace-2-kubeflow-edit
## this RoleBinding is in `namespace-1`, because it grants access to `namespace-1`
namespace: namespace-1
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: kubeflow-edit
subjects:
- kind: ServiceAccount
name: default-editor
## the ServiceAccount lives in `namespace-2`
namespace: namespace-2
提示
- 检查名为
aggregate-to-kubeflow-pipelines-edit的 ClusterRole,以获取一些重要的pipelines.kubeflow.orgRBAC 动词的列表。 - Kubeflow Notebooks pods 默认以
default-editorServiceAccount 运行,因此default-editor的 RoleBindings 适用于它们,并赋予它们在自己的命名空间中提交管道的权限。 - 有关个人资料的更多信息,请参见管理个人资料贡献者指南。
Kubeflow 平台 - 集群外部
Click to expand
Kubeflow 笔记本
As Kubeflow Notebooks run on Pods inside the cluster, they can NOT use the following method to authenticate the Pipelines SDK, see the 集群内部 method.从集群外部进行身份验证的具体方法将取决于您是如何部署 Kubeflow 平台的。因为大多数发行版使用Dex作为身份提供者,所以这个例子将向您展示如何使用Python脚本通过Dex进行身份验证。
您需要使Kubeflow Pipelines API在远程机器上可访问。 如果您的Kubeflow Istio网关已经暴露,请跳过此步骤并直接使用该URL。
以下命令将把 istio-ingressgateway 服务暴露在 localhost:8080:
# TIP: svc/istio-ingressgateway may be called something else,
# or use different ports in your distribution
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 8080:80
以下Python代码定义了一个 KFPClientManager() 类,通过与Dex交互创建一个经过身份验证的 kfp.Client():
import re
from urllib.parse import urlsplit, urlencode
import kfp
import requests
import urllib3
class KFPClientManager:
"""
A class that creates `kfp.Client` instances with Dex authentication.
"""
def __init__(
self,
api_url: str,
dex_username: str,
dex_password: str,
dex_auth_type: str = "local",
skip_tls_verify: bool = False,
):
"""
Initialize the KfpClient
:param api_url: the Kubeflow Pipelines API URL
:param skip_tls_verify: if True, skip TLS verification
:param dex_username: the Dex username
:param dex_password: the Dex password
:param dex_auth_type: the auth type to use if Dex has multiple enabled, one of: ['ldap', 'local']
"""
self._api_url = api_url
self._skip_tls_verify = skip_tls_verify
self._dex_username = dex_username
self._dex_password = dex_password
self._dex_auth_type = dex_auth_type
self._client = None
# disable SSL verification, if requested
if self._skip_tls_verify:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ensure `dex_default_auth_type` is valid
if self._dex_auth_type not in ["ldap", "local"]:
raise ValueError(
f"Invalid `dex_auth_type` '{self._dex_auth_type}', must be one of: ['ldap', 'local']"
)
def _get_session_cookies(self) -> str:
"""
Get the session cookies by authenticating against Dex
:return: a string of session cookies in the form "key1=value1; key2=value2"
"""
# use a persistent session (for cookies)
s = requests.Session()
# GET the api_url, which should redirect to Dex
resp = s.get(
self._api_url, allow_redirects=True, verify=not self._skip_tls_verify
)
if resp.status_code == 200:
pass
elif resp.status_code == 403:
# if we get 403, we might be at the oauth2-proxy sign-in page
# the default path to start the sign-in flow is `/oauth2/start?rd=<url>`
url_obj = urlsplit(resp.url)
url_obj = url_obj._replace(
path="/oauth2/start", query=urlencode({"rd": url_obj.path})
)
resp = s.get(
url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
)
else:
raise RuntimeError(
f"HTTP status code '{resp.status_code}' for GET against: {self._api_url}"
)
# if we were NOT redirected, then the endpoint is unsecured
if len(resp.history) == 0:
# no cookies are needed
return ""
# if we are at `../auth` path, we need to select an auth type
url_obj = urlsplit(resp.url)
if re.search(r"/auth$", url_obj.path):
url_obj = url_obj._replace(
path=re.sub(r"/auth$", f"/auth/{self._dex_auth_type}", url_obj.path)
)
# if we are at `../auth/xxxx/login` path, then we are at the login page
if re.search(r"/auth/.*/login$", url_obj.path):
dex_login_url = url_obj.geturl()
else:
# otherwise, we need to follow a redirect to the login page
resp = s.get(
url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
)
if resp.status_code != 200:
raise RuntimeError(
f"HTTP status code '{resp.status_code}' for GET against: {url_obj.geturl()}"
)
dex_login_url = resp.url
# attempt Dex login
resp = s.post(
dex_login_url,
data={"login": self._dex_username, "password": self._dex_password},
allow_redirects=True,
verify=not self._skip_tls_verify,
)
if resp.status_code != 200:
raise RuntimeError(
f"HTTP status code '{resp.status_code}' for POST against: {dex_login_url}"
)
# if we were NOT redirected, then the login credentials were probably invalid
if len(resp.history) == 0:
raise RuntimeError(
f"Login credentials are probably invalid - "
f"No redirect after POST to: {dex_login_url}"
)
# if we are at `../approval` path, we need to approve the login
url_obj = urlsplit(resp.url)
if re.search(r"/approval$", url_obj.path):
dex_approval_url = url_obj.geturl()
# approve the login
resp = s.post(
dex_approval_url,
data={"approval": "approve"},
allow_redirects=True,
verify=not self._skip_tls_verify,
)
if resp.status_code != 200:
raise RuntimeError(
f"HTTP status code '{resp.status_code}' for POST against: {url_obj.geturl()}"
)
return "; ".join([f"{c.name}={c.value}" for c in s.cookies])
def _create_kfp_client(self) -> kfp.Client:
try:
session_cookies = self._get_session_cookies()
except Exception as ex:
raise RuntimeError(f"Failed to get Dex session cookies") from ex
# monkey patch the kfp.Client to support disabling SSL verification
# kfp only added support in v2: https://github.com/kubeflow/pipelines/pull/7174
original_load_config = kfp.Client._load_config
def patched_load_config(client_self, *args, **kwargs):
config = original_load_config(client_self, *args, **kwargs)
config.verify_ssl = not self._skip_tls_verify
return config
patched_kfp_client = kfp.Client
patched_kfp_client._load_config = patched_load_config
return patched_kfp_client(
host=self._api_url,
cookies=session_cookies,
)
def create_kfp_client(self) -> kfp.Client:
"""Get a newly authenticated Kubeflow Pipelines client."""
return self._create_kfp_client()
以下Python代码演示了如何使用KFPClientManager()类来创建kfp.Client():
# initialize a KFPClientManager
kfp_client_manager = KFPClientManager(
api_url="http://localhost:8080/pipeline",
skip_tls_verify=True,
dex_username="user@example.com",
dex_password="12341234",
# can be 'ldap' or 'local' depending on your Dex configuration
dex_auth_type="local",
)
# get a newly authenticated KFP client
# TIP: long-lived sessions might need to get a new client when their session expires
kfp_client = kfp_client_manager.create_kfp_client()
# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace="my-profile")
print(experiments)
独立的 Kubeflow Pipelines
当在独立模式下运行Kubeflow Pipelines时,将没有多用户身份验证或RBAC的概念。具体步骤将取决于您是在集群内部还是外部运行代码。
独立 KFP - 在集群内
Click to expand
当在Kubernetes集群内部运行时,您可以通过集群内部服务DNS解析直接将Pipelines SDK连接到ml-pipeline-ui服务。
当在与 Kubeflow 相同的命名空间 中运行时:
import kfp
client = kfp.Client(host="http://ml-pipeline-ui:80")
print(client.list_experiments())
当在与 Kubeflow 不同的 命名空间 中运行时:
import kfp
# the namespace in which you deployed Kubeflow Pipelines
namespace = "kubeflow"
client = kfp.Client(host=f"http://ml-pipeline-ui.{namespace}")
print(client.list_experiments())
独立的 KFP - 在集群外
Click to expand
当在Kubernetes集群外运行时,您可以通过使用 kubectl port-forwarding 将Pipelines SDK连接到 ml-pipeline-ui 服务。
步骤 1: 在您的外部系统上运行以下命令以启动端口转发:
# change `--namespace` if you deployed Kubeflow Pipelines into a different namespace
kubectl port-forward --namespace kubeflow svc/ml-pipeline-ui 3000:80
步骤 2:以下代码将针对您的端口转发的 ml-pipeline-ui 服务创建一个 kfp.Client():
import kfp
client = kfp.Client(host="http://localhost:3000")
print(client.list_experiments())