分类与回归

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

本页面介绍分类和回归的算法。 还包括讨论特定类别算法的部分,例如线性方法、树和集成。

目录

分类

逻辑回归

逻辑回归是一种常用的方法,用于预测分类响应。它是 广义线性模型 的一个特例,预测结果的概率。在 spark.ml 中,逻辑回归可以通过使用二项逻辑回归来预测二元结果,或通过使用多项逻辑回归来预测多类结果。使用 family 参数在这两种算法之间进行选择,或者不设置该参数,Spark将推断出正确的变体。

多项式逻辑回归可以通过将 family 参数设置为“multinomial”来用于二分类。它将生成两组系数和两个截距。

在对具有恒定非零列的数据集拟合 LogisticRegressionModel(不带截距)时,Spark MLlib 对恒定非零列输出零系数。这种行为与 R glmnet 相同,但与 LIBSVM 不同。

二项逻辑回归

有关二项逻辑回归的更多背景和实施细节,请参阅 spark.mllib 中的逻辑回归 的文档。

示例

以下示例展示了如何使用弹性网正则化训练用于二分类的二项和多项逻辑回归模型。 elasticNetParam 对应于 $\alpha$,而 regParam 对应于 $\lambda$。

有关参数的更多详细信息,请参阅 Python API文档

from pyspark.ml.classification import LogisticRegression
# 加载训练数据
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# 拟合模型
lrModel = lr.fit(training)
# 打印逻辑回归的系数和截距
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
# 我们还可以使用多项式家族进行二分类
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")
# 拟合模型
mlrModel = mlr.fit(training)
# 打印多项式家族逻辑回归的系数和截距
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))
Find full example code at "examples/src/main/python/ml/logistic_regression_with_elastic_net.py" in the Spark repo.

有关参数的更多细节可以在 Scala API 文档 中找到。

import org.apache.spark.ml.classification.LogisticRegression
// 加载训练数据
val training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
// 拟合模型
val lrModel = lr.fit(training)
// 输出逻辑回归的系数和截距
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
// 我们还可以使用多项式家族进行二元分类
val mlr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
.setFamily("multinomial")
val mlrModel = mlr.fit(training)
// 输出具有多项式家族的逻辑回归的系数和截距
println(s"Multinomial coefficients: ${mlrModel.coefficientMatrix}")
println(s"Multinomial intercepts: ${mlrModel.interceptVector}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionWithElasticNetExample.scala" in the Spark repo.

有关参数的更多详细信息,请参阅 Java API 文档

import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// 加载训练数据
Dataset<Row> training = spark.read().format("libsvm")
.load("data/mllib/sample_libsvm_data.txt");
LogisticRegression lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8);
// 拟合模型
LogisticRegressionModel lrModel = lr.fit(training);
// 打印逻辑回归的系数和截距
System.out.println("系数: "
+ lrModel.coefficients() + " 截距: " + lrModel.intercept());
// 我们也可以使用多项式族进行二元分类
LogisticRegression mlr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
.setFamily("multinomial");
// 拟合模型
LogisticRegressionModel mlrModel = mlr.fit(training);
// 打印多项式族逻辑回归的系数和截距
System.out.println("多项式系数: " + lrModel.coefficientMatrix()
+ "\n多项式截距: " + mlrModel.interceptVector());
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaLogisticRegressionWithElasticNetExample.java" in the Spark repo.

更多关于参数的详细信息可以在 R API 文档 中找到。

# 加载训练数据
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df
# 使用 spark.logit 拟合二项式逻辑回归模型
model <- spark.logit(training, label ~ features, maxIter = 10, regParam = 0.3, elasticNetParam = 0.8)
# 模型摘要
summary(model)
# 预测
predictions <- predict(model, test)
head(predictions)
Find full example code at "examples/src/main/r/ml/logit.R" in the Spark repo.

对于逻辑回归的 spark.ml 实现,支持提取训练集上模型的摘要。请注意,存储在 LogisticRegressionSummary 中的预测和指标作为 DataFrame 被标记为 @transient ,因此只能在驱动程序上使用。

LogisticRegressionTrainingSummary 提供了一个 LogisticRegressionModel 的汇总。 对于二元分类,某些额外的指标是 可用的,例如 ROC 曲线。请参见 BinaryLogisticRegressionTrainingSummary

继续前面的示例:

from pyspark.ml.classification import LogisticRegression
# 从返回的 LogisticRegressionModel 实例提取总结,该实例在之前的示例中进行了训练
trainingSummary = lrModel.summary
# 获取每次迭代的目标
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
print(objective)
# 获取接收者操作特征作为数据框和 areaUnderROC。
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))
# 设置模型阈值以最大化 F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
    .select('threshold').head()['threshold']
lr.setThreshold(bestThreshold)
Find full example code at "examples/src/main/python/ml/logistic_regression_summary_example.py" in the Spark repo.

LogisticRegressionTrainingSummary 提供了一个 LogisticRegressionModel 的摘要。 在二分类的情况下,某些额外的指标是可用的,例如 ROC 曲线。可以通过 binarySummary 方法访问二分类摘要。请参见 BinaryLogisticRegressionTrainingSummary

继续之前的例子:

import org.apache.spark.ml.classification.LogisticRegression
// 从之前训练的 LogisticRegressionModel 实例中提取摘要
// 示例
val trainingSummary = lrModel.binarySummary
// 获取每次迭代的目标值。
val objectiveHistory = trainingSummary.objectiveHistory
println("objectiveHistory:")
objectiveHistory.foreach(loss => println(loss))
// 将接收者操作特征曲线作为数据框和 areaUnderROC。 
val roc = trainingSummary.roc
roc.show()
println(s"areaUnderROC: ${trainingSummary.areaUnderROC}")
// 设置模型阈值以最大化 F-Measure
val fMeasure = trainingSummary.fMeasureByThreshold
val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure)
.select("threshold").head().getDouble(0)
lrModel.setThreshold(bestThreshold)
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionSummaryExample.scala" in the Spark repo.

LogisticRegressionTrainingSummary 提供一个 LogisticRegressionModel 的总结。 在二分类的情况下,可以使用某些额外的指标,例如ROC曲线。可以通过 binarySummary 方法访问二分类总结。请参见 BinaryLogisticRegressionTrainingSummary

继续前面的示例:

import org.apache.spark.ml.classification.BinaryLogisticRegressionTrainingSummary;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
// 从之前训练的LogisticRegressionModel实例提取摘要
// 示例
BinaryLogisticRegressionTrainingSummary trainingSummary = lrModel.binarySummary();
// 获取每次迭代的损失。
double[] objectiveHistory = trainingSummary.objectiveHistory();
for (double lossPerIteration : objectiveHistory) {
System.out.println(lossPerIteration);
}
// 获取接收者操作特征作为数据框和 areaUnderROC。
Dataset<Row> roc = trainingSummary.roc();
roc.show();
roc.select("FPR").show();
System.out.println(trainingSummary.areaUnderROC());
// 获取与最大F-Measure相对应的阈值,并使用此选定的阈值重新运行LogisticRegression。
Dataset<Row> fMeasure = trainingSummary.fMeasureByThreshold();
double maxFMeasure = fMeasure.select(functions.max("F-Measure")).head().getDouble(0);
double bestThreshold = fMeasure.where(fMeasure.col("F-Measure").equalTo(maxFMeasure))
.select("threshold").head().getDouble(0);
lrModel.setThreshold(bestThreshold);
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaLogisticRegressionSummaryExample.java" in the Spark repo.

多项式逻辑回归

多类分类通过多项逻辑(softmax)回归得到支持。在多项逻辑回归中,算法生成 $K$ 组系数,或者维度为 $K \times J$ 的矩阵,其中 $K$ 是结果类别的数量,$J$ 是特征的数量。如果算法包括一个截距项,那么将会得到一个长度为 $K$ 的截距向量。

多项式系数可作为 coefficientMatrix 使用,截距可作为 interceptVector 使用。

coefficients intercept 方法在使用多项式族训练的逻辑回归模型上不受支持。请使用 coefficientMatrix interceptVector

结果类别 $k \in {1, 2, …, K}$ 的条件概率使用softmax函数进行建模。

\[ P(Y=k|\mathbf{X}, \boldsymbol{\beta}_k, \beta_{0k}) = \frac{e^{\boldsymbol{\beta}_k \cdot \mathbf{X} + \beta_{0k}}}{\sum_{k'=0}^{K-1} e^{\boldsymbol{\beta}_{k'} \cdot \mathbf{X} + \beta_{0k'}}} \]

我们最小化加权的负对数似然,使用多项式响应模型,并采用弹性网惩罚以控制过拟合。

\[ \min_{\beta, \beta_0} -\left[\sum_{i=1}^L w_i \cdot \log P(Y = y_i|\mathbf{x}_i)\right] + \lambda \left[\frac{1}{2}\left(1 - \alpha\right)||\boldsymbol{\beta}||_2^2 + \alpha ||\boldsymbol{\beta}||_1\right] \]

有关详细推导,请参见 此处

示例

下面的示例展示了如何训练一个具有弹性网正则化的多类逻辑回归模型,以及提取多类训练摘要以评估模型。

from pyspark.ml.classification import LogisticRegression
# 加载训练数据
training = spark \
    .read \
    .format("libsvm") \
    .load("data/mllib/sample_multiclass_classification_data.txt")
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# 拟合模型
lrModel = lr.fit(training)
# 打印多项式逻辑回归的系数和截距
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))
trainingSummary = lrModel.summary
# 获取每次迭代的目标
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
print(objective)
# 对于多类,我们可以逐个标签检查指标
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
print("label %d: %s" % (i, rate))
print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
print("label %d: %s" % (i, rate))
print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
print("label %d: %s" % (i, prec))
print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
print("label %d: %s" % (i, rec))
print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
print("label %d: %s" % (i, f))
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
% (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))
Find full example code at "examples/src/main/python/ml/multiclass_logistic_regression_with_elastic_net.py" in the Spark repo.
import org.apache.spark.ml.classification.LogisticRegression
// 加载训练数据
val training = spark
.read
.format("libsvm")
.load("data/mllib/sample_multiclass_classification_data.txt")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
// 拟合模型
val lrModel = lr.fit(training)
// 打印多项式逻辑回归的系数和截距
println(s"系数: \n${lrModel.coefficientMatrix}")
println(s"截距: \n${lrModel.interceptVector}")
val trainingSummary = lrModel.summary
// 获取每次迭代的目标值
val objectiveHistory = trainingSummary.objectiveHistory
println("目标历史:")
objectiveHistory.foreach(println)
// 对于多类情况,我们可以按标签检查指标
println("按标签的假阳性率:")
trainingSummary.falsePositiveRateByLabel.zipWithIndex.foreach { case (rate, label) =>
println(s"标签 $label: $rate")
}
println("按标签的真正率:")
trainingSummary.truePositiveRateByLabel.zipWithIndex.foreach { case (rate, label) =>
println(s"标签 $label: $rate")
}
println("按标签的精准率:")
trainingSummary.precisionByLabel.zipWithIndex.foreach { case (prec, label) =>
println(s"标签 $label: $prec")
}
println("按标签的召回率:")
trainingSummary.recallByLabel.zipWithIndex.foreach { case (rec, label) =>
println(s"标签 $label: $rec")
}
println("按标签的F-值:")
trainingSummary.fMeasureByLabel.zipWithIndex.foreach { case (f, label) =>
println(s"标签 $label: $f")
}
val accuracy = trainingSummary.accuracy
val falsePositiveRate = trainingSummary.weightedFalsePositiveRate
val truePositiveRate = trainingSummary.weightedTruePositiveRate
val fMeasure = trainingSummary.weightedFMeasure
val precision = trainingSummary.weightedPrecision
val recall = trainingSummary.weightedRecall
println(s"准确率: $accuracy\n假阳性率: $falsePositiveRate\n真正率: $truePositiveRate\n" +
s"F-值: $fMeasure\n精准率: $precision\n召回率: $recall")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/MulticlassLogisticRegressionWithElasticNetExample.scala" in the Spark repo.
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.classification.LogisticRegressionTrainingSummary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// 加载训练数据
Dataset<Row> training = spark.read().format("libsvm")
.load("data/mllib/sample_multiclass_classification_data.txt");
LogisticRegression lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8);
// 拟合模型
LogisticRegressionModel lrModel = lr.fit(training);
// 打印多项式逻辑回归的系数和截距
System.out.println("Coefficients: \n"
+ lrModel.coefficientMatrix() + " \nIntercept: " + lrModel.interceptVector());
LogisticRegressionTrainingSummary trainingSummary = lrModel.summary();
// 获取每次迭代的损失
double[] objectiveHistory = trainingSummary.objectiveHistory();
for (double lossPerIteration : objectiveHistory) {
System.out.println(lossPerIteration);
}
// 对于多类,我们可以按标签检查指标
System.out.println("False positive rate by label:");
int i = 0;
double[] fprLabel = trainingSummary.falsePositiveRateByLabel();
for (double fpr : fprLabel) {
System.out.println("label " + i + ": " + fpr);
i++;
}
System.out.println("True positive rate by label:");
i = 0;
double[] tprLabel = trainingSummary.truePositiveRateByLabel();
for (double tpr : tprLabel) {
System.out.println("label " + i + ": " + tpr);
i++;
}
System.out.println("Precision by label:");
i = 0;
double[] precLabel = trainingSummary.precisionByLabel();
for (double prec : precLabel) {
System.out.println("label " + i + ": " + prec);
i++;
}
System.out.println("Recall by label:");
i = 0;
double[] recLabel = trainingSummary.recallByLabel();
for (double rec : recLabel) {
System.out.println("label " + i + ": " + rec);
i++;
}
System.out.println("F-measure by label:");
i = 0;
double[] fLabel = trainingSummary.fMeasureByLabel();
for (double f : fLabel) {
System.out.println("label " + i + ": " + f);
i++;
}
double accuracy = trainingSummary.accuracy();
double falsePositiveRate = trainingSummary.weightedFalsePositiveRate();
double truePositiveRate = trainingSummary.weightedTruePositiveRate();
double fMeasure = trainingSummary.weightedFMeasure();
double precision = trainingSummary.weightedPrecision();
double recall = trainingSummary.weightedRecall();
System.out.println("Accuracy: " + accuracy);
System.out.println("FPR: " + falsePositiveRate);
System.out.println("TPR: " + truePositiveRate);
System.out.println("F-measure: " + fMeasure);
System.out.println("Precision: " + precision);
System.out.println("Recall: " + recall);
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaMulticlassLogisticRegressionWithElasticNetExample.java" in the Spark repo.

更多关于参数的详细信息可以在 R API 文档 中找到。

# 加载训练数据
df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
training <- df
test <- df
# 使用 spark.logit 拟合多项逻辑回归模型
model <- spark.logit(training, label ~ features, maxIter = 10, regParam = 0.3, elasticNetParam = 0.8)
# 模型摘要
summary(model)
# 预测
predictions <- predict(model, test)
head(predictions)
Find full example code at "examples/src/main/r/ml/logit.R" in the Spark repo.

决策树分类器

决策树是一类流行的分类和回归方法。有关 spark.ml 实现的更多信息可以在 决策树部分 找到。

示例

以下示例加载一个 LibSVM 格式的数据集,将其拆分为训练集和测试集,首先在第一个数据集上进行训练,然后在保留的测试集上进行评估。 我们使用两个特征转换器来准备数据;这些有助于对标签和分类特征进行索引,为 DataFrame 添加元数据,决策树算法可以识别这些元数据。

有关参数的更多详细信息,请参阅 Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# 加载以 LIBSVM 格式存储的数据作为 DataFrame。
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# 索引标签,为标签列添加元数据。
# 在整个数据集上拟合以包含所有标签在索引中。
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# 自动识别分类特征,并对其进行索引。
# 我们指定 maxCategories,这样具有 > 4 个不同值的特征将被视为连续特征。
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# 将数据分为训练集和测试集(30% 用于测试)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# 训练一个决策树模型。
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
# 在管道中链接索引器和树
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])
# 训练模型。这也运行索引器。
model = pipeline.fit(trainingData)
# 进行预测。
predictions = model.transform(testData)
# 选择示例行进行显示。
predictions.select("prediction", "indexedLabel", "features").show(5)
# 选择 (预测, 真实标签) 并计算测试错误
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("测试错误 = %g " % (1.0 - accuracy))
treeModel = model.stages[2]
# 仅摘要
print(treeModel)
Find full example code at "examples/src/main/python/ml/decision_tree_classification_example.py" in the Spark repo.

有关参数的更多细节可以在 Scala API 文档 中找到。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
// 加载以 LIBSVM 格式存储的数据作为 DataFrame。
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// 索引标签,向标签列添加元数据。
// 在整个数据集上进行拟合,以包含索引中的所有标签。
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(data)
// 自动识别类别特征,并对其进行索引。
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4) // 特征值大于 4 的特征将被视为连续值。
.fit(data)
// 将数据拆分为训练集和测试集(30% 保留用于测试)。
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// 训练一个决策树模型。
val dt = new DecisionTreeClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures")
// 将索引标签转换回原始标签。
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labelsArray(0))
// 在管道中链接索引器和树。
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))
// 训练模型。这也会运行索引器。
val model = pipeline.fit(trainingData)
// 进行预测。
val predictions = model.transform(testData)
// 选择示例行以展示。
predictions.select("predictedLabel", "label", "features").show(5)
// 选择(预测,真实标签)并计算测试错误。
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"测试错误 = ${(1.0 - accuracy)}")
val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
println(s"学习到的分类树模型:\n ${treeModel.toDebugString}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeClassificationExample.scala" in the Spark repo.

关于参数的更多细节可以在 Java API文档 中找到。

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.DecisionTreeClassifier;
import org.apache.spark.ml.classification.DecisionTreeClassificationModel;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// 加载以 LIBSVM 格式存储的数据为 DataFrame。
Dataset<Row> data = spark
.read()
.format("libsvm")
.load("data/mllib/sample_libsvm_data.txt");
// 对标签进行索引,为标签列添加元数据。
// 在整个数据集上拟合以包括所有标签在索引中。
StringIndexerModel labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(data);
// 自动识别类别特征,并对其进行索引。
VectorIndexerModel featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4) // 特征有 > 4 个不同值被视为连续。
.fit(data);
// 将数据分割为训练集和测试集(30% 用于测试)。
Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// 训练一个决策树模型。
DecisionTreeClassifier dt = new DecisionTreeClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures");
// 将索引标签转换回原始标签。
IndexToString labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labelsArray()[0]);
// 在管道中链接索引器和树。
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[]{labelIndexer, featureIndexer, dt, labelConverter});
// 训练模型。这也会运行索引器。
PipelineModel model = pipeline.fit(trainingData);
// 进行预测。
Dataset<Row> predictions = model.transform(testData);
// 选择示例行进行显示。
predictions.select("predictedLabel", "label", "features").show(5);
// 选择(预测,真实标签)并计算测试错误。
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("测试错误 = " + (1.0 - accuracy));
DecisionTreeClassificationModel treeModel =
(DecisionTreeClassificationModel) (model.stages()[2]);
System.out.println("学习到的分类树模型:\n" + treeModel.toDebugString());
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.java" in the Spark repo.

有关更多详细信息,请参考 R API 文档

# 加载训练数据
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df
# 使用spark.decisionTree拟合决策树分类模型
model <- spark.decisionTree(training, label ~ features, "classification")
# 模型摘要
summary(model)
# 预测
predictions <- predict(model, test)
head(predictions)
Find full example code at "examples/src/main/r/ml/decisionTree.R" in the Spark repo.

随机森林分类器

随机森林是一种流行的分类和回归方法。关于 spark.ml 实现的更多信息可以在 随机森林章节 中找到。

示例

以下示例加载了一个LibSVM格式的数据集,将其分成训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。 我们使用两个特征变换器来准备数据;这些帮助为标签和类别特征建立索引,为 DataFrame 添加元数据,以便树基算法可以识别。

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# 加载和解析数据文件,将其转换为DataFrame。
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# 索引标签,向标签列添加元数据。
# 在整个数据集上拟合以包含索引中的所有标签。
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# 自动识别分类特征,并对其进行索引。
# 设置maxCategories,以便具有超过4个不同值的特征被视为连续特征。
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# 将数据分为训练集和测试集(30%用于测试)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# 训练随机森林模型。
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
# 将索引标签转换回原始标签。
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
labels=labelIndexer.labels)
# 在Pipeline中链接索引器和森林
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])
# 训练模型。这也运行索引器。
model = pipeline.fit(trainingData)
# 进行预测。
predictions = model.transform(testData)
# 选择示例行进行显示。
predictions.select("predictedLabel", "label", "features").show(5)
# 选择(预测,真实标签)并计算测试错误
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("测试错误 = %g" % (1.0 - accuracy))
rfModel = model.stages[2]
print(rfModel) # 仅摘要
Find full example code at "examples/src/main/python/ml/random_forest_classifier_example.py" in the Spark repo.

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
// 加载并解析数据文件,将其转换为DataFrame。
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// 索引标签,为标签列添加元数据。
// 在整个数据集上拟合,以包括索引中的所有标签。
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(data)
// 自动识别分类特征,并对其进行索引。
// 设置maxCategories,以便具有> 4个不同值的特征被视为连续特征。
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(data)
// 将数据分为训练集和测试集(30%保留用于测试)。
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// 训练随机森林模型。
val rf = new RandomForestClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures")
.setNumTrees(10)
// 将索引标签转换回原始标签。
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labelsArray(0))
// 在Pipeline中链接索引器和森林。
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))
// 训练模型。这也会运行索引器。
val model = pipeline.fit(trainingData)
// 进行预测。
val predictions = model.transform(testData)
// 选择示例行进行显示。
predictions.select("predictedLabel", "label", "features").show(5)
// 选择(预测,真实标签)并计算测试错误。
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"测试错误 = ${(1.0 - accuracy)}")
val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel]
println(s"学习到的分类森林模型:\n ${rfModel.toDebugString}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/RandomForestClassifierExample.scala" in the Spark repo.

有关更多详细信息,请参阅 Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.RandomForestClassificationModel;
import org.apache.spark.ml.classification.RandomForestClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// 加载和解析数据文件,将其转换为 DataFrame。
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// 为标签建立索引,向标签列添加元数据。
// 在整个数据集上拟合,以包含所有标签的索引。
StringIndexerModel labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(data);
// 自动识别分类特征并进行索引。
// 设置 maxCategories,以便具有 > 4 个不同值的特征被视为连续的。
VectorIndexerModel featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(data);
// 将数据拆分为训练集和测试集(30% 用于测试)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// 训练一个随机森林模型。
RandomForestClassifier rf = new RandomForestClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures");
// 将索引标签转换回原始标签。
IndexToString labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labelsArray()[0]);
// 在管道中链式连接索引器和森林
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {labelIndexer, featureIndexer, rf, labelConverter});
// 训练模型。这也会运行索引器。
PipelineModel model = pipeline.fit(trainingData);
// 进行预测。
Dataset<Row> predictions = model.transform(testData);
// 选择示例行进行显示。
predictions.select("predictedLabel", "label", "features").show(5);
// 选择(预测,真实标签)并计算测试误差
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("测试误差 = " + (1.0 - accuracy));
RandomForestClassificationModel rfModel = (RandomForestClassificationModel)(model.stages()[2]);
System.out.println("已学得的分类森林模型:\n" + rfModel.toDebugString());
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaRandomForestClassifierExample.java" in the Spark repo.

有关更多详细信息,请参阅 R API 文档

# 加载训练数据
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df
# 使用 spark.randomForest 拟合随机森林分类模型
model <- spark.randomForest(training, label ~ features, "classification", numTrees = 10)
# 模型摘要
summary(model)
# 预测
predictions <- predict(model, test)
head(predictions)
Find full example code at "examples/src/main/r/ml/randomForest.R" in the Spark repo.

梯度提升树分类器

梯度提升树 (GBT) 是一种流行的分类和回归方法,使用决策树的集成。 有关 spark.ml 实现的更多信息,请参见 GBT部分

示例

以下示例加载了一个LibSVM格式的数据集,将其分成训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。 我们使用两个特征变换器来准备数据;这些帮助为标签和类别特征建立索引,为 DataFrame 添加元数据,以便树基算法可以识别。

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# 加载并解析数据文件,将其转换为 DataFrame。
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# 索引标签,将元数据添加到标签列。
# 在整个数据集上拟合以包括索引中的所有标签。
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# 自动识别分类特征,并对其进行索引。
# 设置最大类别,以便有超过 4 个不同值的特征被视为连续特征。
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# 将数据分割为训练集和测试集(30% 用于测试)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# 训练 GBT 模型。
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)
# 在管道中链接索引器和 GBT
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])
# 训练模型。这也运行索引器。
model = pipeline.fit(trainingData)
# 进行预测。
predictions = model.transform(testData)
# 选择示例行进行显示。
predictions.select("prediction", "indexedLabel", "features").show(5)
# 选择(预测,真实标签)并计算测试错误
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
gbtModel = model.stages[2]
print(gbtModel) # 仅摘要
Find full example code at "examples/src/main/python/ml/gradient_boosted_tree_classifier_example.py" in the Spark repo.

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
// 加载并解析数据文件,将其转换为 DataFrame。
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// 索引标签,为标签列添加元数据。
// 在整个数据集上拟合以包含所有标签的索引。
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(data)
// 自动识别分类特征并将其索引化。
// 设置 maxCategories,以便具有 > 4 个不同值的特征被视为连续特征。
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(data)
// 将数据分为训练集和测试集(30% 用于测试)。
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// 训练 GBT 模型。
val gbt = new GBTClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures")
.setMaxIter(10)
.setFeatureSubsetStrategy("auto")
// 将索引标签转换回原始标签。
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labelsArray(0))
// 将索引器与 GBT 链接到管道中。
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter))
// 训练模型。这还会运行索引器。
val model = pipeline.fit(trainingData)
// 进行预测。
val predictions = model.transform(testData)
// 选择示例行进行显示。
predictions.select("predictedLabel", "label", "features").show(5)
// 选择(预测,真实标签)并计算测试误差。
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"测试误差 = ${1.0 - accuracy}")
val gbtModel = model.stages(2).asInstanceOf[GBTClassificationModel]
println(s"学习到的分类 GBT 模型:\n ${gbtModel.toDebugString}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/GradientBoostedTreeClassifierExample.scala" in the Spark repo.

有关更多详细信息,请参阅 Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.GBTClassificationModel;
import org.apache.spark.ml.classification.GBTClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// 加载并解析数据文件,将其转换为 DataFrame。
Dataset<Row> data = spark
.read()
.format("libsvm")
.load("data/mllib/sample_libsvm_data.txt");
// 为标签添加索引,向标签列添加元数据。
// 在整个数据集上拟合,以包括所有标签在索引中。
StringIndexerModel labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(data);
// 自动识别分类特征,并为其添加索引。
// 设置 maxCategories,以便具有 > 4 个不同值的特征被视为连续特征。
VectorIndexerModel featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(data);
// 将数据拆分为训练集和测试集(30% 保留用于测试)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// 训练 GBT 模型。
GBTClassifier gbt = new GBTClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures")
.setMaxIter(10);
// 将索引标签转换回原始标签。
IndexToString labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labelsArray()[0]);
// 在管道中链接索引器和 GBT。
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {labelIndexer, featureIndexer, gbt, labelConverter});
// 训练模型。这也会运行索引器。
PipelineModel model = pipeline.fit(trainingData);
// 进行预测。
Dataset<Row> predictions = model.transform(testData);
// 选择示例行以显示。
predictions.select("predictedLabel", "label", "features").show(5);
// 选择(预测,真实标签)并计算测试错误。
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("测试错误 = " + (1.0 - accuracy));
GBTClassificationModel gbtModel = (GBTClassificationModel)(model.stages()[2]);
System.out.println("学习到的分类 GBT 模型:\n" + gbtModel.toDebugString());
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaGradientBoostedTreeClassifierExample.java" in the Spark repo.

有关更多详细信息,请参阅 R API 文档

# 加载训练数据
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df
# 使用 spark.gbt 拟合 GBT 分类模型
model <- spark.gbt(training, label ~ features, "classification", maxIter = 10)
# 模型摘要
summary(model)
# 预测
predictions <- predict(model, test)
head(predictions)
Find full example code at "examples/src/main/r/ml/gbt.R" in the Spark repo.

多层感知器分类器

多层感知器分类器(MLPC)是一种基于 前馈人工神经网络 的分类器。 MLPC由多个节点层组成。 每一层与网络中的下一层完全连接。 输入层的节点代表输入数据。 所有其他节点通过对输入和节点的权重 $\wv$ 以及偏置 $\bv$ 的线性组合, 并应用激活函数来将输入映射到输出。 对于具有 $K+1$ 层的MLPC,这可以用矩阵形式写成如下: \[ \mathrm{y}(\x) = \mathrm{f_K}(...\mathrm{f_2}(\wv_2^T\mathrm{f_1}(\wv_1^T \x+b_1)+b_2)...+b_K) \] 中间层的节点使用sigmoid(逻辑)函数: \[ \mathrm{f}(z_i) = \frac{1}{1 + e^{-z_i}} \] 输出层的节点使用softmax函数: \[ \mathrm{f}(z_i) = \frac{e^{z_i}}{\sum_{k=1}^N e^{z_k}} \] 输出层中节点的数量 $N$ 与类别的数量对应。

MLPC 使用反向传播来学习模型。我们使用逻辑损失函数进行优化,并使用 L-BFGS 作为优化例程。

示例

有关更多细节,请参考 Python API 文档

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# 加载训练数据
data = spark.read.format("libsvm")\
    .load("data/mllib/sample_multiclass_classification_data.txt")
# 将数据分为训练集和测试集
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]
# 为神经网络指定层:
# 输入层大小为4(特征),两个中间层大小为5和4
# 输出层大小为3(类别)
layers = [4, 5, 4, 3]
# 创建训练器并设置其参数
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
# 训练模型
model = trainer.fit(train)
# 计算测试集的准确性
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("测试集准确性 = " + str(evaluator.evaluate(predictionAndLabels)))
Find full example code at "examples/src/main/python/ml/multilayer_perceptron_classification.py" in the Spark repo.

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
// 加载以 LIBSVM 格式存储的数据作为 DataFrame。
val data = spark.read.format("libsvm")
.load("data/mllib/sample_multiclass_classification_data.txt")
// 将数据分为训练和测试
val splits = data.randomSplit(Array(0.6, 0.4), seed = 1234L)
val train = splits(0)
val test = splits(1)
// 指定神经网络的层:
// 输入层大小为 4(特征),两个中间层大小为 5 和 4
// 输出层大小为 3(类别)
val layers = Array[Int](4, 5, 4, 3)
// 创建训练器并设置其参数
val trainer = new MultilayerPerceptronClassifier()
.setLayers(layers)
.setBlockSize(128)
.setSeed(1234L)
.setMaxIter(100)
// 训练模型
val model = trainer.fit(train)
// 计算测试集上的准确率
val result = model.transform(test)
val predictionAndLabels = result.select("prediction", "label")
val evaluator = new MulticlassClassificationEvaluator()
.setMetricName("accuracy")
println(s"测试集准确率 = ${evaluator.evaluate(predictionAndLabels)}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/MultilayerPerceptronClassifierExample.scala" in the Spark repo.

请参阅 Java API 文档 以获取更多详情。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel;
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
// 加载训练数据
String path = "data/mllib/sample_multiclass_classification_data.txt";
Dataset<Row> dataFrame = spark.read().format("libsvm").load(path);
// 将数据分为训练集和测试集
Dataset<Row>[] splits = dataFrame.randomSplit(new double[]{0.6, 0.4}, 1234L);
Dataset<Row> train = splits[0];
Dataset<Row> test = splits[1];
// 指定神经网络的层: 
// 输入层大小为 4(特征),两个中间层大小为 5 和 4 
// 输出层大小为 3(类别)
int[] layers = new int[] {4, 5, 4, 3};
// 创建训练器并设置其参数
MultilayerPerceptronClassifier trainer = new MultilayerPerceptronClassifier()
.setLayers(layers)
.setBlockSize(128)
.setSeed(1234L)
.setMaxIter(100);
// 训练模型
MultilayerPerceptronClassificationModel model = trainer.fit(train);
// 计算测试集的准确率
Dataset<Row> result = model.transform(test);
Dataset<Row> predictionAndLabels = result.select("prediction", "label");
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
.setMetricName("accuracy");
System.out.println("测试集准确率 = " + evaluator.evaluate(predictionAndLabels));
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaMultilayerPerceptronClassifierExample.java" in the Spark repo.

有关更多细节,请参见 R API 文档

# 加载训练数据
df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
training <- df
test <- df
# 指定神经网络的层:
# 输入层大小为4(特征),两个中间层大小为5和4
# 输出层大小为3(类别)
layers = c(4, 5, 4, 3)
# 使用spark.mlp拟合一个多层感知器神经网络模型
model <- spark.mlp(training, label ~ features, maxIter = 100,
layers = layers, blockSize = 128, seed = 1234)
# 模型摘要
summary(model)
# 预测
predictions <- predict(model, test)
head(predictions)
Find full example code at "examples/src/main/r/ml/mlp.R" in the Spark repo.

线性支持向量机

A 支持向量机 在高维或无限维空间中构建超平面或一组超平面,这可以用于分类、回归或其他任务。直观地说,良好的分离是通过与任何类别的最近训练数据点具有最大距离的超平面来实现的(所谓的功能间隔),因为一般来说,间隔越大,分类器的泛化错误率越低。Spark ML 中的 LinearSVC 支持使用线性 SVM 的二元分类。内部,它使用 OWLQN 优化器优化 铰链损失

示例

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.classification import LinearSVC
# 加载训练数据
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
lsvc = LinearSVC(maxIter=10, regParam=0.1)
# 拟合模型
lsvcModel = lsvc.fit(training)
# 打印线性SVC的系数和截距
print("Coefficients: " + str(lsvcModel.coefficients))
print("Intercept: " + str(lsvcModel.intercept))
Find full example code at "examples/src/main/python/ml/linearsvc.py" in the Spark repo.

有关更多细节,请参考 Scala API 文档

import org.apache.spark.ml.classification.LinearSVC
// 加载训练数据
val training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val lsvc = new LinearSVC()
.setMaxIter(10)
.setRegParam(0.1)
// 拟合模型
val lsvcModel = lsvc.fit(training)
// 打印线性支持向量机的系数和截距
println(s"Coefficients: ${lsvcModel.coefficients} Intercept: ${lsvcModel.intercept}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/LinearSVCExample.scala" in the Spark repo.

有关更多详细信息,请参阅 Java API 文档 .

import org.apache.spark.ml.classification.LinearSVC;
import org.apache.spark.ml.classification.LinearSVCModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// 加载训练数据
Dataset<Row> training = spark.read().format("libsvm")
.load("data/mllib/sample_libsvm_data.txt");
LinearSVC lsvc = new LinearSVC()
.setMaxIter(10)
.setRegParam(0.1);
// 拟合模型
LinearSVCModel lsvcModel = lsvc.fit(training);
// 打印 LinearSVC 的系数和截距
System.out.println("Coefficients: "
+ lsvcModel.coefficients() + " Intercept: " + lsvcModel.intercept());
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaLinearSVCExample.java" in the Spark repo.

有关更多详细信息,请参阅 R API 文档

# 加载训练数据
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# 拟合线性SVM模型
model <- spark.svmLinear(training, Survived ~ ., regParam = 0.01, maxIter = 10)
# 模型摘要
summary(model)
# 预测
prediction <- predict(model, training)
showDF(prediction)
Find full example code at "examples/src/main/r/ml/svmLinear.R" in the Spark repo.

一对其余分类器(即一对所有)

OneVsRest 是一个机器学习减法的例子,用于在给定能够有效进行二分类的基础分类器的情况下执行多类别分类。它也被称为“one-vs-all”。

OneVsRest 实现为一个 Estimator 。对于基础分类器,它接收 Classifier 的实例,并为每个 k 类创建一个二分类问题。类 i 的分类器被训练来预测标签是否为 i,区分类 i 和其他所有类。

通过评估每个二元分类器来进行预测,输出最有信心分类器的索引作为标签。

示例

下面的例子演示了如何加载 鸢尾花数据集 ,将其解析为DataFrame,并使用 OneVsRest 进行多分类。测试错误被计算出来以测量算法的准确性。

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# 加载数据文件。
inputData = spark.read.format("libsvm") \
    .load("data/mllib/sample_multiclass_classification_data.txt")
# 生成训练/测试划分。
(train, test) = inputData.randomSplit([0.8, 0.2])
# 实例化基础分类器。
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)
# 实例化一对多分类器。
ovr = OneVsRest(classifier=lr)
# 训练多类别模型。
ovrModel = ovr.fit(train)
# 在测试数据上评分模型。
predictions = ovrModel.transform(test)
# 获取评估器。
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
# 计算测试数据上的分类错误。
accuracy = evaluator.evaluate(predictions)
print("测试错误 = %g" % (1.0 - accuracy))
Find full example code at "examples/src/main/python/ml/one_vs_rest_example.py" in the Spark repo.

有关更多细节,请参阅 Scala API 文档

import org.apache.spark.ml.classification.{LogisticRegression, OneVsRest}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
// 加载数据文件。
val inputData = spark.read.format("libsvm")
.load("data/mllib/sample_multiclass_classification_data.txt")
// 生成训练/测试分割。
val Array(train, test) = inputData.randomSplit(Array(0.8, 0.2))
// 实例化基本分类器
val classifier = new LogisticRegression()
.setMaxIter(10)
.setTol(1E-6)
.setFitIntercept(true)
// 实例化一对多分类器。
val ovr = new OneVsRest().setClassifier(classifier)
// 训练多类模型。
val ovrModel = ovr.fit(train)
// 在测试数据上评分模型。
val predictions = ovrModel.transform(test)
// 获取评估器。
val evaluator = new MulticlassClassificationEvaluator()
.setMetricName("accuracy")
// 计算测试数据上的分类错误。
val accuracy = evaluator.evaluate(predictions)
println(s"测试错误 = ${1 - accuracy}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala" in the Spark repo.

有关更多详细信息,请参考 Java API 文档

import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.OneVsRest;
import org.apache.spark.ml.classification.OneVsRestModel;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// 加载数据文件。
Dataset<Row> inputData = spark.read().format("libsvm")
.load("data/mllib/sample_multiclass_classification_data.txt");
// 生成训练/测试拆分。
Dataset<Row>[] tmp = inputData.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> train = tmp[0];
Dataset<Row> test = tmp[1];
// 配置基础分类器。
LogisticRegression classifier = new LogisticRegression()
.setMaxIter(10)
.setTol(1E-6)
.setFitIntercept(true);
// 实例化一对多分类器。
OneVsRest ovr = new OneVsRest().setClassifier(classifier);
// 训练多类模型。
OneVsRestModel ovrModel = ovr.fit(train);
// 在测试数据上评分模型。
Dataset<Row> predictions = ovrModel.transform(test)
.select("prediction", "label");
// 获取评估器。
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
.setMetricName("accuracy");
// 计算测试数据上的分类错误。
double accuracy = evaluator.evaluate(predictions);
System.out.println("测试错误 = " + (1 - accuracy));
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java" in the Spark repo.

朴素贝叶斯

Naive Bayes classifiers 是一类简单的 概率多类分类器,基于将贝叶斯定理应用于每对特征之间具有强(天真)独立假设。

朴素贝叶斯可以非常高效地进行训练。通过对训练数据的单次遍历,它计算每个特征在给定每个标签下的条件概率分布。对于预测,它应用贝叶斯定理来计算在给定观察结果下每个标签的条件概率分布。

MLlib 支持 多项式朴素贝叶斯 , 补充朴素贝叶斯 , 伯努利朴素贝叶斯 高斯朴素贝叶斯

输入数据 : 这些多项式、补充和伯努利模型通常用于 文档分类 。 在这个上下文中,每个观察值都是一个文档,每个特征代表一个术语。 特征的值是术语的频率(在多项式或补充朴素贝叶斯中)或 一个零或一,表示文档中是否找到该术语(在伯努利朴素贝叶斯中)。 多项式和伯努利模型的特征值必须是 非负 的。模型类型通过一个可选参数选择, 可以是“multinomial”、“complement”、“bernoulli”或“gaussian”,默认值为“multinomial”。 对于文档分类,输入特征向量通常应为稀疏向量。 由于训练数据仅使用一次,因此不需要对其进行缓存。

加性平滑 可以通过设置参数 $\lambda$ 来使用(默认为 $1.0$)。

示例

有关更多详细信息,请参考 Python API 文档

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# 加载训练数据
data = spark.read.format("libsvm") \
    .load("data/mllib/sample_libsvm_data.txt")
# 将数据分为训练集和测试集
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]
# 创建训练器并设置其参数
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
# 训练模型
model = nb.fit(train)
# 选择示例行进行显示。
predictions = model.transform(test)
predictions.show()
# 计算测试集的准确性
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("测试集准确率 = " + str(accuracy))
Find full example code at "examples/src/main/python/ml/naive_bayes_example.py" in the Spark repo.

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
// 加载以 LIBSVM 格式存储的数据作为 DataFrame。
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// 将数据拆分为训练集和测试集(30% 用于测试)
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), seed = 1234L)
// 训练一个 NaiveBayes 模型。
val model = new NaiveBayes()
.fit(trainingData)
// 选择示例行进行显示。
val predictions = model.transform(testData)
predictions.show()
// 选择(预测,真实标签)并计算测试误差
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"测试集准确率 = $accuracy")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/NaiveBayesExample.scala" in the Spark repo.

有关更多详细信息,请参考 Java API 文档

import org.apache.spark.ml.classification.NaiveBayes;
import org.apache.spark.ml.classification.NaiveBayesModel;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// 加载训练数据
Dataset<Row> dataFrame =
spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// 将数据分为训练集和测试集
Dataset<Row>[] splits = dataFrame.randomSplit(new double[]{0.6, 0.4}, 1234L);
Dataset<Row> train = splits[0];
Dataset<Row> test = splits[1];
// 创建训练器并设置其参数
NaiveBayes nb = new NaiveBayes();
// 训练模型
NaiveBayesModel model = nb.fit(train);
// 选择示例行进行显示。
Dataset<Row> predictions = model.transform(test);
predictions.show();
// 计算测试集上的准确率
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("测试集准确率 = " + accuracy);
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaNaiveBayesExample.java" in the Spark repo.

有关更多详细信息,请参阅 R API 文档

# 使用 spark.naiveBayes 拟合一个伯努利朴素贝叶斯模型
titanic <- as.data.frame(Titanic)
titanicDF <- createDataFrame(titanic[titanic$Freq > 0, -5])
nbDF <- titanicDF
nbTestDF <- titanicDF
nbModel <- spark.naiveBayes(nbDF, Survived ~ Class + Sex + Age)
# 模型总结
summary(nbModel)
# 预测
nbPredictions <- predict(nbModel, nbTestDF)
head(nbPredictions)
Find full example code at "examples/src/main/r/ml/naiveBayes.R" in the Spark repo.

分解机分类器

有关因子分解机的更多背景和详细信息,请参考 因子分解机部分

示例

以下示例加载LibSVM格式的数据集,将其拆分为训练集和测试集, 在第一个数据集上进行训练,然后在保留的测试集上进行评估。 我们将特征缩放到0与1之间,以防止梯度爆炸问题。

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.classification import FMClassifier
from pyspark.ml.feature import MinMaxScaler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# 加载并解析数据文件,将其转换为DataFrame。
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# 索引标签,为标签列添加元数据。
# 在整个数据集上拟合以包括索引中的所有标签。
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# 标准化特征。
featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data)
# 将数据拆分为训练集和测试集(30%用于测试)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# 训练FM模型。
fm = FMClassifier(labelCol="indexedLabel", featuresCol="scaledFeatures", stepSize=0.001)
# 创建一个管道。
pipeline = Pipeline(stages=[labelIndexer, featureScaler, fm])
# 训练模型。
model = pipeline.fit(trainingData)
# 进行预测。
predictions = model.transform(testData)
# 选择示例行以进行显示。
predictions.select("prediction", "indexedLabel", "features").show(5)
# 选择(预测,真实标签)并计算测试准确性
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("测试集准确率 = %g" % accuracy)
fmModel = model.stages[2]
print("因素: " + str(fmModel.factors)) # type: ignore
print("线性: " + str(fmModel.linear)) # type: ignore
print("截距: " + str(fmModel.intercept)) # type: ignore
Find full example code at "examples/src/main/python/ml/fm_classifier_example.py" in the Spark repo.

有关更多细节,请参考 Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{FMClassificationModel, FMClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, MinMaxScaler, StringIndexer}
// 加载并解析数据文件,将其转换为 DataFrame。
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// 索引标签,向标签列添加元数据。
// 在整个数据集上拟合,以包括索引中的所有标签。
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(data)
// 缩放特征。
val featureScaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.fit(data)
// 将数据拆分为训练集和测试集 (30% 用于测试)。
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// 训练一个 FM 模型。
val fm = new FMClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("scaledFeatures")
.setStepSize(0.001)
// 将索引标签转换回原始标签。
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labelsArray(0))
// 创建一个管道。
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, featureScaler, fm, labelConverter))
// 训练模型。
val model = pipeline.fit(trainingData)
// 进行预测。
val predictions = model.transform(testData)
// 选择示例行以进行显示。
predictions.select("predictedLabel", "label", "features").show(5)
// 选择 (预测, 真实标签) 并计算测试准确性。
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"测试集准确率 = $accuracy")
val fmModel = model.stages(2).asInstanceOf[FMClassificationModel]
println(s"因子: ${fmModel.factors} 线性: ${fmModel.linear} " +
s"截距: ${fmModel.intercept}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/FMClassifierExample.scala" in the Spark repo.

有关更多详细信息,请参阅 Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.FMClassificationModel;
import org.apache.spark.ml.classification.FMClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// 加载并解析数据文件,将其转换为 DataFrame。
Dataset<Row> data = spark
.read()
.format("libsvm")
.load("data/mllib/sample_libsvm_data.txt");
// 索引标签,为标签列添加元数据。
// 在整个数据集上拟合,以包含所有标签的索引。
StringIndexerModel labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(data);
// 规模特征。
MinMaxScalerModel featureScaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.fit(data);
// 将数据拆分为训练集和测试集(30%用于测试)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// 训练 FM 模型。
FMClassifier fm = new FMClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("scaledFeatures")
.setStepSize(0.001);
// 将索引标签转换回原始标签。
IndexToString labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labelsArray()[0]);
// 创建一个 Pipeline。
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {labelIndexer, featureScaler, fm, labelConverter});
// 训练模型。
PipelineModel model = pipeline.fit(trainingData);
// 进行预测。
Dataset<Row> predictions = model.transform(testData);
// 选择示例行进行显示。
predictions.select("predictedLabel", "label", "features").show(5);
// 选择(预测,真实标签)并计算测试准确性。
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("测试准确性 = " + accuracy);
FMClassificationModel fmModel = (FMClassificationModel)(model.stages()[2]);
System.out.println("因素: " + fmModel.factors());
System.out.println("线性: " + fmModel.linear());
System.out.println("截距: " + fmModel.intercept());
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaFMClassifierExample.java" in the Spark repo.

有关更多详细信息,请参阅 R API 文档

注意:目前 SparkR 不支持特征缩放。

# 加载训练数据
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df
# 拟合 FM 分类模型
model <- spark.fmClassifier(training, label ~ features)
# 模型摘要
summary(model)
# 预测
predictions <- predict(model, test)
head(predictions)
Find full example code at "examples/src/main/r/ml/fmClassifier.R" in the Spark repo.

回归

线性回归

线性回归模型和模型摘要的接口与逻辑回归案例类似。

当在带有常量非零列的数据集上使用“l-bfgs”求解器拟合没有截距的LinearRegressionModel时,Spark MLlib对常量非零列输出零系数。这种行为与R glmnet相同,但与LIBSVM不同。

示例

以下示例演示了训练一个弹性网正则化线性回归模型并提取模型摘要统计数据。

有关参数的更多细节,请参见 Python API 文档

from pyspark.ml.regression import LinearRegression
# 加载训练数据
training = spark.read.format("libsvm")\
    .load("data/mllib/sample_linear_regression_data.txt")
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# 拟合模型
lrModel = lr.fit(training)
# 打印线性回归的系数和截距
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))
# 总结模型并打印一些指标
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
Find full example code at "examples/src/main/python/ml/linear_regression_with_elastic_net.py" in the Spark repo.

有关参数的更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.regression.LinearRegression
// 加载训练数据
val training = spark.read.format("libsvm")
.load("data/mllib/sample_linear_regression_data.txt")
val lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
// 适配模型
val lrModel = lr.fit(training)
// 打印线性回归的系数和截距
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
// 总结训练集上的模型并打印一些指标
val trainingSummary = lrModel.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/LinearRegressionWithElasticNetExample.scala" in the Spark repo.

有关参数的更多详细信息,请参阅 Java API 文档

import org.apache.spark.ml.regression.LinearRegression;
import org.apache.spark.ml.regression.LinearRegressionModel;
import org.apache.spark.ml.regression.LinearRegressionTrainingSummary;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// 加载训练数据。
Dataset<Row> training = spark.read().format("libsvm")
.load("data/mllib/sample_linear_regression_data.txt");
LinearRegression lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8);
// 拟合模型。
LinearRegressionModel lrModel = lr.fit(training);
// 打印线性回归的系数和截距。
System.out.println("Coefficients: "
+ lrModel.coefficients() + " Intercept: " + lrModel.intercept());
// 在训练集上总结模型并打印一些指标。
LinearRegressionTrainingSummary trainingSummary = lrModel.summary();
System.out.println("numIterations: " + trainingSummary.totalIterations());
System.out.println("objectiveHistory: " + Vectors.dense(trainingSummary.objectiveHistory()));
trainingSummary.residuals().show();
System.out.println("RMSE: " + trainingSummary.rootMeanSquaredError());
System.out.println("r2: " + trainingSummary.r2());
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaLinearRegressionWithElasticNetExample.java" in the Spark repo.

有关参数的更多细节可以在 R API 文档 中找到。

# 加载训练数据
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df
# 拟合线性回归模型
model <- spark.lm(training, label ~ features, regParam = 0.3, elasticNetParam = 0.8)
# 预测
predictions <- predict(model, test)
head(predictions)
# 总结
summary(model)
Find full example code at "examples/src/main/r/ml/lm_with_elastic_net.R" in the Spark repo.

广义线性回归

与线性回归假设输出遵循高斯分布的情况相比, 广义线性模型 (GLMs) 是线性模型的一个规范,其中响应变量 $Y_i$ 遵循某种来自 指数分布族 的分布。Spark 的 GeneralizedLinearRegression 接口允许灵活指定 GLMs,可以用于各种类型的预测问题,包括线性回归、泊松回归、逻辑回归等。目前在 spark.ml 中,仅支持指数分布族的一部分分布,具体如下列出

注意 : Spark 当前仅支持通过其 GeneralizedLinearRegression 接口最多 4096 个特征,如果超过此限制将抛出异常。有关更多详细信息,请参见 高级部分 。 尽管如此,对于线性和逻辑回归,可以使用 LinearRegression LogisticRegression 估计器训练特征数量增加的模型。

广义线性模型(GLMs)需要可以以其“标准”或“自然”形式书写的指数族分布,即 自然指数族分布 。自然指数族分布的形式如下所示:

\[f_Y(y|\theta, \tau) = h(y, \tau)\exp{\left( \frac{\theta \cdot y - A(\theta)}{d(\tau)} \right)}\]

其中 $\theta$ 是感兴趣的参数,$\tau$ 是一个分散参数。在广义线性模型中,响应变量 $Y_i$ 假设是来自自然指数族分布:

\[Y_i \sim f\left(\cdot|\theta_i, \tau \right)\]

其中感兴趣的参数 $\theta_i$ 与响应变量的期望值 $\mu_i$ 相关联,如下所示

\[\mu_i = A'(\theta_i)\]

在这里,$A’(\theta_i)$ 由所选分布的形式定义。广义线性模型(GLMs)还允许指定一个连接函数,该函数定义了响应变量 $\mu_i$ 的期望值与所谓的 线性预测变量 $\eta_i$ 之间的关系:

\[g(\mu_i) = \eta_i = \vec{x_i}^T \cdot \vec{\beta}\]

通常,选择连接函数是使得 $A’ = g^{-1}$,这产生了参数 $\theta$ 和线性预测器 $\eta$ 之间的简化关系。在这种情况下,连接函数 $g(\mu)$ 被称为“典范”连接函数。

\[\theta_i = A'^{-1}(\mu_i) = g(g^{-1}(\eta_i)) = \eta_i\]

广义线性模型(GLM)寻找最大化似然函数的回归系数 $\vec{\beta}$。

\[\max_{\vec{\beta}} \mathcal{L}(\vec{\theta}|\vec{y},X) = \prod_{i=1}^{N} h(y_i, \tau) \exp{\left(\frac{y_i\theta_i - A(\theta_i)}{d(\tau)}\right)}\]

其中感兴趣的参数 $\theta_i$ 与回归系数 $\vec{\beta}$ 相关,通过

\[\theta_i = A'^{-1}(g^{-1}(\vec{x_i} \cdot \vec{\beta}))\]

Spark的广义线性回归接口还提供了用于诊断GLM模型拟合的摘要统计信息,包括残差、p值、偏差、赤池信息量准则等。

See here 查看关于广义线性模型(GLMs)及其应用的更全面的综述。

可用的家族

家族 响应类型 支持的链接
高斯 连续 恒等*, 对数, 反向
二项 二元 对数几率*, 正态, CLogLog
泊松 计数 对数*, 恒等, 平方根
伽马 连续 反向*, 恒等, 对数
特维迪 零膨胀连续 幂链接函数
* 典型链接

示例

以下示例演示了如何使用高斯响应和恒等链接函数训练GLM,并提取模型摘要统计信息。

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.regression import GeneralizedLinearRegression
# 加载训练数据
dataset = spark.read.format("libsvm")\
    .load("data/mllib/sample_linear_regression_data.txt")
glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3)
# 拟合模型
model = glr.fit(dataset)
# 打印广义线性回归模型的系数和截距
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))
# 对训练集中的模型进行概述并打印一些指标
summary = model.summary
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))
print("Dispersion: " + str(summary.dispersion))
print("Null Deviance: " + str(summary.nullDeviance))
print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary.deviance))
print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
print("AIC: " + str(summary.aic))
print("Deviance Residuals: ")
summary.residuals().show()
Find full example code at "examples/src/main/python/ml/generalized_linear_regression_example.py" in the Spark repo.

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.regression.GeneralizedLinearRegression
// 加载训练数据
val dataset = spark.read.format("libsvm")
.load("data/mllib/sample_linear_regression_data.txt")
val glr = new GeneralizedLinearRegression()
.setFamily("gaussian")
.setLink("identity")
.setMaxIter(10)
.setRegParam(0.3)
// 拟合模型
val model = glr.fit(dataset)
// 打印广义线性回归模型的系数和截距
println(s"Coefficients: ${model.coefficients}")
println(s"Intercept: ${model.intercept}")
// 在训练集上总结模型并打印一些指标
val summary = model.summary
println(s"Coefficient Standard Errors: ${summary.coefficientStandardErrors.mkString(",")}")
println(s"T Values: ${summary.tValues.mkString(",")}")
println(s"P Values: ${summary.pValues.mkString(",")}")
println(s"Dispersion: ${summary.dispersion}")
println(s"Null Deviance: ${summary.nullDeviance}")
println(s"Residual Degree Of Freedom Null: ${summary.residualDegreeOfFreedomNull}")
println(s"Deviance: ${summary.deviance}")
println(s"Residual Degree Of Freedom: ${summary.residualDegreeOfFreedom}")
println(s"AIC: ${summary.aic}")
println("Deviance Residuals: ")
summary.residuals().show()
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/GeneralizedLinearRegressionExample.scala" in the Spark repo.

有关更多细节,请参阅 Java API 文档

import java.util.Arrays;
import org.apache.spark.ml.regression.GeneralizedLinearRegression;
import org.apache.spark.ml.regression.GeneralizedLinearRegressionModel;
import org.apache.spark.ml.regression.GeneralizedLinearRegressionTrainingSummary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// 加载训练数据
Dataset<Row> dataset = spark.read().format("libsvm")
.load("data/mllib/sample_linear_regression_data.txt");
GeneralizedLinearRegression glr = new GeneralizedLinearRegression()
.setFamily("gaussian")
.setLink("identity")
.setMaxIter(10)
.setRegParam(0.3);
// 拟合模型
GeneralizedLinearRegressionModel model = glr.fit(dataset);
// 打印广义线性回归模型的系数和截距
System.out.println("Coefficients: " + model.coefficients());
System.out.println("Intercept: " + model.intercept());
// 在训练集上总结模型并打印一些指标
GeneralizedLinearRegressionTrainingSummary summary = model.summary();
System.out.println("Coefficient Standard Errors: "
+ Arrays.toString(summary.coefficientStandardErrors()));
System.out.println("T Values: " + Arrays.toString(summary.tValues()));
System.out.println("P Values: " + Arrays.toString(summary.pValues()));
System.out.println("Dispersion: " + summary.dispersion());
System.out.println("Null Deviance: " + summary.nullDeviance());
System.out.println("Residual Degree Of Freedom Null: " + summary.residualDegreeOfFreedomNull());
System.out.println("Deviance: " + summary.deviance());
System.out.println("Residual Degree Of Freedom: " + summary.residualDegreeOfFreedom());
System.out.println("AIC: " + summary.aic());
System.out.println("Deviance Residuals: ");
summary.residuals().show();
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaGeneralizedLinearRegressionExample.java" in the Spark repo.

有关更多详细信息,参见 R API 文档

training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# 使用 spark.glm 拟合一个"高斯"家庭的广义线性模型
df_list <- randomSplit(training, c(7, 3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")
# 模型摘要
summary(gaussianGLM)
# 预测
gaussianPredictions <- predict(gaussianGLM, gaussianTestDF)
head(gaussianPredictions)
# 使用 glm (R 兼容) 拟合一个广义线性模型
gaussianGLM2 <- glm(label ~ features, gaussianDF, family = "gaussian")
summary(gaussianGLM2)
# 使用 spark.glm 拟合一个"二项"家庭的广义线性模型
training2 <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
training2 <- transform(training2, label = cast(training2$label > 1, "integer"))
df_list2 <- randomSplit(training2, c(7, 3), 2)
binomialDF <- df_list2[[1]]
binomialTestDF <- df_list2[[2]]
binomialGLM <- spark.glm(binomialDF, label ~ features, family = "binomial")
# 模型摘要
summary(binomialGLM)
# 预测
binomialPredictions <- predict(binomialGLM, binomialTestDF)
head(binomialPredictions)
# 使用 spark.glm 拟合一个"tweedie"家庭的广义线性模型
training3 <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
tweedieDF <- transform(training3, label = training3$label * exp(randn(10)))
tweedieGLM <- spark.glm(tweedieDF, label ~ features, family = "tweedie",
var.power = 1.2, link.power = 0)
# 模型摘要
summary(tweedieGLM)
Find full example code at "examples/src/main/r/ml/glm.R" in the Spark repo.

决策树回归

决策树是一类流行的分类和回归方法。有关 spark.ml 实现的更多信息可以在 决策树部分 找到。

示例

以下示例加载一个LibSVM格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留下来的测试集上进行评估。 我们使用特征转换器对分类特征进行索引,为 DataFrame 添加元数据,以便决策树算法可以识别。

有关参数的更多详细信息,请参阅 Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
# 将以LIBSVM格式存储的数据加载为DataFrame。
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# 自动识别类别特征,并对其进行索引。
# 我们指定maxCategories,以便具有> 4个不同值的特征被视为连续特征。
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# 将数据拆分为训练集和测试集(30%用于测试)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# 训练一个决策树模型。
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")
# 在管道中链接索引器和树
pipeline = Pipeline(stages=[featureIndexer, dt])
# 训练模型。这也运行索引器。
model = pipeline.fit(trainingData)
# 进行预测。
predictions = model.transform(testData)
# 选择示例行进行显示。
predictions.select("prediction", "label", "features").show(5)
# 选择(预测,真实标签)并计算测试误差
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("测试数据上的均方根误差(RMSE)= %g" % rmse)
treeModel = model.stages[1]
# 仅摘要
print(treeModel)
Find full example code at "examples/src/main/python/ml/decision_tree_regression_example.py" in the Spark repo.

有关参数的更多详细信息,请参见 Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor
// 加载以 LIBSVM 格式存储的数据作为 DataFrame。
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// 自动识别类别特征,并为其建立索引。
// 在这里,我们将具有 > 4 个不同值的特征视为连续的。
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(data)
// 将数据拆分为训练集和测试集(30% 用于测试)。
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// 训练一个决策树模型。
val dt = new DecisionTreeRegressor()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
// 在管道中链接索引器和树。
val pipeline = new Pipeline()
.setStages(Array(featureIndexer, dt))
// 训练模型。这也会运行索引器。
val model = pipeline.fit(trainingData)
// 进行预测。
val predictions = model.transform(testData)
// 选择示例行以显示。
predictions.select("prediction", "label", "features").show(5)
// 选择(预测,真实标签)并计算测试误差。
val evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"测试数据的均方根误差 (RMSE) = $rmse")
val treeModel = model.stages(1).asInstanceOf[DecisionTreeRegressionModel]
println(s"学习的回归树模型:\n ${treeModel.toDebugString}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeRegressionExample.scala" in the Spark repo.

参数的更多详细信息可以在 Java API 文档 中找到。

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.DecisionTreeRegressionModel;
import org.apache.spark.ml.regression.DecisionTreeRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// 将存储在LIBSVM格式的数据加载为DataFrame。
Dataset<Row> data = spark.read().format("libsvm")
.load("data/mllib/sample_libsvm_data.txt");
// 自动识别分类特征,并对其进行索引。
// 设置maxCategories,使具有> 4个不同值的特征被视为连续特征。
VectorIndexerModel featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(data);
// 将数据拆分为训练集和测试集(30%用于测试)。
Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// 训练决策树模型。
DecisionTreeRegressor dt = new DecisionTreeRegressor()
.setFeaturesCol("indexedFeatures");
// 在Pipeline中链接索引器和树。
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[]{featureIndexer, dt});
// 训练模型。这也执行索引器。
PipelineModel model = pipeline.fit(trainingData);
// 进行预测。
Dataset<Row> predictions = model.transform(testData);
// 选择示例行以显示。
predictions.select("label", "features").show(5);
// 选择(预测,真实标签)并计算测试误差。
RegressionEvaluator evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("测试数据上的均方根误差(RMSE)= " + rmse);
DecisionTreeRegressionModel treeModel =
(DecisionTreeRegressionModel) (model.stages()[1]);
System.out.println("学习到的回归树模型:\n" + treeModel.toDebugString());
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeRegressionExample.java" in the Spark repo.

有关更多详细信息,请参考 R API 文档

# 加载训练数据
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df
# 使用 spark.decisionTree 拟合决策树回归模型
model <- spark.decisionTree(training, label ~ features, "regression")
# 模型摘要
summary(model)
# 预测
predictions <- predict(model, test)
head(predictions)
Find full example code at "examples/src/main/r/ml/decisionTree.R" in the Spark repo.

随机森林回归

随机森林是一种流行的分类和回归方法。关于 spark.ml 实现的更多信息可以在 随机森林章节 中找到。

示例

以下示例加载一个LibSVM格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。 我们使用特征转换器来索引分类特征,为 DataFrame 添加元数据,树基算法可以识别这些元数据。

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
# 加载并解析数据文件,将其转换为 DataFrame。
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# 自动识别分类特征,并对其进行索引。
# 设置 maxCategories,使得具有 > 4 个不同值的特征被视为连续特征。
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# 将数据拆分为训练集和测试集(30% 用于测试)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# 训练随机森林模型。
rf = RandomForestRegressor(featuresCol="indexedFeatures")
# 在管道中串联索引器和森林
pipeline = Pipeline(stages=[featureIndexer, rf])
# 训练模型。这也会运行索引器。
model = pipeline.fit(trainingData)
# 进行预测。
predictions = model.transform(testData)
# 选择示例行进行显示。
predictions.select("prediction", "label", "features").show(5)
# 选择(预测值,真实标签)并计算测试误差
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("测试数据的均方根误差 (RMSE) = %g" % rmse)
rfModel = model.stages[1]
print(rfModel) # 仅摘要
Find full example code at "examples/src/main/python/ml/random_forest_regressor_example.py" in the Spark repo.

请参阅 Scala API 文档 以获取更多详细信息。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
// 加载并解析数据文件,将其转换为 DataFrame。
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// 自动识别分类特征,并对其进行索引。
// 设置 maxCategories,以便具有 > 4 个不同值的特征作为连续特征处理。
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(data)
// 将数据划分为训练集和测试集(30% 用于测试)。
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// 训练随机森林模型。
val rf = new RandomForestRegressor()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
// 在管道中链式连接索引器和森林。
val pipeline = new Pipeline()
.setStages(Array(featureIndexer, rf))
// 训练模型。这也会运行索引器。
val model = pipeline.fit(trainingData)
// 进行预测。
val predictions = model.transform(testData)
// 选择示例行进行显示。
predictions.select("prediction", "label", "features").show(5)
// 选择(预测,真实标签)并计算测试误差。
val evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"测试数据上的均方根误差 (RMSE) = $rmse")
val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel]
println(s"学习的回归森林模型:\n ${rfModel.toDebugString}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/RandomForestRegressorExample.scala" in the Spark repo.

有关更多细节,请参考 Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.RandomForestRegressionModel;
import org.apache.spark.ml.regression.RandomForestRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// 加载并解析数据文件,将其转换为 DataFrame。
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// 自动识别类别特征,并为其建立索引。
// 设置 maxCategories,以便具有 > 4 个不同值的特征被视为连续特征。
VectorIndexerModel featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(data);
// 将数据分为训练集和测试集(30% 用于测试)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// 训练一个随机森林模型。
RandomForestRegressor rf = new RandomForestRegressor()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures");
// 在管道中链接索引器和森林
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {featureIndexer, rf});
// 训练模型。这也会运行索引器。
PipelineModel model = pipeline.fit(trainingData);
// 进行预测。
Dataset<Row> predictions = model.transform(testData);
// 选择示例行进行展示。
predictions.select("prediction", "label", "features").show(5);
// 选择 (预测, 真正标签) 并计算测试误差
RegressionEvaluator evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("测试数据上的均方根误差 (RMSE) = " + rmse);
RandomForestRegressionModel rfModel = (RandomForestRegressionModel)(model.stages()[1]);
System.out.println("学习到的回归森林模型:\n" + rfModel.toDebugString());
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaRandomForestRegressorExample.java" in the Spark repo.

有关更多详细信息,请参阅 R API 文档

# 加载训练数据
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df
# 使用 spark.randomForest 拟合随机森林回归模型
model <- spark.randomForest(training, label ~ features, "regression", numTrees = 10)
# 模型摘要
summary(model)
# 预测
predictions <- predict(model, test)
head(predictions)
Find full example code at "examples/src/main/r/ml/randomForest.R" in the Spark repo.

梯度提升树回归

梯度提升树(GBTs)是一种流行的回归方法,它使用决策树的集成。有关 spark.ml 实现的更多信息可以在 GBT部分 中找到。

示例

注意:对于这个示例数据集, GBTRegressor 实际上只需要 1 次迭代,但这在一般情况下并不成立。

请参阅 Python API 文档 以获得更多详细信息。

from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
# 加载和解析数据文件,转换为DataFrame。
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# 自动识别分类特征,并对其进行索引。
# 设置maxCategories,以便将具有> 4个不同值的特征视为连续特征。
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# 将数据分为训练集和测试集(30%用于测试)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# 训练一个GBT模型。
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)
# 在管道中串联索引器和GBT
pipeline = Pipeline(stages=[featureIndexer, gbt])
# 训练模型。这也会运行索引器。
model = pipeline.fit(trainingData)
# 进行预测。
predictions = model.transform(testData)
# 选择示例行进行显示。
predictions.select("prediction", "label", "features").show(5)
# 选择(预测,真实标签)并计算测试误差
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("测试数据上的均方根误差(RMSE)= %g" % rmse)
gbtModel = model.stages[1]
print(gbtModel) # 仅显示摘要
Find full example code at "examples/src/main/python/ml/gradient_boosted_tree_regressor_example.py" in the Spark repo.

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}
// 加载并解析数据文件,将其转换为DataFrame。
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// 自动识别分类特征,并对其进行索引。
// 设置maxCategories,使得具有> 4个不同值的特征被视为连续的。
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(data)
// 将数据分为训练集和测试集(30%留作测试)。
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// 训练GBT模型。
val gbt = new GBTRegressor()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
.setMaxIter(10)
// 在管道中链式索引器和GBT。
val pipeline = new Pipeline()
.setStages(Array(featureIndexer, gbt))
// 训练模型。这也会运行索引器。
val model = pipeline.fit(trainingData)
// 进行预测。
val predictions = model.transform(testData)
// 选择示例行以显示。
predictions.select("prediction", "label", "features").show(5)
// 选择(预测,真实标签)并计算测试误差。
val evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"测试数据的均方根误差(RMSE)= $rmse")
val gbtModel = model.stages(1).asInstanceOf[GBTRegressionModel]
println(s"学习到的回归GBT模型:\n ${gbtModel.toDebugString}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/GradientBoostedTreeRegressorExample.scala" in the Spark repo.

有关更多详细信息,参考 Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.GBTRegressionModel;
import org.apache.spark.ml.regression.GBTRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// 加载并解析数据文件,将其转换为DataFrame。
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// 自动识别分类特征,并对其进行索引。
// 设置maxCategories,使得具有> 4个不同值的特征被视为连续特征。
VectorIndexerModel featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(data);
// 将数据拆分为训练集和测试集(30%用于测试)。
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// 训练GBT模型。
GBTRegressor gbt = new GBTRegressor()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
.setMaxIter(10);
// 在一个管道中串联索引器和GBT。
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureIndexer, gbt});
// 训练模型。这也运行了索引器。
PipelineModel model = pipeline.fit(trainingData);
// 进行预测。
Dataset<Row> predictions = model.transform(testData);
// 选择示例行以显示。
predictions.select("prediction", "label", "features").show(5);
// 选择(预测,真实标签)并计算测试误差。
RegressionEvaluator evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("测试数据的均方根误差(RMSE)= " + rmse);
GBTRegressionModel gbtModel = (GBTRegressionModel)(model.stages()[1]);
System.out.println("学习到的回归GBT模型:\n" + gbtModel.toDebugString());
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaGradientBoostedTreeRegressorExample.java" in the Spark repo.

有关更多详细信息,请参阅 R API 文档

# 加载训练数据
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df
# 使用spark.gbt拟合GBT回归模型
model <- spark.gbt(training, label ~ features, "regression", maxIter = 10)
# 模型摘要
summary(model)
# 预测
predictions <- predict(model, test)
head(predictions)
Find full example code at "examples/src/main/r/ml/gbt.R" in the Spark repo.

生存回归

spark.ml 中,我们实现了 加速失效时间 (AFT) 模型,这是一种用于左删失数据的参数生存回归模型。 它描述了生存时间的对数模型,因此通常被称为 生存分析的对数线性模型。与为同一目的设计的 比例风险 模型不同,AFT 模型更容易并行化 因为每个实例独立地贡献于目标函数。

给定协变量 $x^{‘}$ 的值,对于受试者 i = 1, …, n 的随机寿命 $t_{i}$,可能存在右删失,AFT 模型下的似然函数为: \[ L(\beta,\sigma)=\prod_{i=1}^n[\frac{1}{\sigma}f_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})]^{\delta_{i}}S_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})^{1-\delta_{i}} \] 其中 $\delta_{i}$ 是事件发生的指示符,即是否未删失。使用 $\epsilon_{i}=\frac{\log{t_{i}}-x^{‘}\beta}{\sigma}$,对数似然函数的形式如下: \[ \iota(\beta,\sigma)=\sum_{i=1}^{n}[-\delta_{i}\log\sigma+\delta_{i}\log{f_{0}}(\epsilon_{i})+(1-\delta_{i})\log{S_{0}(\epsilon_{i})}] \] 其中 $S_{0}(\epsilon_{i})$ 是基线生存函数,而 $f_{0}(\epsilon_{i})$ 是相应的密度函数。

最常用的AFT模型基于生存时间的韦布尔分布。生存时间的韦布尔分布对应于生存时间对数的极值分布,$S_{0}(\epsilon)$函数为: \[ S_{0}(\epsilon_{i})=\exp(-e^{\epsilon_{i}}) \] $f_{0}(\epsilon_{i})$函数为: \[ f_{0}(\epsilon_{i})=e^{\epsilon_{i}}\exp(-e^{\epsilon_{i}}) \] 具有韦布尔分布的AFT模型的对数似然函数为: \[ \iota(\beta,\sigma)= -\sum_{i=1}^n[\delta_{i}\log\sigma-\delta_{i}\epsilon_{i}+e^{\epsilon_{i}}] \] 由于最小化负对数似然等同于最大后验概率,我们用来优化的损失函数是$-\iota(\beta,\sigma)$。 对于$\beta$和$\log\sigma$的梯度函数分别为: \[ \frac{\partial (-\iota)}{\partial \beta}=\sum_{1=1}^{n}[\delta_{i}-e^{\epsilon_{i}}]\frac{x_{i}}{\sigma} \] \[ \frac{\partial (-\iota)}{\partial (\log\sigma)}=\sum_{i=1}^{n}[\delta_{i}+(\delta_{i}-e^{\epsilon_{i}})\epsilon_{i}] \]

AFT模型可以被表述为一个凸优化问题,即寻找一个凸函数$-\iota(\beta,\sigma)$的最小值,其依赖于系数向量$\beta$和尺度参数的对数$\log\sigma$。实现中所依据的优化算法是L-BFGS。该实现的结果与R的生存函数 survreg 相匹配。

在没有截距的情况下拟合AFTSurvivalRegressionModel于具有常量非零列的数据集时,Spark MLlib 对于常量非零列输出零系数。这种行为与R的survival::survreg不同。

示例

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.regression import AFTSurvivalRegression
from pyspark.ml.linalg import Vectors
训练 = spark.createDataFrame([
(1.218, 1.0, Vectors.dense(1.560, -0.605)),
(2.949, 0.0, Vectors.dense(0.346, 2.158)),
(3.627, 0.0, Vectors.dense(1.380, 0.231)),
(0.273, 1.0, Vectors.dense(0.520, 1.151)),
(4.199, 0.0, Vectors.dense(0.795, -0.226))], ["标签", "删失", "特征"])
分位数概率 = [0.3, 0.6]
aft = AFTSurvivalRegression(分位数概率=分位数概率,
quantilesCol="quantiles")
模型 = aft.fit(训练)
# 打印 AFT 生存回归的系数、截距和尺度参数
print("系数: " + str(模型.coefficients))
print("截距: " + str(模型.intercept))
print("尺度: " + str(模型.scale))
模型.transform(训练).show(truncate=False)
Find full example code at "examples/src/main/python/ml/aft_survival_regression.py" in the Spark repo.

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.AFTSurvivalRegression
val training = spark.createDataFrame(Seq(
(1.218, 1.0, Vectors.dense(1.560, -0.605)),
(2.949, 0.0, Vectors.dense(0.346, 2.158)),
(3.627, 0.0, Vectors.dense(1.380, 0.231)),
(0.273, 1.0, Vectors.dense(0.520, 1.151)),
(4.199, 0.0, Vectors.dense(0.795, -0.226))
)).toDF("标签", "审查", "特征")
val quantileProbabilities = Array(0.3, 0.6)
val aft = new AFTSurvivalRegression()
.setQuantileProbabilities(quantileProbabilities)
.setQuantilesCol("分位数")
val model = aft.fit(training)
// 打印 AFT 生存回归的系数、截距和尺度参数
println(s"系数: ${model.coefficients}")
println(s"截距: ${model.intercept}")
println(s"尺度: ${model.scale}")
model.transform(training).show(false)
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/AFTSurvivalRegressionExample.scala" in the Spark repo.

有关更多详细信息,请参阅 Java API 文档

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.regression.AFTSurvivalRegression;
import org.apache.spark.ml.regression.AFTSurvivalRegressionModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(1.218, 1.0, Vectors.dense(1.560, -0.605)),
RowFactory.create(2.949, 0.0, Vectors.dense(0.346, 2.158)),
RowFactory.create(3.627, 0.0, Vectors.dense(1.380, 0.231)),
RowFactory.create(0.273, 1.0, Vectors.dense(0.520, 1.151)),
RowFactory.create(4.199, 0.0, Vectors.dense(0.795, -0.226))
);
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("censor", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> training = spark.createDataFrame(data, schema);
double[] quantileProbabilities = new double[]{0.3, 0.6};
AFTSurvivalRegression aft = new AFTSurvivalRegression()
.setQuantileProbabilities(quantileProbabilities)
.setQuantilesCol("quantiles");
AFTSurvivalRegressionModel model = aft.fit(training);
// 打印AFT生存回归的系数、截距和缩放参数
System.out.println("Coefficients: " + model.coefficients());
System.out.println("Intercept: " + model.intercept());
System.out.println("Scale: " + model.scale());
model.transform(training).show(false);
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaAFTSurvivalRegressionExample.java" in the Spark repo.

请参阅 R API 文档 以获取更多详细信息。

# 使用R生存包中的卵巢数据集
library(survival)
# 使用spark.survreg拟合加速失效时间(AFT)生存回归模型
ovarianDF <- suppressWarnings(createDataFrame(ovarian))
aftDF <- ovarianDF
aftTestDF <- ovarianDF
aftModel <- spark.survreg(aftDF, Surv(futime, fustat) ~ ecog_ps + rx)
# 模型摘要
summary(aftModel)
# 预测
aftPredictions <- predict(aftModel, aftTestDF)
head(aftPredictions)
Find full example code at "examples/src/main/r/ml/survreg.R" in the Spark repo.

等距回归

等距回归 属于回归算法的家族。严谨地说,等距回归是一个问题,其中给定一个有限的实数集 $Y = {y_1, y_2, ..., y_n}$ 代表观察到的响应 和 $X = {x_1, x_2, ..., x_n}$ 需要拟合的未知响应值 寻找一个使得最小化的函数

\begin{equation} f(x) = \sum_{i=1}^n w_i (y_i - x_i)^2 \end{equation}

关于满足完整顺序的条件,即 $x_1\le x_2\le ...\le x_n$ 其中 $w_i$ 是正权重。 结果函数称为单调回归,并且是唯一的。 它可以被视为在顺序限制下的最小二乘问题。 本质上,单调回归是一个 单调函数 最佳拟合原始数据点。

我们实现了一个 邻接违犯者算法 它采用了一种 并行化等距回归 的方法。 训练输入是一个包含三列的 DataFrame, 标签、特征和权重。此外,IsotonicRegression 算法有一个 可选参数叫做 $isotonic$,默认为 true。 该参数指定等距回归是 等距(单调递增)还是反等距(单调递减)。

训练返回一个 IsotonicRegressionModel,可用于预测已知和未知特征的标签。等温回归的结果被视为分段线性函数。因此,预测的规则为:

示例

有关API的更多详细信息,请参阅 IsotonicRegression Python 文档

from pyspark.ml.regression import IsotonicRegression
# 加载数据。
dataset = spark.read.format("libsvm")\
    .load("data/mllib/sample_isotonic_regression_libsvm_data.txt")
# 训练一个等温回归模型。
model = IsotonicRegression().fit(dataset)
print("边界按升序排列: %s\n" % str(model.boundaries))
print("与边界相关的预测: %s\n" % str(model.predictions))
# 进行预测。
model.transform(dataset).show()
Find full example code at "examples/src/main/python/ml/isotonic_regression_example.py" in the Spark repo.

有关API的详细信息,请参阅 IsotonicRegression Scala文档

import org.apache.spark.ml.regression.IsotonicRegression
// 加载数据。
val dataset = spark.read.format("libsvm")
.load("data/mllib/sample_isotonic_regression_libsvm_data.txt")
// 训练一个等距回归模型。
val ir = new IsotonicRegression()
val model = ir.fit(dataset)
println(s"边界按递增顺序排列: ${model.boundaries}\n")
println(s"与边界相关的预测: ${model.predictions}\n")
// 进行预测。
model.transform(dataset).show()
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/IsotonicRegressionExample.scala" in the Spark repo.

有关API的详细信息,请参阅 IsotonicRegression Java文档

import org.apache.spark.ml.regression.IsotonicRegression;
import org.apache.spark.ml.regression.IsotonicRegressionModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// 加载数据。
Dataset<Row> dataset = spark.read().format("libsvm")
.load("data/mllib/sample_isotonic_regression_libsvm_data.txt");
// 训练一个等距回归模型。
IsotonicRegression ir = new IsotonicRegression();
IsotonicRegressionModel model = ir.fit(dataset);
System.out.println("边界按升序排列: " + model.boundaries() + "\n");
System.out.println("与边界相关的预测: " + model.predictions() + "\n");
// 进行预测。
model.transform(dataset).show();
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaIsotonicRegressionExample.java" in the Spark repo.

请参考 IsotonicRegression R API 文档 ,获取有关该API的更多详细信息。

# 加载训练数据
df <- read.df("data/mllib/sample_isotonic_regression_libsvm_data.txt", source = "libsvm")
training <- df
test <- df
# 使用 spark.isoreg 拟合等单调回归模型
model <- spark.isoreg(training, label ~ features, isotonic = FALSE)
# 模型摘要
summary(model)
# 预测
predictions <- predict(model, test)
head(predictions)
Find full example code at "examples/src/main/r/ml/isoreg.R" in the Spark repo.

矩阵分解机器回归器

有关因子分解机的更多背景和详细信息,请参考 因子分解机部分

示例

以下示例加载LibSVM格式的数据集,将其拆分为训练集和测试集, 在第一个数据集上进行训练,然后在保留的测试集上进行评估。 我们将特征缩放到0与1之间,以防止梯度爆炸问题。

查看 Python API 文档 以获取更多详细信息。

from pyspark.ml import Pipeline
from pyspark.ml.regression import FMRegressor
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.evaluation import RegressionEvaluator
# 加载和解析数据文件,将其转换为DataFrame。
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# 缩放特征。
featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data)
# 将数据分为训练集和测试集(30%用于测试)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# 训练一个FM模型。
fm = FMRegressor(featuresCol="scaledFeatures", stepSize=0.001)
# 创建一个管道。
pipeline = Pipeline(stages=[featureScaler, fm])
# 训练模型。
model = pipeline.fit(trainingData)
# 进行预测。
predictions = model.transform(testData)
# 选择示例行以显示。
predictions.select("prediction", "label", "features").show(5)
# 选择(预测值,真实标签)并计算测试错误
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("测试数据的均方根误差(RMSE)= %g" % rmse)
fmModel = model.stages[1]
print("因素: " + str(fmModel.factors)) # type: ignore
print("线性: " + str(fmModel.linear)) # type: ignore
print("截距: " + str(fmModel.intercept)) # type: ignore
Find full example code at "examples/src/main/python/ml/fm_regressor_example.py" in the Spark repo.

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.regression.{FMRegressionModel, FMRegressor}
// 加载并解析数据文件,将其转换为 DataFrame。
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// 特征缩放。
val featureScaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.fit(data)
// 将数据拆分为训练和测试集(30% 用于测试)。
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// 训练一个 FM 模型。
val fm = new FMRegressor()
.setLabelCol("label")
.setFeaturesCol("scaledFeatures")
.setStepSize(0.001)
// 创建一个管道。
val pipeline = new Pipeline()
.setStages(Array(featureScaler, fm))
// 训练模型。
val model = pipeline.fit(trainingData)
// 进行预测。
val predictions = model.transform(testData)
// 选择示例行进行显示。
predictions.select("prediction", "label", "features").show(5)
// 选择(预测,真实标签)并计算测试误差。
val evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"测试数据的均方根误差 (RMSE) = $rmse")
val fmModel = model.stages(1).asInstanceOf[FMRegressionModel]
println(s"因子: ${fmModel.factors} 线性: ${fmModel.linear} " +
s"截距: ${fmModel.intercept}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/FMRegressorExample.scala" in the Spark repo.

有关更多详细信息,请参阅 Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.regression.FMRegressionModel;
import org.apache.spark.ml.regression.FMRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// 加载并解析数据文件,将其转换为 DataFrame。
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// 特征缩放。
MinMaxScalerModel featureScaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.fit(data);
// 将数据拆分为训练集和测试集(30%用于测试)。
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// 训练一个 FM 模型。
FMRegressor fm = new FMRegressor()
.setLabelCol("label")
.setFeaturesCol("scaledFeatures")
.setStepSize(0.001);
// 创建一个 Pipeline。
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureScaler, fm});
// 训练模型。
PipelineModel model = pipeline.fit(trainingData);
// 进行预测。
Dataset<Row> predictions = model.transform(testData);
// 选择示例行进行显示。
predictions.select("prediction", "label", "features").show(5);
// 选择(预测,真实标签)并计算测试误差。
RegressionEvaluator evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("测试数据的均方根误差 (RMSE) = " + rmse);
FMRegressionModel fmModel = (FMRegressionModel)(model.stages()[1]);
System.out.println("因子: " + fmModel.factors());
System.out.println("线性: " + fmModel.linear());
System.out.println("截距: " + fmModel.intercept());
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaFMRegressorExample.java" in the Spark repo.

有关更多详情,请参阅 R API文档

注意:目前 SparkR 不支持特征缩放。

# 加载训练数据
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training_test <- randomSplit(df, c(0.7, 0.3))
training <- training_test[[1]]
test <- training_test[[2]]
# 拟合一个FM回归模型
model <- spark.fmRegressor(training, label ~ features)
# 模型摘要
summary(model)
# 预测
predictions <- predict(model, test)
head(predictions)
Find full example code at "examples/src/main/r/ml/fmRegressor.R" in the Spark repo.

线性方法

我们实现了流行的线性方法,如逻辑回归和线性最小二乘法,并使用 $L_1$ 或 $L_2$ 正则化。有关实现和调优的详细信息,请参考 RDD 基于 API 的线性方法指南 ;这些信息仍然相关。

我们还为 弹性网络 包含了一个DataFrame API,这是一种 $L_1$和$L_2$正则化的混合,在 Zou等人的《通过弹性网络进行正则化和变量选择 中提出。 从数学上讲,它被定义为$L_1$和$L_2$正则化项的一个凸组合: \[ \alpha \left( \lambda \|\wv\|_1 \right) + (1-\alpha) \left( \frac{\lambda}{2}\|\wv\|_2^2 \right) , \alpha \in [0, 1], \lambda \geq 0 \] 通过适当设置$\alpha$,弹性网络将$L_1$和$L_2$正则化作为特殊情况包含在内。例如,如果一个 线性回归 模型 以弹性网络参数$\alpha$设置为$1$进行训练,它相当于一个 Lasso 模型。 另一方面,如果$\alpha$设置为$0$,则训练后的模型简化为 岭回归 模型。 我们为线性回归和逻辑回归实现了带有弹性网络正则化的管道API。

分解机

分解机 能够估计特征之间的交互,即使在稀疏性很大的问题中(例如广告和推荐系统)。 spark.ml 实现支持用于二元分类和回归的分解机。

因式分解机的公式是:

\[\hat{y} = w_0 + \sum\limits^n_{i-1} w_i x_i + \sum\limits^n_{i=1} \sum\limits^n_{j=i+1} \langle v_i, v_j \rangle x_i x_j\]

前两个项表示截距和线性项(与线性回归相同),最后一个项表示成对交互项。\(v_i\) 描述第 i 个变量,具有 k 个因子。

FM可用于回归,优化标准是均方误差。FM还可以通过sigmoid函数用于二元分类。优化标准是对数损失。

成对交互可以重新表达为:

\[\sum\limits^n_{i=1} \sum\limits^n_{j=i+1} \langle v_i, v_j \rangle x_i x_j = \frac{1}{2}\sum\limits^k_{f=1} \left(\left( \sum\limits^n_{i=1}v_{i,f}x_i \right)^2 - \sum\limits^n_{i=1}v_{i,f}^2x_i^2 \right)\]

这个方程在 k 和 n 上只有线性复杂度 - 也就是说,它的计算在 \(O(kn)\) 中。

一般来说,为了防止梯度爆炸问题,最好将连续特征缩放到0和1之间,或者将连续特征进行分箱并使用独热编码。

决策树

决策树 及其集成方法是分类和回归的机器学习任务中流行的工具。由于决策树易于解释、处理分类特征、适用于多类分类场景、不需要特征缩放,并且能够捕捉非线性和特征交互,因此被广泛使用。树集成算法,如随机森林和提升,是分类和回归任务中的顶尖表现者。

spark.ml 实现支持用于二分类和多分类的决策树,以及回归,使用连续和分类特征。该实现通过行对数据进行分区,允许对数百万甚至数十亿个实例进行分布式训练。

用户可以在 MLlib 决策树指南 中找到有关决策树算法的更多信息。 此API与 原始 MLlib 决策树 API 之间的主要区别是:

决策树的管道API提供比原始API更多的功能。特别地,对于分类,用户可以获得每个类别的预测概率(即类别条件概率);对于回归,用户可以获得预测的有偏样本方差。

树的集合(随机森林和梯度提升树)在 树的集合部分 中进行了描述。

输入与输出

我们在这里列出了输入和输出(预测)列的类型。所有输出列都是可选的;要排除一个输出列,请将其对应的参数设置为空字符串。

输入列

参数名称 类型 默认值 描述
labelCol Double "label" 要预测的标签
featuresCol Vector "features" 特征向量

输出列

参数名称 类型 默认值 描述 备注
predictionCol Double "prediction" 预测标签
rawPredictionCol Vector "rawPrediction" 长度为类别数量的向量,包含在进行预测时训练实例标签在树节点的计数 仅限分类
probabilityCol Vector "probability" 长度为类别数量的向量,等于经过归一化的rawPrediction的多项分布 仅限分类
varianceCol Double 预测的偏差样本方差 仅限回归

树集成

DataFrame API支持两种主要的树集成算法: 随机森林 梯度提升树 (GBTs) 。两者都使用 spark.ml 决策树 作为基模型。

用户可以在 MLlib Ensemble guide 中找到有关集成算法的更多信息。在本节中,我们演示了集成的 DataFrame API。

这个API和 原始的MLlib集成API 之间的主要区别是:

随机森林

随机森林 决策树 的集成。 随机森林结合了许多决策树,以减少过拟合的风险。 spark.ml 实现支持二分类和多分类以及回归的随机森林, 使用连续特征和分类特征。

有关算法本身的更多信息,请查看 spark.mllib 关于随机森林的文档

输入与输出

我们在这里列出了输入和输出(预测)列的类型。所有输出列都是可选的;要排除一个输出列,请将其对应的参数设置为空字符串。

输入列

参数名称 类型 默认值 描述
labelCol Double "label" 要预测的标签
featuresCol Vector "features" 特征向量

输出列(预测)

参数名称 类型 默认值 描述 备注
predictionCol Double "prediction" 预测标签
rawPredictionCol Vector "rawPrediction" 长度为类数量的向量,包含在做出预测时树节点的训练实例标签计数 仅限分类
probabilityCol Vector "probability" 长度与原始预测相等的向量,归一化为多项分布的概率 仅限分类

梯度增强树 (GBT)

梯度提升树 (GBTs) 决策树 的集成。 GBTs 通过迭代训练决策树来最小化损失函数。 spark.ml 实现支持用于二元分类和回归的 GBTs, 使用连续特征和类别特征。

有关算法本身的更多信息,请参见 spark.mllib 关于GBT的文档

输入和输出

我们在这里列出了输入和输出(预测)列的类型。所有输出列都是可选的;要排除一个输出列,请将其对应的参数设置为空字符串。

输入列

参数名称 类型 默认值 描述
labelCol Double "label" 要预测的标签
featuresCol Vector "features" 特征向量

请注意, GBTClassifier 目前仅支持二元标签。

输出列(预测)

参数名称 类型 默认值 描述 备注
predictionCol Double "prediction" 预测标签

在未来, GBTClassifier 也将输出 rawPrediction probability 列,就像 RandomForestClassifier 所做的那样。