遍历与连通性

最短路径

计算从每个顶点到给定地标顶点集合的最短路径,其中地标由顶点ID指定。请注意,此计算会考虑边的方向。

See Wikipedia for a background.


注意

Be aware, that returned DataFrame is persistent and should be unpersisted manually after processing to avoid memory leaks!


Python API

有关API详情,请参阅 graphframes.GraphFrame.shortestPaths

from graphframes.examples import Graphs

g = Graphs(spark).friends()  # Get example graph

results = g.shortestPaths(landmarks=["a", "d"])
results.select("id", "distances").show()

Scala API

有关API详情,请参阅 org.graphframes.lib.ShortestPaths

import org.graphframes.{examples, GraphFrame}

val g: GraphFrame = examples.Graphs.friends // get example graph

val results = g.shortestPaths.landmarks(Seq("a", "d")).run()
results.select("id", "distances").show()

Arguments

用作地标以计算从它们到所有其他顶点的最短路径的顶点列表(Seq)。

Possible values are graphx and graphframes. Both implementations are based on the same logic. GraphX is faster for small-medium sized graphs but requires more memory due to less efficient RDD serialization and it's triplets-based nature. GraphFrames requires much less memory due to efficient Thungsten serialization and because the core structures are edges and messages, not triplets.

For graphframes only. To avoid exponential growing of the Spark' Logical Plan, DataFrame lineage and query optimization time, it is required to do checkpointing periodically. While checkpoint itself is not free, it is still recommended to set this value to something less than 5.

For graphframes only. By default, GraphFrames uses persistent checkpoints. They are realiable and reduce the errors rate. The downside of the persistent checkpoints is that they are requiride to set up a checkpointDir in persistent storage like S3 or HDFS. By providing use_local_checkpoints=True, user can say GraphFrames to use local disks of Spark' executurs for checkpointing. Local checkpoints are faster, but they are less reliable: if the executur lost, for example, is taking by the higher priority job, checkpoints will be lost and the whole job fails.

The level of storage for intermediate results and the output DataFrame with components. By default it is memory and disk deserialized as a good balance between performance and reliability. For very big graphs and out-of-core scenarious, using DISK_ONLY may be faster.

默认情况下,此值为 true,算法将仅查找有向路径。通过传递 false,图将被视为无向图,算法将查找任意最短路径。

广度优先搜索 (BFS)

广度优先搜索(BFS)可查找从一个顶点(或一组顶点)到另一个顶点(或一组顶点)的最短路径。起始和结束顶点需指定为Spark DataFrame表达式。

更多背景信息请参阅维基百科关于广度优先搜索(BFS)的介绍

以下代码片段使用广度优先搜索(BFS)查找名为 "Esther" 的顶点与年龄小于32的顶点之间的路径。

Python API

有关API详情,请参阅 graphframes.GraphFrame.bfs

g = Graphs(spark).friends()  # Get example graph

# Search from "Esther" for users of age < 32

paths = g.bfs("name = 'Esther'", "age < 32")
paths.show()

# Specify edge filters or max path lengths

g.bfs("name = 'Esther'", "age < 32",
      edgeFilter="relationship != 'friend'", maxPathLength=3)

Scala API

有关API详情,请参阅org.graphframes.lib.BFS

import org.graphframes.{examples, GraphFrame}

val g: GraphFrame = examples.Graphs.friends // get example graph

// Search from "Esther" for users of age < 32.
val paths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
paths.show()

// Specify edge filters or max path lengths.
val paths = {
  g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
    .edgeFilter("relationship != 'friend'")
    .maxPathLength(3).run()
}
paths.show()

连通组件

计算每个顶点的连通组件成员关系,并返回一个图,其中每个顶点都被分配了一个组件ID。

See Wikipedia for the background.


注意:

从 GraphFrames 0.3.0 及后续版本开始,默认的连通分量算法需要设置 Spark 检查点目录。用户可通过 connectedComponents.setAlgorithm("graphx") 回退到旧算法。自 GraphFrames 0.9.3 版本起,用户还可使用无需设置 Spark 检查点目录的 localCheckpoints。要使用 localCheckpoints,用户可将配置项 spark.graphframes.useLocalCheckpoints 设为 true,或调用 API connectedComponents.setUseLocalCheckpoints(true)。虽然 localCheckpoints 能提供更好的性能,但其可靠性不如持久化检查点。

注意

Be aware, that returned DataFrame is persistent and should be unpersisted manually after processing to avoid memory leaks!


Python API

有关API详情,请参阅graphframes.GraphFrame.connectedComponents

from graphframes.examples import Graphs

sc.setCheckpointDir("/tmp/spark-checkpoints")

g = Graphs(spark).friends()  # Get example graph

result = g.connectedComponents()
result.select("id", "component").orderBy("component").show()

Scala API

有关API详情,请参阅org.graphframes.lib.ConnectedComponents

import org.graphframes.{examples, GraphFrame}

val g: GraphFrame = examples.Graphs.friends // get example graph

val result = g.connectedComponents.setUseLocalCheckpoints(true).run()
result.select("id", "component").orderBy("component").show()

Arguments

可能的值为 graphxgraphframes。基于 GraphX 的实现是一种简单的逐个顶点 Pregel 算法。虽然它在中小型图上可能稍快一些,但由于 RDD 序列化效率较低,其收敛复杂度更高且需要更多内存。基于 GraphFrame 的实现基于 Kiveris, Raimondas 等人《MapReduce 及更高版本中的连通分量》。ACM 云计算研讨会论文集。2014 年。 中的思想。该实现具有更好的收敛复杂度,并且所需内存更少。

仅适用于 graphx。限制 Pregel 迭代的最大次数。默认为无限次(Integer.maxValue)。建议不要更改此值。如果算法卡住,这是图的问题,而非算法问题。

For graphframes only. To avoid exponential growing of the Spark' Logical Plan, DataFrame lineage and query optimization time, it is required to do checkpointing periodically. While checkpoint itself is not free, it is still recommended to set this value to something less than 5.

仅适用于 graphframes。详情请参阅此章节

仅适用于 graphframes。当输入图顶点的类型不是 LongIntShortByte 之一时,默认情况下输出标签(组件)是随机的 Long 数字。通过提供 use_labels_as_components=True,用户可以要求 GraphFrames 使用原始顶点标签作为输出组件。在这种情况下,每个找到的组件将使用所有原始ID的最小值。此操作并非免费,需要额外的 groupBy + agg + join

For graphframes only. By default, GraphFrames uses persistent checkpoints. They are realiable and reduce the errors rate. The downside of the persistent checkpoints is that they are requiride to set up a checkpointDir in persistent storage like S3 or HDFS. By providing use_local_checkpoints=True, user can say GraphFrames to use local disks of Spark' executurs for checkpointing. Local checkpoints are faster, but they are less reliable: if the executur lost, for example, is taking by the higher priority job, checkpoints will be lost and the whole job fails.

The level of storage for intermediate results and the output DataFrame with components. By default it is memory and disk deserialized as a good balance between performance and reliability. For very big graphs and out-of-core scenarious, using DISK_ONLY may be faster.

自适应查询执行-广播模式

从 0.10.0 版本开始

仅适用于 graphframes 算法。在迭代过程中,该算法可能生成新的边,由于某些顶点具有非常高的度数,这可能导致连接和聚合操作中出现高度偏斜。在 GraphFrames 的早期版本中,此问题通过手动广播极高度数节点来解决。遗憾的是,Apache Spark 自适应查询执行优化在此类情况下会失效,这也是为什么在连通分量算法中禁用 AQE 的原因。

在GraphFrames的新版本(0.10+)中,有一种方法可以禁用手动广播,启用AQE并允许其处理数据倾斜。要启用此模式,请向setBroadcastThreshold传递-1。根据基准测试,此模式可提供约5倍的加速。在未来的版本中,broadcastThreshold的默认值可能会更改为-1

强连通分量

计算每个顶点的强连通分量(SCC),并返回一个图,其中每个顶点被分配到包含该顶点的SCC。目前,GraphFrames中的SCC是对GraphX实现的封装。

See Wikipedia for the background.


注意

Be aware, that returned DataFrame is persistent and should be unpersisted manually after processing to avoid memory leaks!


Python API

有关API详情,请参阅 graphframes.GraphFrame.stronglyConnectedComponents

from graphframes.examples import Graphs

sc.setCheckpointDir("/tmp/spark-checkpoints")

g = Graphs(spark).friends()  # Get example graph

result = g.stronglyConnectedComponents(maxIter=10)
result.select("id", "component").orderBy("component").show()

Scala API

有关API详情,请参阅org.graphframes.lib.StronglyConnectedComponents

import org.graphframes.{examples, GraphFrame}

val g: GraphFrame = examples.Graphs.friends // get example graph

val result = g.stronglyConnectedComponents.maxIter(10).run()
result.select("id", "component").orderBy("component").show()

三角形计数

计算通过每个顶点的三角形数量。


警告!

当前实现基于收集顶点的邻居集并计算它们的成对交集。虽然这对规则图有效,但在任何类型的幂律图(具有少量极高度数顶点的图)上很可能会失败,或者至少需要为Spark集群分配大量内存。在运行算法之前考虑边采样策略,以获取三角形的近似计数。


Python API

有关API详情,请参阅 graphframes.GraphFrame.triangleCount

from graphframes.examples import Graphs

g = Graphs(spark).friends()  # Get example graph

results = g.triangleCount()
results.select("id", "count").show()

Scala API

有关API详情,请参阅org.graphframes.lib.TriangleCount

import org.graphframes.{examples, GraphFrame}

val g: GraphFrame = examples.Graphs.friends // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()

循环检测

GraphFrames 提供了 Rocha–Thatte 循环检测算法的实现。


注意

Be aware, that returned DataFrame is persistent and should be unpersisted manually after processing to avoid memory leaks!

警告:


Python API

from graphframes import GraphFrame

vertices = spark.createDataFrame(
    [(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e")], ["id", "attr"]
)
edges = spark.createDataFrame(
    [(1, 2), (2, 3), (3, 1), (1, 4), (2, 5)], ["src", "dst"]
)
graph = GraphFrame(vertices, edges)
res = graph.detectingCycles(
    checkpoint_interval=3,
    use_local_checkpoints=True,
)
res.show(False)

# Output:
# +----+--------------+
# | id | found_cycles |
# +----+--------------+
# |1   |[1, 3, 1]     |
# |1   |[1, 2, 1]     |
# |1   |[1, 2, 5, 1]  |
# |2   |[2, 1, 2]     |
# |2   |[2, 5, 1, 2]  |
# |3   |[3, 1, 3]     |
# |5   |[5, 1, 2, 5]  |
# +----+--------------+

Scala API

import org.graphframes.GraphFrame

val graph = GraphFrame(
  spark
    .createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"), (4L, "d"), (5L, "e")))
    .toDF("id", "attr"),
  spark
    .createDataFrame(Seq((1L, 2L), (2L, 1L), (1L, 3L), (3L, 1L), (2L, 5L), (5L, 1L)))
    .toDF("src", "dst"))
val res = graph.detectingCycles.setUseLocalCheckpoints(true).run()
res.show(false)

// Output:
// +--------------+
// | found_cycles |
// +--------------+
// |[1, 3, 1]     |
// |[1, 2, 1]     |
// |[1, 2, 5, 1]  |

Arguments

For graphframes only. To avoid exponential growing of the Spark' Logical Plan, DataFrame lineage and query optimization time, it is required to do checkpointing periodically. While checkpoint itself is not free, it is still recommended to set this value to something less than 5.

For graphframes only. By default, GraphFrames uses persistent checkpoints. They are realiable and reduce the errors rate. The downside of the persistent checkpoints is that they are requiride to set up a checkpointDir in persistent storage like S3 or HDFS. By providing use_local_checkpoints=True, user can say GraphFrames to use local disks of Spark' executurs for checkpointing. Local checkpoints are faster, but they are less reliable: if the executur lost, for example, is taking by the higher priority job, checkpoints will be lost and the whole job fails.

The level of storage for intermediate results and the output DataFrame with components. By default it is memory and disk deserialized as a good balance between performance and reliability. For very big graphs and out-of-core scenarious, using DISK_ONLY may be faster.