camel.storages 包

本页内容

camel.storages 包#

子包#

模块内容#

class camel.storages.BaseGraphStorage[来源]#

基类: ABC

图存储系统的抽象基类。

abstract add_triplet(subj: str, obj: str, rel: str) None[来源]#

在数据库中的两个实体之间添加关系(三元组)。

Parameters:
  • subj (str) – 主体实体的标识符。

  • obj (str) – 对象实体的标识符。

  • rel (str) – 主体与对象之间的关系。

abstract delete_triplet(subj: str, obj: str, rel: str) None[来源]#

从图中删除一个特定的三元组,包含主体、客体和关系。

Parameters:
  • subj (str) – 主体实体的标识符。

  • obj (str) – 对象实体的标识符。

  • rel (str) – 主体与对象之间的关系。

abstract property get_client: Any#

获取底层图存储客户端。

abstract property get_schema: str#

获取图存储的架构

abstract property get_structured_schema: Dict[str, Any]#

获取图存储的结构化模式

abstract query(query: str, params: Dict[str, Any] | None = None) List[Dict[str, Any]][来源]#

使用语句和参数查询图存储。

Parameters:
  • query (str) – 要执行的查询语句。

  • params (Optional[Dict[str, Any]]) – 用于查询的参数字典。默认为None

Returns:

字典列表,每个

字典代表查询结果中的一行数据。

Return type:

List[Dict[str, Any]]

abstract refresh_schema() None[来源]#

刷新图形模式信息。

class camel.storages.BaseKeyValueStorage[来源]#

基类: ABC

一个用于键值存储系统的抽象基类。提供保存、加载和清除数据记录的一致接口,且不会丢失任何信息。

一个抽象基类,旨在作为各种键值存储系统的基础。该类主要通过Python字典进行交互。

这个类旨在被多种键值存储实现继承,包括但不限于JSON文件存储、NoSQL数据库(如MongoDB和Redis)以及内存中的Python字典。

abstract clear() None[来源]#

从键值存储系统中移除所有记录。

abstract load() List[Dict[str, Any]][来源]#

从键值存储系统中加载所有存储的记录。

Returns:

一个字典列表,其中每个字典

代表一条存储的记录。

Return type:

List[Dict[str, Any]]

abstract save(records: List[Dict[str, Any]]) None[来源]#

将一批记录保存到键值存储系统中。

Parameters:

records (List[Dict[str, Any]]) – 一个字典列表,其中每个字典代表要存储的唯一记录。

class camel.storages.BaseVectorStorage[来源]#

基类: ABC

向量存储系统的抽象基类。

abstract add(records: List[VectorRecord], **kwargs: Any) None[来源]#

将向量记录列表保存到存储中。

Parameters:
  • records (List[VectorRecord]) – 要保存的向量记录列表。

  • **kwargs (Any) – 额外的关键字参数。

Raises:

RuntimeError – 如果在保存过程中出现错误。

abstract clear() None[来源]#

从存储中移除所有向量。

abstract property client: Any#

提供对底层向量数据库客户端的访问。

abstract delete(ids: List[str], **kwargs: Any) None[来源]#

从存储中删除由ID标识的向量列表。

Parameters:
  • ids (List[str]) – 待删除向量的唯一标识符列表。

  • **kwargs (Any) – 额外的关键字参数。

Raises:

RuntimeError – 如果在删除过程中出现错误。

get_payloads_by_vector(vector: List[float], top_k: int) List[Dict[str, Any]][来源]#

返回与给定向量最接近的前k个向量记录的有效载荷。

此函数是BaseVectorStorage.query的一个封装。

Parameters:
  • vector (List[float]) – 搜索向量。

  • top_k (int) – 返回最相似向量的数量。

Returns:

检索到的向量载荷列表

基于与查询向量的相似性从存储中获取。

Return type:

列表[列表[字典[str, 任意类型]]]

abstract load() None[来源]#

加载托管在云服务上的集合。

abstract query(query: VectorDBQuery, **kwargs: Any) List[VectorDBQueryResult][来源]#

基于提供的查询在存储中搜索相似向量。

Parameters:
  • query (VectorDBQuery) – 包含搜索向量和要检索的相似向量数量的查询对象。

  • **kwargs (Any) – 额外的关键字参数。

Returns:

从存储中检索到的向量列表

基于与查询向量的相似性。

Return type:

列表[VectorDBQueryResult]

abstract status() VectorDBStatus[来源]#

返回向量数据库的状态。

Returns:

向量数据库状态。

Return type:

VectorDBStatus

class camel.storages.InMemoryKeyValueStorage[来源]#

基类: BaseKeyValueStorage

BaseKeyValueStorage的具体实现,使用内存中的列表。非常适合临时存储用途,因为当程序结束时数据将会丢失。

clear() None[来源]#

从键值存储系统中移除所有记录。

load() List[Dict[str, Any]][来源]#

从键值存储系统中加载所有存储的记录。

Returns:

一个字典列表,其中每个字典

代表一条存储的记录。

Return type:

List[Dict[str, Any]]

save(records: List[Dict[str, Any]]) None[来源]#

将一批记录保存到键值存储系统中。

Parameters:

records (List[Dict[str, Any]]) – 一个字典列表,其中每个字典代表要存储的唯一记录。

class camel.storages.JsonStorage(path: Path | None = None)[来源]#

基类: BaseKeyValueStorage

BaseKeyValueStorage的具体实现,使用JSON文件。允许以人类可读的格式持久化存储记录。

Parameters:

path (Path, optional) – 目标JSON文件的路径。如果为None, 将使用默认路径./chat_history.json。 (默认值: None)

clear() None[来源]#

从键值存储系统中移除所有记录。

load() List[Dict[str, Any]][来源]#

从键值存储系统中加载所有存储的记录。

Returns:

一个字典列表,其中每个字典

代表一条存储的记录。

Return type:

List[Dict[str, Any]]

save(records: List[Dict[str, Any]]) None[来源]#

将一批记录保存到键值存储系统中。

Parameters:

records (List[Dict[str, Any]]) – 一个字典列表,其中每个字典代表要存储的唯一记录。

class camel.storages.Mem0Storage(agent_id: str, api_key: str | None = None, user_id: str | None = None, metadata: Dict[str, Any] | None = None)[来源]#

基类: BaseKeyValueStorage

BaseKeyValueStorage的具体实现,使用Mem0作为后端。该存储系统利用Mem0的文本功能来存储、搜索和管理带上下文的文本。

Parameters:
  • agent_id (str) - 默认关联记忆的代理ID。

  • api_key (str, optional) – 用于身份验证的API密钥。如果未提供,将尝试从环境变量MEM0_API_KEY获取(默认: None)。

  • user_id (str, optional) – 默认关联记忆的用户ID (默认值: None).

  • metadata (Dict[str, Any], optional) - 默认包含在所有记忆中的元数据(默认:None)。

参考文献

https://docs.mem0.ai

clear() None[来源]#

从Mem0存储系统中移除所有记录。

load() List[Dict[str, Any]][来源]#

从Mem0存储系统加载所有存储的记录。

Returns:

一个字典列表,其中每个字典

代表一条存储的记录。

Return type:

List[Dict[str, Any]]

save(records: List[Dict[str, Any]]) None[来源]#

将一批记录保存到Mem0存储系统。

Parameters:

records (List[Dict[str, Any]]) – 一个字典列表,其中每个字典代表要存储的唯一记录。

class camel.storages.MilvusStorage(vector_dim: int, url_and_api_key: Tuple[str, str], collection_name: str | None = None, **kwargs: Any)[来源]#

基础类: BaseVectorStorage

一个实现BaseVectorStorage的类,用于与云原生向量搜索引擎Milvus进行交互。

关于Milvus的详细信息可在以下链接获取: Milvus

Parameters:
  • vector_dim (int) – 存储向量的维度。

  • url_and_api_key (Tuple[str, str]) – 包含用于连接远程Milvus实例的URL和API密钥的元组。 URL对应Milvus的uri概念,通常是"endpoint:port"。 API密钥对应Milvus的token概念,对于自托管实例是"username:pwd", 对于Zilliz Cloud(全托管Milvus)则是API密钥。

  • collection_name (Optional[str], optional) – Milvus中集合的名称。如果未提供,则设置为当前时间的ISO格式。(默认: None)

  • **kwargs (Any) – 用于初始化MilvusClient的额外关键字参数。

Raises:

ImportError – 如果未安装pymilvus包。

add(records: List[VectorRecord], **kwargs) None[来源]#

将向量列表添加到指定的集合中。

Parameters:
  • records (List[VectorRecord]) - 要添加的向量列表。

  • **kwargs (Any) – 传递给插入操作的额外关键字参数。

Raises:

RuntimeError - 如果在添加过程中出现错误。

clear() None[来源]#

从Milvus集合中移除所有向量。此方法会删除现有集合并使用相同的模式重新创建它,从而有效移除所有存储的向量。

property client: Any#

提供对Milvus客户端的直接访问。此属性允许直接与Milvus客户端交互,用于执行MilvusStorage类未涵盖的操作。

Returns:

Milvus客户端实例。

Return type:

任何

delete(ids: List[str], **kwargs: Any) None[来源]#

从存储中删除由ID标识的向量列表。如果不确定ID,可以先查询集合以获取相应的数据。

Parameters:
  • ids (List[str]) – 待删除向量的唯一标识符列表。

  • **kwargs (Any) – 传递给删除操作的额外关键字参数。

Raises:

RuntimeError – 如果在删除过程中出现错误。

load() None[来源]#

加载托管在云服务上的集合。

query(query: VectorDBQuery, **kwargs: Any) List[VectorDBQueryResult][来源]#

基于提供的查询在存储中搜索相似向量。

Parameters:
  • query (VectorDBQuery) – 包含搜索向量和要检索的相似向量数量的查询对象。

  • **kwargs (Any) – 传递给搜索的额外关键字参数。

Returns:

从存储中检索到的向量列表

基于与查询向量的相似性。

Return type:

列表[VectorDBQueryResult]

status() VectorDBStatus[来源]#

获取Milvus集合的当前状态。此方法提供有关集合的信息,包括其向量维度和存储的向量总数。

Returns:

一个包含有关

集合状态信息的对象。

Return type:

VectorDBStatus

class camel.storages.NebulaGraph(host, username, password, space, port=9669, timeout=10000)[来源]#

基类:BaseGraphStorage

add_graph_elements(graph_elements: List[GraphElement]) None[来源]#

向图中添加图形元素(节点和关系)。

Parameters:

graph_elements (List[GraphElement]) – 包含节点和关系的图元素列表。

add_node(node_id: str, tag_name: str, time_label: str | None = None) None[来源]#

添加一个具有指定标签和属性的节点。

Parameters:
  • node_id (str) - 节点的ID。

  • tag_name (str) – 节点的标签名称。

  • time_label (str, optional) – 用于设置节点时间标签属性的特定时间戳。如果未提供,则不会添加时间戳。(默认: None)

add_triplet(subj: str, obj: str, rel: str, time_label: str | None = None) None[来源]#

在Nebula Graph数据库中的两个实体之间添加关系(三元组)。

Parameters:
  • subj (str) – 主体实体的标识符。

  • obj (str) – 对象实体的标识符。

  • rel (str) – 主体与对象之间的关系。

  • time_label (str, optional) – 用于设置关系时间标签属性的特定时间戳。如果未提供,则不会添加时间戳。(默认: None)

Raises:
  • ValueError - 如果time_label格式无效。

  • Exception - 如果创建关系失败时抛出。

delete_entity(entity_id: str) None[来源]#

从图中删除一个实体(顶点)。

Parameters:

entity_id (str) – 要删除的实体的标识符。

delete_triplet(subj: str, obj: str, rel: str) None[来源]#

从Nebula Graph数据库中删除特定的三元组(两个实体之间的关系)。

Parameters:
  • subj (str) – 主体实体的标识符。

  • obj (str) – 对象实体的标识符。

  • rel (str) – 主体与对象之间的关系。

ensure_edge_type_exists(edge_type: str, time_label: str | None = None) None[来源]#

确保指定的边类型存在于NebulaGraph数据库中。如果边类型已存在,该方法不执行任何操作。

Parameters:
  • edge_type (str) - 要创建的边类型的名称。

  • time_label (str, optional) - 用于设置时间标签属性默认值的特定时间戳。如果未提供,则不会添加时间戳。(默认值: None)

Raises:

异常 – 如果边类型创建在多次重试尝试后失败,将抛出带有错误消息的异常。

ensure_tag_exists(tag_name: str, time_label: str | None = None) None[来源]#

确保在NebulaGraph数据库中创建一个标签。如果该标签已存在,则不执行任何操作。

Parameters:
  • tag_name (str) - 要创建的标签名称。

  • time_label (str, optional) – 用于设置为时间标签属性默认值的特定时间戳。如果未提供,则不会添加时间戳。(默认值: None)

Raises:

异常 - 如果标签创建在重试后仍然失败,将抛出带有错误信息的异常。

property get_client: Any#

获取底层图存储客户端。

get_indexes()[来源]#

从数据库中获取标签索引。

Returns:

标签索引名称列表。

Return type:

List[str]

get_node_properties() Tuple[List[str], List[Dict[str, Any]]][来源]#

从图中检索节点属性。

Returns:

一个元组,其中第一个

元素是节点模式属性的列表,第二个元素是表示节点结构的字典列表。

Return type:

元组[列表[str], 列表[字典[str, 任意]]]

get_relationship_properties() Tuple[List[str], List[Dict[str, Any]]][来源]#

从图中检索关系(边)的属性。

Returns:

一个元组,其中第一个

元素是关系模式属性的列表,第二个元素是表示关系结构的字典列表。

Return type:

元组[列表[str], 列表[字典[str, 任意]]]

get_relationship_types() List[str][来源]#

从图中检索关系类型。

Returns:

关系(边)类型名称列表。

Return type:

List[str]

get_schema()[来源]#

生成一个描述节点和关系属性及关系的模式字符串。

Returns:

描述模式的字符串。

Return type:

字符串

property get_structured_schema: Dict[str, Any]#

生成一个由节点和关系属性、关系以及包含时间戳的元数据组成的结构化模式。

Returns:

表示结构化模式的字典。

Return type:

字典[字符串, 任意类型]

query(query: str) ResultSet[来源]#

在图存储上执行查询。

Parameters:

query (str) – 要执行的类Cypher查询语句。

Returns:

查询执行的结果集。

Return type:

结果集

Raises:

ValueError – 如果查询执行失败。

refresh_schema() None[来源]#

通过获取最新的架构详情来刷新架构。

class camel.storages.Neo4jGraph(url: str, username: str, password: str, database: str = 'neo4j', timeout: float | None = None, truncate: bool = False)[来源]#

基类:BaseGraphStorage

提供连接到Neo4j数据库以进行各种图操作。

关于Neo4j的详细信息可在以下位置获取: Neo4j https://neo4j.com/docs/getting-started

本模块参考了Langchian和Llamaindex的工作。

Parameters:
  • url (str) – Neo4j数据库服务器的URL。

  • username (str) - 用于数据库认证的用户名。

  • password (str) - 用于数据库认证的密码。

  • database (str) – 要连接的数据库名称。默认为neo4j

  • timeout (Optional[float]) - 事务超时时间,单位为秒。 可用于终止长时间运行的查询。默认为None

  • truncate (bool) – 一个标志,用于指示是否从结果中移除元素数量超过LIST_LIMIT的列表。默认为False

add_graph_elements(graph_elements: List[GraphElement], include_source: bool = False, base_entity_label: bool = False) None[来源]#

将来自GraphElement对象列表的节点和关系添加到图存储中。

Parameters:
  • graph_elements (List[GraphElement]) – 包含要添加到图中的节点和关系的GraphElement对象列表。每个GraphElement应封装图的部分结构,包括节点、关系以及源元素信息。

  • include_source (bool, optional) – 如果为True,会存储源元素并通过MENTIONS关系将其链接到图中的节点。这对于追溯数据来源很有用。如果源元素元数据中有id属性,则基于该属性合并源元素;否则会计算page_content的MD5哈希值用于合并过程。默认为False

  • base_entity_label (bool, optional) - 如果为True,每个新创建的节点会获得一个额外的BASE_ENTITY_LABEL标签,该标签会被索引并能提高导入速度和性能。默认为False

add_triplet(subj: str, obj: str, rel: str, timestamp: str | None = None) None[来源]#

在数据库中的两个实体之间添加带有时间戳的关系(三元组)。

Parameters:
  • subj (str) – 主体实体的标识符。

  • obj (str) – 对象实体的标识符。

  • rel (str) – 主体与对象之间的关系。

  • timestamp (Optional[str]) – 该关系的时间戳。默认为None。

common_neighbour_aware_random_walk(graph_name: str, sampling_ratio: float, start_node_ids: List[int], node_label_stratification: bool = False, relationship_weight_property: str | None = None) Dict[str, Any][来源]#

运行Common Neighbour Aware Random Walk (CNARW)采样算法。

Parameters:
  • graph_name (str) – 图目录中原图的名称。

  • sampling_ratio (float) - 原始图中要被采样的节点比例。

  • start_node_ids (List[int]) – 原始图中随机游走采样起始的初始节点集ID列表。

  • node_label_stratification (bool, optional) - 如果为true,则保留原始图的节点标签分布。默认为False

  • relationship_weight_property (Optional[str], optional) – 用作权重的 关系属性名称。如果未指定, 算法将按未加权方式运行。默认为 None

Returns:

包含CNARW采样结果的字典

采样。

Return type:

字典[字符串, 任意类型]

delete_triplet(subj: str, obj: str, rel: str) None[来源]#

从图中删除一个特定的三元组,包含主体、客体和关系。

Parameters:
  • subj (str) – 主体实体的标识符。

  • obj (str) – 对象实体的标识符。

  • rel (str) – 主体与对象之间的关系。

property get_client: Any#

获取底层图存储客户端。

property get_schema: str#

获取Neo4jGraph存储的模式结构。

Parameters:

refresh (bool) – 一个标志,指示是否强制从Neo4jGraph存储中刷新架构,无论其是否已被缓存。默认为False

Returns:

Neo4jGraph存储的架构。

Return type:

字符串

property get_structured_schema: Dict[str, Any]#

返回该图的结构化模式

Returns:

该图的结构化模式。

Return type:

字典[字符串, 任意类型]

get_triplet(subj: str | None = None, obj: str | None = None, rel: str | None = None) List[Dict[str, Any]][来源]#

查询三元组信息。如果未指定subj、obj或rel参数,则返回所有匹配的三元组。

Parameters:
  • subj (可选[str]) – 主体节点的ID。 如果为None,则匹配任意主体节点。 (默认值: None)

  • obj (Optional[str]) – 对象节点的ID。 如果为None,则匹配任何对象节点。 (default: None)

  • rel (可选[str]) - 关系类型。 如果为None,则匹配任何关系类型。 (默认值: None)

Returns:

匹配的三元组列表,

每个包含subj、obj、rel和时间戳。

Return type:

List[Dict[str, Any]]

query(query: str, params: Dict[str, Any] | None = None) List[Dict[str, Any]][来源]#

在数据库中执行Neo4j Cypher声明式查询。

Parameters:
  • query (str) – 要执行的Cypher查询语句。

  • params (Optional[Dict[str, Any]]) – 用于查询的参数字典。默认为None

Returns:

字典列表,每个

字典代表Cypher查询结果的一行。

Return type:

List[Dict[str, Any]]

Raises:

ValueError - 如果执行的Cypher查询语法无效。

random_walk_with_restarts(graph_name: str, sampling_ratio: float, start_node_ids: List[int], restart_probability: float = 0.1, node_label_stratification: bool = False, relationship_weight_property: str | None = None) Dict[str, Any][来源]#

运行带重启的随机游走(RWR)采样算法。

Parameters:
  • graph_name (str) – 图目录中原图的名称。

  • sampling_ratio (float) - 原始图中要被采样的节点比例。

  • start_node_ids (List[int]) – 原始图中随机游走采样起始的初始节点集ID列表。

  • restart_probability (float, optional) – 采样随机游走从起始节点之一重新开始的概率。默认为 0.1

  • node_label_stratification (bool, optional) - 如果为true,则保留原始图的节点标签分布。默认为False

  • relationship_weight_property (Optional[str], optional) – 用作权重的 关系属性名称。如果未指定, 算法将按未加权方式运行。默认为 None

Returns:

包含RWR采样结果的字典。

Return type:

字典[字符串, 任意类型]

refresh_schema() None[来源]#

通过查询数据库中的节点属性、关系属性和关系来刷新Neo4j图模式信息。

class camel.storages.QdrantStorage(vector_dim: int, collection_name: str | None = None, url_and_api_key: Tuple[str, str] | None = None, path: str | None = None, distance: VectorDistance = VectorDistance.COSINE, delete_collection_on_del: bool = False, **kwargs: Any)[来源]#

基础类: BaseVectorStorage

一个实现BaseVectorStorage用于与向量搜索引擎Qdrant交互的类。

关于Qdrant的详细信息可在以下链接获取: Qdrant

Parameters:
  • vector_dim (int) – 存储向量的维度。

  • collection_name (可选[str], optional) – Qdrant中集合的名称。如果未提供,则设置为当前时间的ISO格式。(默认: None)

  • url_and_api_key (Optional[Tuple[str, str]], optional) – 包含用于连接远程Qdrant实例的URL和API密钥的元组。 (默认值: None)

  • path (可选[str], 可选) – 用于初始化本地Qdrant客户端的目录路径。(默认: None)

  • distance (VectorDistance, optional) – 用于向量比较的距离度量标准(默认:VectorDistance.COSINE

  • delete_collection_on_del (bool, optional) – 标志位,用于确定在对象销毁时是否应删除集合。 (默认值: False)

  • **kwargs (Any) – 用于初始化QdrantClient的额外关键字参数。

注意事项

  • 如果提供了url_and_api_key,它将优先使用,客户端将尝试通过URL端点连接到远程Qdrant实例。

  • 如果未提供url_and_api_key但给出了path,客户端将使用本地路径来初始化Qdrant。

  • 如果既没有提供url_and_api_key也没有提供path,客户端将使用内存存储(“:memory:”)进行初始化。

add(records: List[VectorRecord], **kwargs) None[来源]#

将向量列表添加到指定的集合中。

Parameters:
  • vectors (List[VectorRecord]) - 要添加的向量列表。

  • **kwargs (Any) – 额外的关键字参数。

Raises:

RuntimeError - 如果在添加过程中出现错误。

clear() None[来源]#

从存储中移除所有向量。

property client: QdrantClient#

提供对底层向量数据库客户端的访问。

close_client(**kwargs)[来源]#

关闭与Qdrant存储的客户端连接。

delete(ids: List[str] | None = None, payload_filter: Dict[str, Any] | None = None, **kwargs: Any) None[来源]#

根据ID或payload过滤器从集合中删除点。

Parameters:
  • ids (Optional[List[str]], optional) – 待删除向量的唯一标识符列表。

  • payload_filter (可选[Dict[str, Any]], 可选) - 用于 删除符合特定条件点的负载过滤器。如果 提供了ids,除非明确组合使用, 否则payload_filter将被忽略。

  • **kwargs (Any) – 传递给QdrantClient.delete的额外关键字参数。

示例

>>> # Delete points with IDs "1", "2", and "3"
>>> storage.delete(ids=["1", "2", "3"])
>>> # Delete points with payload filter
>>> storage.delete(payload_filter={"name": "Alice"})
Raises:
  • ValueError – 如果既没有提供ids也没有提供payload_filter

  • RuntimeError – 如果在删除过程中出现错误。

注意事项

  • 如果提供了ids参数,将直接删除这些ID对应的点

    此时会忽略payload_filter参数。

  • 如果未提供ids但提供了payload_filter,则

    符合payload_filter的点将被删除。

delete_collection() None[来源]#

删除Qdrant存储中的整个集合。

load() None[来源]#

加载托管在云服务上的集合。

query(query: VectorDBQuery, filter_conditions: Dict[str, Any] | None = None, **kwargs: Any) List[VectorDBQueryResult][来源]#

基于提供的查询在存储中搜索相似向量。

Parameters:
  • query (VectorDBQuery) – 包含搜索向量和要检索的相似向量数量的查询对象。

  • filter_conditions (Optional[Dict[str, Any]], optional) – 一个字典,用于指定筛选查询结果的条件。

  • **kwargs (Any) – 额外的关键字参数。

Returns:

从存储中检索到的向量列表

基于与查询向量的相似性。

Return type:

列表[VectorDBQueryResult]

status() VectorDBStatus[来源]#

返回向量数据库的状态。

Returns:

向量数据库状态。

Return type:

VectorDBStatus

update_payload(ids: List[str], payload: Dict[str, Any], **kwargs: Any) None[来源]#

更新由ID标识的向量的有效载荷。

Parameters:
  • ids (List[str]) – 待更新向量的唯一标识符列表。

  • payload (Dict[str, Any]) – 待更新的payload列表。

  • **kwargs (Any) – 额外的关键字参数。

Raises:

RuntimeError – 如果在更新过程中出现错误。

class camel.storages.RedisStorage(sid: str, url: str = 'redis://localhost:6379', loop: AbstractEventLoop | None = None, **kwargs)[来源]#

基类: BaseKeyValueStorage

一个基于Redis后端的BaseCacheStorage具体实现。这适用于需要持久化和高可用性的分布式缓存系统。

clear() None[来源]#

从键值存储系统中移除所有记录。

property client: Redis | None#

返回Redis客户端实例。

Returns:

Redis客户端实例。

Return type:

redis.asyncio.Redis

async close() None[来源]#

异步关闭Redis客户端。

load() List[Dict[str, Any]][来源]#

从键值存储系统中加载所有存储的记录。

Returns:

一个字典列表,其中每个字典

代表一条存储的记录。

Return type:

List[Dict[str, Any]]

save(records: List[Dict[str, Any]], expire: int | None = None) None[来源]#

将一批记录保存到键值存储系统中。

class camel.storages.TiDBStorage(vector_dim: int, collection_name: str | None = None, url_and_api_key: Tuple[str, str] | str | None = None, **kwargs: Any)[来源]#

基础类: BaseVectorStorage

一个实现BaseVectorStorage用于与TiDB交互的实现。

关于TiDB的详细信息可在以下链接获取: TiDB Vector Search

Parameters:
  • vector_dim (int) – 存储向量的维度。

  • url_and_api_key (Optional[Union[Tuple[str, str], str]]) –

    一个元组

    包含用于连接TiDB集群的数据库URL和API密钥。URL格式应为:

    "mysql+pymysql://<username>:<password>@<host>:<port>/<db_name>"。 TiDB不会使用API密钥,但保留此定义以实现接口兼容性。

  • collection_name (可选[str]) – 集合名称。 该集合名称将作为TiDB中的表名使用。如果未提供,则设置为当前时间的iso格式。

  • **kwargs (Any) – 用于初始化TiDB连接的其他关键字参数。

Raises:

ImportError – 如果未安装 pytidb 包。

add(records: List[VectorRecord], **kwargs) None[来源]#

将向量列表添加到指定表中。

Parameters:
  • records (List[VectorRecord]) - 要添加的向量列表。

  • **kwargs (Any) – 传递给插入操作的额外关键字参数。

Raises:

RuntimeError - 如果在添加过程中出现错误。

clear() None[来源]#

从TiDB表中移除所有向量。此方法会删除现有表,然后使用相同的模式重新创建它,以有效移除所有存储的向量。

property client: TiDBClient#

提供对TiDB客户端的直接访问。

Returns:

TiDB客户端实例。

Return type:

任何

delete(ids: List[str], **kwargs: Any) None[来源]#

从存储中删除由ID标识的向量列表。

Parameters:
  • ids (List[str]) – 待删除向量的唯一标识符列表。

  • **kwargs (Any) – 传递给删除操作的额外关键字参数。

Raises:

RuntimeError – 如果在删除过程中出现错误。

load() None[来源]#

加载托管在云服务上的集合。

query(query: VectorDBQuery, **kwargs: Any) List[VectorDBQueryResult][来源]#

基于提供的查询在存储中搜索相似向量。

Parameters:
  • query (VectorDBQuery) – 包含搜索向量和要检索的相似向量数量的查询对象。

  • **kwargs (Any) – 传递给搜索的额外关键字参数。

Returns:

从存储中检索到的向量列表

基于与查询向量的相似性。

Return type:

列表[VectorDBQueryResult]

status() VectorDBStatus[来源]#

获取TiDB表的当前状态。

Returns:

一个包含有关

表状态信息的对象。

Return type:

VectorDBStatus

class camel.storages.VectorDBQuery(query_vector: List[float], top_k: int)[来源]#

基类:BaseModel

表示对向量数据库的查询。

query_vector#

查询向量的数值表示。

Type:

浮点数列表

top_k#

从数据库中检索的相似向量的最大数量。(默认值: 1)

Type:

整数,可选

model_config: ClassVar[ConfigDict] = {}#

模型的配置,应该是一个符合[ConfigDict][pydantic.config.ConfigDict]的字典。

query_vector: List[float]#

查询向量的数值表示。

top_k: int#

从数据库中检索的顶部相似向量的数量。

class camel.storages.VectorDBQueryResult(*, record: VectorRecord, similarity: float)[来源]#

基类:BaseModel

封装了对向量数据库进行查询的结果。

record#

目标向量记录。

Type:

VectorRecord

similarity#

查询向量与记录之间的相似度得分。

Type:

浮点数

classmethod create(similarity: float, vector: List[float], id: str, payload: Dict[str, Any] | None = None) VectorDBQueryResult[来源]#

一个用于构建VectorDBQueryResult实例的类方法。

model_config: ClassVar[ConfigDict] = {}#

模型的配置,应该是一个符合[ConfigDict][pydantic.config.ConfigDict]的字典。

record: VectorRecord#
similarity: float#
class camel.storages.VectorRecord(*, vector: ~typing.List[float], id: str = <factory>, payload: ~typing.Dict[str, ~typing.Any] | None = None)[来源]#

基类:BaseModel

封装了关于向量唯一标识符及其有效载荷的信息,主要用作保存到向量存储时的数据传输对象。

vector#

向量的数值表示。

Type:

浮点数列表

id#

向量的唯一标识符。如果未提供,将分配一个随机uuid。

Type:

字符串, 可选

payload#

与向量相关的任何额外元数据或信息。(默认: None)

Type:

可选[字典[str, 任意类型]], 可选

id: str#
model_config: ClassVar[ConfigDict] = {}#

模型的配置,应该是一个符合[ConfigDict][pydantic.config.ConfigDict]的字典。

payload: Dict[str, Any] | None#
vector: List[float]#