社区检测
标签传播算法 (LPA)
运行静态标签传播算法以检测网络中的社区。网络中的每个节点最初被分配到其自身的社区。在每个超级步骤中,节点将其社区归属发送给所有邻居,并将其状态更新为传入消息的模式社区归属。LPA 是一种标准的图社区检测算法。它在计算上非常廉价,尽管(1)不能保证收敛,且(2)可能最终得到平凡解(所有节点被识别为单一社区)。
请参阅 维基百科 了解背景信息。
注意
Be aware, that returned DataFrame is persistent and should be unpersisted manually after processing to avoid memory leaks!
Python API
有关API详情,请参阅 graphframes.GraphFrame.labelPropagation。
from graphframes.examples import Graphs
g = Graphs(spark).friends() # Get example graph
result = g.labelPropagation(maxIter=5)
result.select("id", "label").show()
Scala API
有关API详情,请参阅org.grapimport org.graphframes.lib.LabelPropagation。
import org.graphframes.{examples,GraphFrame}
val g: GraphFrame = examples.Graphs.friends // get example graph
val result = g.labelPropagation.maxIter(5).run()
result.select("id", "label").show()
Arguments
maxIter
Pregel迭代的次数。理论上,标签传播算法迟早会收敛到某个稳定状态,但在现实世界的图中存在许多问题。首先是振荡现象:即使算法几乎收敛,在大型图中,位于检测到的社区边界处的一些顶点仍可能在迭代之间持续振荡。然而,最大的问题是算法可能轻易收敛到所有顶点具有相同标签的状态。强烈建议将maxIter设置为从5到10之间的合理值,并根据任务和目标进行一些实验。
algorithm
可能的取值为 graphx 和 graphframes。两种实现基于相同的逻辑。对于中小型图,GraphX 速度更快,但由于 RDD 序列化效率较低且基于三元组的特性,需要更多内存。GraphFrames 由于高效的 Tungsten 序列化以及核心结构是边和消息而非三元组,所需内存显著减少。
checkpoint_interval
仅适用于 graphframes。为避免 Spark 逻辑计划、DataFrame 血缘关系和查询优化时间呈指数级增长,需要定期执行检查点操作。虽然检查点本身并非零成本,但仍建议将此值设置为小于 5。
use_local_checkpoints
仅适用于 graphframes。默认情况下,GraphFrames 使用持久化检查点。它们可靠且能降低错误率。持久化检查点的缺点是需要设置持久化存储中的 checkpointDir,例如 S3 或 HDFS。通过提供 use_local_checkpoints=True,用户可以指示 GraphFrames 使用 Spark 执行器的本地磁盘进行检查点操作。本地检查点速度更快,但可靠性较低:例如,如果执行器因被更高优先级的任务占用而丢失,检查点将会丢失,整个任务将失败。
storage_level
中间结果和输出DataFrame的存储级别。默认情况下使用内存和磁盘反序列化,以在性能和可靠性之间取得良好平衡。对于非常大的图和核外计算场景,使用DISK_ONLY可能会更快。
幂迭代聚类 (PIC)
GraphFrames 为 SparkML 库中的 幂迭代聚类 算法提供了一个封装器。
注意
Be aware, that returned DataFrame is persistent and should be unpersisted manually after processing to avoid memory leaks!
Python API
g = GraphFrame(vertices, edges)
g.powerIterationClustering(k=2, maxIter=40, weightCol="weight")
Scala API
val gf = GraphFrame(vertices, edges)
val clusters = gf
.powerIterationClustering(k = 2, maxIter = 40, weightCol = Some("weight"))
最大独立集
MIS(最大独立集)是一组顶点,使得集合中任意两个顶点都不相邻(即集合中任意两个顶点之间没有边),并且该集合是极大的,意味着向集合中添加任何其他顶点都会违反独立性属性。请注意,极大独立集不一定是最大独立集;也就是说,它确保无法向集合中添加更多顶点,但不能保证该集合在图的所有可能独立集中具有最大可能的顶点数量。寻找最大独立集是一个NP难问题。
GraphFrames 中实现的算法基于以下论文:Ghaffari, Mohsen. "一种改进的分布式最大独立集算法"。第二十七届 ACM-SIAM 离散算法研讨会论文集。工业与应用数学学会,2016年。(https://doi.org/10.1137/1.9781611974331.ch20)
该算法非常实用,例如当您计划开展营销活动时,希望通过最小化沟通成本来覆盖社交网络中的大多数用户。在这种情况下,您需要找到一个庞大的节点子集,这些节点彼此不直接相连,但都与网络中的其他节点相连接。这正是最大独立集算法所实现的功能。它会返回一组互不连通的顶点,使得在保持独立性的前提下无法再向该集合添加更多顶点。
注意:这是一个随机化、非确定性的算法。即使提供了固定的随机种子,由于Apache Spark的工作方式,不同运行之间的结果可能会有所不同。
Python API
vertices = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "d")], ["id", "name"]
)
edges = spark.createDataFrame([(0, 1, "edge1")], ["src", "dst", "name"])
graph = GraphFrame(vertices, edges)
mis = graph.maximal_independent_set(storage_level=storage_level, seed=12345)
Scala API
val vertices =
spark.createDataFrame(Seq((0L, "a"), (1L, "b"), (2L, "c"), (3L, "d"))).toDF("id", "name")
val edges = spark.createDataFrame(Seq((0L, 1L, "edge1"))).toDF("src", "dst", "name")
val graph = GraphFrame(vertices, edges)
val mis = graph.maximalIndependentSet.run(seed = 12345L)