将SDK连接到API

学习如何将Kubeflow Pipelines SDK连接到API。

概述

Kubeflow Pipelines SDK 提供了一个 Python 接口,用于与 Kubeflow Pipelines API 进行交互。 本指南将向您展示如何在各种场景中将 SDK 连接到 Pipelines API。

Kubeflow平台

当在多用户 Kubeflow Platform 中运行 Kubeflow Pipelines 时,您如何验证 Pipelines SDK 取决于您是在集群内部 还是外部 运行代码。

Kubeflow 平台 - 集群内部

Click to expand

一个 ServiceAccount token volume 可以挂载到与 Kubeflow Pipelines 运行在同一集群中的 Pod。Kubeflow Pipelines SDK 可以使用此令牌与 Kubeflow Pipelines API 进行身份验证。

以下Python代码将使用ServiceAccount令牌进行身份验证创建一个 kfp.Client()

import kfp

# by default, when run from inside a Kubernetes cluster:
#  - the token is read from the `KF_PIPELINES_SA_TOKEN_PATH` path
#  - the host is set to `http://ml-pipeline-ui.kubeflow.svc.cluster.local`
kfp_client = kfp.Client()

# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace="my-profile")
print(experiments)

服务账户令牌卷

要使用前面的代码,您需要从一个挂载了ServiceAccount令牌卷的Pod中运行它。您可以手动向您的PodSpec添加一个 volumevolumeMount,或者使用Kubeflow的 PodDefaults 来注入所需的卷。

选项 1 - 手动向您的 PodSpec 添加一个卷:

apiVersion: v1
kind: Pod
metadata:
  name: access-kfp-example
spec:
  containers:
  - image: hello-world:latest
    name: hello-world
    env:
      - ## this environment variable is automatically read by `kfp.Client()`
        ## this is the default value, but we show it here for clarity
        name: KF_PIPELINES_SA_TOKEN_PATH
        value: /var/run/secrets/kubeflow/pipelines/token
    volumeMounts:
      - mountPath: /var/run/secrets/kubeflow/pipelines
        name: volume-kf-pipeline-token
        readOnly: true
  volumes:
    - name: volume-kf-pipeline-token
      projected:
        sources:
          - serviceAccountToken:
              path: token
              expirationSeconds: 7200
              ## defined by the `TOKEN_REVIEW_AUDIENCE` environment variable on the `ml-pipeline` deployment
              audience: pipelines.kubeflow.org      

选项 2 - 使用 PodDefault 来注入卷:

apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
  name: access-ml-pipeline
  namespace: "<YOUR_USER_PROFILE_NAMESPACE>"
spec:
  desc: Allow access to Kubeflow Pipelines
  selector:
    matchLabels:
      access-ml-pipeline: "true"
  env:
    - ## this environment variable is automatically read by `kfp.Client()`
      ## this is the default value, but we show it here for clarity
      name: KF_PIPELINES_SA_TOKEN_PATH
      value: /var/run/secrets/kubeflow/pipelines/token
  volumes:
    - name: volume-kf-pipeline-token
      projected:
        sources:
          - serviceAccountToken:
              path: token
              expirationSeconds: 7200
              ## defined by the `TOKEN_REVIEW_AUDIENCE` environment variable on the `ml-pipeline` deployment
              audience: pipelines.kubeflow.org      
  volumeMounts:
    - mountPath: /var/run/secrets/kubeflow/pipelines
      name: volume-kf-pipeline-token
      readOnly: true

RBAC 授权

Kubeflow Pipelines API 尊重 Kubernetes RBAC,并将在允许服务账户进行 Pipelines API 操作之前检查分配给服务账户的 RoleBindings。

例如,该 RoleBinding 允许在 namespace-2 中具有 default-editor ServiceAccount 的 Pods 管理 namespace-1 中的 Kubeflow Pipelines:

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: allow-namespace-2-kubeflow-edit
  ## this RoleBinding is in `namespace-1`, because it grants access to `namespace-1`
  namespace: namespace-1
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: kubeflow-edit
subjects:
  - kind: ServiceAccount
    name: default-editor
    ## the ServiceAccount lives in `namespace-2`
    namespace: namespace-2

Kubeflow 平台 - 集群外部

Click to expand

集群外部进行身份验证的具体方法将取决于您是如何部署 Kubeflow 平台的。因为大多数发行版使用Dex作为身份提供者,所以这个例子将向您展示如何使用Python脚本通过Dex进行身份验证。

您需要使Kubeflow Pipelines API在远程机器上可访问。 如果您的Kubeflow Istio网关已经暴露,请跳过此步骤并直接使用该URL。

以下命令将把 istio-ingressgateway 服务暴露在 localhost:8080

# TIP: svc/istio-ingressgateway may be called something else, 
#      or use different ports in your distribution
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 8080:80

以下Python代码定义了一个 KFPClientManager() 类,通过与Dex交互创建一个经过身份验证的 kfp.Client()

import re
from urllib.parse import urlsplit, urlencode

import kfp
import requests
import urllib3


class KFPClientManager:
    """
    A class that creates `kfp.Client` instances with Dex authentication.
    """

    def __init__(
        self,
        api_url: str,
        dex_username: str,
        dex_password: str,
        dex_auth_type: str = "local",
        skip_tls_verify: bool = False,
    ):
        """
        Initialize the KfpClient

        :param api_url: the Kubeflow Pipelines API URL
        :param skip_tls_verify: if True, skip TLS verification
        :param dex_username: the Dex username
        :param dex_password: the Dex password
        :param dex_auth_type: the auth type to use if Dex has multiple enabled, one of: ['ldap', 'local']
        """
        self._api_url = api_url
        self._skip_tls_verify = skip_tls_verify
        self._dex_username = dex_username
        self._dex_password = dex_password
        self._dex_auth_type = dex_auth_type
        self._client = None

        # disable SSL verification, if requested
        if self._skip_tls_verify:
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        # ensure `dex_default_auth_type` is valid
        if self._dex_auth_type not in ["ldap", "local"]:
            raise ValueError(
                f"Invalid `dex_auth_type` '{self._dex_auth_type}', must be one of: ['ldap', 'local']"
            )

    def _get_session_cookies(self) -> str:
        """
        Get the session cookies by authenticating against Dex
        :return: a string of session cookies in the form "key1=value1; key2=value2"
        """

        # use a persistent session (for cookies)
        s = requests.Session()

        # GET the api_url, which should redirect to Dex
        resp = s.get(
            self._api_url, allow_redirects=True, verify=not self._skip_tls_verify
        )
        if resp.status_code == 200:
            pass
        elif resp.status_code == 403:
            # if we get 403, we might be at the oauth2-proxy sign-in page
            # the default path to start the sign-in flow is `/oauth2/start?rd=<url>`
            url_obj = urlsplit(resp.url)
            url_obj = url_obj._replace(
                path="/oauth2/start", query=urlencode({"rd": url_obj.path})
            )
            resp = s.get(
                url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
            )
        else:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for GET against: {self._api_url}"
            )

        # if we were NOT redirected, then the endpoint is unsecured
        if len(resp.history) == 0:
            # no cookies are needed
            return ""

        # if we are at `../auth` path, we need to select an auth type
        url_obj = urlsplit(resp.url)
        if re.search(r"/auth$", url_obj.path):
            url_obj = url_obj._replace(
                path=re.sub(r"/auth$", f"/auth/{self._dex_auth_type}", url_obj.path)
            )

        # if we are at `../auth/xxxx/login` path, then we are at the login page
        if re.search(r"/auth/.*/login$", url_obj.path):
            dex_login_url = url_obj.geturl()
        else:
            # otherwise, we need to follow a redirect to the login page
            resp = s.get(
                url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
            )
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for GET against: {url_obj.geturl()}"
                )
            dex_login_url = resp.url

        # attempt Dex login
        resp = s.post(
            dex_login_url,
            data={"login": self._dex_username, "password": self._dex_password},
            allow_redirects=True,
            verify=not self._skip_tls_verify,
        )
        if resp.status_code != 200:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for POST against: {dex_login_url}"
            )

        # if we were NOT redirected, then the login credentials were probably invalid
        if len(resp.history) == 0:
            raise RuntimeError(
                f"Login credentials are probably invalid - "
                f"No redirect after POST to: {dex_login_url}"
            )

        # if we are at `../approval` path, we need to approve the login
        url_obj = urlsplit(resp.url)
        if re.search(r"/approval$", url_obj.path):
            dex_approval_url = url_obj.geturl()

            # approve the login
            resp = s.post(
                dex_approval_url,
                data={"approval": "approve"},
                allow_redirects=True,
                verify=not self._skip_tls_verify,
            )
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for POST against: {url_obj.geturl()}"
                )

        return "; ".join([f"{c.name}={c.value}" for c in s.cookies])

    def _create_kfp_client(self) -> kfp.Client:
        try:
            session_cookies = self._get_session_cookies()
        except Exception as ex:
            raise RuntimeError(f"Failed to get Dex session cookies") from ex

        # monkey patch the kfp.Client to support disabling SSL verification
        # kfp only added support in v2: https://github.com/kubeflow/pipelines/pull/7174
        original_load_config = kfp.Client._load_config

        def patched_load_config(client_self, *args, **kwargs):
            config = original_load_config(client_self, *args, **kwargs)
            config.verify_ssl = not self._skip_tls_verify
            return config

        patched_kfp_client = kfp.Client
        patched_kfp_client._load_config = patched_load_config

        return patched_kfp_client(
            host=self._api_url,
            cookies=session_cookies,
        )

    def create_kfp_client(self) -> kfp.Client:
        """Get a newly authenticated Kubeflow Pipelines client."""
        return self._create_kfp_client()

以下Python代码演示了如何使用KFPClientManager()类来创建kfp.Client()

# initialize a KFPClientManager
kfp_client_manager = KFPClientManager(
    api_url="http://localhost:8080/pipeline",
    skip_tls_verify=True,

    dex_username="user@example.com",
    dex_password="12341234",

    # can be 'ldap' or 'local' depending on your Dex configuration
    dex_auth_type="local",
)

# get a newly authenticated KFP client
# TIP: long-lived sessions might need to get a new client when their session expires
kfp_client = kfp_client_manager.create_kfp_client()

# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace="my-profile")
print(experiments)

独立的 Kubeflow Pipelines

当在独立模式下运行Kubeflow Pipelines时,将没有多用户身份验证或RBAC的概念。具体步骤将取决于您是在集群内部还是外部运行代码。

独立 KFP - 在集群内

Click to expand

当在Kubernetes集群内部运行时,您可以通过集群内部服务DNS解析直接将Pipelines SDK连接到ml-pipeline-ui服务。

当在与 Kubeflow 相同的命名空间 中运行时:

import kfp

client = kfp.Client(host="http://ml-pipeline-ui:80")

print(client.list_experiments())

当在与 Kubeflow 不同的 命名空间 中运行时:

import kfp

# the namespace in which you deployed Kubeflow Pipelines
namespace = "kubeflow" 

client = kfp.Client(host=f"http://ml-pipeline-ui.{namespace}")

print(client.list_experiments())

独立的 KFP - 在集群外

Click to expand

当在Kubernetes集群外运行时,您可以通过使用 kubectl port-forwarding 将Pipelines SDK连接到 ml-pipeline-ui 服务。

步骤 1: 在您的外部系统上运行以下命令以启动端口转发:

# change `--namespace` if you deployed Kubeflow Pipelines into a different namespace
kubectl port-forward --namespace kubeflow svc/ml-pipeline-ui 3000:80

步骤 2:以下代码将针对您的端口转发的 ml-pipeline-ui 服务创建一个 kfp.Client()

import kfp

client = kfp.Client(host="http://localhost:3000")

print(client.list_experiments())

反馈

此页面有帮助吗?