聚类

本页面描述了MLlib中的聚类算法。 基于RDD的API中的聚类指南 也包含有关这些算法的相关信息。

目录

K均值

k-means 是最常用的聚类算法之一,它将数据点聚簇到预定义数量的簇中。MLlib 实现包括一种并行化的 k-means++ 方法的变种,称为 kmeans||

KMeans 被实现为一个 Estimator ,并生成一个 KMeansModel 作为基础模型。

输入列

参数名称 类型 默认值 描述
featuresCol 向量 "features" 特征向量

输出列

参数名称 类型 默认值 描述
predictionCol Int "prediction" 预测的聚类中心

示例

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# 加载数据.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
# 训练一个 k-means 模型.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)
# 进行预测
predictions = model.transform(dataset)
# 通过计算轮廓得分来评估聚类
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("使用平方欧几里得距离的轮廓 = " + str(silhouette))
# 显示结果.
centers = model.clusterCenters()
print("聚类中心: ")
for center in centers:
print(center)
Find full example code at "examples/src/main/python/ml/kmeans_example.py" in the Spark repo.

有关更多详细信息,请参考 Scala API 文档

import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
// 加载数据。
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// 训练一个k-means模型。
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)
// 进行预测
val predictions = model.transform(dataset)
// 通过计算轮廓分数评估聚类
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(predictions)
println(s"使用平方欧几里得距离的轮廓 = $silhouette")
// 显示结果。
println("聚类中心: ")
model.clusterCenters.foreach(println)
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/KMeansExample.scala" in the Spark repo.

有关更多详细信息,请参考 Java API 文档

import org.apache.spark.ml.clustering.KMeansModel;
import org.apache.spark.ml.clustering.KMeans;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// 加载数据。
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");
// 训练一个 k-means 模型。
KMeans kmeans = new KMeans().setK(2).setSeed(1L);
KMeansModel model = kmeans.fit(dataset);
// 进行预测
Dataset<Row> predictions = model.transform(dataset);
// 通过计算轮廓系数来评估聚类
ClusteringEvaluator evaluator = new ClusteringEvaluator();
double silhouette = evaluator.evaluate(predictions);
System.out.println("使用平方欧几里得距离的轮廓系数 = " + silhouette);
// 显示结果。
Vector[] centers = model.clusterCenters();
System.out.println("聚类中心: ");
for (Vector center: centers) {
System.out.println(center);
}
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java" in the Spark repo.

有关更多详细信息,请参考 R API 文档

# 使用 spark.kmeans 拟合 k-means 模型
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
df_list <- randomSplit(training, c(7,3), 2)
kmeansDF <- df_list[[1]]
kmeansTestDF <- df_list[[2]]
kmeansModel <- spark.kmeans(kmeansDF, ~ Class + Sex + Age + Freq,
k = 3)
# 模型摘要
summary(kmeansModel)
# 从 k-means 模型获取拟合结果
head(fitted(kmeansModel))
# 预测
kmeansPredictions <- predict(kmeansModel, kmeansTestDF)
head(kmeansPredictions)
Find full example code at "examples/src/main/r/ml/kmeans.R" in the Spark repo.

潜在狄利克雷分配 (LDA)

LDA 被实现为一个 Estimator ,支持 EMLDAOptimizer OnlineLDAOptimizer , 并生成一个 LDAModel 作为基础模型。专业用户如果需要,可以将由 EMLDAOptimizer 生成的 LDAModel 转换为 DistributedLDAModel

示例

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.clustering import LDA
# 加载数据。
dataset = spark.read.format("libsvm").load("data/mllib/sample_lda_libsvm_data.txt")
# 训练 LDA 模型。
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)
ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("整个语料库的日志似然的下界: " + str(ll))
print("困惑度的上界: " + str(lp))
# 描述主题。
topics = model.describeTopics(3)
print("由其最高权重词描述的主题:")
topics.show(truncate=False)
# 显示结果
transformed = model.transform(dataset)
transformed.show(truncate=False)
Find full example code at "examples/src/main/python/ml/lda_example.py" in the Spark repo.

有关更多详细信息,请参考 Scala API 文档

import org.apache.spark.ml.clustering.LDA
// 加载数据。
val dataset = spark.read.format("libsvm")
.load("data/mllib/sample_lda_libsvm_data.txt")
// 训练一个 LDA 模型。
val lda = new LDA().setK(10).setMaxIter(10)
val model = lda.fit(dataset)
val ll = model.logLikelihood(dataset)
val lp = model.logPerplexity(dataset)
println(s"整个语料库的对数似然下界: $ll")
println(s"困惑度的上界: $lp")
// 描述主题。
val topics = model.describeTopics(3)
println("由其最高权重的词描述的主题:")
topics.show(false)
// 显示结果。
val transformed = model.transform(dataset)
transformed.show(false)
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala" in the Spark repo.

有关更多详细信息,请参考 Java API 文档

import org.apache.spark.ml.clustering.LDA;
import org.apache.spark.ml.clustering.LDAModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// 加载数据。
Dataset<Row> dataset = spark.read().format("libsvm")
.load("data/mllib/sample_lda_libsvm_data.txt");
// 训练LDA模型。
LDA lda = new LDA().setK(10).setMaxIter(10);
LDAModel model = lda.fit(dataset);
double ll = model.logLikelihood(dataset);
double lp = model.logPerplexity(dataset);
System.out.println("整个语料库的对数似然的下界: " + ll);
System.out.println("困惑度的上界: " + lp);
// 描述主题。
Dataset<Row> topics = model.describeTopics(3);
System.out.println("由其最高权重术语描述的主题:");
topics.show(false);
// 显示结果。
Dataset<Row> transformed = model.transform(dataset);
transformed.show(false);
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java" in the Spark repo.

有关更多详细信息,请参阅 R API 文档

# 加载训练数据
df <- read.df("data/mllib/sample_lda_libsvm_data.txt", source = "libsvm")
training <- df
test <- df
# 使用 spark.lda 拟合潜在狄利克雷分配模型
model <- spark.lda(training, k = 10, maxIter = 10)
# 模型摘要
summary(model)
# 后验概率
posterior <- spark.posterior(model, test)
head(posterior)
# LDA模型的对数困惑度
logPerplexity <- spark.perplexity(model, test)
print(paste0("困惑度的上限: ", logPerplexity))
Find full example code at "examples/src/main/r/ml/lda.R" in the Spark repo.

二分k均值

二分k均值是一种使用 层次聚类 的分割(或“自上而下”)方法:所有观测值开始时都在一个聚类中,并且在向下移动层次结构时会递归地进行拆分。

二分K均值通常比常规K均值快得多,但它通常会产生不同的聚类。

BisectingKMeans 被实现为一个 Estimator ,并生成一个 BisectingKMeansModel 作为基础模型。

示例

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# 加载数据。
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
# 训练一个二分K均值模型。
bkm = BisectingKMeans().setK(2).setSeed(1)
model = bkm.fit(dataset)
# 进行预测
predictions = model.transform(dataset)
# 通过计算轮廓系数评估聚类效果
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# 显示结果。
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
print(center)
Find full example code at "examples/src/main/python/ml/bisecting_k_means_example.py" in the Spark repo.

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.clustering.BisectingKMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
// 加载数据。
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// 训练一个二分 k-means 模型。
val bkm = new BisectingKMeans().setK(2).setSeed(1)
val model = bkm.fit(dataset)
// 进行预测
val predictions = model.transform(dataset)
// 通过计算轮廓系数评估聚类
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(predictions)
println(s"基于平方欧几里得距离的轮廓系数 = $silhouette")
// 显示结果。
println("聚类中心: ")
val centers = model.clusterCenters
centers.foreach(println)
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/BisectingKMeansExample.scala" in the Spark repo.

请参考 Java API 文档 以获取更多详细信息。

import org.apache.spark.ml.clustering.BisectingKMeans;
import org.apache.spark.ml.clustering.BisectingKMeansModel;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// 加载数据。
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");
// 训练一个二分K均值模型。
BisectingKMeans bkm = new BisectingKMeans().setK(2).setSeed(1);
BisectingKMeansModel model = bkm.fit(dataset);
// 进行预测
Dataset<Row> predictions = model.transform(dataset);
// 通过计算轮廓系数评估聚类
ClusteringEvaluator evaluator = new ClusteringEvaluator();
double silhouette = evaluator.evaluate(predictions);
System.out.println("基于平方欧几里得距离的轮廓系数 = " + silhouette);
// 显示结果。
System.out.println("聚类中心: ");
Vector[] centers = model.clusterCenters();
for (Vector center : centers) {
System.out.println(center);
}
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaBisectingKMeansExample.java" in the Spark repo.

有关更多详细信息,请参阅 R API文档

t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# 拟合具有四个中心的二分k均值模型
model <- spark.bisectingKmeans(training, Class ~ Survived, k = 4)
# 从二分k均值模型获取拟合结果
fitted.model <- fitted(model, "centers")
# 模型摘要
head(summary(fitted.model))
# 训练数据上的拟合值
fitted <- predict(model, training)
head(select(fitted, "Class", "prediction"))
Find full example code at "examples/src/main/r/ml/bisectingKmeans.R" in the Spark repo.

高斯混合模型 (GMM)

A 高斯混合模型 表示一个复合分布,其中点是从一个 k 个高斯子分布中抽取的,每个子分布都有其自己的概率。 spark.ml 实现使用 期望-maximization 算法来引导给定一组样本的最大似然模型。

GaussianMixture 被实现为一个 Estimator ,并生成一个 GaussianMixtureModel 作为基础模型。

输入列

参数名称 类型 默认值 描述
featuresCol 向量 "features" 特征向量

输出列

参数名称 类型 默认值 描述
predictionCol Int "prediction" 预测的聚类中心
probabilityCol Vector "probability" 每个聚类的概率

示例

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.clustering import GaussianMixture
# 加载数据
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
gmm = GaussianMixture().setK(2).setSeed(538009335)
model = gmm.fit(dataset)
print("高斯模型显示为一个数据框: ")
model.gaussiansDF.show(truncate=False)
Find full example code at "examples/src/main/python/ml/gaussian_mixture_example.py" in the Spark repo.

请参考 Scala API 文档 以获取更多细节。

import org.apache.spark.ml.clustering.GaussianMixture
// 加载数据
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// 训练高斯混合模型
val gmm = new GaussianMixture()
.setK(2)
val model = gmm.fit(dataset)
// 输出混合模型的参数
for (i <- 0 until model.getK) {
println(s"Gaussian $i:\nweight=${model.weights(i)}\n" +
s"mu=${model.gaussians(i).mean}\nsigma=\n${model.gaussians(i).cov}\n")
}
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala" in the Spark repo.

请参考 Java API 文档 获取更多详情。

import org.apache.spark.ml.clustering.GaussianMixture;
import org.apache.spark.ml.clustering.GaussianMixtureModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// 加载数据
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");
// 训练一个 GaussianMixture 模型
GaussianMixture gmm = new GaussianMixture()
.setK(2);
GaussianMixtureModel model = gmm.fit(dataset);
// 输出混合模型的参数
for (int i = 0; i < model.getK(); i++) {
System.out.printf("Gaussian %d:\nweight=%f\nmu=%s\nsigma=\n%s\n\n",
i, model.weights()[i], model.gaussians()[i].mean(), model.gaussians()[i].cov());
}
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java" in the Spark repo.

有关更多详细信息,请参阅 R API 文档

# 加载训练数据
df <- read.df("data/mllib/sample_kmeans_data.txt", source = "libsvm")
training <- df
test <- df
# 使用 spark.gaussianMixture 拟合高斯混合聚类模型
model <- spark.gaussianMixture(training, ~ features, k = 2)
# 模型摘要
summary(model)
# 预测
predictions <- predict(model, test)
head(predictions)
Find full example code at "examples/src/main/r/ml/gaussianMixture.R" in the Spark repo.

幂迭代聚类 (PIC)

幂迭代聚类 (PIC) 是一种可扩展的图聚类算法,由 Lin and Cohen 开发。 摘要中提到:PIC 通过在数据的归一化成对相似性矩阵上使用截断幂迭代,寻找数据集的非常低维嵌入。

spark.ml 的PowerIterationClustering实现接受以下参数:

示例

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.clustering import PowerIterationClustering
df = spark.createDataFrame([
(0, 1, 1.0),
(0, 2, 1.0),
(1, 2, 1.0),
(3, 4, 1.0),
(4, 0, 0.1)
], ["src", "dst", "weight"])
pic = PowerIterationClustering(k=2, maxIter=20, initMode="degree", weightCol="weight")
# 显示聚类分配
pic.assignClusters(df).show()
Find full example code at "examples/src/main/python/ml/power_iteration_clustering_example.py" in the Spark repo.

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.clustering.PowerIterationClustering
val dataset = spark.createDataFrame(Seq(
(0L, 1L, 1.0),
(0L, 2L, 1.0),
(1L, 2L, 1.0),
(3L, 4L, 1.0),
(4L, 0L, 0.1)
)).toDF("src", "dst", "weight")
val model = new PowerIterationClustering().
setK(2).
setMaxIter(20).
setInitMode("degree").
setWeightCol("weight")
val prediction = model.assignClusters(dataset).select("id", "cluster")
//  显示聚类分配
prediction.show(false)
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/PowerIterationClusteringExample.scala" in the Spark repo.

有关更多详细信息,请参阅 Java API 文档

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.clustering.PowerIterationClustering;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0L, 1L, 1.0),
RowFactory.create(0L, 2L, 1.0),
RowFactory.create(1L, 2L, 1.0),
RowFactory.create(3L, 4L, 1.0),
RowFactory.create(4L, 0L, 0.1)
);
StructType schema = new StructType(new StructField[]{
new StructField("src", DataTypes.LongType, false, Metadata.empty()),
new StructField("dst", DataTypes.LongType, false, Metadata.empty()),
new StructField("weight", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
PowerIterationClustering model = new PowerIterationClustering()
.setK(2)
.setMaxIter(10)
.setInitMode("degree")
.setWeightCol("weight");
Dataset<Row> result = model.assignClusters(df);
result.show(false);
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaPowerIterationClusteringExample.java" in the Spark repo.

有关更多详细信息,请参考 R API 文档

df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
list(1L, 2L, 1.0), list(3L, 4L, 1.0),
list(4L, 0L, 0.1)),
schema = c("src", "dst", "weight"))
# 分配聚类
clusters <- spark.assignClusters(df, k = 2L, maxIter = 20L,
initMode = "degree", weightCol = "weight")
showDF(arrange(clusters, clusters$id))
Find full example code at "examples/src/main/r/ml/powerIterationClustering.R" in the Spark repo.