构建一个更高级的机器学习管道
这一步演示了如何构建一个更高级的机器学习(ML)管道,该管道利用了额外的KFP管道组合特性。
以下ML管道创建一个数据集,作为预处理步骤对数据集进行特征归一化,并使用不同的超参数在数据上训练一个简单的ML模型:
from typing import List
from kfp import client
from kfp import dsl
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import Model
from kfp.dsl import Output
@dsl.component(packages_to_install=['pandas==1.3.5'])
def create_dataset(iris_dataset: Output[Dataset]):
import pandas as pd
csv_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data'
col_names = [
'Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width', 'Labels'
]
df = pd.read_csv(csv_url, names=col_names)
with open(iris_dataset.path, 'w') as f:
df.to_csv(f)
@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2'])
def normalize_dataset(
input_iris_dataset: Input[Dataset],
normalized_iris_dataset: Output[Dataset],
standard_scaler: bool,
min_max_scaler: bool,
):
if standard_scaler is min_max_scaler:
raise ValueError(
'Exactly one of standard_scaler or min_max_scaler must be True.')
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler
with open(input_iris_dataset.path) as f:
df = pd.read_csv(f)
labels = df.pop('Labels')
if standard_scaler:
scaler = StandardScaler()
if min_max_scaler:
scaler = MinMaxScaler()
df = pd.DataFrame(scaler.fit_transform(df))
df['Labels'] = labels
with open(normalized_iris_dataset.path, 'w') as f:
df.to_csv(f)
@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2'])
def train_model(
normalized_iris_dataset: Input[Dataset],
model: Output[Model],
n_neighbors: int,
):
import pickle
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
with open(normalized_iris_dataset.path) as f:
df = pd.read_csv(f)
y = df.pop('Labels')
X = df
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
clf = KNeighborsClassifier(n_neighbors=n_neighbors)
clf.fit(X_train, y_train)
with open(model.path, 'wb') as f:
pickle.dump(clf, f)
@dsl.pipeline(name='iris-training-pipeline')
def my_pipeline(
standard_scaler: bool,
min_max_scaler: bool,
neighbors: List[int],
):
create_dataset_task = create_dataset()
normalize_dataset_task = normalize_dataset(
input_iris_dataset=create_dataset_task.outputs['iris_dataset'],
standard_scaler=True,
min_max_scaler=False)
with dsl.ParallelFor(neighbors) as n_neighbors:
train_model(
normalized_iris_dataset=normalize_dataset_task
.outputs['normalized_iris_dataset'],
n_neighbors=n_neighbors)
endpoint = '<KFP_UI_URL>'
kfp_client = client.Client(host=endpoint)
run = kfp_client.create_run_from_pipeline_func(
my_pipeline,
arguments={
'min_max_scaler': True,
'standard_scaler': False,
'neighbors': [3, 6, 9]
},
)
url = f'{endpoint}/#/runs/details/{run.run_id}'
print(url)
本示例介绍了管道中的以下新功能:
在组件运行时,使用
packages_to_install参数添加一些要安装的 Python 包,如下所示:@dsl.component(packages_to_install=['pandas==1.3.5'])要在安装库后使用它,您必须在组件函数的范围内包含其导入语句,以便在组件运行时导入该库。
输入和输出工件 的类型为
Dataset和Model在组件签名中引入,以描述组件的输入和输出工件。这是通过类型注解泛型Input[]和Output[]分别用于输入和输出工件来完成的。在组件的范围内,工件可以通过
.path属性读取(对于输入)和写入(对于输出)。KFP 后端确保在运行时将 输入 工件文件从远程存储复制 到 正在执行的 pod 的本地文件系统,以便组件函数可以从本地文件系统读取输入工件。相比之下,输出 工件文件在组件完成运行时从 pod 的本地文件系统复制 到 远程存储。通过这种方式,输出工件在 pod 外部保持持久。在这两种情况下,组件作者只需要与本地文件系统交互,以创建持久工件。带有
Output[]注释的参数的参数值不会由管道作者传递给组件。KFP 后端在组件运行时传递此工件,以便组件作者不需要担心输出工件写入的路径。在输出工件写入后,执行组件的后端识别 KFP 工件类型(Dataset或Model),并在仪表板上整理它们。输出工件可以通过源任务的
.outputs属性和输出工件参数名称作为下游组件的输入进行传递,如下所示:create_dataset_task.outputs['iris_dataset']其中一个DSL 控制流特性,
dsl.ParallelFor,被使用。它是一个上下文管理器,让管道作者创建任务。这些任务在循环中并行执行。使用dsl.ParallelFor迭代neighbors管道参数可以让你使用不同的参数执行train_model组件,并在一次管道运行中测试多个超参数。其他控制流特性包括dsl.Condition和dsl.ExitHandler。
恭喜!您现在拥有一个KFP部署,一个端到端的机器学习管道,以及对用户界面的介绍。这只是KFP管道和仪表板功能的开始。