教程:使用Python开发算法¶
如果内置算法无法满足需求,用户可以编写自己的算法。graphscope允许用户以纯Python方式在PIE和Pregel编程模型中编写算法。
在PIE中编写自定义算法¶
要在PIE模型中实现一个算法,用户只需完成这个类。
from graphscope.analytical.udf.decorators import pie
from graphscope.framework.app import AppAssets
@pie(vd_type="double", md_type="double")
class YourAlgorithm(AppAssets):
@staticmethod
def Init(frag, context):
pass
@staticmethod
def PEval(frag, context):
pass
@staticmethod
def IncEval(frag, context):
pass
如代码所示,用户需要实现一个用@pie装饰的类,并提供三个顺序图函数。在该类中,Init是设置初始状态的函数。PEval是用于部分评估的顺序方法,而IncEval是用于在分区片段上进行增量评估的顺序函数。完整的片段API可以在Cython SDK API中找到。
以SSSP为例,用户在PIE模型中自定义的SSSP可能如下所示。
from graphscope.analytical.udf.decorators import pie
from graphscope.framework.app import AppAssets
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
@staticmethod
def Init(frag, context):
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
nodes = frag.nodes(v_label_id)
context.init_value(
nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
)
context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)
@staticmethod
def PEval(frag, context):
src = int(context.get_config(b"src"))
graphscope.declare(graphscope.Vertex, source)
native_source = False
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
if frag.get_inner_node(v_label_id, src, source):
native_source = True
break
if native_source:
context.set_node_value(source, 0)
else:
return
e_label_num = frag.edge_label_num()
for e_label_id in range(e_label_num):
edges = frag.get_outgoing_edges(source, e_label_id)
for e in edges:
dst = e.neighbor()
# use the third column of edge data as the distance between two vertices
distv = e.get_int(2)
if context.get_node_value(dst) > distv:
context.set_node_value(dst, distv)
@staticmethod
def IncEval(frag, context):
v_label_num = frag.vertex_label_num()
e_label_num = frag.edge_label_num()
for v_label_id in range(v_label_num):
iv = frag.inner_nodes(v_label_id)
for v in iv:
v_dist = context.get_node_value(v)
for e_label_id in range(e_label_num):
es = frag.get_outgoing_edges(v, e_label_id)
for e in es:
u = e.neighbor()
u_dist = v_dist + e.get_int(2)
if context.get_node_value(u) > u_dist:
context.set_node_value(u, u_dist)
如代码所示,用户只需设计和实现针对单个分片的顺序算法,而无需考虑分布式环境中的通信与消息传递问题。在这种情况下,经典的dijkstra算法及其增量版本适用于集群上分区存储的大规模图数据。
使用Pregel编写算法¶
除了基于子图的PIE模型外,graphscope还支持以顶点为中心的Pregel模型。您可以通过实现这个接口来开发Pregel模型中的算法。
from graphscope.analytical.udf.decorators import pregel
from graphscope.framework.app import AppAssets
@pregel(vd_type='double', md_type='double')
class YourPregelAlgorithm(AppAssets):
@staticmethod
def Init(v, context):
pass
@staticmethod
def Compute(messages, v, context):
pass
@staticmethod
def Combine(messages):
pass
与PIE模型不同,该类的装饰器是@pregel。
需要实现的函数定义在顶点上,而非片段上。
以SSSP为例,Pregel模型中的算法如下所示。
from graphscope.analytical.udf import pregel
from graphscope.framework.app import AppAssets
# decorator, and assign the types for vertex data, message data.
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
@staticmethod
def Init(v, context):
v.set_value(1000000000.0)
@staticmethod
def Compute(messages, v, context):
src_id = context.get_config(b"src")
cur_dist = v.value()
new_dist = 1000000000.0
if v.id() == src_id:
new_dist = 0
for message in messages:
new_dist = min(message, new_dist)
if new_dist < cur_dist:
v.set_value(new_dist)
for e_label_id in range(context.edge_label_num()):
edges = v.outgoing_edges(e_label_id)
for e in edges:
v.send(e.vertex(), new_dist + e.get_int(2))
v.vote_to_halt()
@staticmethod
def Combine(messages):
ret = 1000000000.0
for m in messages:
ret = min(ret, m)
return ret
在算法中使用math.h函数¶
GraphScope支持在用户自定义算法中使用math.h中的C函数,通过context.math接口实现。例如,
@staticmethod
def Init(v, context):
v.set_value(context.math.sin(1000000000.0 * context.math.M_PI))
will be translated to the following efficient C code
... Init(...)
v.set_value(sin(1000000000.0 * M_PI));
运行自定义算法¶
要运行您自己的算法,可以在定义它的位置直接触发。
import graphscope
from graphscope.dataset import load_p2p_network
g = load_p2p_network(generate_eid=False)
# load my algorithm
my_app = SSSP_Pregel()
# run my algorithm over a graph and get the result.
# Here the `src` is correspondent to the `context.get_config(b"src")`
ret = my_app(g, src="6")
在开发和测试完成后,您可能希望将其保存以备将来使用。
SSSP_Pregel.to_gar("/tmp/my_sssp_pregel.gar")
之后,您可以从gar包中加载自己的算法。
from graphscope.framework.app import load_app
# load my algorithm from a gar package
my_app = load_app("/tmp/my_sssp_pregel.gar")
# run my algorithm over a graph and get the result.
ret = my_app(g, src="6")