教程:使用FLASH模型以C++开发您的算法

我们已在FLASH中实现了70多种图算法,覆盖40多个不同领域的常见问题。这些算法涵盖聚类分析、中心性计算、图遍历、匹配、挖掘等多个领域。我们会持续向FLASH算法库添加更多算法,同时也鼓励用户在内置算法无法满足需求时,使用FLASH模型开发自己的算法。

使用FLASH模型开发算法

定义FLASH应用程序

FLASH是一种专门为编写分布式图处理算法设计的编程模型。它遵循批量同步并行(BSP)计算范式,提供了定义在VertexSubset结构上的主要功能如VSizeVertexMapEdgeMap,以及一系列辅助功能如集合操作。FLASH模型具有强大的表达能力,使得编程比现有的以顶点为中心的算法更高级的算法成为可能。

要在FLASH模型中实现您的算法,首先需要定义一个FLASH应用程序,通过用C++实现一个APP类来完成。如果算法的结果是每个顶点上的一个值,则该类如下所示:

template <typename FRAG_T>
class YourApp : public FlashAppBase<FRAG_T, V_TYPE> {
 public:
  INSTALL_FLASH_WORKER(YourApp<FRAG_T>, V_TYPE, FRAG_T)
  using context_t = FlashVertexDataContext<FRAG_T, V_TYPE, V_RES_TYPE>;

  bool sync_all_ = false; // true or false

  V_RES_TYPE* Res(V_TYPE* v) {
    // return the result on the vertex
  }

  void Run(const fragment_t& graph, const std::shared_ptr<fw_t> fw, ...) {
    // conduct processing using the FLASH APIs
  }
};

或者,如果算法的结果是单个全局值,则该类看起来像:

template <typename FRAG_T>
class YourApp : public FlashAppBase<FRAG_T, V_TYPE> {
 public:
  INSTALL_FLASH_WORKER(YourApp<FRAG_T>, V_TYPE, FRAG_T)
  using context_t = FlashGlobalDataContext<FRAG_T, V_TYPE, G_RES_TYPE>;

  bool sync_all_ = false; // true or false

  G_RES_TYPE Res() {
    // return the global result
  }

  void Run(const fragment_t& graph, const std::shared_ptr<fw_t> fw, ...) {
    // conduct processing using the FLASH APIs
  }
};

实现运行函数

之后,用户可以通过在应用的Run函数中调用各种FLASH API来实现分布式图处理算法。以广度优先搜索(BFS)为例,用户自定义的应用可能如下所示,位于文件GraphScope/analytical_engine/apps/flash/traversal/bfs.h中:

template <typename FRAG_T>
class BFSFlash : public FlashAppBase<FRAG_T, BFS_TYPE> {
 public:
  INSTALL_FLASH_WORKER(BFSFlash<FRAG_T>, BFS_TYPE, FRAG_T)
  using context_t = FlashVertexDataContext<FRAG_T, BFS_TYPE, int>;

  bool sync_all_ = false;

  int* Res(value_t* v) { return &(v->dis); }

  void Run(const fragment_t& graph, const std::shared_ptr<fw_t> fw,
           oid_t o_source) {
    vid_t source = Oid2FlashId(o_source);
    DefineMapV(init) { v.dis = (id == source) ? 0 : -1; };
    vset_t a = VertexMap(All, CTrueV, init);

    DefineFV(filter) { return id == source; };
    a = VertexMap(a, filter);

    DefineMapE(update) { d.dis = s.dis + 1; };
    DefineFV(cond) { return v.dis == -1; };
    for (int len = VSize(a), i = 1; len > 0; len = VSize(a), ++i) {
      a = EdgeMap(a, ED, CTrueE, update, cond);
    }
  }
};

如这段代码所示,要实现一个BFS算法,用户只需设计和实现针对VertexSubset的处理操作,而无需考虑分布式环境中的通信、消息传递和图分区问题。这降低了编写各类算法的编程负担。

与GraphScope的Python客户端集成

要通过GraphScope的Python客户端运行您的FLASH算法,需要为该算法提供相应的Python函数定义。例如,要集成上述BFS算法,可以在文件GraphScope/python/graphscope/analytical/app/flash/traversal.py中定义如下函数。

@project_to_simple
@not_compatible_for("arrow_property", "dynamic_property")
def bfs(graph, source=1):
    """Evaluate BFS on a graph with flash computation mode.

    Args:
        graph (:class:`graphscope.Graph`): A simple graph.
        source (optional): The source Node. Defaults to 1.

    Returns:
        :class:`graphscope.framework.context.Context`:
            A context with each vertex assigned with the distance from source.
    """
    return AppAssets(algo="flash_bfs", context="vertex_data")(graph, source)

此外,需要在GraphScope/coordinator/gscoordinator/builtin/app/.gs_conf.yaml中添加配置信息。然后就可以从Python客户端调用该算法。

- algo: flash_bfs
  type: cpp_flash
  class_name: gs::BFSFlash
  src: apps/flash/traversal/bfs.h
  compatible_graph:
    - grape::ImmutableEdgecutFragment
    - gs::ArrowProjectedFragment

运行FLASH算法

要运行您自己的FLASH算法,在安装并部署GAE后,您可以通过GraphScope的客户端触发它。首先,一个属性图将被加载到GraphScope中。

# Import the graphscope module.

import graphscope

graphscope.set_option(show_log=True)  # enable logging
# Load p2p network dataset

from graphscope.dataset import load_p2p_network

graph = load_p2p_network(directed=True)

FLASH算法定义在简单图上,这种图只有一种顶点和边,边和顶点最多有一个属性作为其特征。因此,需要通过选择一种顶点/边标签及其最多一个属性,将属性图投影转换为简单图。

simple_graph = graph.project(vertices={"host": []}, edges={"connect": ["dist"]})

接下来,我们可以运行FLASH模型中实现的BFS算法,将上一步得到的投影图作为输入参数,并将源节点ID设置为"1"。该算法将为加载的图生成兼容版本代码,并编译成可执行二进制文件。构建库可能需要一些时间。不过,对于同类型图上的相同算法,此步骤只需执行一次。

context = graphscope.flash.bfs(simple_graph, source=1)

计算完成后,结果分布在集群上的vineyard实例中。返回的对象是一个Context,它提供了多种方法来获取或持久化结果。 在本例中,结果表示从源节点出发的距离。我们使用以下代码获取结果,并显示它们及其顶点ID。

context.to_dataframe(
	selector={"id": "v.id", "dist": "r"}, vertex_range={"begin": 1, "end": 10}
).sort_values(by="id")