Pregel API

Pregel API 是 GraphFrames 的核心支柱之一。它基于 Pregel 算法 的实现,通过使用 Apache Spark DataFrame / Dataset API 的关系操作来实现。


注意

Be aware, that returned DataFrame is persistent and should be unpersisted manually after processing to avoid memory leaks!


应用程序接口

有关API详情,请参阅:

参数

消息列

在内部,GraphFrames通过特定列处理所有消息:

val pregelMessageColumn = org.graphframes.lib.Pregel.msg
val pregelMessageColumnName = org.graphframes.lib.Pregel.MSG_COL_NAME

此名称同时应用于原始消息和聚合消息,并应在条件和聚合中使用。例如,在PageRank算法的最简单实现中,它可能如下所示:

val aggregateMessagesExpression = sum(Pregel.msg)
val updateExpression = coalesce(Pregel.msg, lit(0.0)) * (1 - alpha) + alpha / numVertices

三元组列

在消息生成阶段,GraphFrames 内部会创建三元组,其中包含源顶点、边和目标顶点的 StructType 列。这些结构体包含来自原始源、目标和边的所有列,不仅包括初始属性,还包括 Pregel 列,如活动列、所有顶点列等。要访问这些数据,用户应依赖以下 API:

val sourceVertexColumn1 = Pregel.src("vertexColumn1")
val sourceVertexColumn2 = Pregel.src("vertexColumn2")
val destinationVertexColumn1 = Pregel.dst("vertexColumn1")
val edgeWeight = Pregel.edge("weight")

在底层,传递的列名将被解析以获取三元组结构对应的元素。

发送消息

GraphFrames Pregel API 支持每个顶点发送任意数量的消息。在 Pregel API 内部图始终被视为有向图。这意味着如果一个顶点有指向另一个顶点的出边,则消息将被发送到目标顶点。为了模拟无向图的行为,用户可以同时向源顶点和目标顶点发送相同的消息。

graph
  .pregel
  .sendMsgToDst(Pregel.src("vertexColumn")) // Sending the vertex column of the destination vertex to the source vertex.
  .sendMsgToSrc(Pregel.dst("vertexColumn")) // Sending the vertex column of the source vertex to the destination vertex.
  .run()

聚合

GraphFrames Pregel API 要求用户为消息指定聚合函数。无论方向如何,该函数都会在所有发送到顶点的消息上调用。

graph.pregel.aggMsgs(sum(Pregel.msg))

终止条件

GraphFrames Pregel API 提供以下终止条件: