中心性指标

GraphFrames 提供了三个用于计算度的主要API:

Python API

from graphframes.examples import Graphs

g = Graphs(spark).friends()
in_degrees = g.inDegrees()
out_degrees = g.outDegrees()
degrees = g.degrees()

Scala API

import org.graphframes.{examples,GraphFrame}

val g: GraphFrame = examples.Graphs.friends

val inDegrees: DataFrame = g.inDegrees
val outDegrees: DataFrame = g.outDegrees
val degrees: DataFrame = g.degrees

PageRank算法

PageRank 有两种实现方式。

两种实现都支持非个性化和个性化 PageRank,其中设置 sourceId 会为该顶点个性化结果。

请参阅维基百科了解背景信息。

注意:目前 pageRank API 是 GraphFrames 中唯一返回 GraphFrame 对象而非 DataFrame 的 API。极有可能在下一个主要版本中,为保持 API 一致性,此行为将发生改变。强烈建议完全不要依赖返回的 edges

Python API

有关API详情,请参阅graphframes.GraphFrame.pageRank

from graphframes.examples import Graphs

g = Graphs(spark).friends()  # Get example graph

# Run PageRank until convergence to tolerance "tol"

results = g.pageRank(resetProbability=0.15, tol=0.01)

# Display resulting pageranks and final edge weights

# Note that the displayed pagerank may be truncated, e.g., missing the E notation

# In Spark 1.5+, you can use show(truncate=False) to avoid truncation

results.vertices.select("id", "pagerank").show()
results.edges.select("src", "dst", "weight").show()

# Run PageRank for a fixed number of iterations

results2 = g.pageRank(resetProbability=0.15, maxIter=10)

# Run PageRank personalized for vertex "a"

results3 = g.pageRank(resetProbability=0.15, maxIter=10, sourceId="a")

# Run PageRank personalized for vertex ["a", "b", "c", "d"] in parallel

results4 = g.parallelPersonalizedPageRank(resetProbability=0.15, sourceIds=["a", "b", "c", "d"], maxIter=10)

Scala API

有关API详情,请参阅org.graphframes.lib.PageRank

import org.graphframes.{examples,GraphFrame}

val g: GraphFrame = examples.Graphs.friends // get example graph

// Run PageRank until convergence to tolerance "tol".
val results: GraphFrame = g.pageRank.resetProbability(0.15).tol(0.01).run()

// Display resulting pageranks and final edge weights
// Note that the displayed pagerank may be truncated, e.g., missing the E notation.
// In Spark 1.5+, you can use show(truncate=false) to avoid truncation.
results.vertices.select("id", "pagerank").show()
results.edges.select("src", "dst", "weight").show()

// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()

// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()

// Run PageRank personalized for vertex ["a", "b", "c", "d"] in parallel
val results4 = g.parallelPersonalizedPageRank.resetProbability(0.15).maxIter(10).sourceIds(Array("a", "b", "c", "d"))
.run()
results4.vertices.show()
results4.edges.show()

并行个性化网页排名

GraphFrames 还支持并行个性化 PageRank,允许用户“从源顶点子集”计算排名。

有关API详情请参阅:

K-核心

K-核分解是一种用于识别网络中最紧密连接子图的方法。k-核是指每个顶点至少具有k度的最大子图。该指标通过过滤掉连接较少的节点,揭示高度互连实体的核心,有助于理解网络内部结构。K-核中心性可应用于多个领域,例如在社交网络分析中寻找有影响力的用户,在生物学中检测稳定蛋白质复合物,或在基础设施网络中评估鲁棒性和脆弱性。

GraphFrames 中提供的 K-核分解实现基于 IEEE Xplore 上可获取论文所描述的研究。该方法采用类顶点思维方式,利用消息传递范式解决 k-核分解问题,从而显著降低 I/O 开销。

更多信息,请参阅:

A. Farajollahi, S. G. Khaki, and L. Wang, "大规模图的高效分布式k核分解," 2017 IEEE International Conference on Big Data (Big Data), Boston, MA, USA, 2017, pp. 1430-1435.

Arguments

For graphframes only. To avoid exponential growing of the Spark' Logical Plan, DataFrame lineage and query optimization time, it is required to do checkpointing periodically. While checkpoint itself is not free, it is still recommended to set this value to something less than 5.

仅适用于 graphframes。默认情况下,GraphFrames 使用持久化检查点。它们可靠且能降低错误率。持久化检查点的缺点是需要设置持久化存储中的 checkpointDir,例如 S3HDFS。通过提供 use_local_checkpoints=True,用户可以指示 GraphFrames 使用 Spark 执行器的本地磁盘进行检查点操作。本地检查点速度更快,但可靠性较低:例如,如果执行器因被更高优先级的任务占用而丢失,检查点将会丢失,整个任务也会失败。

The level of storage for intermediate results and the output DataFrame with components. By default it is memory and disk deserialized as a good balance between performance and reliability. For very big graphs and out-of-core scenarious, using DISK_ONLY may be faster.

Python API

import org.graphframes.GraphFrame

v = spark.createDataFrame([(i, f"v{i}") for i in range(30)], ["id", "name"])

# Build edges to create a hierarchical structure:
# Core (k=5): vertices 0-4 - fully connected
core_edges = [(i, j) for i in range(5) for j in range(i + 1, 5)]

# Next layer (k=3): vertices 5-14 - each connects to multiple core vertices
mid_layer_edges = [
    (5, 0),
    (5, 1),
    (5, 2),  # Connect to core
    (6, 0),
    (6, 1),
    (6, 3),
    (7, 1),
    (7, 2),
    (7, 4),
    (8, 0),
    (8, 3),
    (8, 4),
    (9, 1),
    (9, 2),
    (9, 3),
    (10, 0),
    (10, 4),
    (11, 2),
    (11, 3),
    (12, 1),
    (12, 4),
    (13, 0),
    (13, 2),
    (14, 3),
    (14, 4),
]

# Outer layer (k=1): vertices 15-29 - sparse connections
outer_edges = [
    (15, 5),
    (16, 6),
    (17, 7),
    (18, 8),
    (19, 9),
    (20, 10),
    (21, 11),
    (22, 12),
    (23, 13),
    (24, 14),
    (25, 15),
    (26, 16),
    (27, 17),
    (28, 18),
    (29, 19),
]

all_edges = core_edges + mid_layer_edges + outer_edges
e = spark.createDataFrame(all_edges, ["src", "dst"])
g = GraphFrame(v, e)
result = g.k_core(
    checkpoint_interval=args.checkpoint_interval,
    use_local_checkpoints=args.use_local_checkpoints,
    storage_level=args.storage_level,
)

Scala API

import org.graphframes.GraphFrame

val v = spark.createDataFrame((0L until 30L).map(id => (id, s"v$id"))).toDF("id", "name")

// Build edges to create a hierarchical structure:
// Core (k=5): vertices 0-4 - fully connected
// Next layer (k=3): vertices 5-14 - each connects to multiple core vertices
// Outer layer (k=1): vertices 15-29 - sparse connections
val coreEdges = for {
  i <- 0 until 5
  j <- (i + 1) until 5
} yield (i.toLong, j.toLong)

val midLayerEdges = Seq(
  (5L, 0L),
  (5L, 1L),
  (5L, 2L), // Connect to core
  (6L, 0L),
  (6L, 1L),
  (6L, 3L),
  (7L, 1L),
  (7L, 2L),
  (7L, 4L),
  (8L, 0L),
  (8L, 3L),
  (8L, 4L),
  (9L, 1L),
  (9L, 2L),
  (9L, 3L),
  (10L, 0L),
  (10L, 4L),
  (11L, 2L),
  (11L, 3L),
  (12L, 1L),
  (12L, 4L),
  (13L, 0L),
  (13L, 2L),
  (14L, 3L),
  (14L, 4L))

val outerEdges = Seq(
  (15L, 5L),
  (16L, 6L),
  (17L, 7L),
  (18L, 8L),
  (19L, 9L),
  (20L, 10L),
  (21L, 11L),
  (22L, 12L),
  (23L, 13L),
  (24L, 14L),
  (25L, 15L),
  (26L, 16L),
  (27L, 17L),
  (28L, 18L),
  (29L, 19L))

val allEdges = coreEdges ++ midLayerEdges ++ outerEdges
val e = spark.createDataFrame(allEdges).toDF("src", "dst")
val g = GraphFrame(v, e)
val result = g.kCore.run()