Pregel API
Pregel API 是 GraphFrames 的核心支柱之一。它基于 Pregel 算法 的实现,通过使用 Apache Spark DataFrame / Dataset API 的关系操作来实现。
注意
Be aware, that returned DataFrame is persistent and should be unpersisted manually after processing to avoid memory leaks!
应用程序接口
有关API详情,请参阅:
- Scala API: org.graphframes.lib.Pregel
- Python API: graphframes.lib.Pregel
参数
消息列
在内部,GraphFrames通过特定列处理所有消息:
val pregelMessageColumn = org.graphframes.lib.Pregel.msg
val pregelMessageColumnName = org.graphframes.lib.Pregel.MSG_COL_NAME
此名称同时应用于原始消息和聚合消息,并应在条件和聚合中使用。例如,在PageRank算法的最简单实现中,它可能如下所示:
val aggregateMessagesExpression = sum(Pregel.msg)
val updateExpression = coalesce(Pregel.msg, lit(0.0)) * (1 - alpha) + alpha / numVertices
三元组列
在消息生成阶段,GraphFrames 内部会创建三元组,其中包含源顶点、边和目标顶点的 StructType 列。这些结构体包含来自原始源、目标和边的所有列,不仅包括初始属性,还包括 Pregel 列,如活动列、所有顶点列等。要访问这些数据,用户应依赖以下 API:
val sourceVertexColumn1 = Pregel.src("vertexColumn1")
val sourceVertexColumn2 = Pregel.src("vertexColumn2")
val destinationVertexColumn1 = Pregel.dst("vertexColumn1")
val edgeWeight = Pregel.edge("weight")
在底层,传递的列名将被解析以获取三元组结构对应的元素。
发送消息
GraphFrames Pregel API 支持每个顶点发送任意数量的消息。在 Pregel API 内部图始终被视为有向图。这意味着如果一个顶点有指向另一个顶点的出边,则消息将被发送到目标顶点。为了模拟无向图的行为,用户可以同时向源顶点和目标顶点发送相同的消息。
graph
.pregel
.sendMsgToDst(Pregel.src("vertexColumn")) // Sending the vertex column of the destination vertex to the source vertex.
.sendMsgToSrc(Pregel.dst("vertexColumn")) // Sending the vertex column of the source vertex to the destination vertex.
.run()
聚合
GraphFrames Pregel API 要求用户为消息指定聚合函数。无论方向如何,该函数都会在所有发送到顶点的消息上调用。
graph.pregel.aggMsgs(sum(Pregel.msg))
终止条件
GraphFrames Pregel API 提供以下终止条件:
- 通过迭代次数。 用户可以使用
setMaxIter(value: Int)指定最大迭代次数。 - 当没有新消息发送时。 如果迭代中发送的所有消息都为空(
null),用户可以通过设置 GF 来终止计算。为此,用户应指定setEarlyStopping(value: Boolean)。请注意,因为空值检查不是免费操作,而是 Apache Spark 的 action 操作! 例如,如果消息不可能为空,此条件应设置为false。例如,在类似ShortestPaths的算法中,此条件应设置为true,但在类似PageRank的算法中,此条件应设置为false,因为消息不可能为空。 - 通过顶点投票。 用户可以通过
setInitialActiveVertexExpression(expression: Column和setUpdateActiveVertexExpression(expression: Column)为每个顶点指定参与条件。如果stopIfAllNonActiveVertices(value: Boolean)设置为true,当所有顶点都处于非活跃状态时计算将停止。这对于像LabelPropagation这样的算法非常有用,因为消息始终不为null,但如果上次迭代中没有顶点更改标签,计算应当停止。请注意,因为检查顶点状态不是免费操作,而是 Apache Spark 的 action 操作!