频繁模式挖掘 - 基于RDD的API

挖掘频繁项、项集、子序列或其他子结构通常是分析大规模数据集的第一步,这多年来一直是数据挖掘的一个活跃研究主题。我们建议用户查阅维基百科的 关联规则学习 以获取更多信息。 spark.mllib 提供了 FP-growth 的并行实现,这是一种流行的频繁项集挖掘算法。

FP-growth

FP-growth算法在论文中进行了描述 Han et al., Mining frequent patterns without candidate generation , 其中“FP”代表频繁模式。 给定一个交易数据集,FP-growth的第一步是计算项频率并识别频繁项。 不同于为相同目的而设计的 Apriori-like 算法, FP-growth的第二步使用后缀树(FP-tree)结构来编码交易,而不是显式生成候选集, 这通常是代价昂贵的。 在第二步之后,可以从FP-tree中提取频繁项集。 在 spark.mllib 中,我们实现了一个名为PFP的FP-growth并行版本, 如在 Li et al., PFP: Parallel FP-growth for query recommendation 中所述。 PFP根据交易的后缀分配FP-tree生长的工作, 因此比单机实现更具可扩展性。 我们建议用户查看论文以获取更多细节。

spark.mllib 的FP-growth实现包含以下(超)参数:

示例

FPGrowth 实现了 FP-growth 算法。 它接受一个 RDD 的事务,每个事务是一个 List ,包含了一个通用类型的项目。 调用 FPGrowth.train 方法,传入事务,将返回一个 FPGrowthModel ,存储频繁项集及其频率。

有关API的更多详细信息,请参阅 FPGrowth Python文档

 pyspark.mllib.fpm 导入 FPGrowth
数据 = sc.textFile("data/mllib/sample_fpgrowth.txt")
事务 = 数据.map(lambda line: line.strip().split(' '))
模型 = FPGrowth.train(事务, minSupport=0.2, numPartitions=10)
结果 = 模型.freqItemsets().collect()
对于 fi  结果:
print(fi)
Find full example code at "examples/src/main/python/mllib/fpgrowth_example.py" in the Spark repo.

FPGrowth 实现了 FP-growth 算法。 它接受一个 RDD 的事务,其中每个事务是一个 Array 的通用类型项目。 调用 FPGrowth.run 并传入事务,将返回一个 FPGrowthModel ,存储频繁项集及其频率。以下示例说明了如何从 transactions 中挖掘频繁项集和关联规则 (有关详细信息,请参见 关联规则 )。

有关API的详细信息,请参阅 FPGrowth Scala文档

import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD
val data = sc.textFile("data/mllib/sample_fpgrowth.txt")
val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))
val fpg = new FPGrowth()
.setMinSupport(0.2)
.setNumPartitions(10)
val model = fpg.run(transactions)
model.freqItemsets.collect().foreach { itemset =>
println(s"${itemset.items.mkString("[", ",", "]")},${itemset.freq}")
}
val minConfidence = 0.8
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
println(s"${rule.antecedent.mkString("[", ",", "]")}=> " +
s"${rule.consequent .mkString("[", ",", "]")},${rule.confidence}")
}
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/SimpleFPGrowth.scala" in the Spark repo.

FPGrowth 实现了 FP-growth 算法。 它接受一个 JavaRDD 的交易,每个交易都是一个 Iterable 的泛型类型的项目。 调用 FPGrowth.run 进行交易返回一个 FPGrowthModel ,存储频繁项集及其频率。以下 示例说明如何从 transactions 中挖掘频繁项集和关联规则 (有关详细信息,请参见 关联规则 )。

有关API的详细信息,请参阅 FPGrowth Java文档

import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowthModel;
JavaRDD<String> data = sc.textFile("data/mllib/sample_fpgrowth.txt");
JavaRDD<List<String>> transactions = data.map(line -> Arrays.asList(line.split(" ")));
FPGrowth fpg = new FPGrowth()
.setMinSupport(0.2)
.setNumPartitions(10);
FPGrowthModel<String> model = fpg.run(transactions);
for (FPGrowth.FreqItemset<String> itemset: model.freqItemsets().toJavaRDD().collect()) {
System.out.println("[" + itemset.javaItems() + "], " + itemset.freq());
}
double minConfidence = 0.8;
for (AssociationRules.RuleString> rule
: model.generateAssociationRules(minConfidence).toJavaRDD().collect()) {
System.out.println(
rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java" in the Spark repo.

关联规则

AssociationRules 实现了一种并行规则生成算法,用于构建以单个项作为结果的规则。

有关API的详细信息,请参见 AssociationRules Scala 文档

import org.apache.spark.mllib.fpm.AssociationRules
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset
val freqItemsets = sc.parallelize(Seq(
new FreqItemset(Array("a"), 15L),
new FreqItemset(Array("b"), 35L),
new FreqItemset(Array("a", "b"), 12L)
))
val ar = new AssociationRules()
.setMinConfidence(0.8)
val results = ar.run(freqItemsets)
results.collect().foreach { rule =>
println(s"[${rule.antecedent.mkString(",")}=>${rule.consequent.mkString(",")} ]" +
s" ${rule.confidence}")
}
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/AssociationRulesExample.scala" in the Spark repo.

AssociationRules 实现了一种并行规则生成算法,用于构建具有单个项作为结果的规则。

请参阅 AssociationRules Java文档 以获取API的详细信息。

import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset;
JavaRDD<FPGrowth.FreqItemset<String>> freqItemsets = sc.parallelize(Arrays.asList(
new FreqItemset<>(new String[] {"a"}, 15L),
new FreqItemset<>(new String[] {"b"}, 35L),
new FreqItemset<>(new String[] {"a", "b"}, 12L)
));
AssociationRules arules = new AssociationRules()
.setMinConfidence(0.8);
JavaRDD<AssociationRules.Rule<String>> results = arules.run(freqItemsets);
for (AssociationRules.Rule<String> rule : results.collect()) {
System.out.println(
rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java" in the Spark repo.

前缀序列模式挖掘

PrefixSpan 是一种序列模式挖掘算法,描述在 Pei et al., Mining Sequential Patterns by Pattern-Growth: The PrefixSpan Approach 。我们建议读者参考引用的论文以形式化序列模式挖掘问题。

spark.mllib 的 PrefixSpan 实现接受以下参数:

示例

以下示例说明了在序列上运行的 PrefixSpan(使用与 Pei 等相同的记号):

  <(12)3>
  <1(32)(12)>
  <(12)5>
  <6>

PrefixSpan 实现了 PrefixSpan 算法。 调用 PrefixSpan.run 返回一个 PrefixSpanModel ,它存储了频繁序列及其频率。

有关API的详细信息,请参阅 PrefixSpan Scala文档 PrefixSpanModel Scala文档

import org.apache.spark.mllib.fpm.PrefixSpan
val sequences = sc.parallelize(Seq(
Array(Array(1, 2), Array(3)),
Array(Array(1), Array(3, 2), Array(1, 2)),
Array(Array(1, 2), Array(5)),
Array(Array(6))
), 2).cache()
val prefixSpan = new PrefixSpan()
.setMinSupport(0.5)
.setMaxPatternLength(5)
val model = prefixSpan.run(sequences)
model.freqSequences.collect().foreach { freqSequence =>
println(
s"${freqSequence.sequence.map(_.mkString("[", ", ", "]")).mkString("[", ", ", "]")}," +
s" ${freqSequence.freq}")
}
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/PrefixSpanExample.scala" in the Spark repo.

PrefixSpan 实现了 PrefixSpan 算法。 调用 PrefixSpan.run 返回一个 PrefixSpanModel ,存储频繁序列及其频率。

请参阅 PrefixSpan Java 文档 PrefixSpanModel Java 文档 以获取有关 API 的详细信息。

import java.util.Arrays;
import java.util.List;
import org.apache.spark.mllib.fpm.PrefixSpan;
import org.apache.spark.mllib.fpm.PrefixSpanModel;
JavaRDD<List<List<Integer>>> sequences = sc.parallelize(Arrays.asList(
Arrays.asList(1, 2), Arrays.asList(3)),
Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1, 2)),
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5)),
Arrays.asList(Arrays.asList(6))
), 2);
PrefixSpan prefixSpan = new PrefixSpan()
.setMinSupport(0.5)
.setMaxPatternLength(5);
PrefixSpanModel<Integer> model = prefixSpan.run(sequences);
for (PrefixSpan.FreqSequenceInteger> freqSeq: model.freqSequences().toJavaRDD().collect()) {
System.out.println(freqSeq.javaSequence() + ", " + freqSeq.freq());
}
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaPrefixSpanExample.java" in the Spark repo.