在GraphScope交互式环境中创建C++存储过程

除了将Cypher查询作为存储过程适配外,Interactive还支持通过调用Graph Database Engine提供的接口,用C++代码实现存储过程。

快速入门.

我们通过一个示例来演示如何在C++中创建存储过程。 在这个示例中,我们实现了一个返回顶点总数的过程。

在继续之前,请确保您已按照入门指南中的说明安装并启动了Interactive,并且环境变量已正确导出。

定义一个C++存储过程

C++存储过程应定义在一个文件中,例如count_vertices.cc

#include "flex/engines/graph_db/app/app_base.h"
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/utils/app_utils.h"

namespace gs {

// A sample app get the count of the specified vertex label, since no write
// operations are needed we inherit from ReadAppBase. Otherwise you could
// inherit from WriteAppBase.
class CountVertices : public ReadAppBase {
public:
  CountVertices() {}
  /**
   * @brief Query function for query class.
   * @param sess: GraphDBSession The interface where you can visit the graph.
   * @param input: Decoder From where you could deserialize the input
   * parameters.
   * @param output: Encoder To where you should encode the output parameters.
   */
  bool Query(const gs::GraphDBSession &sess, Decoder &input,
             Encoder &output) override {
    // First get the read transaction.
    auto txn = sess.GetReadTransaction();
    // We expect one param of type string from decoder.
    if (input.empty()) {
      return false;
    }
    std::string label_name{input.get_string()};
    const auto &schema = txn.schema();
    if (!schema.has_vertex_label(label_name)) {
      return false; // The requested label doesn't exits.
    }
    auto label_id = schema.get_vertex_label_id(label_name);
    // The vertices are labeled internally from 0 ~ vertex_label_num, accumulate
    // the count.
    output.put_int(txn.GetVertexNum(label_id));
    txn.Commit();
    return true;
  }
};
} // namespace gs

extern "C" {

// Defines how a instance of your procedure is created.
void *CreateApp(gs::GraphDBSession &db) {
  gs::CountVertices *app = new gs::CountVertices();
  return static_cast<void *>(app);
}

// Defines how a instance of your procedure should be deleted.
void DeleteApp(void *app) {
  gs::CountVertices *casted = static_cast<gs::CountVertices *>(app);
  delete casted;
}
}

注册并调用存储过程

在定义了上述CountVertices过程后,我们可以使用gsctl创建存储过程,或者使用交互式Python/Java SDK。

gsctl

部署Interactive后,您可以像创建Cypher存储过程一样注册一个C++存储过程。

定义YAML

定义时,C++存储过程的YAML与Cypher过程的区别仅在于type字段,即cppcypher。用户可以直接在YAML文件中包含存储过程的实现。

name: test_procedure
description: "Ths is a test procedure"
query: |
    #include "flex/engines/graph_db/app/app_base.h"
    #include "flex/engines/graph_db/database/graph_db_session.h"
    #include "flex/utils/app_utils.h"

    namespace gs {

    // A sample app get the count of the specified vertex label, since no write
    // operations are needed we inherit from ReadAppBase. Otherwise you could
    // inherit from WriteAppBase.
    class CountVertices : public ReadAppBase {
    public:
    CountVertices() {}
    /**
    * @brief Query function for query class.
    * @param sess: GraphDBSession The interface where you can visit the graph.
    * @param input: Decoder From where you could deserialize the input
    * parameters.
    * @param output: Encoder To where you should encode the output parameters.
    */
    bool Query(const gs::GraphDBSession &sess, Decoder &input,
                Encoder &output) override {
        // First get the read transaction.
        auto txn = sess.GetReadTransaction();
        // We expect one param of type string from decoder.
        if (input.empty()) {
        return false;
        }
        std::string label_name{input.get_string()};
        const auto &schema = txn.schema();
        if (!schema.has_vertex_label(label_name)) {
        return false; // The requested label doesn't exits.
        }
        auto label_id = schema.get_vertex_label_id(label_name);
        // The vertices are labeled internally from 0 ~ vertex_label_num, accumulate
        // the count.
        output.put_int(txn.GetVertexNum(label_id));
        txn.Commit();
        return true;
    }
    };
    } // namespace gs

    extern "C" {

    // Defines how a instance of your procedure is created.
    void *CreateApp(gs::GraphDBSession &db) {
    gs::CountVertices *app = new gs::CountVertices();
    return static_cast<void *>(app);
    }

    // Defines how a instance of your procedure should be deleted.
    void DeleteApp(void *app) {
    gs::CountVertices *casted = static_cast<gs::CountVertices *>(app);
    delete casted;
    }
    }
type: cpp

您可能会发现C++代码过于冗长,且难以更新,尤其是在需要修改时。幸运的是,我们支持从文件上传过程实现,您只需提供C++文件的完整路径,并在前面加上@符号即可。

name: test_procedure
description: "Ths is a test procedure"
query: "@/path/to/procedure.cc"
type: cpp

Python SDK

安装Interactive Python SDK后,您可以通过以下代码轻松创建存储过程。

import os
from gs_interactive.client.driver import Driver
from gs_interactive.models import *

driver = Driver() # connecting to Interactive service, assuming environment variables like INTERACTIVE_ADMIN_ENDPOINT have been correctly exported.
sess = driver.session() # create the session

# Read the content of the procedure <count_vertices.cc>
app_path='/path/to/count_vertices.cc' # Replace the path with the real path to count_vertices.cc
if not os.path.exists(app_path):
    raise Exception("count_vertices.cc not found")
with open(app_path, "r") as f:
    app_content = f.read()

# we try to create the procedure on default graph, you may try to create on your customized graph by changing the graph_id 
graph_id = '1'

create_proc_request = CreateProcedureRequest(
    name="count_vertices",
    description="Count vertices for the specified vertex label name",
    query=app_content,
    type="cpp",
)
resp = sess.create_procedure(graph_id, create_proc_request)
print(resp)
assert resp.is_ok()

有关Python SDK接口的更多详情,请参阅Java SDK Procedure API

Java SDK

安装Interactive Java SDK后,您可以通过以下代码轻松创建存储过程。

import com.alibaba.graphscope.interactive.client.Driver;
import com.alibaba.graphscope.interactive.client.Session;
import com.alibaba.graphscope.interactive.models.*;
import com.alibaba.graphscope.interactive.client.common.Result;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.IOException;
import java.net.URISyntaxException;

import org.junit.jupiter.api.Test;

public class CreateProcedureTest{
    public static void createProcedure(Session sess, String graphId) {
        CreateProcedureRequest procedure = new CreateProcedureRequest();
        procedure.setName("count_vertices");
        procedure.setDescription("Count vertices for the specified vertex label name");
        String appFilePath = "/path/to/count_vertices.cc"; // Please replace with the real path to count_vertices.cc
        // check file exist
        if (Files.notExists(Paths.get(appFilePath))) {
            throw new RuntimeException("sample app file not exist");
        }
        String appFileContent = "";
        try {
            appFileContent =
                    new String(
                            Files.readAllBytes(Paths.get(appFilePath)));
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (appFileContent.isEmpty()) {
            throw new RuntimeException("sample app content is empty");
        }
        procedure.setQuery(appFileContent);
        procedure.setType(CreateProcedureRequest.TypeEnum.CPP);
        Result<CreateProcedureResponse> resp = sess.createProcedure(graphId, procedure);
        if (!resp.isOk()) {
            throw new RuntimeException("Fail to create procedure" + resp.getStatusMessage());
        }
    }

    public static void main(String[] args) {
        Driver driver = Driver.connect();
        Session sess = driver.session();
        createProcedure(sess, "1");
        return ;
    }
}

有关Java SDK接口的更多详情,请参阅Java SDK Procedure API

创建存储过程

首先,切换到您想要创建存储过程的图数据库。我们将以内置图为例进行说明。如需了解如何创建自定义图,请参阅使用自定义图

gsctl use GRAPH gs_interactive_default_graph 

然后使用gsctl创建该过程:

gsctl create storedproc -f ./procedure.yaml

这将启动编译过程,将C++代码转换为动态库,可能需要几秒钟时间。编译完成后,必须重启服务才能激活存储过程。

gsctl service restart

图数据库引擎

交互式设计遵循事务(Transaction)原则,用户应使用ReadTransactionInsertTractionUpdateTransaction

我们建议您详细阅读GraphDB的代码,如果您希望编写性能最优的存储过程。若遇到任何问题,欢迎通过提交issue或创建讨论联系我们。

查询输入与输出

交互式原生支持两种参数编码和结果解码协议:Encoder/DecoderCypherApp

编码器/解码器

Encoder/Decoder 基于方法提供了最佳性能。序列化/反序列化可由用户自定义。 在该序列化协议中,用户需要自行处理参数的编码和解码。 例如,上述示例程序 count_vertices.cc 使用 Encoder/Decoder 来编码输入和解码输出。

以下是一个示例,展示如何使用Interactive Java SDK和Python SDK查询count_vertices过程。请注意,您需要切换到ID为"1"的图才能使该过程可调用。

import com.alibaba.graphscope.interactive.client.Driver;
import com.alibaba.graphscope.interactive.client.Session;
import com.alibaba.graphscope.interactive.models.*;
import com.alibaba.graphscope.interactive.client.common.Result;
import com.alibaba.graphscope.interactive.client.utils.Encoder;
import com.alibaba.graphscope.interactive.client.utils.Decoder;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.IOException;
import java.net.URISyntaxException;

import org.junit.jupiter.api.Test;

public class CreateProcedureTest{
    public static void callProcedure(Session sess, String graphId, String procedureName, String labelName) {
        byte[] bytes = new byte[1 + 4 + labelName.length()]; // 1 byte for procedure index, 4 bytes for label length, and labelName.length() bytes for label name
        Encoder encoder = new Encoder(bytes);
        encoder.put_string(labelName);
        encoder.put_byte((byte) 1); // Assume the procedure index is 1
        Result<byte[]> resp = sess.callProcedureRaw(graphId, bytes);
        if (!resp.isOk()) {
            throw new RuntimeException("Fail to call procedure" + resp.getStatusMessage());
        }
        Decoder decoder = new Decoder(resp.getValue());
        int count = decoder.get_int();
        System.out.println("Count of vertices with label " + labelName + " is " + count); // should be 4
    }

    public static void startServiceOnGraph(Session sess, String graphId) {
        Result<StartServiceResponse> resp = sess.startService(new StartServiceRequest().graphId(graphId));
        if (!resp.isOk()) {
            throw new RuntimeException("Fail to start service" + resp.getStatusMessage());
        }
    }

    public static void main(String[] args) {
        Driver driver = Driver.connect();
        Session sess = driver.session();
        startServiceOnGraph(sess, "1"); // Procedure is only runnable after service has been switched to graph 1
        callProcedure(sess, "1", "count_vertices", "person"); // count how many vertices are labeled with person.
        return ;
    }
}
from gs_interactive.client.driver import Driver
from gs_interactive.models import *
from gs_interactive.client.utils import *

driver = Driver() # connecting to Interactive service, assuming environment variables like INTERACTIVE_ADMIN_ENDPOINT have been correctly exported.
sess = driver.session() # create the session

# Use a encoder to encode input request
encoder = Encoder()
encoder.put_string("person") # input label name
encoder.put_byte(1) # procedure id 1
resp = sess.call_procedure_raw(graph_id="1", params=encoder.get_bytes())
assert resp.is_ok()
decoder = Decoder(resp.value)
num = decoder.get_int()
print(f"vertices num: {num}")

编程接口

要创建一个满足您需求的高效流程,理解编程接口和交互式存储接口至关重要。我们建议阅读Interactive的源代码,您也可以访问生成的API文档这里