在GraphScope交互式环境中创建C++存储过程¶
除了将Cypher查询作为存储过程适配外,Interactive还支持通过调用Graph Database Engine提供的接口,用C++代码实现存储过程。
快速入门.¶
我们通过一个示例来演示如何在C++中创建存储过程。 在这个示例中,我们实现了一个返回顶点总数的过程。
在继续之前,请确保您已按照入门指南中的说明安装并启动了Interactive,并且环境变量已正确导出。
定义一个C++存储过程¶
C++存储过程应定义在一个文件中,例如count_vertices.cc。
#include "flex/engines/graph_db/app/app_base.h"
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/utils/app_utils.h"
namespace gs {
// A sample app get the count of the specified vertex label, since no write
// operations are needed we inherit from ReadAppBase. Otherwise you could
// inherit from WriteAppBase.
class CountVertices : public ReadAppBase {
public:
CountVertices() {}
/**
* @brief Query function for query class.
* @param sess: GraphDBSession The interface where you can visit the graph.
* @param input: Decoder From where you could deserialize the input
* parameters.
* @param output: Encoder To where you should encode the output parameters.
*/
bool Query(const gs::GraphDBSession &sess, Decoder &input,
Encoder &output) override {
// First get the read transaction.
auto txn = sess.GetReadTransaction();
// We expect one param of type string from decoder.
if (input.empty()) {
return false;
}
std::string label_name{input.get_string()};
const auto &schema = txn.schema();
if (!schema.has_vertex_label(label_name)) {
return false; // The requested label doesn't exits.
}
auto label_id = schema.get_vertex_label_id(label_name);
// The vertices are labeled internally from 0 ~ vertex_label_num, accumulate
// the count.
output.put_int(txn.GetVertexNum(label_id));
txn.Commit();
return true;
}
};
} // namespace gs
extern "C" {
// Defines how a instance of your procedure is created.
void *CreateApp(gs::GraphDBSession &db) {
gs::CountVertices *app = new gs::CountVertices();
return static_cast<void *>(app);
}
// Defines how a instance of your procedure should be deleted.
void DeleteApp(void *app) {
gs::CountVertices *casted = static_cast<gs::CountVertices *>(app);
delete casted;
}
}
注册并调用存储过程¶
在定义了上述CountVertices过程后,我们可以使用gsctl创建存储过程,或者使用交互式Python/Java SDK。
gsctl¶
部署Interactive后,您可以像创建Cypher存储过程一样注册一个C++存储过程。
定义YAML¶
定义时,C++存储过程的YAML与Cypher过程的区别仅在于type字段,即cpp与cypher。用户可以直接在YAML文件中包含存储过程的实现。
name: test_procedure
description: "Ths is a test procedure"
query: |
#include "flex/engines/graph_db/app/app_base.h"
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/utils/app_utils.h"
namespace gs {
// A sample app get the count of the specified vertex label, since no write
// operations are needed we inherit from ReadAppBase. Otherwise you could
// inherit from WriteAppBase.
class CountVertices : public ReadAppBase {
public:
CountVertices() {}
/**
* @brief Query function for query class.
* @param sess: GraphDBSession The interface where you can visit the graph.
* @param input: Decoder From where you could deserialize the input
* parameters.
* @param output: Encoder To where you should encode the output parameters.
*/
bool Query(const gs::GraphDBSession &sess, Decoder &input,
Encoder &output) override {
// First get the read transaction.
auto txn = sess.GetReadTransaction();
// We expect one param of type string from decoder.
if (input.empty()) {
return false;
}
std::string label_name{input.get_string()};
const auto &schema = txn.schema();
if (!schema.has_vertex_label(label_name)) {
return false; // The requested label doesn't exits.
}
auto label_id = schema.get_vertex_label_id(label_name);
// The vertices are labeled internally from 0 ~ vertex_label_num, accumulate
// the count.
output.put_int(txn.GetVertexNum(label_id));
txn.Commit();
return true;
}
};
} // namespace gs
extern "C" {
// Defines how a instance of your procedure is created.
void *CreateApp(gs::GraphDBSession &db) {
gs::CountVertices *app = new gs::CountVertices();
return static_cast<void *>(app);
}
// Defines how a instance of your procedure should be deleted.
void DeleteApp(void *app) {
gs::CountVertices *casted = static_cast<gs::CountVertices *>(app);
delete casted;
}
}
type: cpp
您可能会发现C++代码过于冗长,且难以更新,尤其是在需要修改时。幸运的是,我们支持从文件上传过程实现,您只需提供C++文件的完整路径,并在前面加上@符号即可。
name: test_procedure
description: "Ths is a test procedure"
query: "@/path/to/procedure.cc"
type: cpp
Python SDK¶
安装Interactive Python SDK后,您可以通过以下代码轻松创建存储过程。
import os
from gs_interactive.client.driver import Driver
from gs_interactive.models import *
driver = Driver() # connecting to Interactive service, assuming environment variables like INTERACTIVE_ADMIN_ENDPOINT have been correctly exported.
sess = driver.session() # create the session
# Read the content of the procedure <count_vertices.cc>
app_path='/path/to/count_vertices.cc' # Replace the path with the real path to count_vertices.cc
if not os.path.exists(app_path):
raise Exception("count_vertices.cc not found")
with open(app_path, "r") as f:
app_content = f.read()
# we try to create the procedure on default graph, you may try to create on your customized graph by changing the graph_id
graph_id = '1'
create_proc_request = CreateProcedureRequest(
name="count_vertices",
description="Count vertices for the specified vertex label name",
query=app_content,
type="cpp",
)
resp = sess.create_procedure(graph_id, create_proc_request)
print(resp)
assert resp.is_ok()
有关Python SDK接口的更多详情,请参阅Java SDK Procedure API。
Java SDK¶
安装Interactive Java SDK后,您可以通过以下代码轻松创建存储过程。
import com.alibaba.graphscope.interactive.client.Driver;
import com.alibaba.graphscope.interactive.client.Session;
import com.alibaba.graphscope.interactive.models.*;
import com.alibaba.graphscope.interactive.client.common.Result;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.IOException;
import java.net.URISyntaxException;
import org.junit.jupiter.api.Test;
public class CreateProcedureTest{
public static void createProcedure(Session sess, String graphId) {
CreateProcedureRequest procedure = new CreateProcedureRequest();
procedure.setName("count_vertices");
procedure.setDescription("Count vertices for the specified vertex label name");
String appFilePath = "/path/to/count_vertices.cc"; // Please replace with the real path to count_vertices.cc
// check file exist
if (Files.notExists(Paths.get(appFilePath))) {
throw new RuntimeException("sample app file not exist");
}
String appFileContent = "";
try {
appFileContent =
new String(
Files.readAllBytes(Paths.get(appFilePath)));
} catch (IOException e) {
e.printStackTrace();
}
if (appFileContent.isEmpty()) {
throw new RuntimeException("sample app content is empty");
}
procedure.setQuery(appFileContent);
procedure.setType(CreateProcedureRequest.TypeEnum.CPP);
Result<CreateProcedureResponse> resp = sess.createProcedure(graphId, procedure);
if (!resp.isOk()) {
throw new RuntimeException("Fail to create procedure" + resp.getStatusMessage());
}
}
public static void main(String[] args) {
Driver driver = Driver.connect();
Session sess = driver.session();
createProcedure(sess, "1");
return ;
}
}
有关Java SDK接口的更多详情,请参阅Java SDK Procedure API。
创建存储过程¶
首先,切换到您想要创建存储过程的图数据库。我们将以内置图为例进行说明。如需了解如何创建自定义图,请参阅使用自定义图。
gsctl use GRAPH gs_interactive_default_graph
然后使用gsctl创建该过程:
gsctl create storedproc -f ./procedure.yaml
这将启动编译过程,将C++代码转换为动态库,可能需要几秒钟时间。编译完成后,必须重启服务才能激活存储过程。
gsctl service restart
图数据库引擎¶
交互式设计遵循事务(Transaction)原则,用户应使用ReadTransaction、InsertTraction或UpdateTransaction。
我们建议您详细阅读GraphDB的代码,如果您希望编写性能最优的存储过程。若遇到任何问题,欢迎通过提交issue或创建讨论联系我们。
查询输入与输出¶
交互式原生支持两种参数编码和结果解码协议:Encoder/Decoder 和 CypherApp。
编码器/解码器¶
Encoder/Decoder 基于方法提供了最佳性能。序列化/反序列化可由用户自定义。
在该序列化协议中,用户需要自行处理参数的编码和解码。
例如,上述示例程序 count_vertices.cc 使用 Encoder/Decoder 来编码输入和解码输出。
以下是一个示例,展示如何使用Interactive Java SDK和Python SDK查询count_vertices过程。请注意,您需要切换到ID为"1"的图才能使该过程可调用。
import com.alibaba.graphscope.interactive.client.Driver;
import com.alibaba.graphscope.interactive.client.Session;
import com.alibaba.graphscope.interactive.models.*;
import com.alibaba.graphscope.interactive.client.common.Result;
import com.alibaba.graphscope.interactive.client.utils.Encoder;
import com.alibaba.graphscope.interactive.client.utils.Decoder;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.IOException;
import java.net.URISyntaxException;
import org.junit.jupiter.api.Test;
public class CreateProcedureTest{
public static void callProcedure(Session sess, String graphId, String procedureName, String labelName) {
byte[] bytes = new byte[1 + 4 + labelName.length()]; // 1 byte for procedure index, 4 bytes for label length, and labelName.length() bytes for label name
Encoder encoder = new Encoder(bytes);
encoder.put_string(labelName);
encoder.put_byte((byte) 1); // Assume the procedure index is 1
Result<byte[]> resp = sess.callProcedureRaw(graphId, bytes);
if (!resp.isOk()) {
throw new RuntimeException("Fail to call procedure" + resp.getStatusMessage());
}
Decoder decoder = new Decoder(resp.getValue());
int count = decoder.get_int();
System.out.println("Count of vertices with label " + labelName + " is " + count); // should be 4
}
public static void startServiceOnGraph(Session sess, String graphId) {
Result<StartServiceResponse> resp = sess.startService(new StartServiceRequest().graphId(graphId));
if (!resp.isOk()) {
throw new RuntimeException("Fail to start service" + resp.getStatusMessage());
}
}
public static void main(String[] args) {
Driver driver = Driver.connect();
Session sess = driver.session();
startServiceOnGraph(sess, "1"); // Procedure is only runnable after service has been switched to graph 1
callProcedure(sess, "1", "count_vertices", "person"); // count how many vertices are labeled with person.
return ;
}
}
from gs_interactive.client.driver import Driver
from gs_interactive.models import *
from gs_interactive.client.utils import *
driver = Driver() # connecting to Interactive service, assuming environment variables like INTERACTIVE_ADMIN_ENDPOINT have been correctly exported.
sess = driver.session() # create the session
# Use a encoder to encode input request
encoder = Encoder()
encoder.put_string("person") # input label name
encoder.put_byte(1) # procedure id 1
resp = sess.call_procedure_raw(graph_id="1", params=encoder.get_bytes())
assert resp.is_ok()
decoder = Decoder(resp.value)
num = decoder.get_int()
print(f"vertices num: {num}")
编程接口¶
要创建一个满足您需求的高效流程,理解编程接口和交互式存储接口至关重要。我们建议阅读Interactive的源代码,您也可以访问生成的API文档这里。