聚类
本页面描述了MLlib中的聚类算法。 基于RDD的API中的聚类指南 也包含有关这些算法的相关信息。
目录
K均值
k-means 是最常用的聚类算法之一,它将数据点聚簇到预定义数量的簇中。MLlib 实现包括一种并行化的 k-means++ 方法的变种,称为 kmeans|| 。
KMeans
被实现为一个
Estimator
,并生成一个
KMeansModel
作为基础模型。
输入列
| 参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
| featuresCol | 向量 | "features" | 特征向量 |
输出列
| 参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
| predictionCol | Int | "prediction" | 预测的聚类中心 |
示例
有关更多详细信息,请参阅 Python API 文档 。
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# 加载数据.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
# 训练一个 k-means 模型.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)
# 进行预测
predictions = model.transform(dataset)
# 通过计算轮廓得分来评估聚类
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("使用平方欧几里得距离的轮廓 = " + str(silhouette))
# 显示结果.
centers = model.clusterCenters()
print("聚类中心: ")
for center in centers:
print(center)
有关更多详细信息,请参考 Scala API 文档 。
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
// 加载数据。
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// 训练一个k-means模型。
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)
// 进行预测
val predictions = model.transform(dataset)
// 通过计算轮廓分数评估聚类
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(predictions)
println(s"使用平方欧几里得距离的轮廓 = $silhouette")
// 显示结果。
println("聚类中心: ")
model.clusterCenters.foreach(println)
有关更多详细信息,请参考 Java API 文档 。
import org.apache.spark.ml.clustering.KMeansModel;
import org.apache.spark.ml.clustering.KMeans;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// 加载数据。
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");
// 训练一个 k-means 模型。
KMeans kmeans = new KMeans().setK(2).setSeed(1L);
KMeansModel model = kmeans.fit(dataset);
// 进行预测
Dataset<Row> predictions = model.transform(dataset);
// 通过计算轮廓系数来评估聚类
ClusteringEvaluator evaluator = new ClusteringEvaluator();
double silhouette = evaluator.evaluate(predictions);
System.out.println("使用平方欧几里得距离的轮廓系数 = " + silhouette);
// 显示结果。
Vector[] centers = model.clusterCenters();
System.out.println("聚类中心: ");
for (Vector center: centers) {
System.out.println(center);
}
有关更多详细信息,请参考 R API 文档 。
# 使用 spark.kmeans 拟合 k-means 模型
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
df_list <- randomSplit(training, c(7,3), 2)
kmeansDF <- df_list[[1]]
kmeansTestDF <- df_list[[2]]
kmeansModel <- spark.kmeans(kmeansDF, ~ Class + Sex + Age + Freq,
k = 3)
# 模型摘要
summary(kmeansModel)
# 从 k-means 模型获取拟合结果
head(fitted(kmeansModel))
# 预测
kmeansPredictions <- predict(kmeansModel, kmeansTestDF)
head(kmeansPredictions)
潜在狄利克雷分配 (LDA)
LDA
被实现为一个
Estimator
,支持
EMLDAOptimizer
和
OnlineLDAOptimizer
,
并生成一个
LDAModel
作为基础模型。专业用户如果需要,可以将由
EMLDAOptimizer
生成的
LDAModel
转换为
DistributedLDAModel
。
示例
有关更多详细信息,请参阅 Python API 文档 。
from pyspark.ml.clustering import LDA
# 加载数据。
dataset = spark.read.format("libsvm").load("data/mllib/sample_lda_libsvm_data.txt")
# 训练 LDA 模型。
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)
ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("整个语料库的日志似然的下界: " + str(ll))
print("困惑度的上界: " + str(lp))
# 描述主题。
topics = model.describeTopics(3)
print("由其最高权重词描述的主题:")
topics.show(truncate=False)
# 显示结果
transformed = model.transform(dataset)
transformed.show(truncate=False)
有关更多详细信息,请参考 Scala API 文档 。
import org.apache.spark.ml.clustering.LDA
// 加载数据。
val dataset = spark.read.format("libsvm")
.load("data/mllib/sample_lda_libsvm_data.txt")
// 训练一个 LDA 模型。
val lda = new LDA().setK(10).setMaxIter(10)
val model = lda.fit(dataset)
val ll = model.logLikelihood(dataset)
val lp = model.logPerplexity(dataset)
println(s"整个语料库的对数似然下界: $ll")
println(s"困惑度的上界: $lp")
// 描述主题。
val topics = model.describeTopics(3)
println("由其最高权重的词描述的主题:")
topics.show(false)
// 显示结果。
val transformed = model.transform(dataset)
transformed.show(false)
有关更多详细信息,请参考 Java API 文档 。
import org.apache.spark.ml.clustering.LDA;
import org.apache.spark.ml.clustering.LDAModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// 加载数据。
Dataset<Row> dataset = spark.read().format("libsvm")
.load("data/mllib/sample_lda_libsvm_data.txt");
// 训练LDA模型。
LDA lda = new LDA().setK(10).setMaxIter(10);
LDAModel model = lda.fit(dataset);
double ll = model.logLikelihood(dataset);
double lp = model.logPerplexity(dataset);
System.out.println("整个语料库的对数似然的下界: " + ll);
System.out.println("困惑度的上界: " + lp);
// 描述主题。
Dataset<Row> topics = model.describeTopics(3);
System.out.println("由其最高权重术语描述的主题:");
topics.show(false);
// 显示结果。
Dataset<Row> transformed = model.transform(dataset);
transformed.show(false);
有关更多详细信息,请参阅 R API 文档 。
# 加载训练数据
df <- read.df("data/mllib/sample_lda_libsvm_data.txt", source = "libsvm")
training <- df
test <- df
# 使用 spark.lda 拟合潜在狄利克雷分配模型
model <- spark.lda(training, k = 10, maxIter = 10)
# 模型摘要
summary(model)
# 后验概率
posterior <- spark.posterior(model, test)
head(posterior)
# LDA模型的对数困惑度
logPerplexity <- spark.perplexity(model, test)
print(paste0("困惑度的上限: ", logPerplexity))
二分k均值
二分k均值是一种使用 层次聚类 的分割(或“自上而下”)方法:所有观测值开始时都在一个聚类中,并且在向下移动层次结构时会递归地进行拆分。
二分K均值通常比常规K均值快得多,但它通常会产生不同的聚类。
BisectingKMeans
被实现为一个
Estimator
,并生成一个
BisectingKMeansModel
作为基础模型。
示例
有关更多详细信息,请参阅 Python API 文档 。
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# 加载数据。
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
# 训练一个二分K均值模型。
bkm = BisectingKMeans().setK(2).setSeed(1)
model = bkm.fit(dataset)
# 进行预测
predictions = model.transform(dataset)
# 通过计算轮廓系数评估聚类效果
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# 显示结果。
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
print(center)
有关更多详细信息,请参阅 Scala API 文档 。
import org.apache.spark.ml.clustering.BisectingKMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
// 加载数据。
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// 训练一个二分 k-means 模型。
val bkm = new BisectingKMeans().setK(2).setSeed(1)
val model = bkm.fit(dataset)
// 进行预测
val predictions = model.transform(dataset)
// 通过计算轮廓系数评估聚类
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(predictions)
println(s"基于平方欧几里得距离的轮廓系数 = $silhouette")
// 显示结果。
println("聚类中心: ")
val centers = model.clusterCenters
centers.foreach(println)
请参考 Java API 文档 以获取更多详细信息。
import org.apache.spark.ml.clustering.BisectingKMeans;
import org.apache.spark.ml.clustering.BisectingKMeansModel;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// 加载数据。
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");
// 训练一个二分K均值模型。
BisectingKMeans bkm = new BisectingKMeans().setK(2).setSeed(1);
BisectingKMeansModel model = bkm.fit(dataset);
// 进行预测
Dataset<Row> predictions = model.transform(dataset);
// 通过计算轮廓系数评估聚类
ClusteringEvaluator evaluator = new ClusteringEvaluator();
double silhouette = evaluator.evaluate(predictions);
System.out.println("基于平方欧几里得距离的轮廓系数 = " + silhouette);
// 显示结果。
System.out.println("聚类中心: ");
Vector[] centers = model.clusterCenters();
for (Vector center : centers) {
System.out.println(center);
}
有关更多详细信息,请参阅 R API文档 。
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# 拟合具有四个中心的二分k均值模型
model <- spark.bisectingKmeans(training, Class ~ Survived, k = 4)
# 从二分k均值模型获取拟合结果
fitted.model <- fitted(model, "centers")
# 模型摘要
head(summary(fitted.model))
# 训练数据上的拟合值
fitted <- predict(model, training)
head(select(fitted, "Class", "prediction"))
高斯混合模型 (GMM)
A
高斯混合模型
表示一个复合分布,其中点是从一个
k
个高斯子分布中抽取的,每个子分布都有其自己的概率。
spark.ml
实现使用
期望-maximization
算法来引导给定一组样本的最大似然模型。
GaussianMixture
被实现为一个
Estimator
,并生成一个
GaussianMixtureModel
作为基础模型。
输入列
| 参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
| featuresCol | 向量 | "features" | 特征向量 |
输出列
| 参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
| predictionCol | Int | "prediction" | 预测的聚类中心 |
| probabilityCol | Vector | "probability" | 每个聚类的概率 |
示例
有关更多详细信息,请参阅 Python API 文档 。
from pyspark.ml.clustering import GaussianMixture
# 加载数据
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
gmm = GaussianMixture().setK(2).setSeed(538009335)
model = gmm.fit(dataset)
print("高斯模型显示为一个数据框: ")
model.gaussiansDF.show(truncate=False)
请参考 Scala API 文档 以获取更多细节。
import org.apache.spark.ml.clustering.GaussianMixture
// 加载数据
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// 训练高斯混合模型
val gmm = new GaussianMixture()
.setK(2)
val model = gmm.fit(dataset)
// 输出混合模型的参数
for (i <- 0 until model.getK) {
println(s"Gaussian $i:\nweight=${model.weights(i)}\n" +
s"mu=${model.gaussians(i).mean}\nsigma=\n${model.gaussians(i).cov}\n")
}
请参考 Java API 文档 获取更多详情。
import org.apache.spark.ml.clustering.GaussianMixture;
import org.apache.spark.ml.clustering.GaussianMixtureModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// 加载数据
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");
// 训练一个 GaussianMixture 模型
GaussianMixture gmm = new GaussianMixture()
.setK(2);
GaussianMixtureModel model = gmm.fit(dataset);
// 输出混合模型的参数
for (int i = 0; i < model.getK(); i++) {
System.out.printf("Gaussian %d:\nweight=%f\nmu=%s\nsigma=\n%s\n\n",
i, model.weights()[i], model.gaussians()[i].mean(), model.gaussians()[i].cov());
}
有关更多详细信息,请参阅 R API 文档 。
# 加载训练数据
df <- read.df("data/mllib/sample_kmeans_data.txt", source = "libsvm")
training <- df
test <- df
# 使用 spark.gaussianMixture 拟合高斯混合聚类模型
model <- spark.gaussianMixture(training, ~ features, k = 2)
# 模型摘要
summary(model)
# 预测
predictions <- predict(model, test)
head(predictions)
幂迭代聚类 (PIC)
幂迭代聚类 (PIC) 是一种可扩展的图聚类算法,由 Lin and Cohen 开发。 摘要中提到:PIC 通过在数据的归一化成对相似性矩阵上使用截断幂迭代,寻找数据集的非常低维嵌入。
spark.ml
的PowerIterationClustering实现接受以下参数:
-
k: 要创建的簇的数量 -
initMode: 初始化算法的参数 -
maxIter: 最大迭代次数的参数 -
srcCol: 源顶点ID的输入列名称的参数 -
dstCol: 目标顶点ID的输入列名称 -
weightCol: 权重列名称的参数
示例
有关更多详细信息,请参阅 Python API 文档 。
from pyspark.ml.clustering import PowerIterationClustering
df = spark.createDataFrame([
(0, 1, 1.0),
(0, 2, 1.0),
(1, 2, 1.0),
(3, 4, 1.0),
(4, 0, 0.1)
], ["src", "dst", "weight"])
pic = PowerIterationClustering(k=2, maxIter=20, initMode="degree", weightCol="weight")
# 显示聚类分配
pic.assignClusters(df).show()
有关更多详细信息,请参阅 Scala API 文档 。
import org.apache.spark.ml.clustering.PowerIterationClustering
val dataset = spark.createDataFrame(Seq(
(0L, 1L, 1.0),
(0L, 2L, 1.0),
(1L, 2L, 1.0),
(3L, 4L, 1.0),
(4L, 0L, 0.1)
)).toDF("src", "dst", "weight")
val model = new PowerIterationClustering().
setK(2).
setMaxIter(20).
setInitMode("degree").
setWeightCol("weight")
val prediction = model.assignClusters(dataset).select("id", "cluster")
// 显示聚类分配
prediction.show(false)
有关更多详细信息,请参阅 Java API 文档 。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.clustering.PowerIterationClustering;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0L, 1L, 1.0),
RowFactory.create(0L, 2L, 1.0),
RowFactory.create(1L, 2L, 1.0),
RowFactory.create(3L, 4L, 1.0),
RowFactory.create(4L, 0L, 0.1)
);
StructType schema = new StructType(new StructField[]{
new StructField("src", DataTypes.LongType, false, Metadata.empty()),
new StructField("dst", DataTypes.LongType, false, Metadata.empty()),
new StructField("weight", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
PowerIterationClustering model = new PowerIterationClustering()
.setK(2)
.setMaxIter(10)
.setInitMode("degree")
.setWeightCol("weight");
Dataset<Row> result = model.assignClusters(df);
result.show(false);
有关更多详细信息,请参考 R API 文档 。
df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
list(1L, 2L, 1.0), list(3L, 4L, 1.0),
list(4L, 0L, 0.1)),
schema = c("src", "dst", "weight"))
# 分配聚类
clusters <- spark.assignClusters(df, k = 2L, maxIter = 20L,
initMode = "degree", weightCol = "weight")
showDF(arrange(clusters, clusters$id))