Python SDK 参考文档¶
交互式Python SDK参考手册是一份全面的指南,旨在帮助开发者将交互式服务集成到他们的Python应用程序中。该SDK允许用户无缝连接到Interactive,并利用其强大的功能进行图管理、存储过程管理和查询执行。
系统要求.¶
Python >= 3.8
安装与使用¶
注意
如果您希望将安装与本地Python环境隔离,可以考虑使用虚拟环境virtualenv来创建一个新的环境。
pip安装¶
pip3 install gs_interactive
然后导入该包:
import gs_interactive
Setuptools¶
通过Setuptools安装。
python3 setup.py build_proto
python3 setup.py install --user
(或使用 sudo python3 setup.py install 为所有用户安装该软件包)
然后导入该包:
import gs_interactive
测试¶
执行 pytest 来运行测试。
快速开始¶
首先,通过交互式入门指南安装并启动交互式服务,您将获得该服务的所有端点。
You can connect to Interactive service with Interactive SDK, with following environment variables declared.
############################################################################################
export INTERACTIVE_ADMIN_ENDPOINT=http://127.0.0.1:7777
export INTERACTIVE_STORED_PROC_ENDPOINT=http://127.0.0.1:10000
export INTERACTIVE_CYPHER_ENDPOINT=neo4j://127.0.0.1:7687
############################################################################################
注意
如果在部署Interactive时自定义了端口,请记得将默认端口替换为您自定义的端口。
请记得导出这些环境变量。
连接并提交查询¶
在通过Python SDK连接交互式服务并提交查询之前,请确保该服务正在目标图上运行。
gsctl use GRAPH <graph_name>
gsctl service status
现在通过交互式Python SDK提交查询。
from gs_interactive.client.driver import Driver
from gs_interactive.client.session import Session
from gs_interactive.models import *
driver = Driver()
with driver.getNeo4jSession() as session:
result = session.run("MATCH(n) RETURN COUNT(n);")
for record in result:
print(record)
创建新图¶
要创建新图,用户需要指定名称、描述、顶点类型和边类型。 有关图的详细数据模型,请参阅数据模型。
在本示例中,我们将创建一个仅包含一种顶点类型persson和一种名为knows的边类型的简单图。
def create_graph(sess : Session):
# Define the graph schema via a python dict.
test_graph_def = {
"name": "test_graph",
"description": "This is a test graph",
"schema": {
"vertex_types": [
{
"type_name": "person",
"properties": [
{
"property_name": "id",
"property_type": {"primitive_type": "DT_SIGNED_INT64"},
},
{
"property_name": "name",
"property_type": {"string": {"long_text": ""}},
},
{
"property_name": "age",
"property_type": {"primitive_type": "DT_SIGNED_INT32"},
},
],
"primary_keys": ["id"],
}
],
"edge_types": [
{
"type_name": "knows",
"vertex_type_pair_relations": [
{
"source_vertex": "person",
"destination_vertex": "person",
"relation": "MANY_TO_MANY",
}
],
"properties": [
{
"property_name": "weight",
"property_type": {"primitive_type": "DT_DOUBLE"},
}
],
"primary_keys": [],
}
],
},
}
create_graph_request = CreateGraphRequest.from_dict(test_graph_def)
resp = sess.create_graph(create_graph_request)
assert resp.is_ok()
return resp.get_value().graph_id
driver = Driver()
sess = driver.session()
graph_id = create_graph(sess)
print("Created graph, id is ", graph_id)
在上述示例中,使用Python字典定义了一个名为test_graph的图。您也可以通过CreateGraphRequest提供的编程接口来定义图。当调用createGraph方法时,会返回一个表示该图唯一标识符的字符串。
注意
您可能会注意到,我们使用gsctl以YAML格式定义图模式,但在Python代码中切换为使用dict。在不同格式之间转换时可能会遇到挑战。
不过,将YAML转换为Python的dict相当方便。
首先,安装pyYAML
pip3 install pyYAML
然后使用pyYAML将YAML字符串转换为Python字典
import yaml
yaml_string = """
...
"""
python_dict = yaml.safe_load(yaml_string)
print(python_dict)
之后,您可以从Python字典创建CreateGraphRequest。
将数据导入图¶
创建新图后,您可能需要将数据导入到新创建的图中。 有关数据导入的详细配置,请参阅数据导入配置。
例如,您可以将本地CSV文件导入test_graph。请注意,目前仅支持CSV格式文件。test_graph的原始数据可在GraphScope Interactive Github仓库获取,您可以通过以下命令下载这些数据。
wget https://raw.githubusercontent.com/alibaba/GraphScope/main/flex/interactive/examples/modern_graph/person.csv
wget https://raw.githubusercontent.com/alibaba/GraphScope/main/flex/interactive/examples/modern_graph/person_knows_person.csv
现在在Python解释器中运行以下代码,记得将/path/to/person.csv和/path/to/person_knows_person.csv替换为实际的本地路径。
def bulk_loading(sess: Session, graph_id : str):
test_graph_datasource = {
"loading_config":{
"data_source":{
"scheme": "file"
},
"import_option": "init",
"format":{
"type": "csv"
},
},
"vertex_mappings": [
{
"type_name": "person",
"inputs": ["@/path/to/person.csv"],
"column_mappings": [
{"column": {"index": 0, "name": "id"}, "property": "id"},
{"column": {"index": 1, "name": "name"}, "property": "name"},
{"column": {"index": 2, "name": "age"}, "property": "age"},
],
}
],
"edge_mappings": [
{
"type_triplet": {
"edge": "knows",
"source_vertex": "person",
"destination_vertex": "person",
},
"inputs": [
"@/path/to/person_knows_person.csv"
],
"source_vertex_mappings": [
{"column": {"index": 0, "name": "person.id"}, "property": "id"}
],
"destination_vertex_mappings": [
{"column": {"index": 1, "name": "person.id"}, "property": "id"}
],
"column_mappings": [
{"column": {"index": 2, "name": "weight"}, "property": "weight"}
],
}
],
}
bulk_load_request = SchemaMapping.from_dict(test_graph_datasource)
resp = sess.bulk_loading(graph_id, bulk_load_request)
assert resp.is_ok()
job_id = resp.get_value().job_id
print('The bulkloading job id is ', job_id)
# wait until the job has completed successfully.
while True:
resp = sess.get_job(job_id)
assert resp.is_ok()
status = resp.get_value().status
print("job status: ", status)
if status == "SUCCESS":
break
elif status == "FAILED":
raise Exception("job failed")
else:
time.sleep(1)
# We assume you have created the graph, uncomment
# following code if you haven't create the new graph yet.
#
# driver = Driver()
# sess = driver.session()
# graph_id = create_graph(sess)
# print("Created graph, id is ", graph_id)
#
bulk_loading(sess, graph_id)
对于每个顶点/边类型,您需要提供输入数据源和列映射信息。
请记住在本地文件路径的开头添加@。
Session.bulkLoading()会向服务提交一个数据加载作业,我们可以通过Session.getJobStatus()查询作业状态,并等待作业成功完成。
创建存储过程¶
存储过程可以注册到GraphScope Interactive中,用于封装和复用复杂的图操作。Interactive支持将cypher和c++查询作为存储过程。
通过以下代码,您将创建一个名为testProcedure的过程,该过程通过cypher查询定义。
# Create Graph
# ...
# Bulk loading
# ...
proc_name="test_procedure"
create_proc_request = CreateProcedureRequest(
name=proc_name,
description="test procedure",
query="MATCH (n) RETURN COUNT(n);",
type="cypher",
)
resp = sess.create_procedure(graph_id, create_proc_request)
assert resp.is_ok()
print("successfully create procedure: ", proc_name)
当前无法调用该过程,因为交互式服务尚未切换到新创建的modern_graph。我们需要在modern_graph上启动服务。
在新图上启动查询服务¶
虽然Interactive在逻辑和存储层面支持多图操作,但它一次只能服务于一个图。这意味着在任何时刻,只有一张图可提供查询服务。因此我们需要通过以下代码切换到新创建的modern_graph。
resp = sess.start_service(
start_service_request=StartServiceRequest(graph_id=graph_id)
)
assert resp.is_ok()
print("start service ok", resp)
向新图提交查询¶
在新图上启动查询服务后,我们现在可以向modern_graph提交查询请求。
提交Cypher查询¶
query = "MATCH (n) RETURN COUNT(n);"
with driver.getNeo4jSession() as session:
resp = session.run(query)
for record in resp:
print(record)
删除图¶
最后,我们可以删除图数据。与该图绑定的图数据和存储过程也将被删除。 当前交互式服务禁止删除正在服务中使用的图,因此要删除图,请先停止服务。
# stop the service first, note that graph_id is optional.
resp = sess.stop_service(graph_id)
assert resp.is_ok()
print("successfully stopped the service")
resp = sess.delete_graph(graph_id)
assert resp.is_ok()
print("delete graph res: ", resp)
完整示例请参考Python SDK Example
服务API文档¶
交互式SDK中的服务API分为五大类。
图管理API
ProcedureManagementApi
作业管理API
服务管理API
查询服务API
顶点API
EdgeApi
所有URI均相对于${INTERACTIVE_ADMIN_ENDPOINT}
类 |
方法 |
HTTP请求 |
描述 |
|---|---|---|---|
GraphManagementApi |
POST /v1/graph/{graph_id}/dataloading |
||
GraphManagementApi |
POST /v1/graph |
||
GraphManagementApi |
DELETE /v1/graph/{graph_id} |
||
GraphManagementApi |
GET /v1/graph/{graph_id} |
||
GraphManagementApi |
GET /v1/graph/{graph_id}/schema |
||
GraphManagementApi |
GET /v1/graph |
||
GraphManagementApi |
GET /v1/graph/{graph_id}/statistics |
||
JobManagementApi |
DELETE /v1/job/{job_id} |
||
JobManagementApi |
GET /v1/job/{job_id} |
||
JobManagementApi |
GET /v1/job |
||
ProcedureManagementApi |
POST /v1/graph/{graph_id}/procedure |
||
ProcedureManagementApi |
DELETE /v1/graph/{graph_id}/procedure/{procedure_id} |
||
ProcedureManagementApi |
GET /v1/graph/{graph_id}/procedure/{procedure_id} |
||
ProcedureManagementApi |
GET /v1/graph/{graph_id}/procedure |
||
ProcedureManagementApi |
PUT /v1/graph/{graph_id}/procedure/{procedure_id} |
||
ServiceManagementApi |
GET /v1/service/status |
||
ServiceManagementApi |
POST /v1/service/restart |
||
ServiceManagementApi |
POST /v1/service/start |
||
ServiceManagementApi |
POST /v1/service/stop |
||
QueryServiceApi |
POST /v1/graph/{graph_id}/query |
||
QueryServiceApi |
POST /v1/graph/current/query |
||
VertexApi |
POST /v1/graph/{graph_id}/vertex |
向图中添加顶点 |
|
VertexApi |
GET /v1/graph/{graph_id}/vertex |
通过顶点主键获取顶点属性。 |
|
VertexApi |
PUT /v1/graph/{graph_id}/vertex |
更新顶点属性 |
|
EdgeApi |
POST /v1/graph/{graph_id}/edge |
向图中添加边 |
|
EdgeApi |
GET /v1/graph/{graph_id}/edge |
通过源顶点和目标顶点的主键获取边的属性。 |
|
EdgeApi |
PUT /v1/graph/{graph_id}/edge |
更新边的属性 |