graphframes 包

子包

目录

graphframes.GraphFrame(v: DataFrame, e: DataFrame)[源代码]

表示一个以数据帧形式存储顶点和边的图。

Parameters:
  • vDataFrame 包含顶点信息。 必须包含一个名为“id”的列,用于存储唯一的顶点标识符。

  • eDataFrame 存储边信息。 必须包含两列 "src" 和 "dst",分别存储边的源顶点ID和目标顶点ID。

>>> localVertices = [(1,"A"), (2,"B"), (3, "C")]
>>> localEdges = [(1,2,"love"), (2,1,"hate"), (2,3,"follow")]
>>> v = spark.createDataFrame(localVertices, ["id", "name"])
>>> e = spark.createDataFrame(localEdges, ["src", "dst", "action"])
>>> g = GraphFrame(v, e)
目标节点: str = 'dst'
: str = 'edge'
标识符 str = 'id'
源节点 str = 'src'
aggregateMessages(aggCol: list[Column | str] | Column, sendToSrc: list[Column | str] | Column | str | None = None, sendToDst: list[Column | str] | Column | str | None = None, intermediate_storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[来源]

聚合来自邻居的消息。

在指定消息和聚合函数时,用户可以通过graphframes.lib.AggregateMessages中的静态方法引用列。

更多详情请参阅 Scala 文档。

警告!此方法的结果是持久化的DataFrame对象!用户应处理取消持久化以避免可能的内存泄漏!

Parameters:
  • aggCol – 请求的聚合输出,可以是 pyspark.sql.Column 的集合或 SQL 表达式字符串

  • sendToSrc – 发送至每个三元组源顶点的消息,可以是 pyspark.sql.Column 集合或 SQL 表达式字符串(默认值:None)

  • sendToDst – 发送到每个三元组目标顶点的消息,可以是 pyspark.sql.Column 的集合或 SQL 表达式字符串(默认值:None)

  • intermediate_storage_level – 将用于中间结果和输出的中间存储级别。

Returns:

持久化的DataFrame,包含顶点ID列和聚合结果消息列。 生成的消息列名称基于所提供aggCol的别名!

as_undirected() GraphFrame[源代码]

通过确保所有有向边都是双向的,将有向图转换为无向图。对于每个有向边 (src, dst),都会添加一个对应的边 (dst, src)。

Returns:

一个表示无向图的新 GraphFrame。

广度优先搜索(fromExpr: str, toExpr: str, edgeFilter: str | None = None, maxPathLength: int = 10) DataFrame[源代码]

广度优先搜索(BFS)。

更多详情请参阅 Scala 文档。

Returns:

对于每对匹配顶点之间的最短路径,DataFrame 中会有一行数据。

缓存() GraphFrame[源代码]

使用默认存储级别持久化图的顶点和边的数据框表示。

connectedComponents(algorithm: str = 'graphframes', checkpointInterval: int = 2, broadcastThreshold: int = 1000000, useLabelsAsComponents: bool = False, use_local_checkpoints: bool = False, max_iter: int = 31, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[源代码]

计算图的连通分量。

更多详情请参阅 Scala 文档。

Parameters:
  • algorithm – 使用的连通分量算法(默认:"graphframes") 支持的算法包括 "graphframes" 和 "graphx"。

  • checkpointInterval – 以迭代次数为单位的检查点间隔(默认值:2)

  • broadcastThreshold – 传播组件分配时的广播阈值 (默认值:1000000)。传递-1将禁用手动广播并 允许AQE处理倾斜连接。此模式速度更快 且推荐使用。在GraphFrames的未来版本中 默认值可能会更改为-1。

  • useLabelsAsComponents – 如果为True,则使用顶点标签作为组件,否则将使用长整型数值

  • use_local_checkpoints – 是否应使用本地检查点,默认为 false; 本地检查点速度更快且无需设置持久化的 checkpointDir; 另一方面,本地检查点可靠性较低,且要求执行器具有足够大的本地磁盘。

  • storage_level – 中间和最终数据帧的存储级别。

Returns:

带有新顶点列“component”的DataFrame

属性 degrees: DataFrame
图中每个顶点的度数,返回为一个包含两列的Data框:
  • “id”:顶点的ID

  • ‘degree’(整数)顶点的度数

请注意,结果中不会返回边数为0的顶点。

Returns:

带有新顶点列“degree”的DataFrame

检测循环(checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[源代码]

查找图中的所有循环。

Rocha–Thatte 循环检测算法的一种实现。 Rocha, Rodrigo Caetano 与 Bhalchandra D. Thatte 合著。《大规模稀疏图中的分布式循环检测》。 巴西运筹学研讨会论文集 (SBPO’15) (2015): 1-11.

返回一个包含唯一循环的DataFrame。

Parameters:
  • checkpoint_interval – Pregel 检查点间隔,默认为 2

  • use_local_checkpoints – 是否应使用本地检查点而非 checkpointDir

Storage_level:

中间结果和输出DataFrame的存储级别

Returns:

包含所有循环的持久化DataFrame

dropIsolatedVertices() GraphFrame[源代码]

移除孤立顶点,即不包含在任何边中的顶点。

Returns:

带有筛选顶点的GraphFrame。

属性 edges DataFrame

DataFrame 存储边信息的数据框,其中包含唯一列“src”和“dst”,分别存储边的源顶点ID和目标顶点ID。

filterEdges(condition: str | Column) GraphFrame[源代码]

根据表达式过滤边,保留所有顶点。

Parameters:

condition – 用于描述过滤条件的字符串或列表达式。

Returns:

带有筛选边的GraphFrame。

filterVertices(condition: str | Column) GraphFrame[源代码]

根据表达式筛选顶点,移除包含任何被丢弃顶点的边。

Parameters:

condition – 用于描述过滤条件的字符串或列表达式。

Returns:

带有筛选顶点和边的GraphFrame。

查找(pattern: str) DataFrame[来源]

模式发现。

更多详情请参阅 Scala 文档。

Parameters:

模式 – 描述要搜索的图模式的字符串。

Returns:

为每个找到的图案实例提供一个数据行的DataFrame

属性 inDegrees DataFrame
图中每个顶点的入度,返回为一个包含两列的DataFrame:
  • “id”:顶点的ID

  • “inDegree”(整数)存储顶点的入度

请注意,结果中不会返回入度为0的顶点。

Returns:

带有新顶点列“inDegree”的DataFrame

k_core(checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[来源]

k-核是指每个顶点至少具有度k的最大子图。 k-核度量是基于节点度数及其邻居度数来衡量节点在网络中中心性的指标。 具有较高k-核值的节点被认为在网络中更具中心性和影响力。

该实现基于以下论文中描述的算法: Mandal, Aritra, and Mohammad Al Hasan. “A distributed k-core decomposition algorithm on spark.” 2017 IEEE International Conference on Big Data (Big Data). IEEE, 2017.

Parameters:
  • checkpoint_interval – Pregel checkpoint interval, default is 2

  • use_local_checkpoints – should local checkpoints be used instead of checkpointDir

  • storage_level – 用于存储中间结果和输出DataFrame的存储级别

Returns:

持久化的数据帧,包含ID和k核值(列“kcore”)

标签传播(maxIter: int, algorithm: str = 'graphx', use_local_checkpoints: bool = False, checkpoint_interval: int = 2, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[源代码]

运行静态标签传播以检测网络中的社区。

更多详情请参阅 Scala 文档。

Parameters:
  • maxIter – 要执行的迭代次数

  • 算法 – 使用的实现方式,可选值为“graphframes”和“graphx”; “graphx”适用于中小型图且速度更快, “graphframes”所需内存更少

  • use_local_checkpoints – should local checkpoints be used, default false; local checkpoints are faster and does not require to set a persistent checkpointDir; from the other side, local checkpoints are less reliable and require executors to have big enough local disks.

  • storage_level – storage level for both intermediate and final dataframes.

Checkpoint_interval:

中间结果应多久进行一次检查点保存; 此处使用较大值可能因算法的迭代性质而导致逻辑计划急剧增长。

Returns:

持久化的 DataFrame 带有新的顶点列“label”

最大独立集(seed: int = 42, checkpoint_interval: int = 2, use_local_checkpoints: bool = False, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) DataFrame[源代码]

此方法实现了一种在图中寻找最大独立集(MIS)的分布式算法。

一个MIS(最大独立集)是一组顶点的集合,其中集合内的任意两个顶点都不相邻(即集合中任意两个顶点之间没有边),并且该集合是最大的,意味着向集合中添加任何其他顶点都会违反独立性属性。请注意,此实现找到的是一个最大(但不一定是全局最大)独立集;也就是说,它确保无法向集合中添加更多顶点,但不能保证该集合在图的所有可能独立集中具有最大可能的顶点数量。

这里实现的算法基于以下论文:Ghaffari, Mohsen. 《一种改进的分布式最大独立集算法》。第二十七届ACM-SIAM离散算法研讨会论文集。工业与应用数学学会,2016年。

注意:这是一个随机化、非确定性的算法。由于 Apache Spark 的工作方式,即使提供了固定的随机种子,不同运行之间的结果也可能有所不同。

Parameters:
  • seed – 算法中用于打破平局的随机种子(默认值:42)

  • checkpoint_interval – 以迭代次数为单位的检查点间隔(默认值:2)

  • use_local_checkpoints – 是否使用本地检查点(默认:False); 本地检查点速度更快且无需设置持久化检查点目录;然而,它们可靠性较低, 且要求执行器具有足够的本地磁盘空间。

  • storage_level – 用于中间和最终DataFrames的存储级别 (默认:MEMORY_AND_DISK_DESER)

Returns:

带有新顶点列“selected”的DataFrame,其中“true”表示该顶点是最大独立集的一部分

属性 节点 DataFrame

顶点的别名。

属性 outDegrees: DataFrame
图中每个顶点的出度,返回为一个包含两列的Data帧:
  • “id”:顶点的ID

  • “outDegree”(整数)存储顶点的出度

请注意,出边数为0的顶点不会在结果中返回。

Returns:

带有新顶点列“outDegree”的DataFrame

页面排名算法(resetProbability: float = 0.15, sourceId: Any | None = None, maxIter: int | None = None, tol: float | None = None) GraphFrame[源代码]

在图结构上运行PageRank算法。 注意:必须设置fixed_num_iter或tolerance中的其中一个参数。

更多详情请参阅 Scala 文档。

Parameters:
  • resetProbability – 重置到随机顶点的概率。

  • sourceId – (可选) 个性化PageRank的源顶点。

  • maxIter – 如果设置此参数,算法将运行固定的迭代次数。如果设置了 tol 参数,则不能设置此参数。

  • tol – 如果设置,算法将运行至达到给定容差。 如果设置了 numIter 参数,则可能无法设置此项。

Returns:

GraphFrame 包含新的顶点列“pagerank”和新的边列“weight”

parallelPersonalizedPageRank(resetProbability: float = 0.15, sourceIds: list[Any] | None = None, maxIter: int | None = None) GraphFrame[来源]

在图上运行个性化PageRank算法,从提供的源列表并行执行固定次数的迭代。

更多详情请参阅 Scala 文档。

Parameters:
  • resetProbability – 重置到随机顶点的概率

  • sourceIds – 个性化PageRank的源顶点

  • maxIter – 此算法运行的固定迭代次数

Returns:

GraphFrame 包含新的顶点列“pageranks”和新的边列“weight”

持久化(storageLevel: StorageLevel = StorageLevel(False, True, False, False, 1)) GraphFrame[源代码]

使用给定的存储级别持久化图的顶点和边的数据框表示。

幂迭代聚类(k: int, maxIter: int, weightCol: str | None = None) DataFrame[来源]

幂迭代聚类(PIC),由Lin和Cohen开发的可扩展图聚类算法。 从摘要中可知:PIC通过对数据的归一化成对相似度矩阵进行截断幂迭代,找到数据集的极低维嵌入。

Parameters:
  • k – 要创建的聚类数量

  • maxIter – 最大迭代次数的参数(>= 0)

  • weightCol – 可选的权重列名称,如未提供则默认使用1.0

Returns:

带有新列“cluster”的DataFrame

属性 pregel Pregel

获取用于运行pregel的graphframes.classic.pregel.Pregel 或 :class`graphframes.connect.graphframes_client.Pregel` 对象。

更多详情请参阅 graphframes.lib.Pregel

最短路径(landmarks: list[str | int], algorithm: str = 'graphx', use_local_checkpoints: bool = False, checkpoint_interval: int = 2, storage_level: StorageLevel = StorageLevel(True, True, False, True, 1), is_directed: bool = True) DataFrame[来源]

从图中的一组地标顶点运行最短路径算法。

更多详情请参阅 Scala 文档。

Parameters:
  • 地标 – 一个或多个地标的集合

  • algorithm – implementation to use, posible values are “graphframes” and “graphx”; “graphx” is faster for small-medium sized graphs, “graphframes” requires less amount of memory

  • use_local_checkpoints – should local checkpoints be used, default false; local checkpoints are faster and does not require to set a persistent checkpointDir; from the other side, local checkpoints are less reliable and require executors to have big enough local disks.

  • checkpoint_interval – 中间结果应多久进行一次检查点保存; 由于算法的迭代性质,此处使用较大值可能导致逻辑计划急剧增长。

  • storage_level – storage level for both intermediate and final dataframes.

  • is_directed – 算法应查找有向路径还是任意路径。

Returns:

持久化 DataFrame,包含新的顶点列“距离”

stronglyConnectedComponents(maxIter: int) DataFrame[源代码]

在此图上运行强连通分量算法。

更多详情请参阅 Scala 文档。

Parameters:

maxIter – 运行的迭代次数

Returns:

带有新顶点列“component”的DataFrame

svdPlusPlus(rank: int = 10, maxIter: int = 2, minValue: float = 0.0, maxValue: float = 5.0, gamma1: float = 0.007, gamma2: float = 0.007, gamma6: float = 0.005, gamma7: float = 0.015) tuple[DataFrame, float][源代码]

运行SVD++算法。

更多详情请参阅 Scala 文档。

Returns:

包含存储学习模型的新顶点列的DataFrame元组,以及损失值

triangleCount(storage_level: StorageLevel) DataFrame[来源]

计算此图中通过每个顶点的三角形数量。 此实现基于计算顶点邻域的交集。它需要收集每个顶点的完整邻域信息。 在具有幂律度分布的图(存在少量极高度数顶点的图)上,可能因内存错误而失败。 对于这种情况,可考虑采用边采样方法来获取三角形的近似计数。

Parameters:

storage_level – 用于中间和最终数据帧的存储级别。

Returns:

带有新顶点列“count”的DataFrame

属性 三元组: DataFrame

图中所有边的三元组(源顶点)-[边]->(目标顶点)。

返回为一个包含三列的 DataFrame
  • “src”: 源顶点,其模式与‘vertices’匹配

  • “edge”: 与‘edges’模式匹配的边

  • ‘dst’: 目标顶点,其模式与‘vertices’匹配

Returns:

包含列 'src'、'edge' 和 'dst' 的 DataFrame

类型度(edge_type_col: str, edge_types: List[Any] | None = None) DataFrame[来源]

每条边类型下每个顶点的总度数(包括入度和出度),返回为一个包含两列的DataFrame:

  • “id”:顶点的ID

  • “degrees”:一个结构体,每个边类型对应一个字段,存储总度数计数

Parameters:
  • edge_type_col – 边数据框中包含边类型信息的列名

  • edge_types – 可选的边类型值列表。如果为None,边类型将

自动发现。 :return: 包含列“id”和“degrees”(结构类型)的DataFrame

类型入度(edge_type_col: str, edge_types: List[Any] | None = None) DataFrame[源代码]
每个顶点按边类型的入度,返回为一个包含两列的DataFrame:
  • “id”:顶点的ID

  • “inDegrees”:一个结构体,每个边类型对应一个字段,存储入度计数

Parameters:
  • edge_type_col – 边数据框中包含边类型信息的列名

  • edge_types – 可选的边类型值列表。如果为None,边类型将

自动发现。 :return: 包含列“id”和“inDegrees”(结构类型)的DataFrame

类型出度(edge_type_col: str, edge_types: List[Any] | None = None) DataFrame[源代码]
每个顶点按边类型的出度,返回为一个包含两列的DataFrame:
  • “id”:顶点的ID

  • “outDegrees”:一个结构体,包含每个边类型的字段,用于存储出度计数

Parameters:
  • edge_type_col – 边数据框中包含边类型信息的列名

  • edge_types – 可选的边类型值列表。如果为None,边类型将会

自动发现。 :return: 包含列“id”和“outDegrees”(结构类型)的DataFrame

取消持久化(blocking: bool = False) GraphFrame[源代码]

将图的顶点和边的数据框表示标记为非持久化, 并从内存和磁盘中移除其所有数据块。

验证(check_vertices: bool = True, intermediate_storage_level: StorageLevel = StorageLevel(True, True, False, True, 1)) None[来源]

通过检查顶点和边来验证图的一致性和完整性。

Parameters:
  • check_vertices – 一个标志,用于指示是否应执行额外的顶点一致性检查。 如果为true,该方法将验证顶点DataFrame中的所有顶点是否都在边DataFrame中有所体现,反之亦然。在大型图上运行速度较慢。

  • intermediate_storage_level – 在验证过程中持久化中间DataFrame计算时使用的存储级别。

Returns:

单元,因为该方法执行验证检查并在验证失败时抛出异常。

引发

ValueError – 如果图中存在任何不一致之处,例如重复顶点、边与顶点DataFrame之间的顶点不匹配或缺少连接。

属性 vertices: DataFrame

DataFrame 保存顶点信息,其中包含用于顶点ID的唯一列“id”。