教程:使用FLASH模型以C++开发您的算法¶
我们已在FLASH中实现了70多种图算法,覆盖40多个不同领域的常见问题。这些算法涵盖聚类分析、中心性计算、图遍历、匹配、挖掘等多个领域。我们会持续向FLASH算法库添加更多算法,同时也鼓励用户在内置算法无法满足需求时,使用FLASH模型开发自己的算法。
使用FLASH模型开发算法¶
定义FLASH应用程序¶
FLASH是一种专门为编写分布式图处理算法设计的编程模型。它遵循批量同步并行(BSP)计算范式,提供了定义在VertexSubset结构上的主要功能如VSize、VertexMap和EdgeMap,以及一系列辅助功能如集合操作。FLASH模型具有强大的表达能力,使得编程比现有的以顶点为中心的算法更高级的算法成为可能。
要在FLASH模型中实现您的算法,首先需要定义一个FLASH应用程序,通过用C++实现一个APP类来完成。如果算法的结果是每个顶点上的一个值,则该类如下所示:
template <typename FRAG_T>
class YourApp : public FlashAppBase<FRAG_T, V_TYPE> {
public:
INSTALL_FLASH_WORKER(YourApp<FRAG_T>, V_TYPE, FRAG_T)
using context_t = FlashVertexDataContext<FRAG_T, V_TYPE, V_RES_TYPE>;
bool sync_all_ = false; // true or false
V_RES_TYPE* Res(V_TYPE* v) {
// return the result on the vertex
}
void Run(const fragment_t& graph, const std::shared_ptr<fw_t> fw, ...) {
// conduct processing using the FLASH APIs
}
};
或者,如果算法的结果是单个全局值,则该类看起来像:
template <typename FRAG_T>
class YourApp : public FlashAppBase<FRAG_T, V_TYPE> {
public:
INSTALL_FLASH_WORKER(YourApp<FRAG_T>, V_TYPE, FRAG_T)
using context_t = FlashGlobalDataContext<FRAG_T, V_TYPE, G_RES_TYPE>;
bool sync_all_ = false; // true or false
G_RES_TYPE Res() {
// return the global result
}
void Run(const fragment_t& graph, const std::shared_ptr<fw_t> fw, ...) {
// conduct processing using the FLASH APIs
}
};
实现运行函数¶
之后,用户可以通过在应用的Run函数中调用各种FLASH API来实现分布式图处理算法。以广度优先搜索(BFS)为例,用户自定义的应用可能如下所示,位于文件GraphScope/analytical_engine/apps/flash/traversal/bfs.h中:
template <typename FRAG_T>
class BFSFlash : public FlashAppBase<FRAG_T, BFS_TYPE> {
public:
INSTALL_FLASH_WORKER(BFSFlash<FRAG_T>, BFS_TYPE, FRAG_T)
using context_t = FlashVertexDataContext<FRAG_T, BFS_TYPE, int>;
bool sync_all_ = false;
int* Res(value_t* v) { return &(v->dis); }
void Run(const fragment_t& graph, const std::shared_ptr<fw_t> fw,
oid_t o_source) {
vid_t source = Oid2FlashId(o_source);
DefineMapV(init) { v.dis = (id == source) ? 0 : -1; };
vset_t a = VertexMap(All, CTrueV, init);
DefineFV(filter) { return id == source; };
a = VertexMap(a, filter);
DefineMapE(update) { d.dis = s.dis + 1; };
DefineFV(cond) { return v.dis == -1; };
for (int len = VSize(a), i = 1; len > 0; len = VSize(a), ++i) {
a = EdgeMap(a, ED, CTrueE, update, cond);
}
}
};
如这段代码所示,要实现一个BFS算法,用户只需设计和实现针对VertexSubset的处理操作,而无需考虑分布式环境中的通信、消息传递和图分区问题。这降低了编写各类算法的编程负担。
与GraphScope的Python客户端集成¶
要通过GraphScope的Python客户端运行您的FLASH算法,需要为该算法提供相应的Python函数定义。例如,要集成上述BFS算法,可以在文件GraphScope/python/graphscope/analytical/app/flash/traversal.py中定义如下函数。
@project_to_simple
@not_compatible_for("arrow_property", "dynamic_property")
def bfs(graph, source=1):
"""Evaluate BFS on a graph with flash computation mode.
Args:
graph (:class:`graphscope.Graph`): A simple graph.
source (optional): The source Node. Defaults to 1.
Returns:
:class:`graphscope.framework.context.Context`:
A context with each vertex assigned with the distance from source.
"""
return AppAssets(algo="flash_bfs", context="vertex_data")(graph, source)
此外,需要在GraphScope/coordinator/gscoordinator/builtin/app/.gs_conf.yaml中添加配置信息。然后就可以从Python客户端调用该算法。
- algo: flash_bfs
type: cpp_flash
class_name: gs::BFSFlash
src: apps/flash/traversal/bfs.h
compatible_graph:
- grape::ImmutableEdgecutFragment
- gs::ArrowProjectedFragment
运行FLASH算法¶
要运行您自己的FLASH算法,在安装并部署GAE后,您可以通过GraphScope的客户端触发它。首先,一个属性图将被加载到GraphScope中。
# Import the graphscope module.
import graphscope
graphscope.set_option(show_log=True) # enable logging
# Load p2p network dataset
from graphscope.dataset import load_p2p_network
graph = load_p2p_network(directed=True)
FLASH算法定义在简单图上,这种图只有一种顶点和边,边和顶点最多有一个属性作为其特征。因此,需要通过选择一种顶点/边标签及其最多一个属性,将属性图投影转换为简单图。
simple_graph = graph.project(vertices={"host": []}, edges={"connect": ["dist"]})
接下来,我们可以运行FLASH模型中实现的BFS算法,将上一步得到的投影图作为输入参数,并将源节点ID设置为"1"。该算法将为加载的图生成兼容版本代码,并编译成可执行二进制文件。构建库可能需要一些时间。不过,对于同类型图上的相同算法,此步骤只需执行一次。
context = graphscope.flash.bfs(simple_graph, source=1)
计算完成后,结果分布在集群上的vineyard实例中。返回的对象是一个Context,它提供了多种方法来获取或持久化结果。 在本例中,结果表示从源节点出发的距离。我们使用以下代码获取结果,并显示它们及其顶点ID。
context.to_dataframe(
selector={"id": "v.id", "dist": "r"}, vertex_range={"begin": 1, "end": 10}
).sort_values(by="id")