数据类型 - 基于RDD的API
MLlib支持存储在单台机器上的本地向量和矩阵,以及由一个或多个RDD支持的分布式矩阵。本地向量和本地矩阵是简单的数据模型,作为公共接口。底层的线性代数操作由 Breeze 提供。在监督学习中使用的训练示例称为MLlib中的“标签点”。
本地向量
本地向量具有整数类型和以 0 为基础的索引,以及存储在单台机器上的双精度值。 MLlib 支持两种类型的本地向量:密集型和稀疏型。 密集向量由表示其条目值的双精度数组支持,而稀疏向量由两个并行数组支持:索引和值。 例如,一个向量
(1.0, 0.0, 3.0)
可以用密集格式表示为
[1.0, 0.0, 3.0]
,或者用稀疏格式表示为
(3, [0, 2], [1.0, 3.0])
,其中
3
是向量的大小。
MLlib 识别以下类型作为稠密向量:
-
NumPy的
array -
Python的列表,例如
[1, 2, 3]
以及以下作为稀疏向量:
-
MLlib的
SparseVector. -
SciPy的
csc_matrix具有单列
我们建议使用 NumPy 数组而不是列表以提高效率,并使用在
Vectors
中实现的工厂方法来创建稀疏向量。
有关API的更多详细信息,请参阅
Vectors
Python文档
。
import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors
# 使用NumPy数组作为稠密向量。
dv1 = np.array([1.0, 0.0, 3.0])
# 使用Python列表作为稠密向量。
dv2 = [1.0, 0.0, 3.0]
# 创建一个SparseVector。
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# 使用单列SciPy csc_matrix作为稀疏向量。
sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape=(3, 1))
本地向量的基类是
Vector
,我们提供了两种实现:
DenseVector
和
SparseVector
。我们推荐使用
Vectors
中实现的工厂方法来创建本地向量。
有关API的详细信息,请参阅
Vector
Scala文档
和
Vectors
Scala文档
.
import org.apache.spark.mllib.linalg.{Vector, Vectors}
// 创建一个稠密向量 (1.0, 0.0, 3.0).
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
// 通过指定其索引和对应于非零条目的值来创建一个稀疏向量 (1.0, 0.0, 3.0).
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
// 通过指定其非零条目来创建一个稀疏向量 (1.0, 0.0, 3.0).
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
注意:
Scala 默认导入
scala.collection.immutable.Vector
,因此你必须显式导入
org.apache.spark.mllib.linalg.Vector
才能使用 MLlib 的
Vector
。
局部向量的基本类是
Vector
,我们提供了两种实现:
DenseVector
和
SparseVector
。我们建议使用
Vectors
中实现的工厂方法来创建局部向量。
请参阅
Vector
Java 文档
和
Vectors
Java 文档
以获取有关API的详细信息。
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// 创建一个稠密向量 (1.0, 0.0, 3.0).
Vector dv = Vectors.dense(1.0, 0.0, 3.0);
// 通过指定其索引和对应于非零条目的值来创建一个稀疏向量 (1.0, 0.0, 3.0).
Vector sv = Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0});
标记点
标记点是一个局部向量,可以是稠密的或稀疏的,与标签/响应相关联。
在MLlib中,标记点用于监督学习算法。
我们使用双精度浮点数来存储标签,因此我们可以在回归和分类中使用标记点。
对于二元分类,标签应为
0
(负)或
1
(正)。
对于多类分类,标签应为从零开始的类索引:
0, 1, 2, ...
。
一个标记点由
LabeledPoint
表示。
有关API的更多详细信息,请参阅
LabeledPoint
Python文档
。
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
# 创建一个具有正标签和稠密特征向量的标记点。
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])
# 创建一个具有负标签和稀疏特征向量的标记点。
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))
标记点由案例类表示
LabeledPoint
.
有关API的详细信息,请参考
LabeledPoint
Scala 文档
。
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
// 创建一个带有正标签和稠密特征向量的标记点。
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
// 创建一个带有负标签和稀疏特征向量的标记点。
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
一个标记的点用
LabeledPoint
表示。
请参阅
LabeledPoint
Java文档
获取API的详细信息。
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
// 创建一个带有正标签的标记点和一个稠密特征向量。
LabeledPoint pos = new LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0));
// 创建一个带有负标签的标记点和一个稀疏特征向量。
LabeledPoint neg = new LabeledPoint(0.0, Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0}));
稀疏数据
在实践中,稀疏训练数据是非常常见的。MLlib 支持读取存储为
LIBSVM
格式的训练示例,这是
LIBSVM
和
LIBLINEAR
使用的默认格式。它是一种文本格式,其中每一行表示一个带标签的稀疏特征向量,使用以下格式:
标签 index1:值1 index2:值2 ...
其中索引是从一开始的,并按照升序排列。加载后,特征索引被转换为从零开始。
MLUtils.loadLibSVMFile
读取以 LIBSVM 格式存储的训练示例。
有关API的更多详细信息,请参考
MLUtils
Python文档
。
从 pyspark.mllib.util 导入 MLUtils
示例 = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
MLUtils.loadLibSVMFile
读取以 LIBSVM 格式存储的训练示例。
有关API的详细信息,请参阅
MLUtils
Scala文档
。
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
MLUtils.loadLibSVMFile
读取存储在 LIBSVM 格式中的训练示例。
有关API的详细信息,请参考
MLUtils
Java文档
。
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.api.java.JavaRDD;
JavaRDD<LabeledPoint> examples =
MLUtils.loadLibSVMFile(jsc.sc(), "data/mllib/sample_libsvm_data.txt").toJavaRDD();
本地矩阵
一个本地矩阵具有整数类型的行和列索引以及双精度类型的值,存储在单台机器上。MLlib 支持密集矩阵,其条目值存储在一个按列优先顺序的单一双精度数组中,以及稀疏矩阵,其非零条目值以压缩稀疏列(CSC)格式存储,按列优先顺序。例如,以下密集矩阵
\[ \begin{pmatrix}
1.0 & 2.0 \\
3.0 & 4.0 \\
5.0 & 6.0
\end{pmatrix}
\]
存储在一个一维数组
[1.0, 3.0, 5.0, 2.0, 4.0, 6.0]
中,矩阵大小为
(3, 2)
。
本地矩阵的基础类是
Matrix
,我们提供两种实现:
DenseMatrix
,
和
SparseMatrix
。
我们建议使用在
Matrices
中实现的工厂方法来创建本地
矩阵。请记住,MLlib 中的本地矩阵是以列主序存储的。
有关API的更多详细信息,请参阅
Matrix
Python文档
和
Matrices
Python文档
。
from pyspark.mllib.linalg import Matrix, Matrices
# 创建一个稠密矩阵 ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices.dense(3, 2, [1, 3, 5, 2, 4, 6])
# 创建一个稀疏矩阵 ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
局部矩阵的基类是
Matrix
,我们提供了两种实现:
DenseMatrix
,
和
SparseMatrix
。
我们建议使用在
Matrices
中实现的工厂方法来创建局部矩阵。请记住,MLlib 中的局部矩阵以列主顺序存储。
有关API的详细信息,请参阅
Matrix
Scala文档
和
Matrices
Scala文档
。
import org.apache.spark.mllib.linalg.{Matrix, Matrices}
// 创建一个稠密矩阵 ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
// 创建一个稀疏矩阵 ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
本地矩阵的基类是
Matrix
,我们提供了两种
实现:
DenseMatrix
,
和
SparseMatrix
。
我们建议使用在
Matrices
中实现的工厂方法
来创建本地矩阵。请记住,MLlib 中的本地矩阵按列优先顺序存储。
请参阅
Matrix
Java 文档
和
Matrices
Java 文档
获取有关 API 的详细信息。
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Matrices;
// 创建一个稠密矩阵 ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
Matrix dm = Matrices.dense(3, 2, new double[] 1.0, 3.0, 5.0, 2.0, 4.0, 6.0});
// 创建一个稀疏矩阵 ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
Matrix sm = Matrices.sparse(3, 2, new int[] 0, 1, 3}, new int[] 0, 2, 1}, new double[] 9, 6, 8});
分布式矩阵
分布式矩阵具有长整型的行和列索引以及双精度值,分布存储在一个或多个 RDD 中。选择合适的格式来存储大型分布式矩阵是非常重要的。将分布式矩阵转换为不同格式可能需要全局洗牌,这个过程相当耗费资源。到目前为止,已经实现了四种类型的分布式矩阵。
基本类型称为
RowMatrix
。
RowMatrix
是一种面向行的分布式矩阵,没有有意义的行索引,例如,一系列特征向量。它由其行的 RDD 支持,其中每一行是一个本地向量。我们假设
RowMatrix
的列数不是非常庞大,以便可以合理地将单个本地向量传送给驱动程序,并且也可以通过单个节点进行存储/操作。
IndexedRowMatrix
类似于
RowMatrix
,但带有行索引,可用于识别行和执行连接。
CoordinateMatrix
是一种以
坐标列表 (COO)
格式存储的分布式矩阵,由其条目的 RDD 支持。
BlockMatrix
是一种由
MatrixBlock
(一个
(Int, Int, Matrix)
的元组)支持的分布式矩阵。
注意
分布式矩阵的底层 RDD 必须是确定性的,因为我们缓存了矩阵大小。一般来说,使用非确定性 RDD 可能会导致错误。
行矩阵
一个
RowMatrix
是一个行导向的分布式矩阵,没有有意义的行索引,由其行的 RDD 支持,其中每行是一个本地向量。由于每行由一个本地向量表示,列的数量受到整数范围的限制,但在实际中应该小得多。
一个
RowMatrix
可以从一个
RDD
向量创建。
有关API的更多详细信息,请参阅
RowMatrix
Python文档
。
from pyspark.mllib.linalg.distributed import RowMatrix
# 创建一个向量的 RDD。
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
# 从向量的 RDD 创建一个 RowMatrix。
mat = RowMatrix(rows)
# 获取它的大小。
m = mat.numRows() # 4
n = mat.numCols() # 3
# 重新获取行作为向量的 RDD。
rowsRDD = mat.rows
一个
RowMatrix
可以通过一个
RDD[Vector]
实例创建。然后我们可以计算它的列摘要统计和分解。
QR分解
形式为 A = QR,其中 Q 是一个正交矩阵,R 是一个上三角矩阵。关于
奇异值分解 (SVD)
和
主成分分析 (PCA)
,请参阅
降维
。
有关API的详细信息,请参阅
RowMatrix
Scala文档
。
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val rows: RDD[Vector] = ... // 一个本地向量的 RDD
// 从 RDD[Vector] 创建 RowMatrix。
val mat: RowMatrix = new RowMatrix(rows)
// 获取它的大小。
val m = mat.numRows()
val n = mat.numCols()
// QR 分解
val qrResult = mat.tallSkinnyQR(true)
一个
RowMatrix
可以从一个
JavaRDD
实例中创建。然后我们可以计算它的列摘要统计信息。
有关API的详细信息,请参阅
RowMatrix
Java文档
。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
JavaRDD<Vector> rows = ... // 一个本地向量的 JavaRDD
// 从 JavaRDD 创建一个 RowMatrix。
RowMatrix mat = new RowMatrix(rows.rdd());
// 获取它的大小。
long m = mat.numRows();
long n = mat.numCols();
// QR 分解
QRDecomposition<RowMatrix, Matrix> result = mat.tallSkinnyQR(true);
索引行矩阵
一个
IndexedRowMatrix
类似于一个
RowMatrix
,但具有有意义的行索引。它由一个索引行的 RDD 支持,确保每一行由其索引(长整型)和一个局部向量表示。
一个
IndexedRowMatrix
可以从一个
RDD
的
IndexedRow
创建,其中
IndexedRow
是一个
(long, vector)
的封装。一个
IndexedRowMatrix
可以通过丢弃其行索引转换为
RowMatrix
。
有关API的更多详细信息,请参阅
IndexedRowMatrix
Python 文档
。
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
# 创建一个索引行的RDD。
# - 这可以通过使用IndexedRow类显式完成:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
IndexedRow(1, [4, 5, 6]),
IndexedRow(2, [7, 8, 9]),
IndexedRow(3, [10, 11, 12])])
# - 或者通过使用(长整型,向量)元组:
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
(2, [7, 8, 9]), (3, [10, 11, 12])])
# 从IndexedRows的RDD创建IndexedRowMatrix。
mat = IndexedRowMatrix(indexedRows)
# 获取它的大小。
m = mat.numRows() # 4
n = mat.numCols() # 3
# 获取行作为IndexedRows的RDD。
rowsRDD = mat.rows
# 通过删除行索引将其转换为RowMatrix。
rowMat = mat.toRowMatrix()
可以从一个
IndexedRowMatrix
实例创建一个
RDD[IndexedRow]
实例,其中
IndexedRow
是
(Long, Vector)
的一个封装。一个
IndexedRowMatrix
可以通过去掉
它的行索引被转换为
RowMatrix
。
有关API的详细信息,请参阅
IndexedRowMatrix
Scala文档
。
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
val rows: RDD[IndexedRow] = ... // 一个 IndexedRow 的 RDD
// 从 RDD[IndexedRow] 创建一个 IndexedRowMatrix。
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)
// 获取它的大小。
val m = mat.numRows()
val n = mat.numCols()
// 删除它的行索引。
val rowMat: RowMatrix = mat.toRowMatrix()
一个
IndexedRowMatrix
可以从一个
JavaRDD
实例创建,其中
IndexedRow
是
对
(long, Vector)
的封装。 一个
IndexedRowMatrix
可以通过删除
其行索引转换为
RowMatrix
。
有关API的详细信息,请参阅
IndexedRowMatrix
Java文档
。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.IndexedRow;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
JavaRDD<IndexedRow> rows = ... // 一个 JavaRDD 的索引行
// 从一个 JavaRDD 创建一个 IndexedRowMatrix。
IndexedRowMatrix mat = new IndexedRowMatrix(rows.rdd());
// 获取它的大小。
long m = mat.numRows();
long n = mat.numCols();
// 删除其行索引。
RowMatrix rowMat = mat.toRowMatrix();
坐标矩阵
A
CoordinateMatrix
是一个由其条目的 RDD 支持的分布式矩阵。每个条目是一个元组
(i: Long, j: Long, value: Double)
,其中
i
是行索引,
j
是列索引,而
value
是条目值。只有当矩阵的两个维度都很大且矩阵非常稀疏时,才应使用
CoordinateMatrix
。
一个
CoordinateMatrix
可以从
RDD
的
MatrixEntry
条目创建,其中
MatrixEntry
是一个对
(long, long, float)
的封装。一个
CoordinateMatrix
可以通过调用
toRowMatrix
转换为
RowMatrix
,或者通过调用
toIndexedRowMatrix
转换为带稀疏行的
IndexedRowMatrix
。
有关API的更多详细信息,请参阅
CoordinateMatrix
Python文档
。
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
# 创建一个坐标条目的RDD。
# - 这可以通过MatrixEntry类显式完成:
entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(2, 1, 3.7)])
# - 或使用(long,long,float)元组:
entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)])
# 从MatrixEntries的RDD创建一个CoordinateMatrix。
mat = CoordinateMatrix(entries)
# 获取它的大小。
m = mat.numRows() # 3
n = mat.numCols() # 2
# 获取条目作为MatrixEntries的RDD。
entriesRDD = mat.entries
# 转换为RowMatrix。
rowMat = mat.toRowMatrix()
# 转换为IndexedRowMatrix。
indexedRowMat = mat.toIndexedRowMatrix()
# 转换为BlockMatrix。
blockMat = mat.toBlockMatrix()
A
CoordinateMatrix
可以从一个
RDD[MatrixEntry]
实例创建,其中
MatrixEntry
是一个
对
(Long, Long, Double)
的封装。一个
CoordinateMatrix
可以通过调用
toIndexedRowMatrix
转换为具有稀疏行的
IndexedRowMatrix
。
目前对
CoordinateMatrix
的其他计算不支持。
请参考
CoordinateMatrix
Scala 文档
以获取有关 API 的详细信息。
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // 一个矩阵条目的 RDD
// 从 RDD[MatrixEntry] 创建一个 CoordinateMatrix。
val mat: CoordinateMatrix = new CoordinateMatrix(entries)
// 获取它的大小。
val m = mat.numRows()
val n = mat.numCols()
// 将其转换为行是稀疏向量的 IndexRowMatrix。
val indexedRowMatrix = mat.toIndexedRowMatrix()
A
CoordinateMatrix
可以从一个
JavaRDD
实例中创建,其中
MatrixEntry
是一个
对
(long, long, double)
的封装。一个
CoordinateMatrix
可以通过调用
toIndexedRowMatrix
转换为具有稀疏行的
IndexedRowMatrix
。目前对
CoordinateMatrix
的其他计算不支持。
请参考
CoordinateMatrix
Java 文档
以获取有关 API 的详细信息。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
import org.apache.spark.mllib.linalg.distributed.MatrixEntry;
JavaRDD<MatrixEntry> entries = ... // 一个包含矩阵条目的 JavaRDD
// 从 JavaRDD 创建一个 CoordinateMatrix。
CoordinateMatrix mat = new CoordinateMatrix(entries.rdd());
// 获取它的大小。
long m = mat.numRows();
long n = mat.numCols();
// 将其转换为一个索引行矩阵,其行是稀疏向量。
IndexedRowMatrix indexedRowMatrix = mat.toIndexedRowMatrix();
块矩阵
A
BlockMatrix
是一个由
MatrixBlock
的 RDD 支持的分布式矩阵,其中
MatrixBlock
是一个元组
((Int, Int), Matrix)
,其中
(Int, Int)
是块的索引,
Matrix
是给定索引的子矩阵,大小为
rowsPerBlock
x
colsPerBlock
。
BlockMatrix
支持与另一个
BlockMatrix
的
add
和
multiply
等方法。
BlockMatrix
还有一个辅助函数
validate
,可用于检查
BlockMatrix
是否正确设置。
A
BlockMatrix
可以由一个
RDD
的子矩阵块创建,其中一个子矩阵块是一个
((blockRowIndex, blockColIndex), sub-matrix)
元组。
请参阅
BlockMatrix
Python 文档
以获取有关 API 的更多详细信息。
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix
# 创建一个子矩阵块的RDD。
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
# 从子矩阵块的RDD创建一个BlockMatrix。
mat = BlockMatrix(blocks, 3, 2)
# 获取它的大小。
m = mat.numRows() # 6
n = mat.numCols() # 2
# 将块作为子矩阵块的RDD获取。
blocksRDD = mat.blocks
# 转换为LocalMatrix。
localMat = mat.toLocalMatrix()
# 转换为IndexedRowMatrix。
indexedRowMat = mat.toIndexedRowMatrix()
# 转换为CoordinateMatrix。
coordinateMat = mat.toCoordinateMatrix()
A
BlockMatrix
可以通过调用
toBlockMatrix
从
IndexedRowMatrix
或
CoordinateMatrix
最简单地创建。
toBlockMatrix
默认创建1024 x 1024大小的块。
用户可以通过提供
toBlockMatrix(rowsPerBlock, colsPerBlock)
的值来更改块大小。
有关API的详细信息,请参阅
BlockMatrix
Scala文档
。
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // 一个 (i, j, v) 矩阵条目的 RDD
// 从 RDD[MatrixEntry] 创建一个 CoordinateMatrix。
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
// 将 CoordinateMatrix 转换为 BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
// 验证 BlockMatrix 是否设置正确。如果不有效则抛出异常。
// 如果有效则什么都不会发生。
matA.validate()
// 计算 A^T A。
val ata = matA.transpose.multiply(matA)
一个
BlockMatrix
可以通过调用
toBlockMatrix
从
IndexedRowMatrix
或
CoordinateMatrix
最容易创建。
toBlockMatrix
默认创建大小为 1024 x 1024 的块。用户可以通过提供
toBlockMatrix(rowsPerBlock, colsPerBlock)
中的值来更改块大小。
有关API的详细信息,请参阅
BlockMatrix
Java文档
。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.BlockMatrix;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
JavaRDD<MatrixEntry> entries = ... // 一个包含 (i, j, v) 矩阵条目的 JavaRDD
// 从 JavaRDD 创建一个 CoordinateMatrix。
CoordinateMatrix coordMat = new CoordinateMatrix(entries.rdd());
// 将 CoordinateMatrix 转换为 BlockMatrix
BlockMatrix matA = coordMat.toBlockMatrix().cache();
// 验证 BlockMatrix 是否已正确设置。如果无效,则抛出异常。
// 如果有效,则不会发生任何事情。
matA.validate();
// 计算 A^T A。
BlockMatrix ata = matA.transpose().multiply(matA);