智能体数据 (Python)
See the Agent Data Overview for concepts, constraints, and environment details.
uv add llama-cloud-servicesPython llama-cloud-services SDK 提供了 AsyncAgentDataClient 用于处理 Agent 数据 API。
import httpximport osfrom pydantic import BaseModelfrom llama_cloud_services.beta.agent_data import AsyncAgentDataClientfrom llama_cloud.client import AsyncLlamaCloud
class ExtractedPerson(BaseModel): name: str age: int email: str
project_id = os.getenv("LLAMA_DEPLOY_PROJECT_ID")
# Base URL and API key (if running outside LlamaCloud)base_url = os.getenv("LLAMA_CLOUD_BASE_URL")api_key = os.getenv("LLAMA_CLOUD_API_KEY")
# Reusable async HTTP client with optional project scopinghttp_client = httpx.AsyncClient(headers={"Project-Id": project_id} if project_id else None)
# Optional: base client for other SDK operationsbase_client = AsyncLlamaCloud( base_url=base_url, token=api_key, httpx_client=http_client,)
# Only set when deployed in LlamaCloud (falls back inside the Agent Data client)deployment_name = os.getenv("LLAMA_DEPLOY_DEPLOYMENT_NAME")
client = AsyncAgentDataClient( type=ExtractedPerson, collection="extracted_people", # If omitted, uses LLAMA_DEPLOY_DEPLOYMENT_NAME or "_public" deployment_name=deployment_name, client=base_client,)创建、获取、更新、删除
Section titled “Create, Get, Update, Delete”person = ExtractedPerson(name="John Doe", age=30, email="john@example.com")created = await client.create_item(person)fetched = await client.get_item(created.id)updated = await client.update_item(created.id, ExtractedPerson(name="Jane", age=31, email="jane@example.com"))await client.delete_item(updated.id)重试行为:网络错误(超时、连接错误、可重试的HTTP状态码)最多重试3次,采用指数退避策略。
备注:
- 更新操作会覆盖整个
data对象。 get_item如果未找到则抛出状态码为404的httpx.HTTPStatusError异常。
删除与筛选条件匹配的多个项目。返回已删除项目的数量。
deleted_count = await client.delete( filter={ "status": {"eq": "inactive"}, "age": {"gte": 65}, })print(deleted_count)您可以通过data字段以及created_at/updated_at(顶级字段)进行筛选。使用逗号分隔的字段列表进行排序;按数据字段排序时需要data.前缀。默认页面大小为50(最大1000)。
results = await client.search( filter={ # Data fields "age": {"gte": 21, "lt": 65}, "status": {"eq": "active"}, "tags": {"includes": ["python", "ml"]}, # Top-level timestamps (ISO strings accepted) "created_at": {"gte": "2024-01-01T00:00:00Z"}, }, order_by="data.name desc, created_at", page_size=50, offset=0, include_total=True, # request only on the first page if needed)
for item in results.items: print(item.data)
print(results.has_more, results.total)排序:
- 示例:
"data.name desc, created_at"。 - 如果未提供排序方式,结果默认为
created_at desc。
分页:
- 使用
offset和page_size。服务器可能返回has_more和一个next_page_token(SDK 暴露has_more)。
按一个或多个 data 字段对数据进行分组,可选择统计每组的项目数量,和/或获取每组的第一个项目。
agg = await client.aggregate( filter={"status": {"eq": "active"}}, group_by=["department", "role"], count=True, first=True, # return the earliest item per group (by created_at) order_by="data.department asc, data.role asc", page_size=100,)
for group in agg.items: # items are groups print(group.group_key) # {"department": "Sales", "role": "AE"} print(group.count) # optional print(group.first_item) # optional dict详情:
group_by: 点式数据路径(例如"department","contact.email")。count: 为每个分组添加一个count。first: 返回每个组的首个data项(最早的created_at)。order_byorder_by: 使用与搜索相同的语义(适用于分组键表达式)。- 分页使用
offset和page_size,与搜索类似。
当您希望跳过模型的Pydantic验证时,请使用非类型化方法。这些方法返回的响应对象中,.data有效载荷是普通的dict而非Pydantic模型。
# Get raw item (AgentData object; .data is a dict)raw_item = await client.untyped_get_item(created.id)print(raw_item.id, raw_item.deployment_name, raw_item.collection)print(raw_item.data["name"]) # dict access
# Search (raw paginated response)raw_page = await client.untyped_search( filter={"status": {"eq": "active"}}, order_by="data.name desc, created_at", page_size=50,)for item in raw_page.items: # each item is an AgentData object print(item.data) # dict
# Pagination fields match the APIprint(raw_page.next_page_token, raw_page.total_size)
# Aggregate (raw paginated response)raw_groups = await client.untyped_aggregate( filter={"status": {"eq": "active"}}, group_by=["department", "role"], count=True, first=True,)for grp in raw_groups.items: # each item is an AggregateGroup object print(grp.group_key, grp.count, grp.first_item) # first_item is a dictprint(raw_groups.next_page_token, raw_groups.total_size)