graphframes.lib package¶
与消息聚合一起使用的实用工具集合¶
- class graphframes.lib.AggregateMessages[source]¶
与
graphframes.GraphFrame.aggregateMessages()配合使用的实用工具集合。- 目标节点¶
目标列的引用,用于指定消息。
- 边¶
边的列引用,用于指定消息。
- 消息¶
消息列的参考,用于指定聚合函数。
- 源点¶
源列的参考,用于指定消息。
- 类 graphframes.lib.Pregel(graph: GraphFrame)[源代码]¶
实现了一个基于DataFrame操作的类似Pregel的批量同步消息传递API。
请参阅 Malewicz 等人所著的《Pregel:大规模图处理系统》 获取关于 Pregel 算法的详细说明。
您可以使用此构造函数或
graphframes.GraphFrame.pregel来构建Pregel实例, 然后使用构建器模式描述操作,最后调用run()开始运行。 它将返回最后一次迭代的顶点DataFrame。当运行开始时,它会使用通过
withVertexColumn()定义的列表达式来扩展顶点数据框。 这些额外的顶点属性可以在Pregel迭代过程中被修改。 在每个Pregel迭代中,包含三个阶段:给定每条边三元组,生成消息并指定目标顶点发送, 由
sendMsgToDst()和sendMsgToSrc()描述。按目标顶点ID聚合消息,由
aggMsgs()描述。基于来自前一次迭代的聚合消息和状态更新额外的顶点属性, 由
withVertexColumn()描述。
请查阅方法API文档中每个阶段可以引用的列。
您可以通过
setMaxIter()控制迭代次数,并查阅API文档了解高级控制选项。- Parameters:
图 – 一个
graphframes.GraphFrame对象,用于存储以DataFrames形式保存顶点和边的图结构。
- aggMsgs(aggExpr: Column) Pregel[来源]¶
定义消息在按目标顶点ID分组后如何聚合。
- Parameters:
aggExpr – 消息聚合表达式,例如 sum(Pregel.msg())。 您可以通过
msg()引用消息列,通过 col(“id”) 引用顶点ID, 但后者通常不被使用。
- static dst(colName: str) Column[来源]¶
在生成要发送的消息时引用目标顶点列。
请参阅
sendMsgToSrc()和sendMsgToDst()- Parameters:
colName – 顶点列名称。
- static edge(colName: str) Column[来源]¶
在生成要发送的消息时引用边列。
请参阅
sendMsgToSrc()和sendMsgToDst()- Parameters:
colName – 边列名称。
- static msg() Column[源代码]¶
在聚合消息和更新额外顶点列时引用消息列。
请参阅
aggMsgs()和withVertexColumn()
- sendMsgToDst(msgExpr: Column) Pregel[来源]¶
定义要发送到每条边三元组目标顶点的消息。
您可以多次调用它以发送多条消息。
请参阅方法
sendMsgToSrc()。
- sendMsgToSrc(msgExpr: Column) Pregel[源代码]¶
定义要发送给每条边三元组源顶点的消息。
您可以多次调用它以发送多条消息。
请参阅方法
sendMsgToDst()。
- setCheckpointInterval(value: int) Pregel[源代码]¶
设置两次检查点之间的迭代次数(默认值:2)。
这是一个高级控制项,用于平衡查询计划优化与检查点数据I/O成本。 在大多数情况下,您应保持默认值。
如果将此值设置为0,则禁用检查点。
- setEarlyStopping(value: bool) Pregel[来源]¶
设置是否在没有新消息发送的情况下提前停止Pregel。
提前停止允许在达到最大迭代次数之前通过检查是否存在任何非空消息来终止Pregel。 虽然在某些情况下可能获得显著的性能提升,但在其他情况下可能导致性能下降, 因为检查消息DataFrame是否为空是一个操作,需要物化Spark执行计划并执行一些额外计算。
在用户可以假设 maxIter 的良好值的情况下,建议将此值保留为默认值“false”。 在难以估计收敛所需迭代次数的情况下, 建议将此值设置为“false”,以避免在达到 maxIter 之前持续迭代直至收敛。 当此值为“true”时,可以将 maxIter 设置为更大的值而无需承担风险。
- setInitialActiveVertexExpression(value: Column) Self[源代码]¶
设置活动顶点列的初始表达式。
活动顶点列用于确定每个Pregel迭代中顶点的投票结果。 该表达式在初始顶点DataFrame上进行评估,以设置活动列的初始状态。
- Parameters:
value – 用于计算顶点初始活动状态的表达式。 在此表达式中,您可以引用所有原始顶点列。
- 设置中间存储级别(storage_level: StorageLevel) Self[来源]¶
设置中间存储级别。 在每次迭代中,Pregel 使用请求的存储级别缓存结果。
对于非常大的图,建议使用 DISK_ONLY。
- Parameters:
storage_level – 使用的存储级别。
- setSkipMessagesFromNonActiveVertices(value: bool) Self[源代码]¶
设置 Pregel 是否应跳过从非活动顶点发送消息。
当启用此选项时,标记为非活跃状态的顶点将不会发送消息。 这有助于通过避免非活跃顶点产生不必要的消息传播来优化性能。
- Parameters:
value – 布尔值。
- setStopIfAllNonActiveVertices(value: bool) Self[来源]¶
如果所有顶点都投票停止,是否应停止Pregel。
活动(或投票)基于 activity_col 确定。 有关如何设置和更新 activity_col 的详细信息,请参阅方法
setInitialActiveVertexExpression()和setUpdateActiveVertexExpression()。请注意,检查投票并非免费操作,而是一个Spark动作。如果条件实际上无法达到但仍被设置,只会减慢算法速度。
- Parameters:
value – 布尔值。
- setUpdateActiveVertexExpression(value: Column) Self[源代码]¶
设置用于更新活动顶点列的表达式。
活动顶点列用于确定每个Pregel迭代中顶点的投票结果。 该表达式在更新后的顶点DataFrame上进行评估,以设置活动列的新状态。
- Parameters:
value – 用于计算顶点新活动状态的表达式。 在此表达式中,您可以引用所有原始顶点列和附加顶点列。
- setUseLocalCheckpoints(value: bool) Self[来源]¶
设置 Pregel 是否应使用本地检查点。
本地检查点速度更快且无需配置持久存储。 同时,本地检查点的可靠性较低,可能会在执行器的本地磁盘上产生较大负载。
- Parameters:
value – boolean value.
- static src(colName: str) Column[来源]¶
在生成要发送的消息时引用源顶点列。
See
sendMsgToSrc()andsendMsgToDst()- Parameters:
colName – the vertex column name.
- withVertexColumn(colName: str, initialExpr: Column, updateAfterAggMsgsExpr: Column) Pregel[来源]¶
在运行开始时定义一个额外的顶点列,以及如何在每次迭代中更新它。
您可以多次调用它来添加多个额外的顶点列。
- Parameters:
colName – 新增顶点列的名称。 它不能是图中已存在的顶点列。
initialExpr – 用于初始化附加顶点列的表达式。 在此表达式中,您可以引用所有原始顶点列。
updateAfterAggMsgsExpr – 在消息聚合后更新附加顶点列的表达式。 您可以使用
msg()引用所有原始顶点列、附加顶点列以及 聚合后的消息列。 如果顶点未收到任何消息,消息列将为空值。