构建一个更高级的机器学习管道

创建一个更高级的管道,利用额外的KFP功能。

这一步演示了如何构建一个更高级的机器学习(ML)管道,该管道利用了额外的KFP管道组合特性。

以下ML管道创建一个数据集,作为预处理步骤对数据集进行特征归一化,并使用不同的超参数在数据上训练一个简单的ML模型:

from typing import List

from kfp import client
from kfp import dsl
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import Model
from kfp.dsl import Output


@dsl.component(packages_to_install=['pandas==1.3.5'])
def create_dataset(iris_dataset: Output[Dataset]):
    import pandas as pd

    csv_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data'
    col_names = [
        'Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width', 'Labels'
    ]
    df = pd.read_csv(csv_url, names=col_names)

    with open(iris_dataset.path, 'w') as f:
        df.to_csv(f)


@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2'])
def normalize_dataset(
    input_iris_dataset: Input[Dataset],
    normalized_iris_dataset: Output[Dataset],
    standard_scaler: bool,
    min_max_scaler: bool,
):
    if standard_scaler is min_max_scaler:
        raise ValueError(
            'Exactly one of standard_scaler or min_max_scaler must be True.')

    import pandas as pd
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.preprocessing import StandardScaler

    with open(input_iris_dataset.path) as f:
        df = pd.read_csv(f)
    labels = df.pop('Labels')

    if standard_scaler:
        scaler = StandardScaler()
    if min_max_scaler:
        scaler = MinMaxScaler()

    df = pd.DataFrame(scaler.fit_transform(df))
    df['Labels'] = labels
    with open(normalized_iris_dataset.path, 'w') as f:
        df.to_csv(f)


@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2'])
def train_model(
    normalized_iris_dataset: Input[Dataset],
    model: Output[Model],
    n_neighbors: int,
):
    import pickle

    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.neighbors import KNeighborsClassifier

    with open(normalized_iris_dataset.path) as f:
        df = pd.read_csv(f)

    y = df.pop('Labels')
    X = df

    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)

    clf = KNeighborsClassifier(n_neighbors=n_neighbors)
    clf.fit(X_train, y_train)
    with open(model.path, 'wb') as f:
        pickle.dump(clf, f)


@dsl.pipeline(name='iris-training-pipeline')
def my_pipeline(
    standard_scaler: bool,
    min_max_scaler: bool,
    neighbors: List[int],
):
    create_dataset_task = create_dataset()

    normalize_dataset_task = normalize_dataset(
        input_iris_dataset=create_dataset_task.outputs['iris_dataset'],
        standard_scaler=True,
        min_max_scaler=False)

    with dsl.ParallelFor(neighbors) as n_neighbors:
        train_model(
            normalized_iris_dataset=normalize_dataset_task
            .outputs['normalized_iris_dataset'],
            n_neighbors=n_neighbors)


endpoint = '<KFP_UI_URL>'
kfp_client = client.Client(host=endpoint)
run = kfp_client.create_run_from_pipeline_func(
    my_pipeline,
    arguments={
        'min_max_scaler': True,
        'standard_scaler': False,
        'neighbors': [3, 6, 9]
    },
)
url = f'{endpoint}/#/runs/details/{run.run_id}'
print(url)

本示例介绍了管道中的以下新功能:

  • 在组件运行时,使用 packages_to_install 参数添加一些要安装的 Python ,如下所示:

    @dsl.component(packages_to_install=['pandas==1.3.5'])

    要在安装库后使用它,您必须在组件函数的范围内包含其导入语句,以便在组件运行时导入该库。

  • 输入和输出工件 的类型为 DatasetModel 在组件签名中引入,以描述组件的输入和输出工件。这是通过类型注解泛型 Input[]Output[] 分别用于输入和输出工件来完成的。

    在组件的范围内,工件可以通过 .path 属性读取(对于输入)和写入(对于输出)。KFP 后端确保在运行时将 输入 工件文件从远程存储复制 正在执行的 pod 的本地文件系统,以便组件函数可以从本地文件系统读取输入工件。相比之下,输出 工件文件在组件完成运行时从 pod 的本地文件系统复制 远程存储。通过这种方式,输出工件在 pod 外部保持持久。在这两种情况下,组件作者只需要与本地文件系统交互,以创建持久工件。

    带有 Output[] 注释的参数的参数值不会由管道作者传递给组件。KFP 后端在组件运行时传递此工件,以便组件作者不需要担心输出工件写入的路径。在输出工件写入后,执行组件的后端识别 KFP 工件类型(DatasetModel),并在仪表板上整理它们。

    输出工件可以通过源任务的 .outputs 属性和输出工件参数名称作为下游组件的输入进行传递,如下所示:

    create_dataset_task.outputs['iris_dataset']

  • 其中一个DSL 控制流特性dsl.ParallelFor,被使用。它是一个上下文管理器,让管道作者创建任务。这些任务在循环中并行执行。使用dsl.ParallelFor迭代neighbors管道参数可以让你使用不同的参数执行train_model组件,并在一次管道运行中测试多个超参数。其他控制流特性包括dsl.Conditiondsl.ExitHandler

恭喜!您现在拥有一个KFP部署,一个端到端的机器学习管道,以及对用户界面的介绍。这只是KFP管道和仪表板功能的开始。

反馈

此页面有帮助吗?