降维 - 基于RDD的API
降维
是减少考虑的变量数量的过程。
它可以用于从原始和噪声特征中提取潜在特征,
或在保持结构的同时压缩数据。
spark.mllib
在
RowMatrix
类上提供对降维的支持。
奇异值分解 (SVD)
奇异值分解 (SVD) 将一个矩阵分解为三个矩阵:$U$,$\Sigma$ 和 $V$,使得
\[
A = U \Sigma V^T,
\]
在这里
- $U$ 是一个正交归一矩阵,其列称为左奇异向量,
- $\Sigma$ 是一个对角矩阵,其对角线上的元素为非负数且按降序排列,对角线上的元素称为奇异值,
- $V$ 是一个正交归一矩阵,其列称为右奇异向量。
对于大型矩阵,我们通常不需要完整的分解,只需要前几个奇异值及其相关的奇异向量。这可以节省存储、去噪并恢复矩阵的低秩结构。
如果我们保留前 $k$ 个奇异值,那么结果低秩矩阵的维度将是:
-
$U$:$m \times k$, -
$\Sigma$:$k \times k$, -
$V$:$n \times k$。
性能
我们假设 $n$ 小于 $m$。奇异值和右奇异向量来自Gram矩阵 $A^T A$ 的特征值和特征向量。存储左奇异向量的矩阵 $U$,通过矩阵乘法计算得出,即 $U = A (V S^{-1})$,如果用户通过 computeU 参数请求的话。实际使用的方法会根据计算成本自动确定:
- 如果 $n$ 较小 ($n < 100$) 或 $k$ 相比于 $n$ 较大 ($k > n / 2$),我们首先计算Gramian矩阵,然后在驱动程序上局部计算其前几个特征值和特征向量。这需要在每个执行器和驱动程序上进行一次通过,存储复杂度为 $O(n^2)$,在驱动程序上需要 $O(n^2 k)$ 的时间。
- 否则,我们以分布式的方式计算 $(A^T A) v$ 并将其发送到 ARPACK ,以在驱动节点上计算 $(A^T A)$ 的前几个特征值和特征向量。这需要 $O(k)$ 次传递,在每个执行器上存储复杂度为 $O(n)$,在驱动程序上存储复杂度为 $O(n k)$。
SVD 示例
spark.mllib
提供了对行向矩阵的SVD功能,该功能在
RowMatrix
类中提供。
有关API的详细信息,请参阅
SingularValueDecomposition
Python文档
。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])
mat = RowMatrix(rows)
# 计算前 5 个奇异值及其对应的奇异向量。
svd = mat.computeSVD(5, computeU=True)
U = svd.U # U 因子是一个 RowMatrix。
s = svd.s # 奇异值存储在一个本地密集向量中。
V = svd.V # V 因子是一个本地密集矩阵。
相同的代码适用于
IndexedRowMatrix
如果
U
被定义为一个
IndexedRowMatrix
。
有关API的详细信息,请参阅
SingularValueDecomposition
Scala文档
。
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.SingularValueDecomposition
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
val rows = sc.parallelize(data)
val mat: RowMatrix = new RowMatrix(rows)
// 计算前 5 个奇异值和相应的奇异向量。
val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(5, computeU = true)
val U: RowMatrix = svd.U // U 因子是一个 RowMatrix。
val s: Vector = svd.s // 奇异值存储在一个本地的稠密向量中。
val V: Matrix = svd.V // V 因子是一个本地的稠密矩阵。
相同的代码适用于
IndexedRowMatrix
如果
U
被定义为一个
IndexedRowMatrix
。
有关API的详细信息,请参考
SingularValueDecomposition
Java文档
。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.SingularValueDecomposition;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
List<Vector> data = Arrays.asList(
Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
);
JavaRDD<Vector> rows = jsc.parallelize(data);
// 从 JavaRDD 创建一个 RowMatrix。
RowMatrix mat = new RowMatrix(rows.rdd());
// 计算前 5 个奇异值和对应的奇异向量。
SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(5, true, 1.0E-9d);
RowMatrix U = svd.U(); // U 因子是一个 RowMatrix。
Vector s = svd.s(); // 奇异值存储在一个本地稠密向量中。
Matrix V = svd.V(); // V 因子是一个本地稠密矩阵。
相同的代码适用于
IndexedRowMatrix
如果
U
被定义为一个
IndexedRowMatrix
。
主成分分析 (PCA)
主成分分析 (PCA) 是一种统计方法,用于寻找一种旋转,使得第一坐标具有最大的方差,而随后的每个坐标依次具有最大的方差。旋转矩阵的列称为主成分。PCA 广泛应用于降维。
spark.mllib
支持对以行优先格式存储的高浅矩阵和任何向量进行主成分分析 (PCA)。
以下代码演示了如何在
RowMatrix
上计算主成分,并使用它们将向量投影到低维空间中。
有关API的详细信息,请参阅
RowMatrix
Python文档
。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])
mat = RowMatrix(rows)
# 计算前 4 个主成分。
# 主成分存储在一个本地密集矩阵中。
pc = mat.computePrincipalComponents(4)
# 将行投影到前 4 个主成分所张成的线性空间中。
projected = mat.multiply(pc)
以下代码演示了如何在
RowMatrix
上计算主成分,并利用它们将向量投影到低维空间中。
有关API的详细信息,请参阅
RowMatrix
Scala文档
。
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
val rows = sc.parallelize(data)
val mat: RowMatrix = new RowMatrix(rows)
// 计算前 4 个主成分。
// 主成分存储在一个本地稠密矩阵中。
val pc: Matrix = mat.computePrincipalComponents(4)
// 将行投影到由前 4 个主成分构成的线性空间中。
val projected: RowMatrix = mat.multiply(pc)
以下代码演示了如何计算源向量的主成分,并使用它们将向量投影到低维空间,同时保留相关标签:
有关API的详细信息,请参阅
PCA
Scala文档
。
import org.apache.spark.mllib.feature.PCA
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
val data: RDD[LabeledPoint] = sc.parallelize(Seq(
new LabeledPoint(0, Vectors.dense(1, 0, 0, 0, 1)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 1, 0)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 0, 0)),
new LabeledPoint(0, Vectors.dense(1, 0, 0, 0, 0)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 0, 0))))
// 计算前 5 个主成分。
val pca = new PCA(5).fit(data.map(_.features))
// 将向量投影到前 5 个主成分所展开的线性空间中,保留标签
val projected = data.map(p => p.copy(features = pca.transform(p.features)))
以下代码演示如何在
RowMatrix
上计算主成分,并使用它们将向量投影到低维空间中。
有关API的详细信息,请参阅
RowMatrix
Java文档
。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
List<Vector> data = Arrays.asList(
Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
);
JavaRDD<Vector> rows = jsc.parallelize(data);
// 从 JavaRDD 创建 RowMatrix。
RowMatrix mat = new RowMatrix(rows.rdd());
// 计算前 4 个主成分。
// 主成分存储在一个本地密集矩阵中。
Matrix pc = mat.computePrincipalComponents(4);
// 将行投影到由前 4 个主成分张成的线性空间。
RowMatrix projected = mat.multiply(pc);