GraphX 编程指南
概述
GraphX 是 Spark 中用于图形和图并行计算的新组件。在高级别上,GraphX 通过引入一种新的 图 抽象来扩展 Spark RDD :一个带有附加属性的定向多重图,属性附加在每个顶点和边上。为了支持图形计算,GraphX 公开了一组基本操作符(例如 子图 、 连接顶点 和 聚合消息 ),以及 Pregel API 的优化版本。此外,GraphX 还包括一组不断增长的图形 算法 和 构建器 ,以简化图形分析任务。
开始使用
要开始,你首先需要将Spark和GraphX导入到你的项目中,如下所示:
import org.apache.spark._
import org.apache.spark.graphx._
// 为了使一些示例能够工作,我们还需要 RDD
import org.apache.spark.rdd.RDD
如果您不使用Spark shell,您还需要一个
SparkContext
。要了解更多关于如何开始使用Spark的信息,请参考
Spark快速入门指南
。
属性图
属性图
是一种有向多重图,其中每个顶点和边都附加了用户自定义对象。 有向多重图是一个有向图,它可以具有多个共享相同源和目标顶点的平行边。 支持平行边的能力简化了建模场景,在这些场景中,同一对顶点之间可能存在多种关系(例如,同事和朋友)。 每个顶点由一个
唯一的
64 位长标识符 (
VertexId
) 来标识。 GraphX 不对顶点标识符施加任何排序约束。 同样,边也具有相应的源和目标顶点标识符。
属性图是以顶点 (
VD
) 和边 (
ED
) 类型为参数化。它们分别是与每个顶点和边相关联的对象类型。
GraphX优化了顶点和边类型的表示,当它们是基本数据类型时(例如,int、double等…),通过将它们存储在专门的数组中,减少了内存占用。
在某些情况下,可能希望在同一个图中具有不同属性类型的顶点。 这可以通过继承来实现。例如,为了将用户和产品建模为一个 二分图,我们可能会这样做:
class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// 图的类型可能是:
var graph: Graph[VertexProperty, String] = null
与RDD一样,属性图是不可变的、分布式的,并且具有容错性。对图的值或结构的更改是通过生成一个具有所需更改的新图来实现的。请注意,原始图的很大一部分(即未受影响的结构、属性和索引)在新图中被重复使用,从而降低了这种本质上是函数数据结构的成本。图被划分在各个执行器之间,使用一系列顶点划分启发式算法。与RDD一样,在发生故障时,图的每个分区可以在不同的机器上重新创建。
从逻辑上讲,属性图对应于一对编码每个顶点和边属性的类型集合 (RDDs)。因此,图类包含成员以访问图的顶点和边:
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
}
类
VertexRDD[VD]
和
EdgeRDD[ED]
是
RDD[(VertexId, VD)]
和
RDD[Edge[ED]]
的扩展和优化版本。
VertexRDD[VD]
和
EdgeRDD[ED]
提供了围绕图计算构建的附加功能,并利用内部优化。我们在
顶点和边 RDD
的部分中更详细地讨论
VertexRDD
VertexRDD
和
EdgeRDD
EdgeRDD
API,但现在可以将它们视为简单的形式 RDD:
RDD[(VertexId, VD)]
和
RDD[Edge[ED]]
。
示例属性图
假设我们想构建一个包含GraphX项目中各个合作者的属性图。顶点属性可能包含用户名和职业。我们可以用描述合作者之间关系的字符串来注释边:
生成的图形将具有类型签名:
val userGraph: Graph[(String, String), String]
有很多种方法可以从原始文件、RDD 以及甚至合成生成器构建属性图,这些将在 图构建器 部分详细讨论。可能最通用的方法是使用 Graph 对象 。例如,以下代码从一组 RDDs 中构造一个图:
// 假设 SparkContext 已经被构造
val sc: SparkContext
// 为顶点创建一个 RDD
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// 为边创建一个 RDD
val relationships: RDD[Edge[String]] =
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// 定义一个默认用户以防有缺失用户的关系
val defaultUser = ("John Doe", "Missing")
// 构建初始图
val graph = Graph(users, relationships, defaultUser)
在上述示例中,我们使用了
Edge
案例类。边具有一个
srcId
和一个
dstId
,分别对应源和目标顶点标识符。此外,
Edge
类还有一个
attr
成员,用于存储边的属性。
我们可以通过分别使用
graph.vertices
和
graph.edges
成员,将图形分解为各自的顶点和边视图。
val graph: Graph[(String, String), String] // 从上面构建
// 统计所有职位为博士后(postdocs)的用户
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// 统计所有边中 src > dst 的情况
graph.edges.filter(e => e.srcId > e.dstId).count
请注意,
graph.vertices
返回一个VertexRDD[(String, String)]
,它扩展了RDD[(VertexId, (String, String))]
,所以我们使用 Scala 的case
表达式来解构元组。另一方面,graph.edges
返回一个包含Edge[String]
对象的EdgeRDD
。我们也可以使用如下的 case 类类型构造器:
图.边.过滤 { 案例 边(源, 目标, 属性) => 源 > 目标 }.计数
除了属性图的顶点和边的视图外,GraphX 还提供了三元组视图。
三元组视图在逻辑上将顶点和边的属性连接在一起,生成一个
RDD[EdgeTriplet[VD, ED]]
,其中包含
EdgeTriplet
类的实例。这个
连接
可以用以下 SQL 表达式表示:
SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id
或者图形化表示为:
类
EdgeTriplet
通过添加
srcAttr
和
dstAttr
成员来扩展
Edge
类,这两个成员分别包含源和目标属性。我们可以使用图的三元组视图来渲染描述用户之间关系的字符串集合。
val graph: Graph[(String, String), String] // 从上面构建
// 使用三元组视图创建事实的 RDD。
val facts: RDD[String] =
graph.triplets.map(triplet =>
triplet.srcAttr._1 + " 是 " + triplet.attr + " 的 " + triplet.dstAttr._1)
facts.collect.foreach(println(_))
图形操作符
正如RDD具有基本操作,如
map
、
filter
和
reduceByKey
,属性图也具有一组基本操作,这些操作接受用户定义的函数并生成具有转化后的属性和结构的新图。具有优化实现的核心操作被定义在
Graph
中,而作为核心操作组合表示的便捷操作则在
GraphOps
中定义。然而,感谢Scala隐式,
GraphOps
中的操作符会自动作为
Graph
的成员可用。例如,我们可以通过以下方式计算每个顶点的入度(在
GraphOps
中定义):
val graph: Graph[(String, String), String]
// 使用隐式的 GraphOps.inDegrees 运算符
val inDegrees: VertexRDD[Int] = graph.inDegrees
区分核心图操作和
GraphOps
的原因是能够在未来支持不同的图表示。每种图表示必须提供核心操作的实现,并重用在
GraphOps
中定义的许多有用操作。
操作符汇总列表
以下是对在
Graph
和
GraphOps
中定义的功能的快速总结,但为简化起见,展示为 Graph 的成员。注意某些函数
签名已被简化(例如,默认参数和类型约束已移除),并且一些更高级的功能已被删除,因此请查阅 API 文档以获取官方操作列表。
/** 属性图中功能的总结 */
class Graph[VD, ED] {
// 关于图的信息 ===================================================================
val numEdges: Long
val numVertices: Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
// 作为集合的图的视图 =============================================================
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
// 缓存图的函数 ==================================================================
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = false): Graph[VD, ED]
// 更改分区启发式 ==================================================================
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
// 转换顶点和边属性 ==========================================================
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
: Graph[VD, ED2]
// 修改图结构 ====================================================================
def reverse: Graph[VD, ED]
def subgraph(
epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (VertexId, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
// 与图连接 RDDs ======================================================================
def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
// 聚合关于相邻三元组的信息 ================================================================
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A]
// 迭代图并行计算 ==========================================================
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
// 基本图算法 ========================================================================
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def connectedComponents(): Graph[VertexId, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}
属性运算符
类似于RDD
map
操作符,属性图包含以下内容:
class Graph[VD, ED] {
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
每个操作符都会生成一个新的图,其中顶点或边的属性由用户定义的
map
函数修改。
请注意,在每种情况下,图形结构不受影响。这是这些操作符的一个关键特性,它允许结果图重用原始图的结构索引。以下代码片段在逻辑上是等效的,但第一个片段不保留结构索引,并且不会受益于GraphX系统优化:
val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
val newGraph = Graph(newVertices, graph.edges)
相反,使用
mapVertices
来保留索引:
val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
这些运算符通常用于初始化特定计算的图形或去除不必要的属性。例如,给定一个将出度作为顶点属性的图(我们稍后将说明如何构建这样的图),我们为PageRank进行初始化:
// 给定一个图,其中顶点属性是出度
val inputGraph: Graph[Int, String] =
graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// 构造一个图,其中每条边包含权重
// 每个顶点是初始的 PageRank
val outputGraph: Graph[Double, Double] =
inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
结构操作符
目前 GraphX 仅支持一组简单的常用结构运算符,我们期望在未来添加更多。以下是基本结构运算符的列表。
class Graph[VD, ED] {
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}
reverse
操作符返回一个新图,所有边的方向被反转。这在计算逆 PageRank 时可能非常有用。因为反转操作不会修改顶点或边的属性,也不会改变边的数量,所以可以高效地实现,而无需数据移动或复制。
这段
subgraph
操作符接受顶点和边的谓词,并返回仅包含满足顶点谓词(评估为真)的顶点和满足边谓词并连接满足顶点谓词的顶点的边的图。
subgraph
操作符可以用于多种情况,以限制图形到感兴趣的顶点和边或消除断开的链接。例如,在以下代码中,我们移除断开的链接:
// 为顶点创建一个 RDD
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
// 为边创建一个 RDD
val relationships: RDD[Edge[String]] =
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// 定义一个默认用户,以防有关系缺少用户
val defaultUser = ("John Doe", "Missing")
// 构建初始图
val graph = Graph(users, relationships, defaultUser)
// 请注意,有一个用户 0(我们没有信息)连接到用户
// 4(peter)和 5(franklin)。
graph.triplets.map(
triplet => triplet.srcAttr._1 + " 是 " + triplet.attr + " 的 " + triplet.dstAttr._1
).collect.foreach(println(_))
// 删除缺失的顶点以及与它们连接的边
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// 有效的子图将通过删除用户 0 来断开用户 4 和 5 的连接
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
triplet => triplet.srcAttr._1 + " 是 " + triplet.attr + " 的 " + triplet.dstAttr._1
).collect.foreach(println(_))
请注意,在上述示例中仅提供了顶点谓词。
subgraph
运算符在未提供顶点或边谓词时默认为true
。
这个
mask
操作符通过返回一个包含输入图中也存在的顶点和边的子图来构造子图。这可以与
subgraph
操作符结合使用,根据另一个相关图中的属性来限制图。例如,我们可能使用缺失顶点的图运行连接组件,然后将答案限制为有效的子图。
// 运行连通分量
val ccGraph = graph.connectedComponents() // 不再包含缺失字段
// 移除缺失的顶点以及与它们相连的边
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "缺失")
// 将答案限制在有效的子图中
val validCCGraph = ccGraph.mask(validGraph)
groupEdges
操作符可以合并多重图中的平行边(即,顶点对之间的重复边)。在许多数值应用中,平行边可以被
相加
(它们的权重合并)成一条边,从而减少图的大小。
连接操作符
在许多情况下,有必要将来自外部集合(RDDs)的数据与图结合。例如,我们可能有额外的用户属性,希望将其与现有图合并,或者我们可能希望从一个图中拉取顶点属性到另一个图中。这些任务可以使用 join 操作符来完成。下面我们列出关键的 join 操作符:
class Graph[VD, ED] {
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
: Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
}
(a href="api/scala/org/apache/spark/graphx/GraphOps.html#joinVertices[U](RDD[(VertexId,U)])((VertexId,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED]">
joinVertices
操作符将顶点与输入 RDD 连接,并返回一个新图,顶点属性通过将用户定义的
map
函数应用于连接顶点的结果获得。没有在 RDD 中匹配值的顶点保留其原始值。
注意,如果RDD对于给定的顶点包含多个值,则仅会使用一个。因此建议使用以下方法使输入RDD唯一,这也会 预索引 结果值,从而显著加速后续的连接。
val nonUniqueCosts: RDD[(VertexId, Double)]
val uniqueCosts: VertexRDD[Double] =
graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
(id, oldCost, extraCost) => oldCost + extraCost)
更一般的
outerJoinVertices
的行为类似于
joinVertices
,但用户定义的
map
函数应用于所有顶点,并且可以更改顶点属性类型。由于并非所有顶点在输入 RDD 中都有匹配值,因此
map
函数采用
Option
类型。例如,我们可以通过用它们的
outDegree
初始化顶点属性来为 PageRank 设置图。
val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
outDegOpt match {
case Some(outDeg) => outDeg
case None => 0 // 没有出度即为零出度
}
}
您可能注意到上面示例中使用的多参数列表(例如,
f(a)(b)
)的柯里化函数模式。虽然我们也可以将f(a)(b)
写成f(a,b)
,但这将意味着对b
的类型推断将不依赖于a
。因此,用户需要为用户定义的函数提供类型注释:
val joinedGraph = graph.joinVertices(uniqueCosts,
(id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost)
邻里聚合
许多图分析任务中的一个关键步骤是汇总关于每个顶点邻域的信息。例如,我们可能想知道每个用户有多少个关注者或每个用户关注者的平均年龄。许多迭代图算法(例如,PageRank、Shortest Path 和 connected components)反复汇总邻近顶点的属性(例如,当前的 PageRank 值、到源的最短路径和最小可达顶点 ID)。
为了提高性能,主要的聚合操作符已从
graph.mapReduceTriplets
更改为新的graph.AggregateMessages
。虽然API中的变化相对较小,但我们在下面提供了过渡指南。
汇总消息 (aggregateMessages)
GraphX中的核心聚合操作是
aggregateMessages
。
此操作符将用户定义的
sendMsg
函数应用于图中的每个
边三元组
,
然后使用
mergeMsg
函数在其目标顶点聚合这些消息。
class Graph[VD, ED] {
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[Msg]
}
用户定义的
sendMsg
函数接收一个
EdgeContext
,它暴露了源和目标属性以及边属性和函数 (
sendToSrc
和
sendToDst
) 用于向源和目标属性发送消息。认为
sendMsg
是 map-reduce 中的
map
函数。用户定义的
mergeMsg
函数接收两个目的地相同的消息,并产生一个单一消息。认为
mergeMsg
是 map-reduce 中的
reduce
函数。
aggregateMessages
操作符返回一个
VertexRDD[Msg]
,包含发送给每个顶点的聚合消息(类型为
Msg
)。没有收到消息的顶点不包含在返回的
VertexRDD
VertexRDD
中。
此外,
aggregateMessages
接受一个可选的
tripletsFields
,它指示在
EdgeContext
中访问哪些数据
(即,源顶点属性而不是目标顶点属性)。
tripletsFields
的可选项在
TripletFields
中定义,
默认值是
TripletFields.All
,这表示用户定义的
sendMsg
函数可以访问
EdgeContext
中的任何字段。
tripletFields
参数可用于通知 GraphX 仅需要部分
EdgeContext
,从而允许 GraphX 选择一种优化的连接策略。
例如,如果我们正在计算每个用户的关注者的平均年龄,我们只需要
源字段,因此我们将使用
TripletFields.Src
来指示我们
只需要源字段。
在早期版本的 GraphX 中,我们使用字节码检查来推断
TripletFields
,然而我们发现字节码检查略显不可靠,因此选择了更明确的用户控制。
在下面的例子中,我们使用
aggregateMessages
操作符来计算每个用户较资深关注者的平均年龄。
import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators
// 创建一个图,"age"作为顶点属性。
// 这里我们使用随机图以简化问题。
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// 计算较年长的追随者数量及其总年龄
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
triplet => { // 映射函数
if (triplet.srcAttr > triplet.dstAttr) {
// 向目标顶点发送包含计数器和年龄的消息
triplet.sendToDst((1, triplet.srcAttr))
}
},
// 添加计数器和年龄
(a, b) => (a._1 + b._1, a._2 + b._2) // 减少函数
)
// 将总年龄除以较年长的追随者数量以获得较年长追随者的平均年龄
val avgAgeOfOlderFollowers: VertexRDD[Double] =
olderFollowers.mapValues( (id, value) =>
value match { case (count, totalAge) => totalAge / count } )
// 显示结果
avgAgeOfOlderFollowers.collect.foreach(println(_))
当消息(和消息的总和)具有恒定大小时,
aggregateMessages
操作能够达到最佳性能(例如,使用浮点数和加法,而不是列表和连接)。
Map Reduce 三元组转换指南(遗留版)
在早期的GraphX版本中,邻域聚合是通过使用
mapReduceTriplets
操作符来实现的:
class Graph[VD, ED] {
def mapReduceTriplets[Msg](
map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
reduce: (Msg, Msg) => Msg)
: VertexRDD[Msg]
}
这个
mapReduceTriplets
操作符接受一个用户定义的映射函数,该函数应用于每个三元组,并可以生成
消息
,这些消息使用用户定义的
reduce
函数进行聚合。然而,我们发现返回的迭代器的使用成本较高,并且抑制了我们应用额外优化的能力(例如,本地顶点重编号)。在
aggregateMessages
中,我们引入了 EdgeContext,它暴露了三元组字段,并且还提供了显式发送消息到源顶点和目标顶点的功能。此外,我们移除了字节码检查,而是要求用户指明在三元组中实际上需要哪些字段。
以下代码块使用
mapReduceTriplets
:
val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
Iterator((triplet.dstId, "Hi"))
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)
可以使用
aggregateMessages
重新编写为:
val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
triplet.sendToDst("Hi")
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)
计算学位信息
一个常见的聚合任务是计算每个顶点的度:与每个顶点相邻的边的数量。在有向图的上下文中,通常需要知道每个顶点的入度、出度和总度。
GraphOps
类包含一组运算符,用于计算每个顶点的度。例如,在下面我们计算最大入度、出度和总度:
// 定义一个减少操作来计算最高度顶点
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
// 计算最大度数
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
收集邻居
在某些情况下,通过收集每个顶点的邻近顶点及其属性来表达计算可能更容易。这可以通过使用
collectNeighborIds
和
collectNeighbors
操作符轻松实现。
class GraphOps[VD, ED] {
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}
这些操作符可能非常昂贵,因为它们会复制信息并需要大量的通信。如果可能,请尝试直接使用
aggregateMessages
操作符来表达相同的计算。
缓存与取消缓存
在Spark中,RDD默认情况下不会保留在内存中。为了避免重新计算,必须在多次使用时显式地缓存它们(请参阅
Spark编程指南
)。GraphX中的图行为也一样。
在多次使用图时,请确保首先调用
Graph.cache()
。
在迭代计算中, 取消缓存 对于最佳性能也是必要的。默认情况下,缓存的 RDD 和图形将保留在内存中,直到内存压力迫使它们按照 LRU 顺序被逐出。对于迭代计算,来自前几次迭代的中间结果将填满缓存。尽管它们最终会被逐出,但存储在内存中的不必要数据会减慢垃圾回收的速度。尽早取消缓存中间结果会更高效,只要它们不再需要。这涉及到在每次迭代中物化(缓存和强制)一个图或 RDD,取消缓存所有其他数据集,并仅在未来的迭代中使用物化的数据集。然而,由于图是由多个 RDD 组成的,因此正确取消其持久性可能会很困难。 对于迭代计算,我们建议使用 Pregel API,它可以正确取消中间结果的持久性。
Pregel API
图本质上是递归数据结构,因为顶点的属性依赖于其邻居的属性,而邻居的属性又依赖于 它们 的邻居的属性。因此,许多重要的图算法迭代地重新计算每个顶点的属性,直到达到固定点条件。已经提出了一系列图并行抽象来表达这些迭代算法。GraphX 暴露了 Pregel API 的一种变体。
在高层次上,GraphX中的Pregel运算符是一种批量同步并行消息抽象, 受限于图的拓扑结构 。Pregel运算符执行一系列超级步骤,在这些步骤中,顶点接收来自上一个超级步骤的 输入消息 的 总和 ,计算顶点属性的新值,然后在下一个超级步骤中向邻近的顶点发送消息。与Pregel不同,消息是作为边三元组的函数并行计算的,消息计算可以访问源顶点和目标顶点的属性。在一个超级步骤中,未接收到消息的顶点会被跳过。当没有剩余消息时,Pregel运算符终止迭代并返回最终图。
注意,与更标准的 Pregel 实现不同,GraphX 中的顶点只能向邻近顶点发送消息,消息的构造是通过用户定义的消息函数并行完成的。这些限制使得 GraphX 内部能够进行额外的优化。
以下是 Pregel 运算符 的类型签名以及其实现的 概述 (注意:为了避免由于长继承链导致的 stackOverflowError,pregel 支持通过将“spark.graphx.pregel.checkpointInterval”设置为正数(例如 10)定期对图和消息进行检查点。并使用 SparkContext.setCheckpointDir(directory: String) 设置检查点目录):
class GraphOps[VD, ED] {
def pregel[A]
(initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
// 在每个顶点接收初始消息
var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
// 计算消息
var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
var activeMessages = messages.count()
// 循环直到没有消息剩余或达到最大迭代次数
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// 接收消息并更新顶点。
g = g.joinVertices(messages)(vprog).cache()
val oldMessages = messages
// 发送新消息,跳过没有一方接收到消息的边。我们必须缓存
// 消息,以便在下一行可以物化,从而允许我们取消缓存上一次的
// 迭代。
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
activeMessages = messages.count()
i += 1
}
g
}
}
请注意,Pregel 接受两个参数列表(即,
graph.pregel(list1)(list2)
)。第一个参数列表包含配置参数,包括初始消息、最大迭代次数以及发送消息的边的方向(默认为沿着出边)。第二个参数列表包含用户定义的接收消息的函数(顶点程序
vprog
)、计算消息的函数(
sendMsg
)和合并消息的函数
mergeMsg
。
我们可以使用Pregel运算符来表达计算,例如在下面的例子中进行单源最短路径。
import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators
// 一个包含距离的边属性的图
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attrtoDouble)
val sourceId: VertexId = 42 // 最终源
// 初始化图,使得除了根以外的所有顶点的距离为无穷大。
val initialGraph = graph.mapVertices((id, _) =>
if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // 顶点程序
triplet => { // 发送消息
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // 合并消息
)
println(sssp.vertices.collect.mkString("\n"))
图形构建器
GraphX 提供几种从 RDD 或磁盘上的顶点和边集合构建图的方法。默认情况下,没有一个图构建器会重新划分图的边;相反,边将保持在其默认分区中(例如,它们在 HDFS 中的原始块)。
Graph.groupEdges
需要图被重新划分,因为它假设相同的边会在同一个分区中共存,因此你必须在调用
Graph.partitionBy
之前调用
groupEdges
。
object GraphLoader {
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1)
: Graph[Int, Int]
}
GraphLoader.edgeListFile
提供了一种从磁盘上的边列表加载图形的方法。它解析以下形式的邻接列表(源顶点ID,目标顶点ID)对,跳过以
#
开头的注释行:
# 这是一个注释
2 1
4 1
1 2
它从指定的边创建一个
Graph
,自动创建边提到的任何顶点。所有顶点和边的属性默认为 1。
canonicalOrientation
参数允许将边重新定向到正方向 (
srcId < dstId
),这在
连接组件
算法中是必要的。
minEdgePartitions
参数指定生成的最小边分区数;如果,例如,HDFS 文件有更多块,则可能会有更多的边分区。
object Graph {
def apply[VD, ED](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null)
: Graph[VD, ED]
def fromEdges[VD, ED](
edges: RDD[Edge[ED]],
defaultValue: VD): Graph[VD, ED]
def fromEdgeTuples[VD](
rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]
}
Graph.apply
允许从顶点和边的 RDD 中创建图。重复的顶点会被任意选择,边 RDD 中找到但在顶点 RDD 中不存在的顶点将被赋予默认属性。
Graph.fromEdges
允许仅通过边的 RDD 来创建图,自动创建边中提到的任何顶点并为它们赋予默认值。
Graph.fromEdgeTuples
允许仅从一个边元组的 RDD 创建图,给边赋值为 1,并自动创建被边提及的任何顶点并赋予它们默认值。它还支持边的去重;要去重,请将
Some
的
PartitionStrategy
作为
uniqueEdges
参数传递(例如,
uniqueEdges = Some(PartitionStrategy.RandomVertexCut)
)。一个分区策略是必要的,以便将相同的边放置在同一分区上,以便它们可以去重。
顶点和边 RDDs
GraphX 暴露了存储在图中的顶点和边的
RDD
视图。然而,由于 GraphX 以优化的数据结构来维护顶点和边,这些数据结构提供了额外的功能,因此顶点和边分别返回为
VertexRDD
VertexRDD
和
EdgeRDD
EdgeRDD
。在这一部分,我们回顾这些类型的一些额外有用功能。请注意,这只是一个不完整的列表,官方操作列表请参考 API 文档。
顶点RDDs
VertexRDD[A]
扩展了
RDD[(VertexId, A)]
,并增加了额外的约束,即每个
VertexId
只出现
一次
。此外,
VertexRDD[A]
表示一个具有属性类型为
A
的
顶点集合
。在内部,这通过将顶点属性存储在可重用的哈希映射数据结构中实现。因此,如果两个
VertexRDD
是从同一个基础
VertexRDD
VertexRDD
派生的(例如,通过
filter
或
mapValues
),则可以在不进行哈希评估的情况下以恒定时间连接它们。为了利用这个索引数据结构,
VertexRDD
VertexRDD
提供了以下额外功能:
class VertexRDD[VD] extends RDD[(VertexId, VD)] {
// 过滤顶点集合但保留内部索引
def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
// 转换值而不改变id(保留内部索引)
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
// 仅显示根据它们的 VertexId 唯一的顶点
def minus(other: RDD[(VertexId, VD)])
// 从这个集合中移除出现在其他集合中的顶点
def diff(other: VertexRDD[VD]): VertexRDD[VD]
// 使用利用内部索引加速连接的连接操作(显著加快)
def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
// 使用此 RDD 上的索引加速输入 RDD 的 `reduceByKey` 操作。
def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}
请注意,例如,
filter
操作符返回一个
VertexRDD
VertexRDD
。Filter 实际上是使用
BitSet
实现的,从而重用索引并保持与其他
VertexRDD
进行快速连接的能力。同样,
mapValues
操作符不允许
map
函数更改
VertexId
,从而使得同样的
HashMap
数据结构可以被重用。
leftJoin
和
innerJoin
都能够辨识何时连接两个来自同一
HashMap
的
VertexRDD
,并通过线性扫描而不是代价高昂的点查找来实现连接。
这个
aggregateUsingIndex
操作符对于高效构建新的
VertexRDD
VertexRDD
非常有用,它是基于一个
RDD[(VertexId, A)]
。 从概念上讲,如果我已经构建了一个
VertexRDD[B]
,它是某个
RDD[(VertexId, A)]
中顶点的超集,那么我可以重用该索引来对
RDD[(VertexId, A)]
进行聚合然后随后索引。 例如:
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// rddB 中应该有 200 条记录
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// setB 中应该有 100 条记录
setB.count
// A 和 B 的连接现在应该很快!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
边RDD
扩展了
RDD[Edge[ED]]
的
EdgeRDD[ED]
将边按照使用
PartitionStrategy
中定义的各种分区策略之一进行分块组织。在每个分区内,边属性和邻接结构是分开存储的,这样在更改属性值时可以实现最大程度的重用。
EdgeRDD
EdgeRDD
暴露的三个附加功能是:
// 转换边属性,同时保持结构
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// 反转边,重用属性和结构
def reverse: EdgeRDD[ED]
// 联接两个使用相同分区策略的 `EdgeRDD`。
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
在大多数应用中,我们发现对
EdgeRDD
EdgeRDD
的操作是通过图形操作符完成的,或者依赖于在基础
RDD
类中定义的操作。
优化表示
虽然对GraphX在分布式图表示中使用的优化进行详细描述超出了本指南的范围,但对高层次的理解可能有助于可扩展算法的设计以及API的最佳使用。GraphX采用了顶点切割方法进行分布式图的分区:
与其沿着边划分图,GraphX 更倾向于沿着顶点划分图,这可以减少通信和存储开销。从逻辑上讲,这对应于将边分配给机器,并允许顶点跨多个机器。边的具体分配方法取决于
PartitionStrategy
,并且各种启发式方法之间会有一些权衡。用户可以通过使用
Graph.partitionBy
操作符来选择不同的策略重新划分图。默认的划分策略是使用图构造时提供的边的初始划分。然而,用户可以轻松切换到 2D 划分或 GraphX 中包含的其他启发式方法。
一旦边被划分,图并行计算的关键挑战是高效地将顶点属性与边连接起来。由于现实世界中的图通常具有比顶点更多的边,我们将顶点属性移到边上。由于并非所有的分区都包含与所有顶点相邻的边,我们内部维护一个路由表,以识别在实现诸如
triplets
和
aggregateMessages
等操作所需的连接时,应该在哪里广播顶点。
图算法
GraphX 包含一组图算法以简化分析任务。这些算法包含在
org.apache.spark.graphx.lib
包中,可以通过
Graph
直接作为方法访问,方法是通过
GraphOps
。本节描述了这些算法及其使用方法。
页面排名
PageRank 衡量图中每个顶点的重要性,假设从 u 到 v 的边表示 u 对 v 的重要性 endorsement。例如,如果一个 Twitter 用户被许多人关注,该用户将被高度排名。
GraphX 提供了 PageRank 的静态和动态实现,作为
PageRank
对象
的方法。静态 PageRank 运行固定次数的迭代,而动态 PageRank 则运行直到排名收敛(即,停止改变超过指定的容差)。
GraphOps
允许直接调用这些算法,作为
Graph
上的方法。
GraphX 还包括一个示例社交网络数据集,我们可以在其上运行 PageRank。用户集在
data/graphx/users.txt
中给出,用户之间的关系集在
data/graphx/followers.txt
中给出。我们按照以下方式计算每个用户的 PageRank:
import org.apache.spark.graphx.GraphLoader
// 将边加载为图
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// 运行 PageRank
val ranks = graph.pageRank(0.0001).vertices
// 将排名与用户名连接
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
// 打印结果
println(ranksByUsername.collect().mkString("\n"))
连通组件
连通分量算法用图中编号最低的顶点的ID标记每个连通分量。例如,在社交网络中,连通分量可以近似表示聚类。GraphX 包含实现该算法的
ConnectedComponents
对象
,我们从
PageRank 部分
计算示例社交网络数据集的连通分量,如下所示:
import org.apache.spark.graphx.GraphLoader
// 按照PageRank示例加载图
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// 查找连通分量
val cc = graph.connectedComponents().vertices
// 将连通分量与用户名连接
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
case (id, (username, cc)) => (username, cc)
}
// 打印结果
println(ccByUsername.collect().mkString("\n"))
三角形计数
当一个顶点有两个相邻的顶点并且它们之间有一条边时,它是三角形的一部分。GraphX 在
TriangleCount
对象
中实现了一个三角形计数算法,该算法确定通过每个顶点的三角形数量,从而提供聚类的度量。我们计算来自
PageRank 部分
的社交网络数据集的三角形计数。
请注意,
TriangleCount
要求边以规范方向排列 (
srcId < dstId
),并且图必须使用
Graph.partitionBy
进行分区。
import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
// 以标准顺序加载边并为三角计数分区图
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
.partitionBy(PartitionStrategy.RandomVertexCut)
// 为每个顶点查找三角计数
val triCounts = graph.triangleCount().vertices
// 将三角计数与用户名进行连接
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
(username, tc)
}
// 打印结果
println(triCountByUsername.collect().mkString("\n"))
示例
假设我想从一些文本文件构建一个图,将图限制在重要的关系和用户上,在子图上运行页面排名,最后返回与顶级用户相关的属性。我可以仅用几行代码在GraphX中完成所有这些:
import org.apache.spark.graphx.GraphLoader
// 加载我的用户数据并解析为用户ID和属性列表的元组
val users = (sc.textFile("data/graphx/users.txt")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
// 解析已经是用户ID -> 用户ID格式的边数据
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// 附加用户属性
val graph = followerGraph.outerJoinVertices(users) {
case (uid, deg, Some(attrList)) => attrList
// 一些用户可能没有属性,因此我们将它们设置为空
case (uid, deg, None) => Array.empty[String]
}
// 将图限制为具有用户名和名称的用户
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
// 计算 PageRank
val pagerankGraph = subgraph.pageRank(0.001)
// 获取排名前的用户的属性
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
case (uid, attrList, Some(pr)) => (pr, attrList.toList)
case (uid, attrList, None) => (0.0, attrList.toList)
}
println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))