教程:使用Python开发算法

如果内置算法无法满足需求,用户可以编写自己的算法。graphscope允许用户以纯Python方式在PIE和Pregel编程模型中编写算法。

在PIE中编写自定义算法

要在PIE模型中实现一个算法,用户只需完成这个类。

from graphscope.analytical.udf.decorators import pie
from graphscope.framework.app import AppAssets

@pie(vd_type="double", md_type="double")
class YourAlgorithm(AppAssets):
    @staticmethod
    def Init(frag, context):
        pass

    @staticmethod
    def PEval(frag, context):
        pass

    @staticmethod
    def IncEval(frag, context):
        pass

如代码所示,用户需要实现一个用@pie装饰的类,并提供三个顺序图函数。在该类中,Init是设置初始状态的函数。PEval是用于部分评估的顺序方法,而IncEval是用于在分区片段上进行增量评估的顺序函数。完整的片段API可以在Cython SDK API中找到。

以SSSP为例,用户在PIE模型中自定义的SSSP可能如下所示。

from graphscope.analytical.udf.decorators import pie
from graphscope.framework.app import AppAssets

@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
    @staticmethod
    def Init(frag, context):
        v_label_num = frag.vertex_label_num()
        for v_label_id in range(v_label_num):
            nodes = frag.nodes(v_label_id)
            context.init_value(
                nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
            )
            context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)

    @staticmethod
    def PEval(frag, context):
        src = int(context.get_config(b"src"))
        graphscope.declare(graphscope.Vertex, source)
        native_source = False
        v_label_num = frag.vertex_label_num()
        for v_label_id in range(v_label_num):
            if frag.get_inner_node(v_label_id, src, source):
                native_source = True
                break
        if native_source:
            context.set_node_value(source, 0)
        else:
            return
        e_label_num = frag.edge_label_num()
        for e_label_id in range(e_label_num):
            edges = frag.get_outgoing_edges(source, e_label_id)
            for e in edges:
                dst = e.neighbor()
                # use the third column of edge data as the distance between two vertices
                distv = e.get_int(2)
                if context.get_node_value(dst) > distv:
                    context.set_node_value(dst, distv)

    @staticmethod
    def IncEval(frag, context):
        v_label_num = frag.vertex_label_num()
        e_label_num = frag.edge_label_num()
        for v_label_id in range(v_label_num):
            iv = frag.inner_nodes(v_label_id)
            for v in iv:
                v_dist = context.get_node_value(v)
                for e_label_id in range(e_label_num):
                    es = frag.get_outgoing_edges(v, e_label_id)
                    for e in es:
                        u = e.neighbor()
                        u_dist = v_dist + e.get_int(2)
                        if context.get_node_value(u) > u_dist:
                            context.set_node_value(u, u_dist)

如代码所示,用户只需设计和实现针对单个分片的顺序算法,而无需考虑分布式环境中的通信与消息传递问题。在这种情况下,经典的dijkstra算法及其增量版本适用于集群上分区存储的大规模图数据。

使用Pregel编写算法

除了基于子图的PIE模型外,graphscope还支持以顶点为中心的Pregel模型。您可以通过实现这个接口来开发Pregel模型中的算法。

from graphscope.analytical.udf.decorators import pregel
from graphscope.framework.app import AppAssets

@pregel(vd_type='double', md_type='double')
class YourPregelAlgorithm(AppAssets):

    @staticmethod
    def Init(v, context):
        pass

    @staticmethod
    def Compute(messages, v, context):
        pass

    @staticmethod
    def Combine(messages):
        pass

与PIE模型不同,该类的装饰器是@pregel。 需要实现的函数定义在顶点上,而非片段上。 以SSSP为例,Pregel模型中的算法如下所示。

from graphscope.analytical.udf import pregel
from graphscope.framework.app import AppAssets

# decorator, and assign the types for vertex data, message data.
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
    @staticmethod
    def Init(v, context):
        v.set_value(1000000000.0)

    @staticmethod
    def Compute(messages, v, context):
        src_id = context.get_config(b"src")
        cur_dist = v.value()
        new_dist = 1000000000.0
        if v.id() == src_id:
            new_dist = 0
        for message in messages:
            new_dist = min(message, new_dist)
        if new_dist < cur_dist:
            v.set_value(new_dist)
            for e_label_id in range(context.edge_label_num()):
                edges = v.outgoing_edges(e_label_id)
                for e in edges:
                    v.send(e.vertex(), new_dist + e.get_int(2))
        v.vote_to_halt()

    @staticmethod
    def Combine(messages):
        ret = 1000000000.0
        for m in messages:
            ret = min(ret, m)
        return ret

在算法中使用math.h函数

GraphScope支持在用户自定义算法中使用math.h中的C函数,通过context.math接口实现。例如,

@staticmethod
def Init(v, context):
    v.set_value(context.math.sin(1000000000.0 * context.math.M_PI))

will be translated to the following efficient C code

... Init(...)

    v.set_value(sin(1000000000.0 * M_PI));

运行自定义算法

要运行您自己的算法,可以在定义它的位置直接触发。

import graphscope
from graphscope.dataset import load_p2p_network

g = load_p2p_network(generate_eid=False)

# load my algorithm
my_app = SSSP_Pregel()

# run my algorithm over a graph and get the result.
# Here the `src` is correspondent to the `context.get_config(b"src")`
ret = my_app(g, src="6")

在开发和测试完成后,您可能希望将其保存以备将来使用。

SSSP_Pregel.to_gar("/tmp/my_sssp_pregel.gar")

之后,您可以从gar包中加载自己的算法。

from graphscope.framework.app import load_app

# load my algorithm from a gar package
my_app = load_app("/tmp/my_sssp_pregel.gar")

# run my algorithm over a graph and get the result.
ret = my_app(g, src="6")