Ingress: 图算法的增量计算

Ingress是一个用于增量图处理的自动化系统。它能够将批处理的顶点中心算法整体增量化为对应的增量版本,无需用户重新设计逻辑或数据结构。Ingress底层是一个配备了四种不同记忆化策略的自动化增量框架,可支持各类顶点中心计算并优化内存利用率。

动机

当今大多数图计算系统都是为静态图设计的。当输入发生变化时,这些系统必须在新图上重新计算整个算法,这既昂贵又耗时。对于拥有数万亿条边的图(如不断变化的电商关系图)而言,这种情况尤为明显。

为解决这一问题,需要采用增量图计算技术。该技术首先通过批量算法在原始图上计算结果,随后利用增量算法根据输入变化调整旧结果。现实场景中的变更通常规模较小,且旧图计算与新图重新计算之间往往存在重叠部分。这使得增量计算更加高效,因为它减少了不必要的重复计算需求。然而,现有的增量图系统存在局限性,例如需要人工干预和使用不同的记忆化策略,这对非专业用户构成了使用负担。

Ingress的设计

概述

鉴于这些挑战,我们开发了Ingress,这是一个面向顶点的自动化增量图处理系统。Ingress的整体结构如下图所示。给定一个批处理算法A,Ingress会验证A的特性并自动推导出对应的增量算法∆A。它会选择合适的记忆化引擎来记录部分或全部运行时中间状态。当接收到图更新时,Ingress会借助记忆化状态执行∆A来提供更新后的结果。

The Ingress architecture

Ingress架构。

Ingress 具备以下与以往系统不同的特性:

  • (1) Ingress 支持灵活的缓存方案,能够在四种不同的缓存策略下执行增量计算,即从批处理算法推导出增量对应版本。

  • (2) Ingress 将通用的批量顶点中心算法整体增量化为对应的增量版本。无需手动重构数据结构或批量算法的逻辑,提高了易用性。

  • (3) Ingress 还实现了高性能运行时。

消息驱动的差异化

Ingress的图处理增量模型基于消息驱动的差异化机制。 在顶点中心模型中,每个顶点v的(最终)状态由该顶点在迭代计算的不同轮次中接收到的消息决定。基于这一特性,我们可以将寻找批处理顶点中心算法两次运行间差异的问题,转化为识别消息变更的问题。对于增量计算而言,只需获取原始图和更新图上运行时产生差异的那一轮消息,然后在接收到这些变更消息的受影响区域重放计算过程以调整状态即可。之后,下一轮所需的消息变更就能轻松获得,算法只需重复上述操作直到找出并处理所有变更消息。这一机制与变更传播的理念不谋而合。

灵活的备忘录化

一种简单的记忆化策略用于检测无效和缺失消息的方法是记录所有旧消息。通过直接比较新运行中创建的消息与已记忆的消息,可以找出发生变化的那些消息。尽管该解决方案足够通用,可以增量处理所有以顶点为中心的算法,但它通常会导致巨大的内存开销,特别是对于那些需要大量轮次才能收敛的算法。

Ingress提供了一种灵活的备忘录方案,可以在四种不同的备忘录策略下执行增量更新:

  • (1) 无记忆化策略(MF),该策略不记录运行时的旧消息(例如Delta-PageRank、Delta-PHP);

  • (2) 记忆路径策略(MP),仅记录一小部分旧消息(例如SSSP、CC、SSWP);

  • (3) 记忆化顶点策略(MV),用于跟踪顶点在不同步骤间的状态(例如GCN和CommNet);

  • (4) 记忆边策略(ME),保留所有旧消息(例如GraphSAGE、GIN)。

Ingress能够自动为给定的批处理算法选择最优的记忆化策略。有关其如何决定最优记忆化策略的详细信息,请参阅2021年发表的VLDB论文

Ingress API

Ingress遵循著名的以顶点为中心(vertex-centric)模型,为用户提供编写批量顶点中心算法的API接口。在该模型中,顶点状态和边属性的模板类型分别用DW表示。用户应分别通过init_vinit_m接口设置顶点状态和消息的初始值。聚合函数通过aggregate接口实现,该接口仅包含两个输入参数。不过,如果H满足结合律,该接口可扩展为支持不同数量的输入参数。

虽然为了简化起见,aggregate函数仅支持两个输入参数,但我们还提供了另一个接口,可以接收元素向量作为输入。以顶点为中心的模型通过update接口指定更新函数,用于调整顶点状态。此外,API中的generate接口对应传播函数G,负责生成消息。

template <class D, class W> 
interface IteratorKernel {
  virtual void init_m(Vertex v, D m) = 0; 
  virtual void init_v(Vertex v, D d) = 0; 
  virtual D aggregate(D m1, D m2) = 0; 
  virtual D update(D v, D m) = 0;
  virtual D generate(D v, D m, W w) = 0; 
}

使用此API,批量SSSP算法的实现如下:

class SSSPKernel: public IteratorKernel {
  void init_m(Vertex v, double m) { m = DBL_MAX; } 
  void init_v(Vertex v, double d) {
    v.d = ((v.id == source) ? 0 : DBL_MAX); 
  }
  double aggregate(double m1, double m2) { return m1 < m2 ? m1 : m2; } 
  double update(double v, double m) { return aggregate(v, m); }
  double generate(double v, double m, double w) { return v + w; }
}

分布式运行时引擎

Ingress的分布式运行时引擎构建在GraphScope的基础模块之上。它继承了GraphScope的图存储后端和图分区策略,确保了两个系统间的无缝集成。此外,Ingress还包含多个增强功能的新模块,主要包括:

  • 顶点中心编程。 Ingress扩展了块中心编程模型以实现顶点中心编程。具体来说,Ingress在每个工作节点上生成一个新进程来处理分配的子图。它采用CSC/CSR优化的图存储结构,以实现底层图的快速查询处理。对于每个顶点,它会调用用户指定的顶点中心API来执行聚合、更新和生成计算。生成的消息会在每次迭代处理完整子图后批量发送出去。Ingress依赖消息传递接口与其他工作节点进行高效通信。

  • 数据维护。 Ingress 在原始输入图上启动初始批量运行。根据选定的记忆策略(例如仅保留收敛顶点状态的MF策略或保留有效消息的MP策略),它在批量迭代计算过程中保留计算状态。之后,Ingress 准备接受图更新并执行推导出的增量算法来更新状态。图更新可以包括边的插入和删除,以及新添加的顶点和删除的顶点。特别地,没有关联边的变更顶点被编码为仅有一个端点的"虚拟"边。此外,边属性的更改通过删除旧边和插入具有新属性的边来表示。

  • 增量处理。 Ingress从输入图更新中涉及的顶点(称为受影响顶点)开始增量计算。利用消息推导技术,对于每个受影响顶点,Ingress会根据新的边属性和保留状态生成取消消息和补偿消息。这些消息被发送到对应的邻居顶点。只有接收到消息的顶点才会被Ingress激活以执行顶点中心计算,且只有状态发生更新的顶点才能向其邻居传播新消息。该过程持续进行,直到满足收敛条件。