消息传递
与 GraphX 类似,GraphFrames 提供了用于开发图算法的基本构件。两个关键组件是:
aggregateMessages: 在顶点之间发送消息,并为每个顶点聚合消息。GraphFrames 提供了一个使用 DataFrame 操作实现的原生aggregateMessages方法。这可以类比于 GraphX API 使用。- 连接:将消息聚合与原始图进行连接。GraphFrames 依赖于
DataFrame连接,这提供了 GraphX 连接的全部功能。
注意
请注意,返回的 DataFrame 是持久化的,在处理后应手动取消持久化以避免内存泄漏!
以下代码片段展示了如何使用 aggregateMessages 来计算相邻用户的年龄总和。
Python API
有关API详情,请参阅 graphframes.GraphFrame.aggregateMessages。
from graphframes.lib import AggregateMessages as AM
from graphframes.examples import Graphs
from pyspark.sql.functions import sum as sqlsum
g = Graphs(spark).friends() # Get example graph
# For each user, sum the ages of the adjacent users
msgToSrc = AM.dst["age"]
msgToDst = AM.src["age"]
agg = g.aggregateMessages(
sqlsum(AM.msg).alias("summedAges"),
sendToSrc=msgToSrc,
sendToDst=msgToDst)
agg.show()
Scala API
有关API详情,请参阅org.graphframes.lib.AggregateMessages。
import org.graphframes.{examples,GraphFrame}
import org.graphframes.lib.AggregateMessages
val g: GraphFrame = examples.Graphs.friends // get example graph
// We will use AggregateMessages utilities later, so name it "AM" for short.
val AM = AggregateMessages
// For each user, sum the ages of the adjacent users.
val msgToSrc = AM.dst("age")
val msgToDst = AM.src("age")
val agg = { g.aggregateMessages
.sendToSrc(msgToSrc) // send destination user's age to source
.sendToDst(msgToDst) // send source user's age to destination
.agg(sum(AM.msg).as("summedAges")) } // sum up ages, stored in AM.msg column
agg.show()
有关更复杂的示例,请查看用于实现 graphframes.examples.BeliefPropagation 的代码。