优化 - 基于RDD的API

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

数学描述

梯度下降

解决形式为 $\min_{\wv \in\R^d} \; f(\wv)$ 的优化问题的最简单方法是 梯度下降 。这种一阶优化方法(包括梯度下降及其随机变体)非常适合大规模和分布式计算。

梯度下降方法旨在通过迭代地朝着最陡下降的方向(也就是当前点的导数的负值,称为 梯度 )采取步骤,以找到函数的局部最小值,即在当前参数值下。如果目标函数 $f$ 在所有参数上不可微,但仍然是凸的,那么 次梯度 是梯度的自然推广,并承担步长方向的角色。无论如何,计算 $f$ 的梯度或次梯度是昂贵的——这需要对完整数据集进行完整的遍历,以计算所有损失项的贡献。

随机梯度下降 (SGD)

目标函数 $f$ 被写成和的优化问题特别适合使用 随机梯度下降法 (SGD) 进行求解。 在我们的案例中,对于在 监督机器学习 中常用的优化形式, \begin{equation} f(\wv) := \lambda\, R(\wv) + \frac1n \sum_{i=1}^n L(\wv;\x_i,y_i) \label{eq:regPrimal} \ . \end{equation} 这尤其自然,因为损失被写成来自每个数据点的个别损失的平均值。

随机子梯度是对一个向量的随机选择,使得在期望上,我们能够获得原始目标函数的真实子梯度。 均匀随机选择一个数据点 $i\in[1..n]$ ,我们获得关于 $\wv$ 的随机子梯度如下: \[ f'_{\wv,i} := L'_{\wv,i} + \lambda\, R'_\wv \ , \] 其中 $L'_{\wv,i} \in \R^d$ 是由 $i$ -th 数据点确定的损失函数部分的子梯度,即 $L'_{\wv,i} \in \frac{\partial}{\partial \wv} L(\wv;\x_i,y_i)$ 。 此外, $R'_\wv$ 是正则化项 $R(\wv)$ 的子梯度,即 $R'_\wv \in \frac{\partial}{\partial \wv} R(\wv)$ 。项 $R'_\wv$ 不依赖于选择哪个随机数据点。 显然,在对随机选择 $i\in[1..n]$ 的期望中,我们有 $f'_{\wv,i}$ 是原始目标 $f$ 的子梯度,这意味着 $\E\left[f'_{\wv,i}\right] \in \frac{\partial}{\partial \wv} f(\wv)$

现在运行 SGD 只是朝负随机子梯度的方向行走 $f'_{\wv,i}$ ,即 \begin{equation}\label{eq:SGDupdate} \wv^{(t+1)} := \wv^{(t)} - \gamma \; f'_{\wv,i} \ . \end{equation} 步长. 参数 $\gamma$ 是步长,在默认实现中选择 与迭代计数的平方根成比例递减,即 $\gamma := \frac{s}{\sqrt{t}}$ 在第 $t$ 次迭代中,输入参数 $s=$ stepSize 。请注意,为 SGD 方法选择最佳步长在实践中常常是微妙的,并且是一个活跃研究的主题。

梯度。 spark.mllib 中实现的机器学习方法的(子)梯度表可在 分类和回归 部分中找到。

近端更新。 作为一种替代,仅使用正则化器的次梯度 $R'(\wv)$ 在步方向上,某些情况下可以通过使用近端算子获得改进的更新。 对于L1正则化器,近端算子通过软阈值来给出,如在 L1Updater 中实现。

分布式SGD的更新方案

SGD 实现在 GradientDescent 使用 简单的(分布式)数据示例采样。 我们回顾一下优化问题的损失部分 $\eqref{eq:regPrimal}$ $\frac1n \sum_{i=1}^n L(\wv;\x_i,y_i)$ ,因此 $\frac1n \sum_{i=1}^n L'_{\wv,i}$ 将 是真实的(子)梯度。 由于这需要访问完整数据集,参数 miniBatchFraction 指定 要使用的完整数据的比例。 在此子集上,梯度的平均值,即 \[ \frac1{|S|} \sum_{i\in S} L'_{\wv,i} \ , \] 是一个随机梯度。这里 $S$ 是大小为 $|S|=$ miniBatchFraction $\cdot n$ 的抽样子集。

在每次迭代中,针对分布式数据集的采样( RDD )以及从每个工作机器的部分结果计算总和是由标准的Spark例程执行的。

如果点的比例 miniBatchFraction 被设置为 1(默认值),则每次迭代中的结果步骤是精确的(子)梯度下降。在这种情况下,使用的步骤方向没有随机性也没有方差。另一方面,如果 miniBatchFraction 选择得非常小,以至于只采样一个点,即 $|S|=$ miniBatchFraction $\cdot n = 1$ ,那么该算法等同于标准的 SGD。在这种情况下,步骤方向依赖于对点的均匀随机采样。

有限内存BFGS (L-BFGS)

L-BFGS 是一种优化算法,属于拟牛顿方法家族,旨在解决形式为 $\min_{\wv \in\R^d} \; f(\wv)$ 的优化问题。L-BFGS 方法在局部将目标函数近似为二次函数,而无需评估目标函数的二阶偏导数来构建海森矩阵。海森矩阵是通过先前的梯度评估来近似的,因此在使用牛顿法显式计算海森矩阵时,没有垂直可扩展性问题(训练特征的数量)。因此,与其他一阶优化方法相比,L-BFGS 通常实现更快速的收敛。

选择优化方法

线性方法 在内部使用优化,一些 spark.mllib 中的线性方法支持 SGD 和 L-BFGS。 不同的优化方法可以根据目标函数的属性具有不同的收敛保证,我们无法在这里涵盖文献。 一般来说,当 L-BFGS 可用时,我们建议使用它而不是 SGD,因为 L-BFGS 通常收敛得更快(在更少的迭代中)。

在 MLlib 中的实现

梯度下降和随机梯度下降

梯度下降方法包括随机子梯度下降(SGD),作为 MLlib 中的一种低级原语,基于此开发了各种机器学习算法,具体见 线性方法 部分的例子。

SGD 类 GradientDescent 设置以下参数:

L-BFGS

L-BFGS 目前仅在 MLlib 中作为一种低级优化原语。如果您想在各种 ML 算法中使用 L-BFGS,例如线性回归和逻辑回归,您需要自己将目标函数的梯度和更新器传递给优化器,而不是使用像 LogisticRegressionWithSGD 这样的训练 API。请参阅下面的示例。将在下一个版本中解决。

使用 L1Updater 的 L1 正则化将无法工作,因为 L1Updater 中的软阈值逻辑是为梯度下降设计的。请参见开发者注释。

L-BFGS 方法 LBFGS.runLBFGS 具有以下参数:

返回的是一个包含两个元素的元组。第一个元素是一个列矩阵,包含每个特征的权重,第二个元素是一个数组,包含每次迭代计算的损失。

这里是一个使用L-BFGS优化器训练二元逻辑回归的示例,带有L2正则化。

请参阅 LBFGS Scala 文档 SquaredL2Updater Scala 文档 获取关于 API 的详细信息。

import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.optimization.{LBFGS, LogisticGradient, SquaredL2Updater}
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val numFeatures = data.take(1)(0).features.size
// 将数据分为训练集(60%)和测试集(40%)。
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
// 在训练数据中添加1作为截距项。
val training = splits(0).map(x => (x.label, MLUtils.appendBias(x.features))).cache()
val test = splits(1)
// 运行训练算法以构建模型
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1))
val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
training,
new LogisticGradient(),
new SquaredL2Updater(),
numCorrections,
convergenceTol,
maxNumIterations,
regParam,
initialWeightsWithIntercept)
val model = new LogisticRegressionModel(
Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)),
weightsWithIntercept(weightsWithIntercept.size - 1))
// 清除默认阈值。
model.clearThreshold()
// 计算测试集上的原始得分。
val scoreAndLabels = test.map { point =>
val score = model.predict(point.features)
(score, point.label)
}
// 获取评估指标。
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()
println("训练过程中每一步的损失")
loss.foreach(println)
println(s"ROC下面积 = $auROC")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/LBFGSExample.scala" in the Spark repo.

有关API的详细信息,请参阅 LBFGS Java文档 SquaredL2Updater Java文档

import java.util.Arrays;
import scala.Tuple2;
import org.apache.spark.api.java.*;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.optimization.*;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();
int numFeatures = data.take(1).get(0).features().size();
// 将初始 RDD 分成两个... [60% 训练数据, 40% 测试数据].
JavaRDD<LabeledPoint> trainingInit = data.sample(false, 0.6, 11L);
JavaRDD<LabeledPoint> test = data.subtract(trainingInit);
// 将 1 附加到训练数据中作为截距.
JavaPairRDD<Object, Vector> training = data.mapToPair(p ->
new Tuple2<>(p.label(), MLUtils.appendBias(p.features())));
training.cache();
// 运行训练算法以构建模型.
int numCorrections = 10;
double convergenceTol = 1e-4;
int maxNumIterations = 20;
double regParam = 0.1;
Vector initialWeightsWithIntercept = Vectors.dense(new double[numFeatures + 1]);
Tuple2<Vector, double[]> result = LBFGS.runLBFGS(
training.rdd(),
new LogisticGradient(),
new SquaredL2Updater(),
numCorrections,
convergenceTol,
maxNumIterations,
regParam,
initialWeightsWithIntercept);
Vector weightsWithIntercept = result._1();
double[] loss = result._2();
LogisticRegressionModel model = new LogisticRegressionModel(
Vectors.dense(Arrays.copyOf(weightsWithIntercept.toArray(), weightsWithIntercept.size() - 1)),
(weightsWithIntercept.toArray())[weightsWithIntercept.size() - 1]);
// 清除默认阈值.
model.clearThreshold();
// 在测试集上计算原始分数.
JavaPairRDD<Object, Object> scoreAndLabels = test.mapToPair(p ->
new Tuple2<>(model.predict(p.features()), p.label()));
// 获取评估指标.
BinaryClassificationMetrics metrics =
new BinaryClassificationMetrics(scoreAndLabels.rdd());
double auROC = metrics.areaUnderROC();
System.out.println("训练过程中的每一步损失");
for (double l : loss) {
System.out.println(l);
}
System.out.println("ROC 曲线下面积 = " + auROC);
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java" in the Spark repo.

开发者的笔记

由于海森矩阵是根据之前的梯度评估近似构造的,因此在优化过程中目标函数不能改变。因此,随机L-BFGS不能仅仅依靠miniBatch工作;因此,在我们有更好的理解之前,我们不提供这个。

Updater 是一个最初为梯度下降设计的类,用于计算实际的梯度下降步骤。 然而,我们能够通过忽略仅针对梯度下降的逻辑部分,例如自适应步长等,将梯度和正则化目标函数的损失用于L-BFGS。我们将把它重构为正则化器,以替换更新器,从而分离正则化和步骤更新之间的逻辑。