跳转到内容

智能体数据 (Python)

See the Agent Data Overview for concepts, constraints, and environment details.

Terminal window
uv add llama-cloud-services

Python llama-cloud-services SDK 提供了 AsyncAgentDataClient 用于处理 Agent 数据 API。

import httpx
import os
from pydantic import BaseModel
from llama_cloud_services.beta.agent_data import AsyncAgentDataClient
from llama_cloud.client import AsyncLlamaCloud
class ExtractedPerson(BaseModel):
name: str
age: int
email: str
project_id = os.getenv("LLAMA_DEPLOY_PROJECT_ID")
# Base URL and API key (if running outside LlamaCloud)
base_url = os.getenv("LLAMA_CLOUD_BASE_URL")
api_key = os.getenv("LLAMA_CLOUD_API_KEY")
# Reusable async HTTP client with optional project scoping
http_client = httpx.AsyncClient(headers={"Project-Id": project_id} if project_id else None)
# Optional: base client for other SDK operations
base_client = AsyncLlamaCloud(
base_url=base_url,
token=api_key,
httpx_client=http_client,
)
# Only set when deployed in LlamaCloud (falls back inside the Agent Data client)
deployment_name = os.getenv("LLAMA_DEPLOY_DEPLOYMENT_NAME")
client = AsyncAgentDataClient(
type=ExtractedPerson,
collection="extracted_people",
# If omitted, uses LLAMA_DEPLOY_DEPLOYMENT_NAME or "_public"
deployment_name=deployment_name,
client=base_client,
)

创建、获取、更新、删除

Section titled “Create, Get, Update, Delete”
person = ExtractedPerson(name="John Doe", age=30, email="john@example.com")
created = await client.create_item(person)
fetched = await client.get_item(created.id)
updated = await client.update_item(created.id, ExtractedPerson(name="Jane", age=31, email="jane@example.com"))
await client.delete_item(updated.id)

重试行为:网络错误(超时、连接错误、可重试的HTTP状态码)最多重试3次,采用指数退避策略。

备注:

  • 更新操作会覆盖整个 data 对象。
  • get_item 如果未找到则抛出状态码为404的httpx.HTTPStatusError异常。

删除与筛选条件匹配的多个项目。返回已删除项目的数量。

deleted_count = await client.delete(
filter={
"status": {"eq": "inactive"},
"age": {"gte": 65},
}
)
print(deleted_count)

您可以通过data字段以及created_at/updated_at(顶级字段)进行筛选。使用逗号分隔的字段列表进行排序;按数据字段排序时需要data.前缀。默认页面大小为50(最大1000)。

results = await client.search(
filter={
# Data fields
"age": {"gte": 21, "lt": 65},
"status": {"eq": "active"},
"tags": {"includes": ["python", "ml"]},
# Top-level timestamps (ISO strings accepted)
"created_at": {"gte": "2024-01-01T00:00:00Z"},
},
order_by="data.name desc, created_at",
page_size=50,
offset=0,
include_total=True, # request only on the first page if needed
)
for item in results.items:
print(item.data)
print(results.has_more, results.total)

排序:

  • 示例:"data.name desc, created_at"
  • 如果未提供排序方式,结果默认为 created_at desc

分页:

  • 使用 offsetpage_size。服务器可能返回 has_more 和一个 next_page_token(SDK 暴露 has_more)。

按一个或多个 data 字段对数据进行分组,可选择统计每组的项目数量,和/或获取每组的第一个项目。

agg = await client.aggregate(
filter={"status": {"eq": "active"}},
group_by=["department", "role"],
count=True,
first=True, # return the earliest item per group (by created_at)
order_by="data.department asc, data.role asc",
page_size=100,
)
for group in agg.items: # items are groups
print(group.group_key) # {"department": "Sales", "role": "AE"}
print(group.count) # optional
print(group.first_item) # optional dict

详情:

  • group_by: 点式数据路径(例如 "department", "contact.email")。
  • count: 为每个分组添加一个 count
  • first: 返回每个组的首个data项(最早的created_at)。
  • order_byorder_by: 使用与搜索相同的语义(适用于分组键表达式)。
  • 分页使用 offsetpage_size,与搜索类似。

当您希望跳过模型的Pydantic验证时,请使用非类型化方法。这些方法返回的响应对象中,.data有效载荷是普通的dict而非Pydantic模型。

# Get raw item (AgentData object; .data is a dict)
raw_item = await client.untyped_get_item(created.id)
print(raw_item.id, raw_item.deployment_name, raw_item.collection)
print(raw_item.data["name"]) # dict access
# Search (raw paginated response)
raw_page = await client.untyped_search(
filter={"status": {"eq": "active"}},
order_by="data.name desc, created_at",
page_size=50,
)
for item in raw_page.items: # each item is an AgentData object
print(item.data) # dict
# Pagination fields match the API
print(raw_page.next_page_token, raw_page.total_size)
# Aggregate (raw paginated response)
raw_groups = await client.untyped_aggregate(
filter={"status": {"eq": "active"}},
group_by=["department", "role"],
count=True,
first=True,
)
for grp in raw_groups.items: # each item is an AggregateGroup object
print(grp.group_key, grp.count, grp.first_item) # first_item is a dict
print(raw_groups.next_page_token, raw_groups.total_size)