线性方法 - 基于RDD的API
\[
\newcommand{\R}{\mathbb{R}}
\newcommand{\E}{\mathbb{E}}
\newcommand{\x}{\mathbf{x}}
\newcommand{\y}{\mathbf{y}}
\newcommand{\wv}{\mathbf{w}}
\newcommand{\av}{\mathbf{\alpha}}
\newcommand{\bv}{\mathbf{b}}
\newcommand{\N}{\mathbb{N}}
\newcommand{\id}{\mathbf{I}}
\newcommand{\ind}{\mathbf{1}}
\newcommand{\0}{\mathbf{0}}
\newcommand{\unit}{\mathbf{e}}
\newcommand{\one}{\mathbf{1}}
\newcommand{\zero}{\mathbf{0}}
\]
数学公式
许多标准的
机器学习
方法可以被表述为一个凸优化问题,即寻找一个依赖于变量向量
$f$
的最小化器,该向量
$\wv$
(在代码中称为
weights
),具有
$d$
个元素。正式地,我们可以将其写为优化问题
$\min_{\wv \in\R^d} \; f(\wv)$
,其中目标函数的形式为
\begin{equation}
f(\wv) := \lambda\, R(\wv) +
\frac1n \sum_{i=1}^n L(\wv;\x_i,y_i)
\label{eq:regPrimal}
\ .
\end{equation}
这里的向量
$\x_i\in\R^d$
是训练数据示例,对于
$1\le i\le n$
,而
$y_i\in\R$
是它们对应的标签,我们希望进行预测。如果 $L(\wv; \x, y)$ 可以表示为 $\wv^T x$ 和 $y$ 的函数,我们称这种方法为
线性
。
spark.mllib
中的几种分类和回归算法属于这一类别,并在这里进行了讨论。
目标函数
$f$
有两个部分:
控制模型复杂度的正则项,
以及衡量模型在训练数据上误差的损失。
损失函数
$L(\wv;.)$
通常是一个关于
$\wv$
的凸函数。
固定的正则化参数
$\lambda \ge 0$
(代码中的
regParam
)
定义了最小化损失(即,训练误差)和最小化模型复杂度(即,避免过拟合)这两个目标之间的权衡。
损失函数
下表总结了损失函数及其梯度或次梯度,适用于
spark.mllib
支持的方法:
| 损失函数 $L(\wv; \x, y)$ | 梯度或次梯度 | |
|---|---|---|
| 铰链损失 | $\max \{0, 1-y \wv^T \x \}, \quad y \in \{-1, +1\}$ | $\begin{cases}-y \cdot \x & \text{如果 $y \wv^T \x <1$}, \\ 0 & \text{否则}.\end{cases}$ |
| 逻辑损失 | $\log(1+\exp( -y \wv^T \x)), \quad y \in \{-1, +1\}$ | $-y \left(1-\frac1{1+\exp(-y \wv^T \x)} \right) \cdot \x$ |
| 平方损失 | $\frac{1}{2} (\wv^T \x - y)^2, \quad y \in \R$ | $(\wv^T \x - y) \cdot \x$ |
请注意,在上述数学公式中,二元标签 $y$ 被表示为 $+1$(正类)或 $-1$(负类),这对于公式是方便的。
然而
,负标签在
spark.mllib
中被表示为 $0$ 而不是 $-1$,以保持与多类标记的一致性。
正则化器
正则化的目的是
regularizer
,旨在
鼓励简单模型并避免过拟合。我们在
spark.mllib
中支持以下正则化器:
| 正则化项 $R(\wv)$ | 梯度或次梯度 | |
|---|---|---|
| 零 (未正则化) | 0 | $\0$ |
| L2 | $\frac{1}{2}\|\wv\|_2^2$ | $\wv$ |
| L1 | $\|\wv\|_1$ | $\mathrm{sign}(\wv)$ |
| 弹性网 | $\alpha \|\wv\|_1 + (1-\alpha)\frac{1}{2}\|\wv\|_2^2$ | $\alpha \mathrm{sign}(\wv) + (1-\alpha) \wv$ |
这里
$\mathrm{sign}(\wv)$
是一个向量,包含了
$\wv$
所有条目的符号 (
$\pm1$
)。
L2正则化问题通常比L1正则化更容易解决,因为其光滑性。 然而,L1正则化可以帮助促进权重的稀疏性,从而导致更小且更易于解释的模型,后者在特征选择中可能会非常有用。 弹性网 是L1和L2正则化的结合。 不建议在没有任何正则化的情况下训练模型, 特别是当训练样本数量较少时。
优化
在底层,线性方法使用凸优化方法来优化目标函数。
spark.mllib
使用两种方法,SGD 和 L-BFGS,这在
优化部分
中进行了描述。目前,大多数算法 API 支持随机梯度下降 (SGD),而少数支持 L-BFGS。有关选择优化方法的指南,请参阅
此优化部分
。
分类
分类
旨在将项目划分为类别。
最常见的分类类型是
二元分类
,其中有两个类别,通常称为正类和负类。
如果有超过两个类别,则称为
多类别分类
。
spark.mllib
支持两种线性分类方法:线性支持向量机 (SVM) 和逻辑回归。
线性 SVM 仅支持二元分类,而逻辑回归支持二元和多类别分类问题。
对于这两种方法,
spark.mllib
支持 L1 和 L2 正则化变体。
训练数据集由 MLlib 中的 RDD 的
LabeledPoint
表示,其中标签是从零开始的类索引:$0, 1, 2, \ldots$。
线性支持向量机 (SVMs)
线性 SVM 是大规模分类任务的标准方法。它是一种线性方法,如上述方程
$\eqref{eq:regPrimal}$
所述,损失函数的形式由铰链损失给出:
\[
L(\wv;\x,y) := \max \{0, 1-y \wv^T \x \}.
\]
默认情况下,线性支持向量机使用L2正则化进行训练。
我们也支持替代的L1正则化。在这种情况下,
问题变成了一个
线性规划
。
线性 SVM 算法输出一个 SVM 模型。给定一个新数据点,记为 $\x$,模型根据 $\wv^T \x$ 的值做出预测。默认情况下,如果 $\wv^T \x \geq 0$,则结果为正,否则为负。
示例
以下示例展示了如何加载一个示例数据集,构建 SVM 模型,并使用生成的模型进行预测以计算训练误差。
有关API的更多详细信息,请参考
SVMWithSGD
Python文档
和
SVMModel
Python文档
。
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint
# 加载和解析数据
def parsePoint(line):
values = [float(x) for x in line.split(' ')]
return LabeledPoint(values[0], values[1:])
data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)
# 构建模型
<span
以下代码片段说明了如何加载一个示例数据集,使用算法对象中的静态方法在这些训练数据上执行训练算法,并利用生成的模型进行预测以计算训练误差。
有关API的详细信息,请参阅
SVMWithSGD
的Scala文档
和
SVMModel
的Scala文档
。
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.util.MLUtils
// 以LIBSVM格式加载训练数据。
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// 将数据拆分为训练(60%)和测试(40%)。
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)
// 运行训练算法以构建模型
val numIterations = 100
val model = SVMWithSGD.train(training, numIterations)
// 清除默认阈值。
model.clearThreshold()
// 在测试集上计算原始分数。
val scoreAndLabels = test.map { point =>
val score = model.predict(point.features)
(score, point.label)
}
// 获取评估指标。
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()
println(s"ROC曲线下面积 = $auROC")
// 保存和加载模型
model.save(sc, "target/tmp/scalaSVMWithSGDModel")
val sameModel = SVMModel.load(sc, "target/tmp/scalaSVMWithSGDModel")
默认情况下,
SVMWithSGD.train()
方法执行 L2 正则化,正则化参数设置为 1.0。如果我们想要配置这个算法,我们可以通过直接创建一个新对象并调用设置方法来进一步定制
SVMWithSGD
。所有其他
spark.mllib
算法也支持以这种方式进行定制。例如,以下代码生成了一个 L1 正则化的 SVM 变体,正则化参数设置为 0.1,并运行训练算法 200 次迭代。
import org.apache.spark.mllib.optimization.L1Updater
val svmAlg = new SVMWithSGD()
svmAlg.optimizer
.setNumIterations(200)
.setRegParam(0.1)
.setUpdater(new L1Updater)
val modelL1 = svmAlg.run(training)
MLlib 的所有方法都使用 Java 友好的类型,因此您可以以与在 Scala 中相同的方式导入并调用它们。唯一的注意事项是这些方法需要 Scala RDD 对象,而 Spark Java API 使用单独的
JavaRDD
类。您可以通过在您的
JavaRDD
对象上调用
.rdd()
将 Java RDD 转换为 Scala RDD。下面提供的自包含应用示例与 Scala 中提供的示例等价:
请参考
SVMWithSGD
Java 文档
和
SVMModel
Java 文档
以获取有关API的详细信息。
import scala.Tuple2;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.classification.SVMModel;
import org.apache.spark.mllib.classification.SVMWithSGD;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();
// 将初始 RDD 分割成两部分... [60% 训练数据, 40% 测试数据].
JavaRDD<LabeledPoint> training = data.sample(false, 0.6, 11L);
training.cache();
JavaRDD<LabeledPoint> test = data.subtract(training);
// 运行训练算法以构建模型.
int numIterations = 100;
SVMModel model = SVMWithSGD.train(training.rdd(), numIterations);
// 清除默认阈值.
model.clearThreshold();
// 计算测试集的原始分数.
JavaRDD<Tuple2Object, Object>> scoreAndLabels = test.map(p ->
new Tuple2<>(model.predict(p.features()), p.label()));
// 获取评估指标.
BinaryClassificationMetrics metrics =
new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
double auROC = metrics.areaUnderROC();
System.out.println("ROC曲线下的面积 = " + auROC);
// 保存和加载模型
model.save(sc, "target/tmp/javaSVMWithSGDModel");
SVMModel sameModel = SVMModel.load(sc, "target/tmp/javaSVMWithSGDModel");
默认情况下,
SVMWithSGD.train()
方法执行 L2 正则化,正则化参数设置为 1.0。如果我们想配置该算法,可以通过直接创建一个新对象并调用设置方法来自定义
SVMWithSGD
。所有其他
spark.mllib
算法也支持以这种方式进行自定义。例如,以下代码生成了一个 L1 正则化的 SVM 变体,正则化参数设置为 0.1,并运行训练算法 200 次迭代。
import org.apache.spark.mllib.optimization.L1Updater;
SVMWithSGD svmAlg = new SVMWithSGD();
svmAlg.optimizer()
.setNumIterations(200)
.setRegParam(0.1)
.setUpdater(new L1Updater());
SVMModel modelL1 = svmAlg.run(training.rdd());
为了运行上述应用程序,按照Spark快速入门指南中 自包含应用程序 部分提供的说明进行操作。确保在构建文件中将 spark-mllib 作为依赖项包含在内。
逻辑回归
逻辑回归
广泛用于预测二元响应。它是一种线性方法,如上面公式中所述
$\eqref{eq:regPrimal}$
,损失函数的公式由逻辑损失给出:
\[
L(\wv;\x,y) := \log(1+\exp( -y \wv^T \x)).
\]
对于二元分类问题,该算法输出一个二元逻辑回归模型。
给定一个新的数据点,记作 $\x$,该模型通过
应用逻辑函数来进行预测
\[
\mathrm{f}(z) = \frac{1}{1 + e^{-z}}
\]
其中 $z = \wv^T \x$。
默认情况下,如果 $\mathrm{f}(\wv^T x) > 0.5$,则结果为正,否则为负,尽管与线性 SVM 不同,逻辑回归模型的原始输出 $\mathrm{f}(z)$ 具有概率解释(即,$\x$ 为正的概率)。
二元逻辑回归可以推广为
多项逻辑回归
,以
训练和预测多类分类问题。
例如,对于 $K$ 个可能的结果,可以选择其中一个结果作为“枢轴”,
其余的 $K - 1$ 个结果可以单独回归于枢轴结果。
在
spark.mllib
中,第一个类别 $0$ 被选为“枢轴”类别。
参考
统计学习的要素
第 4.4 节。
这里有一个
详细的数学推导
。
对于多类分类问题,该算法将输出一个多项式逻辑回归模型,其中包含 $K - 1$ 个针对第一类回归的二元逻辑回归模型。给定一个新的数据点,将运行 $K - 1$ 个模型,并选择具有最大概率的类作为预测类。
我们实现了两种算法来解决逻辑回归:小批量梯度下降和L-BFGS。我们推荐使用L-BFGS,因其收敛速度比小批量梯度下降更快。
示例
以下示例展示了如何加载一个样本数据集,构建逻辑回归模型,并使用生成的模型进行预测以计算训练误差。
请注意,Python API 目前还不支持多类分类和模型的保存/加载,但未来会支持。
有关API的更多详细信息,请参阅
LogisticRegressionWithLBFGS
Python文档
和
LogisticRegressionModel
Python文档
。
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
# 加载和解析数据
def parsePoint(line):
values = [float(x) for x in line.split(' ')]
return LabeledPoint(values[0], values[1:])
data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)
# 构建模型
model = LogisticRegressionWithLBFGS.train(parsedData)
# 在训练数据上评估模型
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("训练错误 = " + str(trainErr))
# 保存和加载模型
model.save(sc, "target/tmp/pythonLogisticRegressionWithLBFGSModel")
sameModel = LogisticRegressionModel.load(sc,
"target/tmp/pythonLogisticRegressionWithLBFGSModel")
以下代码演示了如何加载一个示例多类数据集,将其拆分为训练集和测试集,并使用 LogisticRegressionWithLBFGS 来拟合逻辑回归模型。 然后该模型将在测试数据集上进行评估并保存到磁盘上。
请参阅
LogisticRegressionWithLBFGS
的 Scala 文档
和
LogisticRegressionModel
的 Scala 文档
以获取有关 API 的详细信息。
import org.apache.spark.mllib.classification.{LogisticRegressionModel, LogisticRegressionWithLBFGS}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
// 以 LIBSVM 格式加载训练数据。
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// 将数据分成训练集 (60%) 和测试集 (40%)。
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)
// 运行训练算法构建模型
val model = new LogisticRegressionWithLBFGS()
.setNumClasses(10)
.run(training)
// 在测试集上计算原始分数。
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
val prediction = model.predict(features)
(prediction, label)
}
// 获取评估指标。
val metrics = new MulticlassMetrics(predictionAndLabels)
val accuracy = metrics.accuracy
println(s"准确率 = $accuracy")
// 保存和加载模型
model.save(sc, "target/tmp/scalaLogisticRegressionWithLBFGSModel")
val sameModel = LogisticRegressionModel.load(sc,
"target/tmp/scalaLogisticRegressionWithLBFGSModel")
以下代码演示了如何加载一个示例多类数据集,将其拆分为训练集和测试集,并使用 LogisticRegressionWithLBFGS 来拟合逻辑回归模型。然后对模型进行测试数据集的评估并保存到磁盘。
请参考
LogisticRegressionWithLBFGS
Java 文档
和
LogisticRegressionModel
Java 文档
以获取有关API的详细信息。
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
import org.apache.spark.mllib.evaluation.MulticlassMetrics;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();
// 将初始 RDD 分割为两个... [60% 训练数据,40% 测试数据]。
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[] {0.6, 0.4}, 11L);
JavaRDD<LabeledPoint> training = splits[0].cache();
JavaRDD<LabeledPoint> test = splits[1];
// 运行训练算法以构建模型。
LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
.setNumClasses(10)
.run(training.rdd());
// 计算测试集上的原始分数。
JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p ->
new Tuple2<>(model.predict(p.features()), p.label()));
// 获取评估指标。
MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());
double accuracy = metrics.accuracy();
System.out.println("准确率 = " + accuracy);
// 保存和加载模型
model.save(sc, "target/tmp/javaLogisticRegressionWithLBFGSModel");
LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc,
"target/tmp/javaLogisticRegressionWithLBFGSModel");
回归
线性最小二乘法、Lasso回归和岭回归
线性最小二乘是回归问题最常用的公式。它是一种线性方法,如上面方程
$\eqref{eq:regPrimal}$
所述,其损失函数由平方损失给出:
\[
L(\wv;\x,y) := \frac{1}{2} (\wv^T \x - y)^2.
\]
通过使用不同类型的正则化,可以得出各种相关的回归方法: 普通最小二乘法 或 线性最小二乘法 不使用 正则化; 岭回归 使用 L2 正则化;而 Lasso 使用 L1 正则化。对于所有这些模型,平均损失或训练误差,$\frac{1}{n} \sum_{i=1}^n (\wv^T x_i - y_i)^2$,被称为 均方误差 。
流式线性回归
当数据以流的方式到达时,实时拟合回归模型是有用的,随着新数据的到来更新模型的参数。
spark.mllib
目前支持使用普通最小二乘法的流线性回归。 拟合与离线进行的类似,只是拟合发生在每一批数据上,以便模型不断更新以反映来自流的数据。
示例
以下示例演示了如何从两个不同的文本文件输入流加载训练和测试数据,将流解析为带标签的点,在线拟合线性回归模型到第一个流,并对第二个流进行预测。
首先,我们导入解析输入数据和创建模型所需的类。
然后我们为训练和测试数据创建输入流。我们假设已经创建了一个 StreamingContext
ssc
,有关更多信息,请参见
Spark Streaming 编程指南
。对于这个示例,我们在训练和测试流中使用带标签的点,但在实际操作中,您可能更想使用无标签的向量作为测试数据。
我们通过将权重初始化为0来创建我们的模型。
现在我们注册训练和测试的流并开始作业。
我们现在可以将带有数据的文本文件保存到训练或测试文件夹中。
每行应该是格式为
(y,[x1,x2,x3])
的数据点,其中
y
是标签
而
x1,x2,x3
是特征。每当一个文本文件被放置在
sys.argv[1]
时,模型将会更新。每当一个文本文件被放置在
sys.argv[2]
时,你将看到预测结果。
随着你向训练目录提供更多数据,预测结果
会越来越好!
这是一个完整的示例:
import sys
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import StreamingLinearRegressionWithSGD
def parse(lp):
label = float(lp[lp.find('(') + 1: lp.find(',')])
vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
return LabeledPoint(label, vec)
trainingData = ssc.textFileStream(sys.argv[1]).map(parse).cache()
testData = ssc.textFileStream(sys.argv[2]).map(parse)
numFeatures = 3
model = StreamingLinearRegressionWithSGD()
model.setInitialWeights([0.0, 0.0, 0.0])
model.trainOn(trainingData)
print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))
ssc.start()
ssc.awaitTermination()
首先,我们导入解析输入数据和创建模型所需的类。
然后我们为训练和测试数据创建输入流。我们假设已经创建了一个 StreamingContext
ssc
,更多信息见
Spark Streaming 编程指南
。在这个例子中,我们在训练和测试流中使用带标签的点,但实际上您可能希望使用无标签的向量作为测试数据。
我们通过将权重初始化为零来创建模型,并注册用于训练和测试的流,然后启动任务。打印预测结果以及真实标签使我们能够轻松查看结果。
最后,我们可以将包含数据的文本文件保存到训练或测试文件夹中。每一行应该是一个数据点,格式为
(y,[x1,x2,x3])
,其中
y
是标签,
x1,x2,x3
是特征。每当将一个文本文件放入
args(0)
时,模型会更新。每当将一个文本文件放入
args(1)
时,您将看到预测。随着您向训练目录提供更多数据,预测将变得更好!
这是一个完整的示例:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
val trainingData = ssc.textFileStream(args(0)).map(LabeledPoint.parse).cache()
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)
val numFeatures = 3
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.zeros(numFeatures))
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()
实现(开发者)
在后台,
spark.mllib
实现了一种简单的分布式随机梯度下降(SGD)版本,建立在基础的梯度下降原语之上(如
优化
部分所述)。所有提供的算法都以正则化参数 (
regParam
) 和与随机梯度下降相关的各种参数作为输入 (
stepSize
,
numIterations
,
miniBatchFraction
)。对于它们中的每一个,我们支持三种可能的正则化(无,L1 或 L2)。
对于逻辑回归, L-BFGS 版本是在 LogisticRegressionWithLBFGS 下实现的,该版本支持二元和多元逻辑回归,而 SGD 版本仅支持二元逻辑回归。然而,L-BFGS 版本不支持 L1 正则化,但 SGD 版本支持 L1 正则化。当不需要 L1 正则化时,强烈建议使用 L-BFGS 版本,因为它通过使用拟牛顿法近似逆海森矩阵,相比于 SGD 收敛得更快且更准确。
算法都用Scala实现: