Skip to main content
Version: Next

超参数调优:使用Hyperopt的SynapseML

SynapseML 是一个开源库,它简化了创建大规模可扩展的机器学习(ML)管道的过程。SynapseML 提供了简单、可组合和分布式的 API,适用于各种不同的机器学习任务,如文本分析、视觉、异常检测等。

Hyperopt,另一方面,是一个用于在复杂搜索空间上进行串行和并行优化的Python库,包括实值、离散和条件维度。

本指南展示了使用SynapseML和Hyperopt在Spark中调整分布式算法的过程。

本指南的使用案例是针对需要超参数调优的Python分布式机器学习。它提供了一个演示,展示如何在SynapseML中为机器学习工作流程调优超参数,并可用作参考来调优来自Spark MLlib或其他库的其他分布式机器学习算法。

指南包括两个部分:

  • 使用SynapseML运行分布式训练,无需超参数调优。
  • 使用Hyperopt在分布式训练工作流中调整超参数。

先决条件

需求

  • 安装 HyperOpt
%pip install hyperopt mlflow

MLflow 自动日志记录

要使用MLflow跟踪模型训练和调优,您可以通过运行mlflow.pyspark.ml.autolog()来启用MLflow自动日志记录。

from synapse.ml.core.platform import *

if running_on_synapse_internal():
experiment_name = "hyperopt-synapseml"
elif running_on_synapse():
experiment_name = "hyperopt-synapseml"
else:
experiment_name = "/Shared/hyperopt-synapseml"
import mlflow

mlflow.__version__
# Set pyspark autologging logModelAllowlist to include SynapseML models
spark.sparkContext._conf.set(
"spark.mlflow.pysparkml.autolog.logModelAllowlistFile",
"https://mmlspark.blob.core.windows.net/publicwasb/log_model_allowlist.txt",
)
# enable autologging
mlflow.pyspark.ml.autolog()

设置实验名称以进行跟踪

# Set MLflow experiment.

if running_on_synapse():
from notebookutils.mssparkutils import azureML

linked_service = "AzureMLService1" # use your linked service name
ws = azureML.getWorkspace(linked_service)
mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())
mlflow.set_experiment(experiment_name)

第一部分:使用MLlib进行分布式训练

本节展示了一个使用SynapseML进行分布式训练的简单示例。如需更多信息和示例,请访问官方网站

准备数据集

我们使用California Housing数据集。 该数据来源于1990年美国人口普查。它包含20640个条目和8个特征。 我们使用sklearn.datasets模块轻松下载它,然后按75/25的比例将数据集分为训练集和测试集。

import numpy as np
import pandas as pd
from sklearn.datasets import fetch_california_housing
import time
try:
california = fetch_california_housing()
except EOFError:
print("Encountered EOFError while downloading, retrying once...")
time.sleep(5)
california = fetch_california_housing()

feature_cols = ["f" + str(i) for i in range(california.data.shape[1])]
header = ["target"] + feature_cols
df = spark.createDataFrame(
pd.DataFrame(
data=np.column_stack((california.target, california.data)), columns=header
)
).repartition(1)

print("Dataframe has {} rows".format(df.count()))
display(df)

以下是数据集的摘要。

display(df.summary().toPandas())

创建一个函数来训练模型

在本节中,您将定义一个函数来使用SynapseML LightgbmRegressor训练梯度提升模型。将训练代码封装在函数中对于稍后将函数传递给Hyperopt进行调优非常重要。

我们通过使用synapse.ml.train.ComputeModelStatistics来评估预测结果,该函数返回四个指标:

from pyspark.ml.feature import VectorAssembler

# Convert features into a single vector column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
data = featurizer.transform(df)["target", "features"]

train_data, test_data = data.randomSplit([0.75, 0.25], seed=42)
train_data, validation_data = train_data.randomSplit([0.85, 0.15], seed=42)

display(train_data)

# Using one partition since the training dataset is very small
repartitioned_data = train_data.repartition(1).cache()
from synapse.ml.lightgbm import LightGBMRegressor
from synapse.ml.train import ComputeModelStatistics


def train_tree(alpha, learningRate, numLeaves, numIterations):
"""
This train() function:
- takes hyperparameters as inputs (for tuning later)
- returns the F1 score on the validation dataset

Wrapping code as a function makes it easier to reuse the code later with Hyperopt.
"""
# Use MLflow to track training.
# Specify "nested=True" since this single model will be logged as a child run of Hyperopt's run.
with mlflow.start_run(nested=True):

lgr = LightGBMRegressor(
objective="quantile",
alpha=alpha,
learningRate=learningRate,
numLeaves=numLeaves,
labelCol="target",
numIterations=numIterations,
)

model = lgr.fit(repartitioned_data)

cms = ComputeModelStatistics(
evaluationMetric="regression", labelCol="target", scoresCol="prediction"
)

# Define an evaluation metric and evaluate the model on the test dataset.
predictions = model.transform(test_data)
metrics = cms.transform(predictions).collect()[0].asDict()

# log metrics with mlflow
mlflow.log_metric("MSE", metrics["mean_squared_error"])
mlflow.log_metric("RMSE", metrics["root_mean_squared_error"])
mlflow.log_metric("R^2", metrics["R^2"])
mlflow.log_metric("MAE", metrics["mean_absolute_error"])

return model, metrics["R^2"]

运行训练函数以确保其正常工作。 在添加调优之前,确保训练代码能够运行是一个好主意。

initial_model, val_metric = train_tree(
alpha=0.2, learningRate=0.3, numLeaves=31, numIterations=50
)
print(
f"The trained decision tree achieved a R^2 of {val_metric} on the validation data"
)

第二部分:使用Hyperopt调整超参数

在第二部分中,Hyperopt工作流程通过以下方式创建:

  • 定义一个要最小化的函数
  • 定义超参数的搜索空间
  • 指定搜索算法并使用fmin()来调整模型。

有关Hyperopt API的更多信息,请参阅Hyperopt文档

定义一个函数以最小化

  • 输入:超参数
  • 内部:重用上面定义的训练函数。
  • 输出: 损失
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK


def train_with_hyperopt(params):
"""
An example train method that calls into MLlib.
This method is passed to hyperopt.fmin().

:param params: hyperparameters as a dict. Its structure is consistent with how search space is defined. See below.
:return: dict with fields 'loss' (scalar loss) and 'status' (success/failure status of run)
"""
# For integer parameters, make sure to convert them to int type if Hyperopt is searching over a continuous range of values.
alpha = params["alpha"]
learningRate = params["learningRate"]
numLeaves = int(params["numLeaves"])
numIterations = int(params["numIterations"])

model, r_squared = train_tree(alpha, learningRate, numLeaves, numIterations)

# Hyperopt expects you to return a loss (for which lower is better), so take the negative of the R^2 (for which higher is better).
loss = -r_squared

return {"loss": loss, "status": STATUS_OK}

定义超参数的搜索空间

此示例调整了四个超参数:alphalearningRatenumLeavesnumIterations。有关定义搜索空间和参数表达式的详细信息,请参阅 Hyperopt 文档

space = {
"alpha": hp.uniform("alpha", 0, 1),
"learningRate": hp.uniform("learningRate", 0, 1),
"numLeaves": hp.uniformint("numLeaves", 30, 50),
"numIterations": hp.uniformint("numIterations", 20, 100),
}

使用Hyperopt fmin()调整模型

为了使用Hyperopt的fmin()调整模型,采取以下步骤:

  • max_evals设置为要在超参数空间中测试的最大点数。
  • 指定搜索算法,可以是hyperopt.tpe.suggesthyperopt.rand.suggest
    • hyperopt.tpe.suggest: Parzen估计器树,一种贝叶斯方法,它迭代且自适应地选择新的超参数设置,以基于先前结果进行探索
    • hyperopt.rand.suggest: 随机搜索,一种非自适应方法,随机采样搜索空间

重要提示:
在使用Hyperopt与SynapseML及其他分布式训练算法时,不要向fmin()传递trials参数。当你不包含trials参数时,Hyperopt会使用默认的Trials类,该类在集群驱动程序上运行。Hyperopt需要在驱动节点上评估每个试验,以便每个试验可以启动分布式训练任务。

不要将SparkTrials类与SynapseML一起使用。SparkTrials设计用于分发本身不分布式的算法的试验。SynapseML已经使用了分布式计算,与SparkTrials不兼容。

algo = tpe.suggest

with mlflow.start_run():
best_params = fmin(fn=train_with_hyperopt, space=space, algo=algo, max_evals=8)
# Print out the parameters that produced the best model
best_params

在整个训练数据集上重新训练模型

为了调优,此工作流程将训练数据集分为训练和验证子集。现在,使用“最佳”超参数在整个训练数据集上重新训练模型。

best_alpha = best_params["alpha"]
best_learningRate = best_params["learningRate"]
best_numIterations = int(best_params["numIterations"])
best_numLeaves = int(best_params["numLeaves"])

final_model, val_r_squared = train_tree(
best_alpha, best_learningRate, best_numIterations, best_numLeaves
)

使用测试数据集来比较初始模型和“最佳”模型的评估指标。

# Define an evaluation metric and evaluate the model on the test dataset.
cms = ComputeModelStatistics(
evaluationMetric="regression", labelCol="target", scoresCol="prediction"
)

initial_model_predictions = initial_model.transform(test_data)
initial_model_test_metric = (
cms.transform(initial_model_predictions).collect()[0].asDict()["R^2"]
)

final_model_predictions = final_model.transform(test_data)
final_model_test_metric = (
cms.transform(final_model_predictions).collect()[0].asDict()["R^2"]
)

print(
f"On the test data, the initial (untuned) model achieved R^2 {initial_model_test_metric}, and the final (tuned) model achieved {final_model_test_metric}."
)