优化 - 基于RDD的API
\[
\newcommand{\R}{\mathbb{R}}
\newcommand{\E}{\mathbb{E}}
\newcommand{\x}{\mathbf{x}}
\newcommand{\y}{\mathbf{y}}
\newcommand{\wv}{\mathbf{w}}
\newcommand{\av}{\mathbf{\alpha}}
\newcommand{\bv}{\mathbf{b}}
\newcommand{\N}{\mathbb{N}}
\newcommand{\id}{\mathbf{I}}
\newcommand{\ind}{\mathbf{1}}
\newcommand{\0}{\mathbf{0}}
\newcommand{\unit}{\mathbf{e}}
\newcommand{\one}{\mathbf{1}}
\newcommand{\zero}{\mathbf{0}}
\]
数学描述
梯度下降
解决形式为
$\min_{\wv \in\R^d} \; f(\wv)$
的优化问题的最简单方法是
梯度下降
。这种一阶优化方法(包括梯度下降及其随机变体)非常适合大规模和分布式计算。
梯度下降方法旨在通过迭代地朝着最陡下降的方向(也就是当前点的导数的负值,称为
梯度
)采取步骤,以找到函数的局部最小值,即在当前参数值下。如果目标函数
$f$
在所有参数上不可微,但仍然是凸的,那么
次梯度
是梯度的自然推广,并承担步长方向的角色。无论如何,计算
$f$
的梯度或次梯度是昂贵的——这需要对完整数据集进行完整的遍历,以计算所有损失项的贡献。
随机梯度下降 (SGD)
目标函数
$f$
被写成和的优化问题特别适合使用
随机梯度下降法 (SGD)
进行求解。 在我们的案例中,对于在
监督机器学习
中常用的优化形式,
\begin{equation}
f(\wv) :=
\lambda\, R(\wv) +
\frac1n \sum_{i=1}^n L(\wv;\x_i,y_i)
\label{eq:regPrimal}
\ .
\end{equation}
这尤其自然,因为损失被写成来自每个数据点的个别损失的平均值。
随机子梯度是对一个向量的随机选择,使得在期望上,我们能够获得原始目标函数的真实子梯度。
均匀随机选择一个数据点
$i\in[1..n]$
,我们获得关于
$\wv$
的随机子梯度如下:
\[
f'_{\wv,i} := L'_{\wv,i} + \lambda\, R'_\wv \ ,
\]
其中
$L'_{\wv,i} \in \R^d$
是由
$i$
-th 数据点确定的损失函数部分的子梯度,即
$L'_{\wv,i} \in \frac{\partial}{\partial \wv} L(\wv;\x_i,y_i)$
。
此外,
$R'_\wv$
是正则化项
$R(\wv)$
的子梯度,即
$R'_\wv \in
\frac{\partial}{\partial \wv} R(\wv)$
。项
$R'_\wv$
不依赖于选择哪个随机数据点。
显然,在对随机选择
$i\in[1..n]$
的期望中,我们有
$f'_{\wv,i}$
是原始目标
$f$
的子梯度,这意味着
$\E\left[f'_{\wv,i}\right] \in
\frac{\partial}{\partial \wv} f(\wv)$
。
现在运行 SGD 只是朝负随机子梯度的方向行走
$f'_{\wv,i}$
,即
\begin{equation}\label{eq:SGDupdate}
\wv^{(t+1)} := \wv^{(t)} - \gamma \; f'_{\wv,i} \ .
\end{equation}
步长.
参数
$\gamma$
是步长,在默认实现中选择
与迭代计数的平方根成比例递减,即
$\gamma := \frac{s}{\sqrt{t}}$
在第
$t$
次迭代中,输入参数
$s=$ stepSize
。请注意,为 SGD 方法选择最佳步长在实践中常常是微妙的,并且是一个活跃研究的主题。
梯度。
在
spark.mllib
中实现的机器学习方法的(子)梯度表可在
分类和回归
部分中找到。
近端更新。
作为一种替代,仅使用正则化器的次梯度
$R'(\wv)$
在步方向上,某些情况下可以通过使用近端算子获得改进的更新。
对于L1正则化器,近端算子通过软阈值来给出,如在
L1Updater
中实现。
分布式SGD的更新方案
SGD 实现在
GradientDescent
使用
简单的(分布式)数据示例采样。
我们回顾一下优化问题的损失部分
$\eqref{eq:regPrimal}$
是
$\frac1n \sum_{i=1}^n L(\wv;\x_i,y_i)$
,因此
$\frac1n \sum_{i=1}^n L'_{\wv,i}$
将
是真实的(子)梯度。
由于这需要访问完整数据集,参数
miniBatchFraction
指定
要使用的完整数据的比例。
在此子集上,梯度的平均值,即
\[
\frac1{|S|} \sum_{i\in S} L'_{\wv,i} \ ,
\]
是一个随机梯度。这里
$S$
是大小为
$|S|=$ miniBatchFraction
$\cdot n$
的抽样子集。
在每次迭代中,针对分布式数据集的采样( RDD )以及从每个工作机器的部分结果计算总和是由标准的Spark例程执行的。
如果点的比例
miniBatchFraction
被设置为 1(默认值),则每次迭代中的结果步骤是精确的(子)梯度下降。在这种情况下,使用的步骤方向没有随机性也没有方差。另一方面,如果
miniBatchFraction
选择得非常小,以至于只采样一个点,即
$|S|=$ miniBatchFraction $\cdot n = 1$
,那么该算法等同于标准的 SGD。在这种情况下,步骤方向依赖于对点的均匀随机采样。
有限内存BFGS (L-BFGS)
L-BFGS
是一种优化算法,属于拟牛顿方法家族,旨在解决形式为
$\min_{\wv \in\R^d} \; f(\wv)$
的优化问题。L-BFGS 方法在局部将目标函数近似为二次函数,而无需评估目标函数的二阶偏导数来构建海森矩阵。海森矩阵是通过先前的梯度评估来近似的,因此在使用牛顿法显式计算海森矩阵时,没有垂直可扩展性问题(训练特征的数量)。因此,与其他一阶优化方法相比,L-BFGS 通常实现更快速的收敛。
选择优化方法
线性方法
在内部使用优化,一些
spark.mllib
中的线性方法支持 SGD 和 L-BFGS。
不同的优化方法可以根据目标函数的属性具有不同的收敛保证,我们无法在这里涵盖文献。
一般来说,当 L-BFGS 可用时,我们建议使用它而不是 SGD,因为 L-BFGS 通常收敛得更快(在更少的迭代中)。
在 MLlib 中的实现
梯度下降和随机梯度下降
梯度下降方法包括随机子梯度下降(SGD),作为
MLlib
中的一种低级原语,基于此开发了各种机器学习算法,具体见
线性方法
部分的例子。
SGD 类 GradientDescent 设置以下参数:
-
Gradient是一个计算被优化函数随机梯度的类,即相对于当前参数值的单个训练示例。 MLlib 包含常见损失函数的梯度类,例如,铰链损失、逻辑损失、最小二乘损失。梯度类接受训练示例、其标签和当前参数值作为输入。 -
Updater是一个执行实际梯度下降步骤的类,即在每次迭代中更新权重,针对给定的损失部分的梯度。更新器还负责执行来自正则化部分的更新。MLlib 包含没有正则化的情况以及 L1 和 L2 正则化器的更新器。 -
stepSize是一个标量值,表示梯度下降的初始步长。 MLlib 中的所有更新器使用在第 t 步的步长等于stepSize $/ \sqrt{t}$。 -
numIterations是要运行的迭代次数。 -
regParam是使用 L1 或 L2 正则化时的正则化参数。 -
miniBatchFraction是在每次迭代中抽样的总数据的比例,用于计算梯度方向。-
抽样仍然需要遍历整个 RDD,因此减少
miniBatchFraction可能不会显著加快优化。当计算梯度的成本高昂时,用户将会看到最大的加速,因为仅选择的样本用于计算梯度。
-
抽样仍然需要遍历整个 RDD,因此减少
L-BFGS
L-BFGS 目前仅在
MLlib
中作为一种低级优化原语。如果您想在各种 ML 算法中使用 L-BFGS,例如线性回归和逻辑回归,您需要自己将目标函数的梯度和更新器传递给优化器,而不是使用像
LogisticRegressionWithSGD
这样的训练 API。请参阅下面的示例。将在下一个版本中解决。
使用 L1Updater 的 L1 正则化将无法工作,因为 L1Updater 中的软阈值逻辑是为梯度下降设计的。请参见开发者注释。
L-BFGS 方法 LBFGS.runLBFGS 具有以下参数:
-
Gradient是一个计算优化目标函数梯度的类,即在当前参数值下,针对单个训练示例的梯度。MLlib 包含用于常见损失函数的梯度类,例如 hinge、logistic、最小二乘。梯度类输入一个训练示例、其标签和当前参数值。 -
Updater是一个计算 L-BFGS 的目标函数的梯度和损失的类。MLlib 包含不带正则化的更新器,以及 L2 正则化器。 -
numCorrections是 L-BFGS 更新中使用的修正次数。推荐使用 10。 -
maxNumIterations是 L-BFGS 可以运行的最大迭代次数。 -
regParam是使用正则化时的正则化参数。 -
convergenceTol控制 L-BFGS 被认为收敛时允许的相对变化量。这必须是非负的。较低的值容忍度较低,因此通常导致更多的迭代。该值同时考虑平均改善和梯度的范数,位于 Breeze LBFGS 中。
返回的是一个包含两个元素的元组。第一个元素是一个列矩阵,包含每个特征的权重,第二个元素是一个数组,包含每次迭代计算的损失。
这里是一个使用L-BFGS优化器训练二元逻辑回归的示例,带有L2正则化。
请参阅
LBFGS
Scala 文档
和
SquaredL2Updater
Scala 文档
获取关于 API 的详细信息。
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.optimization.{LBFGS, LogisticGradient, SquaredL2Updater}
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val numFeatures = data.take(1)(0).features.size
// 将数据分为训练集(60%)和测试集(40%)。
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
// 在训练数据中添加1作为截距项。
val training = splits(0).map(x => (x.label, MLUtils.appendBias(x.features))).cache()
val test = splits(1)
// 运行训练算法以构建模型
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1))
val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
training,
new LogisticGradient(),
new SquaredL2Updater(),
numCorrections,
convergenceTol,
maxNumIterations,
regParam,
initialWeightsWithIntercept)
val model = new LogisticRegressionModel(
Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)),
weightsWithIntercept(weightsWithIntercept.size - 1))
// 清除默认阈值。
model.clearThreshold()
// 计算测试集上的原始得分。
val scoreAndLabels = test.map { point =>
val score = model.predict(point.features)
(score, point.label)
}
// 获取评估指标。
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()
println("训练过程中每一步的损失")
loss.foreach(println)
println(s"ROC下面积 = $auROC")
有关API的详细信息,请参阅
LBFGS
Java文档
和
SquaredL2Updater
Java文档
。
import java.util.Arrays;
import scala.Tuple2;
import org.apache.spark.api.java.*;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.optimization.*;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();
int numFeatures = data.take(1).get(0).features().size();
// 将初始 RDD 分成两个... [60% 训练数据, 40% 测试数据].
JavaRDD<LabeledPoint> trainingInit = data.sample(false, 0.6, 11L);
JavaRDD<LabeledPoint> test = data.subtract(trainingInit);
// 将 1 附加到训练数据中作为截距.
JavaPairRDD<Object, Vector> training = data.mapToPair(p ->
new Tuple2<>(p.label(), MLUtils.appendBias(p.features())));
training.cache();
// 运行训练算法以构建模型.
int numCorrections = 10;
double convergenceTol = 1e-4;
int maxNumIterations = 20;
double regParam = 0.1;
Vector initialWeightsWithIntercept = Vectors.dense(new double[numFeatures + 1]);
Tuple2<Vector, double[]> result = LBFGS.runLBFGS(
training.rdd(),
new LogisticGradient(),
new SquaredL2Updater(),
numCorrections,
convergenceTol,
maxNumIterations,
regParam,
initialWeightsWithIntercept);
Vector weightsWithIntercept = result._1();
double[] loss = result._2();
LogisticRegressionModel model = new LogisticRegressionModel(
Vectors.dense(Arrays.copyOf(weightsWithIntercept.toArray(), weightsWithIntercept.size() - 1)),
(weightsWithIntercept.toArray())[weightsWithIntercept.size() - 1]);
// 清除默认阈值.
model.clearThreshold();
// 在测试集上计算原始分数.
JavaPairRDD<Object, Object> scoreAndLabels = test.mapToPair(p ->
new Tuple2<>(model.predict(p.features()), p.label()));
// 获取评估指标.
BinaryClassificationMetrics metrics =
new BinaryClassificationMetrics(scoreAndLabels.rdd());
double auROC = metrics.areaUnderROC();
System.out.println("训练过程中的每一步损失");
for (double l : loss) {
System.out.println(l);
}
System.out.println("ROC 曲线下面积 = " + auROC);
开发者的笔记
由于海森矩阵是根据之前的梯度评估近似构造的,因此在优化过程中目标函数不能改变。因此,随机L-BFGS不能仅仅依靠miniBatch工作;因此,在我们有更好的理解之前,我们不提供这个。
Updater
是一个最初为梯度下降设计的类,用于计算实际的梯度下降步骤。 然而,我们能够通过忽略仅针对梯度下降的逻辑部分,例如自适应步长等,将梯度和正则化目标函数的损失用于L-BFGS。我们将把它重构为正则化器,以替换更新器,从而分离正则化和步骤更新之间的逻辑。