聚类 - 基于RDD的API

聚类 是一个无监督学习问题,我们的目标是根据某种相似性的概念将实体的子集彼此分组。聚类通常用于探索性分析和/或作为层次 监督学习 管道的组成部分(在该管道中,为每个聚类训练不同的分类器或回归模型)。

spark.mllib 包支持以下模型:

K均值

K均值 是最常用的聚类算法之一,它将数据点聚类为预定义数量的簇。 spark.mllib 的实现包括一种并行化的变体 k-means++ 方法,称为 kmeans|| spark.mllib 中的实现具有以下参数:

示例

以下示例可以在PySpark命令行中进行测试。

在下面的例子中,加载和解析数据后,我们使用KMeans对象将数据聚类为两个簇。所需簇的数量传递给算法。然后我们计算簇内平方和误差(WSSSE)。通过增加 k ,您可以减小此误差度量。实际上,最佳的 k 通常是在WSSSE图中存在“肘部”的地方。

有关API的更多详细信息,请参阅 KMeans Python文档 KMeansModel Python文档

from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel
# 加载和解析数据
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# 构建模型(对数据进行聚类)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
# 通过计算组内平方和来评估聚类
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("组内平方和 = " + str(WSSSE))
# 保存和加载模型
clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
Find full example code at "examples/src/main/python/mllib/k_means_example.py" in the Spark repo.

以下代码片段可以在 spark-shell 中执行。

在下面的示例中,加载和解析数据后,我们使用 KMeans 对象将数据聚类 为两个簇。所需簇的数量被传递给算法。然后我们计算组内平方误差和(WSSSE)。通过增加 k ,您可以减少这个误差度量。实际上,最佳的 k 通常是在 WSSSE 图中出现“肘部”的那一点。

有关API的详细信息,请参阅 KMeans Scala文档 KMeansModel Scala文档

import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
// 加载和解析数据
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
// 使用KMeans将数据聚类为两个类别
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)
// 通过计算集合内平方误差之和来评估聚类效果
val WSSSE = clusters.computeCost(parsedData)
println(s"集合内平方误差之和 = $WSSSE")
// 保存和加载模型
clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala" in the Spark repo.

MLlib 的所有方法都使用 Java 友好的类型,因此您可以以与在 Scala 中相同的方式在 Java 中导入和调用它们。唯一的注意事项是,这些方法使用 Scala RDD 对象,而 Spark Java API 使用单独的 JavaRDD 类。您可以通过在您的 JavaRDD 对象上调用 .rdd() 将 Java RDD 转换为 Scala RDD。下面给出了一个自包含的应用程序示例,它与提供的 Scala 示例相当:

请参考 KMeans Java 文档 KMeansModel Java 文档 以获取有关API的详细信息。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// 加载并解析数据
String path = "data/mllib/kmeans_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
parsedData.cache();
// 使用 KMeans 将数据聚类为两个类别
int numClusters = 2;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
System.out.println("聚类中心:");
for (Vector center: clusters.clusterCenters()) {
System.out.println(" " + center);
}
double cost = clusters.computeCost(parsedData.rdd());
System.out.println("成本: " + cost);
// 通过计算集合内部平方误差之和来评估聚类
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("集合内部平方误差之和 = " + WSSSE);
// 保存和加载模型
clusters.save(jsc.sc(), "target/org/apache/spark/JavaKMeansExample/KMeansModel");
KMeansModel sameModel = KMeansModel.load(jsc.sc(),
"target/org/apache/spark/JavaKMeansExample/KMeansModel");
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java" in the Spark repo.

高斯混合

一个 高斯混合模型 代表了一种复合分布,其中点是从 k 个高斯子分布中的一个抽取的,每个子分布都有其自身的概率。 spark.mllib 实现使用 期望最大化 算法,根据一组样本来推导最大似然模型。该实现具有以下参数:

示例

在下面的示例中,加载和解析数据后,我们使用一个 GaussianMixture 对象将数据分成两个聚类。所需的聚类数量传递给算法。然后我们输出混合模型的参数。

有关API的更多细节,请参考 GaussianMixture Python文档 GaussianMixtureModel Python文档

from numpy import array
from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel
# 加载和解析数据
data = sc.textFile("data/mllib/gmm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')]))
# 构建模型(对数据进行聚类)
gmm = GaussianMixture.train(parsedData, 2)
# 保存和加载模型
gmm.save(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
sameModel = GaussianMixtureModel\
    .load(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
# 输出模型的参数
for i in range(2):
print("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
"sigma = ", gmm.gaussians[i].sigma.toArray())
Find full example code at "examples/src/main/python/mllib/gaussian_mixture_example.py" in the Spark repo.

在以下示例中,在加载和解析数据后,我们使用一个 GaussianMixture 对象将数据聚成两个聚类。所需的聚类数量被传递给算法。然后我们输出混合模型的参数。

请参考 GaussianMixture 的 Scala 文档 GaussianMixtureModel 的 Scala 文档 以获取有关 API 的详细信息。

import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel}
import org.apache.spark.mllib.linalg.Vectors
// 加载和解析数据
val data = sc.textFile("data/mllib/gmm_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()
// 使用高斯混合模型将数据聚类为两个类别
val gmm = new GaussianMixture().setK(2).run(parsedData)
// 保存和加载模型
gmm.save(sc, "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
val sameModel = GaussianMixtureModel.load(sc,
"target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
// 输出最大似然模型的参数
for (i <- 0 until gmm.k) {
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala" in the Spark repo.

MLlib的所有方法都使用Java友好的类型,因此您可以像在Scala中一样导入并调用它们。唯一的注意事项是,这些方法接受Scala RDD对象,而Spark Java API使用一个单独的 JavaRDD 类。您可以通过在您的 JavaRDD 对象上调用 .rdd() 将Java RDD转换为Scala RDD。以下是一个与提供的Scala示例等效的自包含应用程序示例:

请参阅 GaussianMixture Java 文档 GaussianMixtureModel Java 文档 以获取有关API的详细信息。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// 加载并解析数据
String path = "data/mllib/gmm_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
parsedData.cache();
// 使用GaussianMixture将数据聚类为两类
GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());
// 保存和加载GaussianMixtureModel
gmm.save(jsc.sc(), "target/org/apache/spark/JavaGaussianMixtureExample/GaussianMixtureModel");
GaussianMixtureModel sameModel = GaussianMixtureModel.load(jsc.sc(),
"target/org.apache.spark.JavaGaussianMixtureExample/GaussianMixtureModel");
// 输出混合模型的参数
for (int j = 0; j < gmm.k(); j++) {
System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());
}
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java" in the Spark repo.

幂迭代聚类(PIC)

幂迭代聚类(PIC)是一种可扩展且高效的算法,用于根据边属性的成对相似性对图的顶点进行聚类,描述见 Lin and Cohen, Power Iteration Clustering 。它通过 幂迭代 计算图的归一化亲和矩阵的伪特征向量,并用它来聚类顶点。 spark.mllib 包括一个使用GraphX作为后端的PIC实现。它接受一个 RDD ,包含 (srcId, dstId, similarity) 元组,并输出一个包含聚类分配的模型。相似性必须是非负的。PIC假设相似性度量是对称的。无论顺序如何,一对 (srcId, dstId) 在输入数据中至多出现一次。如果一对在输入中缺失,则它们的相似性被视为零。 spark.mllib 的PIC实现接受以下(超)参数:

示例

在下面,我们展示代码片段以演示如何在 spark.mllib 中使用 PIC。

PowerIterationClustering 实现了PIC算法。 它接受一个 RDD ,内容为 (srcId: Long, dstId: Long, similarity: Double) 元组,表示 亲和力矩阵。 调用 PowerIterationClustering.run 返回一个 PowerIterationClusteringModel , 其中包含计算出的聚类分配。

有关API的更多详细信息,请参考 PowerIterationClustering Python文档 PowerIterationClusteringModel Python文档

from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel
# 加载并解析数据
data = sc.textFile("data/mllib/pic_data.txt")
similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')]))
# 使用PowerIterationClustering将数据聚类为两个类别
model = PowerIterationClustering.train(similarities, 2, 10)
model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster)))
# 保存和加载模型
model.save(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
sameModel = PowerIterationClusteringModel\
    .load(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
Find full example code at "examples/src/main/python/mllib/power_iteration_clustering_example.py" in the Spark repo.

PowerIterationClustering 实现了PIC算法。 它接受一个 RDD ,该 (srcId: Long, dstId: Long, similarity: Double) 元组代表亲和度矩阵。 调用 PowerIterationClustering.run 返回一个 PowerIterationClusteringModel , 其中包含计算出的聚类分配。

有关API的详细信息,请参阅 PowerIterationClustering Scala文档 PowerIterationClusteringModel Scala文档

import org.apache.spark.mllib.clustering.PowerIterationClustering
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering()
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setInitializationMode("degree")
.run(circlesRdd)
val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
.map { case (k, v) =>
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
}.mkString(", ")
val sizesStr = assignments.map {
_._2.length
}.sorted.mkString("(", ",", ")")
println(s"簇分配: $assignmentsStr\n簇大小: $sizesStr")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala" in the Spark repo.

PowerIterationClustering 实现了PIC算法。 它接受一个 JavaRDD ,包含 (srcId: Long, dstId: Long, similarity: Double) 元组,表示 亲和矩阵。 调用 PowerIterationClustering.run 将返回一个 PowerIterationClusteringModel ,其中包含计算出的聚类分配。

有关API的详细信息,请参阅 PowerIterationClustering Java文档 PowerIterationClusteringModel Java文档

import org.apache.spark.mllib.clustering.PowerIterationClustering;
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel;
JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Arrays.asList(
new Tuple3<>(0L, 1L, 0.9),
new Tuple3<>(1L, 2L, 0.9),
new Tuple3<>(2L, 3L, 0.9),
new Tuple3<>(3L, 4L, 0.1),
new Tuple3<>(4L, 5L, 0.9)));
PowerIterationClustering pic = new PowerIterationClustering()
.setK(2)
.setMaxIterations(10);
PowerIterationClusteringModel model = pic.run(similarities);
for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
System.out.println(a.id() + " -> " + a.cluster());
}
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java" in the Spark repo.

潜在狄利克雷分配 (LDA)

潜在狄利克雷分配(LDA) 是一种主题模型,用于从文本文档集合中推断主题。 LDA可以被视为一种聚类算法,如下所示:

LDA 支持通过 setOptimizer 函数使用不同的推断算法。 EMLDAOptimizer 使用 期望最大化 在似然函数上学习聚类,并产生全面的结果,而 OnlineLDAOptimizer 则使用迭代的小批量抽样进行 在线变分推断 ,并且通常对内存友好。

LDA 将一组文档作为词频向量输入,并使用以下参数(通过构建器模式设置):

所有的 spark.mllib 的 LDA 模型支持:

注意 : LDA仍然是一个正在积极开发的实验性功能。因此,某些功能仅在优化器生成的两个优化器/模型之一中可用。目前,分布式模型可以转换为本地模型,但反之则不行。

接下来的讨论将分别描述每个优化器/模型对。

期望最大化

EMLDAOptimizer DistributedLDAModel 中实现。

对于提供给 LDA 的参数:

注意 : 进行足够的迭代非常重要。在早期的迭代中,EM通常会产生无用的主题,但这些主题在更多迭代后会显著改善。根据您的数据集,使用至少20次,可能50-100次迭代通常是合理的。

EMLDAOptimizer 产生一个 DistributedLDAModel ,它不仅存储推断的主题,还存储完整的训练语料库和训练语料库中每个文档的主题分布。 一个 DistributedLDAModel 支持:

在线变分贝叶斯

实现于 OnlineLDAOptimizer LocalLDAModel

对于提供给 LDA 的参数:

此外, OnlineLDAOptimizer 接受以下参数:

OnlineLDAOptimizer 生成一个 LocalLDAModel ,它只存储 推断出的主题。一个 LocalLDAModel 支持:

示例

在下面的示例中,我们加载表示文档语料库的词计数向量。 然后我们使用 LDA 从文档中推断出三个主题。所需的聚类数量被传递给算法。然后我们输出主题,表示为单词的概率分布。

有关API的更多详细信息,请参考 LDA Python文档 LDAModel Python文档

from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
# 加载和解析数据
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# 用唯一ID索引文档
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
# 使用LDA将文档聚类为三个主题
ldaModel = LDA.train(corpus, k=3)
# 输出主题。每个主题是一个单词的分布(与单词计数向量匹配)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
+ " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
print("Topic " + str(topic) + ":")
for word in range(0, ldaModel.vocabSize()):
print(" " + str(topics[word][topic]))
# 保存和加载模型
ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
sameModel = LDAModel\
    .load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
Find full example code at "examples/src/main/python/mllib/latent_dirichlet_allocation_example.py" in the Spark repo.

请参考 LDA Scala 文档 DistributedLDAModel Scala 文档 以获取API的详细信息。

import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
import org.apache.spark.mllib.linalg.Vectors
// 加载和解析数据
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// 为文档索引唯一ID
val corpus = parsedData.zipWithIndex.map(_.swap).cache()
// 使用LDA将文档聚类为三个主题
val ldaModel = new LDA().setK(3).run(corpus)
// 输出主题。每个主题都是对词的分布(与词频向量相匹配)
println(s"学习到的主题(根据${ldaModel.vocabSize}个词的分布):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
print(s"主题 $topic :")
for (word <- Range(0, ldaModel.vocabSize)) {
print(s"${topics(word, topic)}")
}
println()
}
// 保存和加载模型。
ldaModel.save(sc, "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
val sameModel = DistributedLDAModel.load(sc,
"target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala" in the Spark repo.

有关API的详细信息,请参阅 LDA Java文档 DistributedLDAModel Java文档

import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.clustering.LDAModel;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// 加载和解析数据
String path = "data/mllib/sample_lda_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
// 使用唯一ID索引文档
JavaPairRDD<Long, Vector> corpus =
JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(Tuple2::swap));
corpus.cache();
// 使用LDA将文档聚类为三个主题
LDAModel ldaModel = new LDA().setK(3).run(corpus);
// 输出主题。每个主题是对单词的分布(匹配单词计数向量)
System.out.println("学习到的主题(作为对 " + ldaModel.vocabSize()
+ " 个单词的分布):");
Matrix topics = ldaModel.topicsMatrix();
for (int topic = 0; topic < 3; topic++) {
System.out.print("主题 " + topic + ":");
for (int word = 0; word < ldaModel.vocabSize(); word++) {
System.out.print(" " + topics.apply(word, topic));
}
System.out.println();
}
ldaModel.save(jsc.sc(),
"target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
DistributedLDAModel sameModel = DistributedLDAModel.load(jsc.sc(),
"target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java" in the Spark repo.

二分k均值

二分法K均值通常比常规K均值快得多,但它通常会产生不同的聚类结果。

二分k-means是一种 层次聚类 。层次聚类是最常用的聚类分析方法之一,它旨在建立聚类的层次结构。层次聚类的策略通常分为两种类型:

二分k均值算法是一种分割算法。 MLlib中的实现具有以下参数:

示例

请参考 BisectingKMeans Python 文档 BisectingKMeansModel Python 文档 获取有关该 API 的更多详细信息。

from numpy import array
from pyspark.mllib.clustering import BisectingKMeans
# 加载并解析数据
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# 构建模型(对数据进行聚类)
model = BisectingKMeans.train(parsedData, 2, maxIterations=5)
# 评估聚类
cost = model.computeCost(parsedData)
print("Bisecting K-means Cost = " + str(cost))
Find full example code at "examples/src/main/python/mllib/bisecting_k_means_example.py" in the Spark repo.

查看 BisectingKMeans Scala文档 BisectingKMeansModel Scala文档 以获取有关API的详细信息。

import org.apache.spark.mllib.clustering.BisectingKMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}
// 加载和解析数据
def parse(line: String): Vector = Vectors.dense(line.split(" ").map(_.toDouble))
val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()
// 使用 BisectingKMeans 将数据聚类为 6 个簇。
val bkm = new BisectingKMeans().setK(6)
val model = bkm.run(data)
// 显示计算成本和聚类中心
println(s"Compute Cost: ${model.computeCost(data)}")
model.clusterCenters.zipWithIndex.foreach { case (center, idx) =>
println(s"Cluster Center ${idx}: ${center}")
}
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/BisectingKMeansExample.scala" in the Spark repo.

请参考 BisectingKMeans Java 文档 BisectingKMeansModel Java 文档 以获取有关API的详细信息。

import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.BisectingKMeans;
import org.apache.spark.mllib.clustering.BisectingKMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
List<Vector> localData = Arrays.asList(
Vectors.dense(0.1, 0.1), Vectors.dense(0.3, 0.3),
Vectors.dense(10.1, 10.1), Vectors.dense(10.3, 10.3),
Vectors.dense(20.1, 20.1), Vectors.dense(20.3, 20.3),
Vectors.dense(30.1, 30.1), Vectors.dense(30.3, 30.3)
);
JavaRDD<Vector> data = sc.parallelize(localData, 2);
BisectingKMeans bkm = new BisectingKMeans()
.setK(4);
BisectingKMeansModel model = bkm.run(data);
System.out.println("计算成本: " + model.computeCost(data));
Vector[] clusterCenters = model.clusterCenters();
for (int i = 0; i < clusterCenters.length; i++) {
Vector clusterCenter = clusterCenters[i];
System.out.println("聚类中心 " + i + ": " + clusterCenter);
}
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java" in the Spark repo.

流式K均值

当数据以流的形式到达时,我们可能希望动态地估计聚类,随着新数据到达而更新它们。 spark.mllib 提供了流式 k-means 聚类的支持,具有控制估计衰退(或“遗忘”)的参数。该算法使用了迷你批量 k-means 更新规则的一个推广。对于每一批数据,我们将所有点分配到其最近的聚类,计算新的聚类中心,然后使用以下方式更新每个聚类:

\begin{equation} c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t} \end{equation} \begin{equation} n_{t+1} = n_t + m_t \end{equation}

其中 $c_t$ 是簇的先前中心, $n_t$ 是到目前为止分配给簇的点的数量, $x_t$ 是来自当前批次的新簇中心, $m_t$ 是在当前批次中添加到簇的点的数量。衰减因子 $\alpha$ 可以用于忽略过去:当 $\alpha$=1 时,所有数据将从一开始就被使用;当 $\alpha$=0 时,仅使用最近的数据。这类似于指数加权移动平均。

衰减可以通过一个 halfLife 参数来指定,该参数确定正确的衰减因子 a ,使得对于在时间 t 获得的数据,其在时间 t + halfLife 时的贡献将降至 0.5。时间单位可以指定为 batches points ,更新规则将相应调整。

示例

该示例展示了如何在流数据上估计集群。

有关API的更多细节,请参考 StreamingKMeans Python文档 。并且请参考 Spark Streaming 编程指南 以获取有关StreamingContext的详细信息。

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
# 我们创建一个用于训练的向量输入流,
# 以及一个用于测试的向量流
def parse(lp):
label = float(lp[lp.find('(') + 1: lp.find(')')])
vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
return LabeledPoint(label, vec)
trainingData = sc.textFile("data/mllib/kmeans_data.txt")\
    .map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse)
trainingQueue = [trainingData]
testingQueue = [testingData]
trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)
# 我们创建一个随机聚类模型并指定要寻找的聚类数量
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
# 现在注册用于训练和测试的流并开始工作,
# 打印新数据点到达时的预测聚类分配。
model.trainOn(trainingStream)
result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
Find full example code at "examples/src/main/python/mllib/streaming_k_means_example.py" in the Spark repo.

有关API的详细信息,请参阅 StreamingKMeans Scala文档 。并请参阅 Spark Streaming编程指南 以获取有关StreamingContext的详细信息。

import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("StreamingKMeansExample")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)
val model = new StreamingKMeans()
.setK(args(3).toInt)
.setDecayFactor(1.0)
.setRandomCenters(args(4).toInt, 0.0)
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala" in the Spark repo.

当你添加新的文本文件时,集群中心将会更新。每个训练点应格式化为 [x1, x2, x3] ,而每个测试数据点应格式化为 (y, [x1, x2, x3]) ,其中 y 是某个有用的标签或标识符(例如,真实类别分配)。每次一个文本文件放置在 /training/data/dir 时模型将会更新。每次一个文本文件放置在 /testing/data/dir 时你将看到预测结果。随着新数据的加入,集群中心将会改变!