Groot: 持久化图存储

概述

除了GraphScope支持的内存列式图存储Vineyard外,我们还提供基于磁盘、面向行存储、多版本化的持久化图存储。Vineyard主要专注于对内存中全图分析工作负载的卓越支持,而持久化图存储则旨在更好地支持频繁更新图数据并响应遍历查询的持续图数据管理服务。

该存储是基于流行的RocksDB键值存储构建的分布式图存储系统。它采用面向行的设计来支持对图进行频繁的小规模更新。每行数据都标记有快照ID作为其版本号。查询操作会读取相对于其启动时快照ID的最新数据版本,因此不会被写入操作阻塞。在写入方面,我们在一致性和更高吞吐量之间采取了折中方案。在我们的设计中,同一会话中的写入操作可以分组并作为一个原子单元执行,持久化存储会为每个组分配一个快照ID(即当前时间的低精度时间戳),并按快照ID顺序执行写入组,对于同一快照ID内发生的多个写入组则采用确定性(但任意)顺序执行。这种设计在保持一定程度顺序性和隔离性的同时提供了高写入吞吐量,虽然它提供的一致性弱于数据库中常见的严格快照隔离级别。我们希望这一设计选择能为实际应用场景提供一种有意义的权衡方案。

已知限制

最初,新的持久化存储作为独立于Vineyard的选项提供,它可以接受Gremlin查询进行数据访问。展望未来,我们希望将其发展成适合各类工作负载的集成式混合图存储。

部署Groot

我们使用HelmKubernetes集群上部署Groot。

先决条件

  • Kubernetes 1.21+

  • Helm 3.2.0+

如果您没有Kubernetes集群,可以通过Docker Desktop、Minikube或Kind创建一个本地集群。

如果您没有Kubernetes集群,可以通过Docker Desktopminikubekind创建一个本地集群。

更多详细指南请参考在自托管k8s集群上部署graphscope

安装

从ArtifactHub安装

最新稳定版本的Groot可以通过以下命令从ArtifactHub安装:

helm repo add graphscope https://graphscope.oss-cn-beijing.aliyuncs.com/charts/
helm repo update
helm install demo graphscope/graphscope-store

从本地目录安装

如果您想应用最新更新或修改某些文件,可以克隆GraphScope仓库并通过以下命令从本地目录安装Groot:

cd GraphScope/charts/graphscope-store
helm dependency update  # fetch the dependency charts
helm install demo .

上述命令将使用默认配置部署Groot。安装过程中可配置的项目可在Common Configurations部分找到。

首次启动Groot服务可能需要一些时间,因为需要拉取镜像。您可以使用以下命令检查服务是否可用:

helm test demo

Helm将在控制台打印以下语句,您可以复制并执行以获取连接地址。

您还可以通过以下命令检查部署状态并获取连接地址:

helm status demo

通用配置

名称

描述

默认值

image.registry

镜像仓库

registry.cn-hongkong.aliyuncs.com

image.repository

镜像仓库

graphscope/graphscope-store

image.tag

镜像标签,默认为Chart的版本

“”

auth.username

用户名。如果为空,则表示无需认证

“”

auth.password

密码

“”

store.replicaCount

存储Pod数量

2

dataset.modern

在开始时加载modern graph数据集

false

frontend.replicaCount

前端副本数量

1

frontend.service.type

前端服务的Kubernetes服务类型

NodePort

frontend.query.per.second.limit

前端服务能处理的最大每秒查询量

2147483647 (无限制)

query.execution.timeout.ms

查询的总执行时间

3000000

如果Groot以默认配置启动,那么将会启动两个存储Pod、一个前端Pod和一个协调器Pod。协调器节点的数量固定为1。

使用 --set key=value[,key=value] 命令为 helm install 设置参数,例如:

helm install demo graphscope/graphscope-store \
    --set auth.username=admin,auth.password=123456

上述命令配置了连接集群所需的用户名和密码。

在需要设置大量参数的情况下,使用--set选项可能会变得难以管理。此时,可以通过YAML文件来指定参数,如下例所示:

helm install demo graphscope/graphscope-store -f settings.yaml

settings.yaml 的示例配置如下:

# cat settings.yaml
---
image:
  tag: latest
auth:
  username: admin
  password: 123456

在设置用户名和密码的同时,它将指定要拉取的镜像标签为最新版本。

连接到Groot

安装Groot后,默认会创建一个空图。我们可以执行连接、定义图模型、加载数据,并使用Gremlin查询语言进行查询。

连接

在上一步中,执行通过Helm打印的连接信息获取命令后,所述信息已设置为环境变量。可以使用以下语句来获取并连接到Groot:

import os
import graphscope
node_ip = os.environ["NODE_IP"]
grpc_port = os.environ["GRPC_PORT"]
gremlin_port = os.environ["GREMLIN_PORT"]
grpc_endpoint = f"{node_ip}:{grpc_port}"
gremlin_endpoint = f"{node_ip}:{gremlin_port}"

conn = graphscope.conn(grpc_endpoint, gremlin_endpoint)

如果在安装过程中配置了用户名和密码,则在建立连接时需要提供这些凭据。

conn = graphscope.conn(grpc_endpoint, gremlin_endpoint, username="admin", password="123456")

构建与修改图模型

可以通过conn对象获取图对象。

graph = conn.g()
# Create schema
schema = graph.schema()

使用内置数据集

如果在安装过程中设置了dataset.modern=true,Groot将加载一个简单的示例数据集以便快速入门。

注意

目前暂不支持

自定义模型与数据集

用户还可以自定义模型并加载自己的数据集。

用于定义图模型的常见语句如下:

schema.add_vertex_label('v_label_name').add_primary_key('pk_name', 'type').property('prop_name_1', 'type').property('prop_name_2', 'type')
schema.add_edge_label('e_label_name').source('src_label').destination('dst_label').property('prop_name_3', 'type')
schema.drop('label')
schema.drop('label', 'src_label', 'dst_label')
schema.update()

图模型定义了多个标签,每个标签包含一个标签名称和若干属性(.property())。

其中,点标签可以定义主键(.add_primary_key()),边标签需要定义源标签(.source())和目标标签(.destination())。.drop()用于删除标签,.update()提交事务以应用更改。

这是一个定义相识人群关系的简单模型示例,标签为person -> knows <- person。该模型包含:

person 标签,包含一个主键名为 id,类型为 long,以及一个名为 name 的属性,类型为 strknows 标签,包含一个主键名为 date,类型为 str,其源标签和目标标签均为 person。

schema.add_vertex_label("person").add_primary_key("id", "long").add_property(
        "name", "str"
    )
schema.add_edge_label("knows").source("person").destination("person").add_property(
        "date", "str"
    )
schema.update()

数据查询

Python

利用之前获取的连接信息,我们可以在Python中执行Gremlin查询。

g = conn.gremlin()
print(g.V().count().toList())

或者,我们可以直接从连接信息中获取Gremlin的IP地址和端口,并使用gremlinpython库进行查询。

  1. 安装 gremlinpython

pip install gremlinpython ‑‑user
  1. 复制以下代码并将endpoint设置为先前获取的连接信息:

import os
from gremlin_python.driver.client import Client

endpoint = f"{os.environ['NODE_IP']}:{os.environ['GREMLIN_PORT']}"
graph_url = f"ws://{endpoint}/gremlin"
username = "<username>"
password = "<password>"
client = Client(
    graph_url,
    "g",
    username=username,  # If auth enabled
    password=password,  # If auth enabled
    )
print(client.submit("g.V().limit(2)").all().result())
client.close()

Java

  1. 创建如下目录结构,其中pom.xml和Main.java是文件

gremlin
├── pom.xml
└── src
    ├── main
        ├── java
            └── org
                └── example
                    └── Main.java
  1. 按如下方式配置 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>gremlin</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <packaging>jar</packaging>
    <name>GremlinExample</name>
    <url>https://maven.apache.org</url>
    <dependencies>
        <dependency>
            <groupId>org.apache.tinkerpop</groupId>
            <artifactId>gremlin-driver</artifactId>
            <version>3.6.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.10.1</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <executable>java</executable>
                    <arguments>
                        <argument>-classpath</argument>
                        <classpath/>
                        <argument>org.example.Main</argument>
                    </arguments>
                    <mainClass>org.example.Main</mainClass>
                    <complianceLevel>1.11</complianceLevel>
                    <killAfter>-1</killAfter>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
  1. 按如下方式配置 Main.java

package org.example;

import org.apache.tinkerpop.gremlin.driver.*;

public class Main {

    public static void main(String[] args) {
        Cluster.Builder builder = Cluster.build();
        builder.addContactPoint("127.0.0.1");
        builder.port(8182);
        builder.credentials("username", "password");
        Cluster cluster = builder.create();

        Client client = cluster.connect();
        ResultSet results = client.submit("g.V().limit(3).valueMap()");
        for (Result result : results) {
            System.out.println(result.getObject());
        }
        client.close();
        cluster.close();
    }
}
  1. 执行程序

mvn compile exec:exec

Node.js

  1. 安装 JavaScript 的 gremlin

npm install gremlin
  1. 执行这些代码

const gremlin = require('gremlin');
const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection;
const Graph = gremlin.structure.Graph;

graph_url = `ws://{gremlin_endpoint}/gremlin`
remoteConn = new DriverRemoteConnection(graph_url,{});

const graph = new Graph();
const g = graph.traversal().withRemote(remoteConn);

g.V().limit(2).count().next().
    then(data => {
        console.log(data);
        remoteConn.close();
    }).catch(error => {
        console.log('ERROR', error);
        remoteConn.close();
    });

由于目前我们仅定义了schema,还没有加载任何数据,查询结果将是空的。因此下一步是加载数据。

数据导入

有两种导入数据的方法。一种方法是使用离线导入工具从外部存储(如HDFS)批量导入数据,另一种方法是使用SDK提供的语句进行实时写入。

注意:离线导入将*覆盖导入标签的全部数据。

离线导入

前提条件

解压data_load.tar.gz文件,其中data_load/bin/load_tool.sh是下文将要用到的工具。

tar xzvf data_load.tar.gz

数据格式

源数据需要以特定格式存储在HDFS中。每个文件包含与某类顶点或边标签相关的数据。

以下是关于person顶点标签和knows边标签的数据示例,其中包含person->knows<-person关系。

  • person.csv

id|name
1000|Alice
1001|Bob
  • person_knows_person.csv

person_id|person_id_1|date
1000|1001|20210611151923

数据文件的第一行是描述每个字段键值的标题行。标题行不是必需的。如果数据文件中没有标题行,您需要在数据构建过程中将skip.header设置为true(详情请参阅“构建分区图”中的参数说明)。

其余行是数据记录。每行代表一条记录。数据字段由自定义分隔符分隔(上例中使用的是"|")。在顶点数据文件person.csv中,id字段和name字段分别是顶点类型person的主键和属性。在边数据文件person_knows_person.csv中,person_id字段是源顶点的主键,person_id_1字段是目标顶点的主键,date是边类型knows的属性。

所有数据字段将根据图模式中定义的数据类型进行解析。如果输入数据字段无法正确解析,数据构建过程将失败并返回相应的错误信息。

加载过程

加载过程包含三个步骤:

  1. 分区图通过MapReduce作业从源文件构建,并存储在同一HDFS中

  2. 图分区被并行加载到存储服务器中

  3. 提交到在线服务,使数据准备好处理查询

构建:构建一个分区图

通过运行以下命令的hadoop map-reduce作业来构建数据:

$ ./load_tool.sh build <path/to/config/file>

配置文件应遵循Java java.util.Properties类可识别的格式。示例如下:

split.size=256
separator=\\|
input.path=/tmp/ldbc_sample
output.path=/tmp/data_output
graph.endpoint=1.2.3.4:55555
column.mapping.config={"person_0_0.csv":{"label":"person","propertiesColMap":{"0":"id","1":"name"}},"person_knows_person_0_0.csv":{"label":"knows","srcLabel":"person","dstLabel":"person","srcPkColMap":{"0":"id"},"dstPkColMap":{"1":"id"},"propertiesColMap":{"2":"date"}}}
skip.header=true
load.after.build=true
# This is not required when load.after.build=true
# hadoop.endpoint=127.0.0.1:9000

参数详情如下:

配置键

必填

默认值

描述

split.size

false

256

Hadoop map-reduce输入数据分片大小(单位:MB)

separator

false

\|

用于解析行中每个字段的分隔符

input.path

true

-

输入HDFS目录

output.path

true

-

输出HDFS目录

graph.endpoint

true

-

图存储服务的RPC端点。您可以按照以下文档获取RPC端点:GraphScope Store Service

列映射配置

true

-

以JSON格式描述每个输入文件的映射信息。第一层中的每个键应为可在input.path中找到的文件名,对应的值定义映射信息。对于顶点类型,映射信息应包含:1) 顶点类型的label,2) propertiesColMap,描述从输入字段到图属性的映射,格式为{ columnIdx: "propertyName" }。对于边类型,映射信息应包含:1) 边类型的label,2) 源顶点类型的srcLabel,3) 目标顶点类型的dstLabel,4) srcPkColMap,描述从输入字段到源顶点类型主键图属性的映射,5) dstPkColMap,描述从输入字段到目标顶点类型主键图属性的映射,6) propertiesColMap,描述从输入字段到边类型图属性的映射。

skip.header

false

true

是否跳过输入文件的第一行

load.after.build

false

false

是否立即加载并提交构建好的文件

hadoop.endpoint

false

-

Hadoop集群的端点,格式为:。当load.after.build设置为true时不需要填写

数据构建完成后,您可以在HDFS的output.path中找到输出文件。输出文件包括一个名为META的元数据文件、一个名为_SUCCESS的空文件,以及一些数据文件,每个分区对应一个文件,文件名格式为part-r-xxxxx.sst。输出目录的结构应如下所示:

/tmp/data_output
  |- META
  |- _SUCCESS
  |- part-r-00000.sst
  |- part-r-00001.sst
  |- part-r-00002.sst
  ...

如果 load.after.build=true,则可以跳过步骤2和3。 否则,请继续执行数据加载和提交操作。

2. 加载图分区

现在将离线构建的数据导入图存储中。运行:

$ ./load_data.sh ingest <path/to/config/file>

离线构建的数据只能成功导入一次,否则会出现错误。

3. 提交到存储服务

数据导入图存储后,您需要提交数据加载。在成功提交之前,数据将无法读取。运行:

$ ./load_data.sh commit <path/to/config/file>

注意:后提交的数据将覆盖之前提交的具有相同顶点类型或边关系的数据。

实时写入

Groot图有以下几种实时写入方法:

Python

参考test_store_service.py中的示例。

# Inserts one vertex
def insert_vertex(self, vertex: VertexRecordKey, properties: dict) -> int: pass

# Inserts a list of vertices
def insert_vertices(self, vertices: list) -> int: pass

# Update one vertex to new properties
def update_vertex_properties(self, vertex: VertexRecordKey, properties: dict) -> int: pass

# Delete one vertex
def delete_vertex(self, vertex_pk: VertexRecordKey) -> int: pass

# Delete a list of vertices
def delete_vertices(self, vertex_pks: list) -> int: pass

# Insert one edge
def insert_edge(self, edge: EdgeRecordKey, properties: dict) -> int: pass

# Insert a list of edges
def insert_edges(self, edges: list) -> int: pass

# Update one edge to new properties
def update_edge_properties(self, edge: EdgeRecordKey, properties: dict) -> int: pass

# Delete one edge
def delete_edge(self, edge: EdgeRecordKey) -> int: pass

# Delete a list of edges
def delete_edges(self, edge_pks: list) -> int: pass

# Make sure the snapshot is available
def remote_flush(self, snapshot_id: int): pass

我们使用两个名为VertexRecordKeyEdgeRecordKey的工具类来表示唯一标识记录的键。

class VertexRecordKey:
    """Unique identifier of a vertex.
    The primary key may be a dict, the key is the property name,
    and the value is the data.
    """
    def __init__(self, label, primary_key):
        self.label: str = label
        self.primary_key: dict = primary_key

class EdgeRecordKey:
    """Unique identifier of an edge.
    The `eid` is required in Update and Delete, which is a
    system generated unsigned integer. User need to get that eid
    by other means such as gremlin query.
    """
    def __init__(self, label, src_vertex_key, dst_vertex_key, eid=None):
        self.label: str = label
        self.src_vertex_key: VertexRecordKey = src_vertex_key
        self.dst_vertex_key: VertexRecordKey = dst_vertex_key
        self.eid: int = eid  # Only required in update and delete operation

Java

我们还提供了一个用于实时写入和模式管理的Java SDK。

包含以下API接口:

  • 创建和检查图模式

  • 插入/删除/更新顶点

  • 插入/删除/更新边

  • 通过属性名称清除顶点或边的属性

参考RealtimeWrite.java中的示例。

其他功能

Groot 允许用户从特定偏移量或时间戳重放实时写入记录,这在您想要恢复离线加载完成前的某些记录时非常有用,因为离线加载会覆盖所有记录。

您只能指定offsettimestamp中的一个参数。另一个未使用的参数必须设置为-1。如果不这样做,offset将优先生效。

示例API:

  • Python:

    import time
    import graphscope
    conn = graphscope.conn()
    current_timestamp = int(time.time() * 1000) - 100 * 60 * 1000
    
    r = conn.replay_records(-1, current_timestamp)
    
  • Java

    GrootClient client = GrootClientBuilder.build();
    long timestamp = System.currentTimeMillis();
    client.replayRecords(-1, timestamp);
    

卸载与重新启动

卸载Groot

要卸载/删除demo Groot集群部署,请使用

helm delete demo

该命令会移除与chart关联的所有Kubernetes组件并删除release。

如果集群支持动态供应,默认情况下Groot会创建一组持久卷声明(PVCs)来申请持久卷(PVs),用于存储元数据和图数据。卸载Groot时默认不会删除这些PV。您可以使用以下命令查询PVC和PV。

kubectl get pvc
kubectl get pv
# To query only the PVC belonging to the demo deployment
kubectl get pvc -lapp.kubernetes.io/instance=demo

重启Groot

要在原始PV上重新启动Groot,使用与初始安装相同的命令。此时,Groot可以访问卸载前的数据,所有其他操作与卸载前相同。这可以实现无缝版本更新,或者在使用云服务提供商时,可以按需卸载Groot以释放弹性计算资源,仅保留块存储以节省成本。

# Note that if the node count is configured during installation, it should be exactly the same when reinstalling.
helm install demo graphscope/graphscope-store

销毁Groot

销毁Groot意味着释放Groot使用的所有资源,包括StatefulSets、Services、PVCs和PVs。

helm delete demo
kubectl delete pvc -lapp.kubernetes.io/instance=demo

# If the PV was dynamically provisioned with a PVC, then there is no need to delete the PV explicitly as it will be deleted automatically with the PVC.

# However, if the PV was manually created, then it must be explicitly deleted.

# To delete a PV, you can use the kubectl delete command followed by the PV name:
# kubectl delete pv ${PV_NAME}

开发指南

构建镜像

cd GraphScope/k8s
make graphscope-store VERSION=latest

这将生成一个名为 graphscope/graphscope-store:latest 的镜像。

持久化

Groot将图数据存储在Store Pod的/var/lib/graphscope-store目录中,并将元数据存储在Coordinator Pod的/etc/groot/my.meta目录中。

故障排除

查看日志

您可以使用以下命令查看每个Pod的日志 kubectl logs ${POD_NAME}

通常需要检查Frontend和Store角色的日志。在调试时,往往还需要检查Coordinator的日志。Frontend的日志包含生成逻辑查询计划的Compiler日志,而Store的日志则包含查询引擎执行的日志。例如,

kubectl logs demo-graphscope-store-frontend-0
kubectl logs demo-graphscope-store-store-0

配置日志

Groot在Java部分使用logback作为日志记录库,在Rust部分使用log4rs作为日志记录库。

这两个日志库都支持配置的自动周期性重载,这意味着日志配置文件可以修改,并在短时间内(最多30秒)生效。

容器中日志配置文件的位置为:

  • logback的配置文件位于/usr/local/groot/conf/logback.xml

  • log4rs的配置文件位于/usr/local/groot/conf/log4rs.yml

次级实例

Groot支持在主实例之外开启辅助实例。它利用RocksDB的Secondary Instance功能,既能处理查询请求,又能同步模式与数据更新。

要使用它,只需在helm charts中设置secondary.enabled=true。 同时请记住数据路径、ZK连接字符串以及Kafka端点和主题应与主实例保持一致。 并为每个次级实例使用不同的zk.base.path,以避免在节点发现时相互冲突。

storeGcIntervalMs 控制次级节点执行 try_catch_up_with_primary 调用的频率,默认为 5000 即5秒。

追踪

使用 --set otel.enabled=true 来启用追踪导出功能。

编写高可用性

在多Pod部署模式下使用--set write.ha.enabled=True来开启备份存储Pod。