ADK 流式快速入门
通过本快速入门指南,您将学习创建一个简单的智能体,并使用ADK Streaming实现与其进行低延迟、双向的语音和视频通信。我们将安装Agent Development Kit,设置一个基础的"Google搜索"智能体,尝试使用adk web工具运行该智能体与Streaming功能,然后讲解如何自行使用ADK Streaming和FastAPI构建一个简单的异步Web应用。
注意:本指南假设您有在Windows、Mac和Linux环境中使用终端的经验。
支持的语音/视频流模型
要在ADK中使用语音/视频流功能,您需要使用支持Live API的Gemini模型。您可以在文档中找到支持Gemini Live API的模型ID:
1. 设置环境 & 安装ADK
创建并激活虚拟环境(推荐):
# Create
python -m venv .venv
# Activate (each new terminal)
# macOS/Linux: source .venv/bin/activate
# Windows CMD: .venv\Scripts\activate.bat
# Windows PowerShell: .venv\Scripts\Activate.ps1
安装ADK:
2. 项目结构
创建以下包含空文件的文件夹结构:
adk-streaming/ # Project folder
└── app/ # the web app folder
├── .env # Gemini API key
└── google_search_agent/ # Agent folder
├── __init__.py # Python package
└── agent.py # Agent definition
agent.py
将以下代码块复制粘贴到agent.py中。
对于model,请按照前面模型部分所述仔细核对模型ID。
from google.adk.agents import Agent
from google.adk.tools import google_search # Import the tool
root_agent = Agent(
# A unique name for the agent.
name="basic_search_agent",
# The Large Language Model (LLM) that agent will use.
model="gemini-2.0-flash-live-001", # Google AI Studio
#model="gemini-2.0-flash-live-preview-04-09" # Vertex AI Studio
# A short description of the agent's purpose.
description="Agent to answer questions using Google Search.",
# Instructions to set the agent's behavior.
instruction="You are an expert researcher. You always stick to the facts.",
# Add google_search tool to perform grounding with Google search.
tools=[google_search]
)
agent.py 是存放所有智能体逻辑的文件,其中必须定义一个 root_agent。
注意你多么轻松地集成了基于Google Search的落地能力。Agent类和google_search工具处理了与LLM的复杂交互以及搜索API的落地实现,让你能专注于智能体的目标和行为。

将以下代码块复制粘贴到 __init__.py 和 main.py 文件中。
3. 设置Gemini API密钥
要运行您的智能体,您需要设置一个Gemini API密钥。
- 从Google AI Studio获取API密钥。
- 在您的
app目录中,创建一个.env文件。 - 将这些行添加到
.env文件中,将YOUR_API_KEY_HERE替换为您的密钥:
.env
4. 使用 adk web 测试智能体
现在可以尝试使用智能体了。运行以下命令启动开发界面。首先,请确保将当前目录设置为app:
然后,运行开发用户界面:
在浏览器中直接打开提供的URL(通常是http://localhost:8000或
http://127.0.0.1:8000)。该连接完全保留在您的本地机器上。选择basic_search_agent。
尝试使用文本
在用户界面中输入以下提示语进行尝试。
- 纽约的天气怎么样?
- 纽约现在几点?
- 巴黎的天气怎么样?
- 巴黎现在几点?
智能体将使用google_search工具获取最新信息来回答这些问题。
尝试语音和视频功能
现在,点击麦克风按钮启用语音输入,并通过语音提出相同的问题。您将实时听到语音回答。
此外,点击摄像头按钮启用视频输入,并提问如"你看到了什么?"。智能体会回答他们在视频输入中看到的内容。
停止工具
在控制台按下Ctrl-C来停止adk web。
关于ADK流式处理的说明
ADK Streaming未来版本将支持以下功能:回调、长运行工具、示例工具以及Shell智能体(例如SequentialAgent)。
5. 构建自定义流式应用(可选)
在上一节中,我们已经验证了基础搜索智能体可以通过adk web工具与ADK Streaming协同工作。本节我们将学习如何使用FastAPI构建支持流式通信的自定义Web应用。
Add static directory under app, and add main.py and index.html as empty files, as in the following structure:
adk-streaming/ # Project folder
└── app/ # the web app folder
├── main.py # FastAPI web app
└── static/ # Static content folder
└── index.html # The web client page
main.py
将以下代码块复制粘贴到 main.py 文件中。
import os
import json
import asyncio
from pathlib import Path
from dotenv import load_dotenv
from google.genai.types import (
Part,
Content,
)
from google.adk.runners import Runner
from google.adk.agents import LiveRequestQueue
from google.adk.agents.run_config import RunConfig
from google.adk.sessions.in_memory_session_service import InMemorySessionService
from fastapi import FastAPI, WebSocket
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from google_search_agent.agent import root_agent
#
# ADK Streaming
#
# Load Gemini API Key
load_dotenv()
APP_NAME = "ADK Streaming example"
session_service = InMemorySessionService()
def start_agent_session(session_id: str):
"""Starts an agent session"""
# Create a Session
session = session_service.create_session(
app_name=APP_NAME,
user_id=session_id,
session_id=session_id,
)
# Create a Runner
runner = Runner(
app_name=APP_NAME,
agent=root_agent,
session_service=session_service,
)
# Set response modality = TEXT
run_config = RunConfig(response_modalities=["TEXT"])
# Create a LiveRequestQueue for this session
live_request_queue = LiveRequestQueue()
# Start agent session
live_events = runner.run_live(
session=session,
live_request_queue=live_request_queue,
run_config=run_config,
)
return live_events, live_request_queue
async def agent_to_client_messaging(websocket, live_events):
"""Agent to client communicaation"""
while True:
async for event in live_events:
# turn_complete
if event.turn_complete:
await websocket.send_text(json.dumps({"turn_complete": True}))
print("[TURN COMPLETE]")
if event.interrupted:
await websocket.send_text(json.dumps({"interrupted": True}))
print("[INTERRUPTED]")
# Read the Content and its first Part
part: Part = (
event.content and event.content.parts and event.content.parts[0]
)
if not part or not event.partial:
continue
# Get the text
text = event.content and event.content.parts and event.content.parts[0].text
if not text:
continue
# Send the text to the client
await websocket.send_text(json.dumps({"message": text}))
print(f"[AGENT TO CLIENT]: {text}")
await asyncio.sleep(0)
async def client_to_agent_messaging(websocket, live_request_queue):
"""Client to agent communication"""
while True:
text = await websocket.receive_text()
content = Content(role="user", parts=[Part.from_text(text=text)])
live_request_queue.send_content(content=content)
print(f"[CLIENT TO AGNET]: {text}")
await asyncio.sleep(0)
#
# FastAPI web app
#
app = FastAPI()
STATIC_DIR = Path("static")
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
@app.get("/")
async def root():
"""Serves the index.html"""
return FileResponse(os.path.join(STATIC_DIR, "index.html"))
@app.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: int):
"""Client websocket endpoint"""
# Wait for client connection
await websocket.accept()
print(f"Client #{session_id} connected")
# Start agent session
session_id = str(session_id)
live_events, live_request_queue = start_agent_session(session_id)
# Start tasks
agent_to_client_task = asyncio.create_task(
agent_to_client_messaging(websocket, live_events)
)
client_to_agent_task = asyncio.create_task(
client_to_agent_messaging(websocket, live_request_queue)
)
await asyncio.gather(agent_to_client_task, client_to_agent_task)
# Disconnected
print(f"Client #{session_id} disconnected")
这段代码使用ADK和FastAPI创建了一个实时聊天应用。它设置了一个WebSocket端点,客户端可以连接并与Google搜索智能体进行交互。
关键功能:
- 加载Gemini API密钥。
- 使用ADK管理智能体会话并运行`google_search_agent`。
- `start_agent_session` 初始化一个带有实时请求队列的智能体会话,用于实时通信。
- `agent_to_client_messaging` 异步地将智能体的文本响应和状态更新(轮次完成、中断)流式传输到连接的WebSocket客户端。
- `client_to_agent_messaging` 异步接收来自WebSocket客户端的文本消息,并将其作为用户输入发送给智能体。
- FastAPI 提供静态前端服务,并在 `/ws/{session_id}` 处理 WebSocket 连接。
- 当客户端连接时,它会启动一个智能体会话,并通过WebSockets为客户端和智能体之间的双向通信创建并发任务。
Copy-paste the following code block to the index.html file.
<!doctype html>
<html>
<head>
<title>ADK Streaming Test</title>
</head>
<body>
<h1>ADK Streaming Test</h1>
<div
id="messages"
style="height: 300px; overflow-y: auto; border: 1px solid black"></div>
<br />
<form id="messageForm">
<label for="message">Message:</label>
<input type="text" id="message" name="message" />
<button type="submit" id="sendButton" disabled>Send</button>
</form>
</body>
<script>
// Connect the server with a WebSocket connection
const sessionId = Math.random().toString().substring(10);
const ws_url = "ws://" + window.location.host + "/ws/" + sessionId;
let ws = new WebSocket(ws_url);
// Get DOM elements
const messageForm = document.getElementById("messageForm");
const messageInput = document.getElementById("message");
const messagesDiv = document.getElementById("messages");
let currentMessageId = null;
// WebSocket handlers
function addWebSocketHandlers(ws) {
ws.onopen = function () {
console.log("WebSocket connection opened.");
document.getElementById("sendButton").disabled = false;
document.getElementById("messages").textContent = "Connection opened";
addSubmitHandler(this);
};
ws.onmessage = function (event) {
// Parse the incoming message
const packet = JSON.parse(event.data);
console.log(packet);
// Check if the turn is complete
// if turn complete, add new message
if (packet.turn_complete && packet.turn_complete == true) {
currentMessageId = null;
return;
}
// add a new message for a new turn
if (currentMessageId == null) {
currentMessageId = Math.random().toString(36).substring(7);
const message = document.createElement("p");
message.id = currentMessageId;
// Append the message element to the messagesDiv
messagesDiv.appendChild(message);
}
// Add message text to the existing message element
const message = document.getElementById(currentMessageId);
message.textContent += packet.message;
// Scroll down to the bottom of the messagesDiv
messagesDiv.scrollTop = messagesDiv.scrollHeight;
};
// When the connection is closed, try reconnecting
ws.onclose = function () {
console.log("WebSocket connection closed.");
document.getElementById("sendButton").disabled = true;
document.getElementById("messages").textContent = "Connection closed";
setTimeout(function () {
console.log("Reconnecting...");
ws = new WebSocket(ws_url);
addWebSocketHandlers(ws);
}, 5000);
};
ws.onerror = function (e) {
console.log("WebSocket error: ", e);
};
}
addWebSocketHandlers(ws);
// Add submit handler to the form
function addSubmitHandler(ws) {
messageForm.onsubmit = function (e) {
e.preventDefault();
const message = messageInput.value;
if (message) {
const p = document.createElement("p");
p.textContent = "> " + message;
messagesDiv.appendChild(p);
ws.send(message);
messageInput.value = "";
}
return false;
};
}
</script>
</html>
This HTML file sets up a basic webpage with:
- 一个表单(`messageForm`),包含用于输入消息的输入框和一个"发送"按钮。
- JavaScript 能够:
- 连接到位于`wss://[当前主机]/ws/[随机会话ID]`的WebSocket服务器。
- 连接成功后启用"发送"按钮。
- 将从WebSocket接收到的消息追加到`messages` div中,处理流式响应和轮次完成。
- 当表单提交时,将输入框中输入的文本发送到WebSocket服务器。
- 如果WebSocket连接关闭,会尝试重新连接。
6. 与您的流式应用交互
1. 导航到正确的目录:
要有效运行您的智能体,您需要位于应用文件夹 (adk-streaming/app)中
2. 启动Fast API: 运行以下命令以启动CLI界面
3. 访问用户界面: 当UI服务器启动后,终端会显示一个本地URL(例如 http://localhost:8000)。点击此链接在浏览器中打开用户界面。
现在你应该能看到如下用户界面:

尝试提问What is Gemini?。智能体会使用Google搜索来回应您的查询。您会注意到界面以流式文本形式显示智能体的响应。您还可以随时向智能体发送消息,即使智能体仍在响应中。这展示了ADK Streaming的双向通信能力。
相比传统同步网页应用的优势:
- 实时双向通信:无缝交互。
- 响应更迅速、互动性更强:无需等待完整回复或不断刷新。体验如同实时对话。
- 可扩展支持音频、图像和视频流的多模态应用。
恭喜!您已成功使用ADK创建并交互了您的首个流式智能体!
后续步骤
- 添加音频/图像模态:通过Streaming,您还可以使用音频和图像与智能体进行实时通信。我们将在未来添加更多支持多模态的示例。敬请期待!