编程模型:PIE

尽管顶点中心编程模型能够表达各种图分析算法,但现有的顺序(单机)图算法需要经过修改才能遵循"像顶点一样思考"的原则。这使得并行图计算成为仅限有经验用户的特权。此外,在许多情况下,使用顶点中心模型的图算法性能并不理想。每个顶点仅掌握其一跳邻居的信息,导致信息在图中的传播速度缓慢且每次只能传播一跳。因此,可能需要多次计算迭代才能将一条信息从源头传播到目的地。

什么是PIE模型?

为了解决上述问题,我们在2017年发表的SIGMOD论文中提出了一种新的编程模型PIE(PEval-IncEval-Assemble)。与以顶点为中心的模型不同,PIE模型能够自动并行化现有的串行图算法,只需进行少量修改。这使得熟悉大学教材中传统图算法的用户能够轻松进行并行图计算,无需将现有图算法重新改写为新模型。

具体来说,在PIE模型中,用户只需提供三个函数,

  • (1) PEval,一个针对给定查询的顺序(单机)函数,在本地分区上计算结果;

  • (2) IncEval,一个顺序增量函数,通过将传入消息视为更新来计算旧输出的变化;以及

  • (3) 整合,收集部分答案,并将它们组合成一个完整的答案。

The PIE model

PIE模型。

PIE工作流程

PIE模型运行在图G上,每个工作节点维护G的一个分区。给定查询时,各工作节点首先对其本地分区执行PEval以并行计算部分答案。随后,各工作节点可能通过同步消息传递与其他节点交换部分结果。在接收消息后,各工作节点增量计算IncEval。该增量步骤会迭代执行直至无法生成更多消息。此时,Assemble将收集部分答案并组装最终结果。通过这种方式,PIE模型能够在不修改现有串行图算法逻辑和工作流的前提下实现并行化。

在该模型中,用户无需熟悉分布式环境下处理大规模图的复杂性。PIE模型基于定点计算,自动将图分析任务并行化分配到工作节点集群中。在单调性条件下,只要提供的三个顺序算法正确,它就能保证收敛并获得准确结果。

以下代码展示了如何在GAE中使用PIE模型表达SSSP算法。请注意,这里我们仅展示主要的计算逻辑。

void PEval(const fragment_t& frag, context_t& ctx,
             message_manager_t& messages) {
  vertex_t source;
  bool native_source = frag.GetInnerVertex(ctx.source_id, source);

  if (native_source) {
      ctx.partial_result[source] = 0;
      auto es = frag.GetOutgoingAdjList(source);
      for (auto& e : es) {
        vertex_t v = e.get_neighbor();
        ctx.partial_result[v] =
            std::min(ctx.partial_result[v], static_cast<double>(e.get_data()));
        if (frag.IsOuterVertex(v)) {
          // put the message to the channel.
          messages.Channels()[0].SyncStateOnOuterVertex<fragment_t, double>(
              frag, v, ctx.partial_result[v]);
        } else {
          ctx.next_modified.Insert(v);
        }
      }
    }
}

void IncEval(const fragment_t& frag, context_t& ctx,
               message_manager_t& messages) {
  auto inner_vertices = frag.InnerVertices();
  // parallel process and reduce the received messages
  messages.ParallelProcess<fragment_t, double>(
        thread_num(), frag, [&ctx](int tid, vertex_t u, double msg) {
          if (ctx.partial_result[u] > msg) {
            atomic_min(ctx.partial_result[u], msg);
            ctx.curr_modified.Insert(u);
          }
        });

  // incremental evaluation.		
  ForEach(ctx.curr_modified, inner_vertices,
            [&frag, &ctx](int tid, vertex_t v) {
              double distv = ctx.partial_result[v];
              auto es = frag.GetOutgoingAdjList(v);
              for (auto& e : es) {
                vertex_t u = e.get_neighbor();
                double ndistu = distv + e.get_data();
                if (ndistu < ctx.partial_result[u]) {
                  atomic_min(ctx.partial_result[u], ndistu);
                  ctx.next_modified.Insert(u);
                }
              }
            });
  
  auto outer_vertices = frag.OuterVertices();
  ForEach(ctx.next_modified, outer_vertices,
          [&channels, &frag, &ctx](int tid, vertex_t v) {
              channels[tid].SyncStateOnOuterVertex<fragment_t, double>(
              frag, v, ctx.partial_result[v]);
           });
}

在上述代码中,给定源顶点source,在PEval函数中,我们首先在源顶点所在的子图(片段)上执行Dijkstra算法以获得部分结果。之后,调用SyncStateOnOuterVertex函数,将部分结果发送到其他片段以触发IncEval函数。

IncEval函数中,每个分片首先通过消息管理器接收消息,然后基于接收到的消息执行增量计算来更新部分结果。如果部分结果被更新,每个分片需要执行SyncStateOnOuterVertex函数将外部顶点的最新部分结果与其他分片同步,以触发下一轮IncEval。有关如何使用PIE模型开发图应用程序的更多细节,请查看以下教程。