模式发现

图模式查找指的是在图中搜索结构模式。要查看实际应用示例,请查阅图模式查找教程

GraphFrame 模式查找使用一种简单的领域特定语言(DSL)来表达结构查询。例如,graph.find("(a)-[e1]->(b); (b)-[e2]->(a)")graph.find("(a)<-[e]->(b)") 将搜索通过边双向连接的顶点对 a,b。它将返回图中所有此类结构的 DataFrame,其中包含模式中每个命名元素(顶点或边)的列。在这种情况下,返回的列将是 "a, b, e1, e2."

用于表达结构模式的领域特定语言:

限制条件:

更复杂的查询,例如对顶点或边属性进行操作的查询, 可以通过对结果DataFrame应用过滤器来表达。

这可能会返回重复的行。例如,查询 "(u)-[]->()" 将为每个匹配的边返回结果,即使这些边共享相同的顶点 u

Python API

有关API详情,请参阅 graphframes.GraphFrame.find

from graphframes.examples import Graphs

g = Graphs(spark).friends()  # Get example graph

# Search for pairs of vertices with edges in both directions between them
motifs = g.find("(a)-[e1]->(b); (b)-[e2]->(a)")
motifs.show()

# More complex queries can be expressed by applying filters
motifs.filter("b.age > 30").show()

Scala API

有关API详情,请参阅org.graphframes.GraphFrame

import org.apache.spark.sql.DataFrame
import org.graphframes.{examples,GraphFrame}

val g: GraphFrame = examples.Graphs.friends  // get example graph

// Search for pairs of vertices with edges in both directions between them.
val motifs: DataFrame = g.find("(a)-[e1]->(b); (b)-[e2]->(a)")
motifs.show()

// More complex queries can be expressed by applying filters.
motifs.filter("b.age > 30").show()

示例

许多模式查询是无状态的且易于表达,正如上述示例所示。接下来的示例展示了更复杂的查询,这些查询沿着模式路径携带状态。这些查询可以通过将GraphFrame模式查找与结果筛选器相结合来表达,其中筛选器使用序列操作来构建一系列DataFrame Column

例如,假设我们希望识别具有由一系列函数定义的某种属性的4个顶点链。也就是说,在4个顶点链 a->b->c->d 中,识别匹配此复杂筛选条件的链子集:

以下代码片段演示了这一过程,我们识别由4个顶点组成的链, 其中至少3条边中有2条是"朋友"关系。 在此示例中,状态是当前"朋友"边的计数;通常,它可以是任何 DataFrame Column

Python API

from pyspark.sql.functions import col, lit, when
from pyspark.sql.types import IntegerType
from graphframes.examples import Graphs


g = Graphs(spark).friends()  # Get example graph

chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

# Query on sequence, with state (cnt)
# (a) Define method for updating state given the next element of the motif
sumFriends =\
  lambda cnt,relationship: when(relationship == "friend", cnt+1).otherwise(cnt)

# (b) Use sequence operation to apply method to sequence of elements in motif
# In this case, the elements are the 3 edges
condition =\
  reduce(lambda cnt,e: sumFriends(cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0))

# (c) Apply filter to DataFrame
chainWith2Friends2 = chain4.where(condition >= 2)
chainWith2Friends2.show()

Scala API

import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions.{col, when}
import org.graphframes.{examples,GraphFrame}

val g: GraphFrame = examples.Graphs.friends  // get example graph

// Find chains of 4 vertices.
val chain4: DataFrame = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
chain4.show()

// Query on sequence, with state (cnt)
//  (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
  when(relationship === "friend", cnt + 1).otherwise(cnt)
}
//  (b) Use sequence operation to apply method to sequence of elements in motif.
//      In this case, the elements are the 3 edges.
val condition = { Seq("ab", "bc", "cd")
  .foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship"))) }
//  (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
chainWith2Friends2.show()