消息传递

与 GraphX 类似,GraphFrames 提供了用于开发图算法的基本构件。两个关键组件是:


注意

请注意,返回的 DataFrame 是持久化的,在处理后应手动取消持久化以避免内存泄漏!


以下代码片段展示了如何使用 aggregateMessages 来计算相邻用户的年龄总和。

Python API

有关API详情,请参阅 graphframes.GraphFrame.aggregateMessages

from graphframes.lib import AggregateMessages as AM
from graphframes.examples import Graphs
from pyspark.sql.functions import sum as sqlsum


g = Graphs(spark).friends()  # Get example graph

# For each user, sum the ages of the adjacent users
msgToSrc = AM.dst["age"]
msgToDst = AM.src["age"]
agg = g.aggregateMessages(
    sqlsum(AM.msg).alias("summedAges"),
    sendToSrc=msgToSrc,
    sendToDst=msgToDst)
agg.show()

Scala API

有关API详情,请参阅org.graphframes.lib.AggregateMessages

import org.graphframes.{examples,GraphFrame}
import org.graphframes.lib.AggregateMessages

val g: GraphFrame = examples.Graphs.friends  // get example graph

// We will use AggregateMessages utilities later, so name it "AM" for short.
val AM = AggregateMessages

// For each user, sum the ages of the adjacent users.
val msgToSrc = AM.dst("age")
val msgToDst = AM.src("age")
val agg = { g.aggregateMessages
  .sendToSrc(msgToSrc)  // send destination user's age to source
  .sendToDst(msgToDst)  // send source user's age to destination
  .agg(sum(AM.msg).as("summedAges")) } // sum up ages, stored in AM.msg column
agg.show()

有关更复杂的示例,请查看用于实现 graphframes.examples.BeliefPropagation 的代码。