跳转到内容

ADK 流式快速入门

通过本快速入门指南,您将学习创建一个简单的智能体,并使用ADK Streaming实现与其进行低延迟、双向的语音和视频通信。我们将安装Agent Development Kit,设置一个基础的"Google搜索"智能体,尝试使用adk web工具运行该智能体与Streaming功能,然后讲解如何自行使用ADK Streaming和FastAPI构建一个简单的异步Web应用。

注意:本指南假设您有在Windows、Mac和Linux环境中使用终端的经验。

支持的语音/视频流模型

要在ADK中使用语音/视频流功能,您需要使用支持Live API的Gemini模型。您可以在文档中找到支持Gemini Live API的模型ID

1. 设置环境 & 安装ADK

创建并激活虚拟环境(推荐):

# Create
python -m venv .venv
# Activate (each new terminal)
# macOS/Linux: source .venv/bin/activate
# Windows CMD: .venv\Scripts\activate.bat
# Windows PowerShell: .venv\Scripts\Activate.ps1

安装ADK:

pip install google-adk

2. 项目结构

创建以下包含空文件的文件夹结构:

adk-streaming/  # Project folder
└── app/ # the web app folder
    ├── .env # Gemini API key
    └── google_search_agent/ # Agent folder
        ├── __init__.py # Python package
        └── agent.py # Agent definition

agent.py

将以下代码块复制粘贴到agent.py中。

对于model,请按照前面模型部分所述仔细核对模型ID。

from google.adk.agents import Agent
from google.adk.tools import google_search  # Import the tool

root_agent = Agent(
   # A unique name for the agent.
   name="basic_search_agent",
   # The Large Language Model (LLM) that agent will use.
   model="gemini-2.0-flash-live-001", # Google AI Studio
   #model="gemini-2.0-flash-live-preview-04-09" # Vertex AI Studio
   # A short description of the agent's purpose.
   description="Agent to answer questions using Google Search.",
   # Instructions to set the agent's behavior.
   instruction="You are an expert researcher. You always stick to the facts.",
   # Add google_search tool to perform grounding with Google search.
   tools=[google_search]
)

agent.py 是存放所有智能体逻辑的文件,其中必须定义一个 root_agent

注意你多么轻松地集成了基于Google Search的落地能力Agent类和google_search工具处理了与LLM的复杂交互以及搜索API的落地实现,让你能专注于智能体的目标行为

intro_components.png

将以下代码块复制粘贴到 __init__.pymain.py 文件中。

__init__.py
from . import agent

3. 设置Gemini API密钥

要运行您的智能体,您需要设置一个Gemini API密钥。

  1. Google AI Studio获取API密钥。
  2. 在您的app目录中,创建一个.env文件。
  3. 将这些行添加到.env文件中,将YOUR_API_KEY_HERE替换为您的密钥:

.env

GOOGLE_API_KEY=YOUR_API_KEY_HERE # Replace with your API Key
GOOGLE_GENAI_USE_VERTEXAI=0

4. 使用 adk web 测试智能体

现在可以尝试使用智能体了。运行以下命令启动开发界面。首先,请确保将当前目录设置为app

cd app

然后,运行开发用户界面:

adk web

在浏览器中直接打开提供的URL(通常是http://localhost:8000http://127.0.0.1:8000)。该连接完全保留在您的本地机器上。选择basic_search_agent

尝试使用文本

在用户界面中输入以下提示语进行尝试。

  • 纽约的天气怎么样?
  • 纽约现在几点?
  • 巴黎的天气怎么样?
  • 巴黎现在几点?

智能体将使用google_search工具获取最新信息来回答这些问题。

尝试语音和视频功能

现在,点击麦克风按钮启用语音输入,并通过语音提出相同的问题。您将实时听到语音回答。

此外,点击摄像头按钮启用视频输入,并提问如"你看到了什么?"。智能体会回答他们在视频输入中看到的内容。

停止工具

在控制台按下Ctrl-C来停止adk web

关于ADK流式处理的说明

ADK Streaming未来版本将支持以下功能:回调、长运行工具、示例工具以及Shell智能体(例如SequentialAgent)。

5. 构建自定义流式应用(可选)

在上一节中,我们已经验证了基础搜索智能体可以通过adk web工具与ADK Streaming协同工作。本节我们将学习如何使用FastAPI构建支持流式通信的自定义Web应用。

Add static directory under app, and add main.py and index.html as empty files, as in the following structure:

adk-streaming/  # Project folder
└── app/ # the web app folder
    ├── main.py # FastAPI web app
    └── static/ # Static content folder
        └── index.html # The web client page

main.py

将以下代码块复制粘贴到 main.py 文件中。

import os
import json
import asyncio

from pathlib import Path
from dotenv import load_dotenv

from google.genai.types import (
    Part,
    Content,
)

from google.adk.runners import Runner
from google.adk.agents import LiveRequestQueue
from google.adk.agents.run_config import RunConfig
from google.adk.sessions.in_memory_session_service import InMemorySessionService

from fastapi import FastAPI, WebSocket
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse

from google_search_agent.agent import root_agent

#
# ADK Streaming
#

# Load Gemini API Key
load_dotenv()

APP_NAME = "ADK Streaming example"
session_service = InMemorySessionService()


def start_agent_session(session_id: str):
    """Starts an agent session"""

    # Create a Session
    session = session_service.create_session(
        app_name=APP_NAME,
        user_id=session_id,
        session_id=session_id,
    )

    # Create a Runner
    runner = Runner(
        app_name=APP_NAME,
        agent=root_agent,
        session_service=session_service,
    )

    # Set response modality = TEXT
    run_config = RunConfig(response_modalities=["TEXT"])

    # Create a LiveRequestQueue for this session
    live_request_queue = LiveRequestQueue()

    # Start agent session
    live_events = runner.run_live(
        session=session,
        live_request_queue=live_request_queue,
        run_config=run_config,
    )
    return live_events, live_request_queue


async def agent_to_client_messaging(websocket, live_events):
    """Agent to client communicaation"""
    while True:
        async for event in live_events:
            # turn_complete
            if event.turn_complete:
                await websocket.send_text(json.dumps({"turn_complete": True}))
                print("[TURN COMPLETE]")

            if event.interrupted:
                await websocket.send_text(json.dumps({"interrupted": True}))
                print("[INTERRUPTED]")

            # Read the Content and its first Part
            part: Part = (
                event.content and event.content.parts and event.content.parts[0]
            )
            if not part or not event.partial:
                continue

            # Get the text
            text = event.content and event.content.parts and event.content.parts[0].text
            if not text:
                continue

            # Send the text to the client
            await websocket.send_text(json.dumps({"message": text}))
            print(f"[AGENT TO CLIENT]: {text}")
            await asyncio.sleep(0)


async def client_to_agent_messaging(websocket, live_request_queue):
    """Client to agent communication"""
    while True:
        text = await websocket.receive_text()
        content = Content(role="user", parts=[Part.from_text(text=text)])
        live_request_queue.send_content(content=content)
        print(f"[CLIENT TO AGNET]: {text}")
        await asyncio.sleep(0)


#
# FastAPI web app
#

app = FastAPI()

STATIC_DIR = Path("static")
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")


@app.get("/")
async def root():
    """Serves the index.html"""
    return FileResponse(os.path.join(STATIC_DIR, "index.html"))


@app.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: int):
    """Client websocket endpoint"""

    # Wait for client connection
    await websocket.accept()
    print(f"Client #{session_id} connected")

    # Start agent session
    session_id = str(session_id)
    live_events, live_request_queue = start_agent_session(session_id)

    # Start tasks
    agent_to_client_task = asyncio.create_task(
        agent_to_client_messaging(websocket, live_events)
    )
    client_to_agent_task = asyncio.create_task(
        client_to_agent_messaging(websocket, live_request_queue)
    )
    await asyncio.gather(agent_to_client_task, client_to_agent_task)

    # Disconnected
    print(f"Client #{session_id} disconnected")

这段代码使用ADK和FastAPI创建了一个实时聊天应用。它设置了一个WebSocket端点,客户端可以连接并与Google搜索智能体进行交互。

关键功能:

  • 加载Gemini API密钥。
  • 使用ADK管理智能体会话并运行`google_search_agent`。
  • `start_agent_session` 初始化一个带有实时请求队列的智能体会话,用于实时通信。
  • `agent_to_client_messaging` 异步地将智能体的文本响应和状态更新(轮次完成、中断)流式传输到连接的WebSocket客户端。
  • `client_to_agent_messaging` 异步接收来自WebSocket客户端的文本消息,并将其作为用户输入发送给智能体。
  • FastAPI 提供静态前端服务,并在 `/ws/{session_id}` 处理 WebSocket 连接。
  • 当客户端连接时,它会启动一个智能体会话,并通过WebSockets为客户端和智能体之间的双向通信创建并发任务。

Copy-paste the following code block to the index.html file.

index.html
<!doctype html>
<html>
  <head>
    <title>ADK Streaming Test</title>
  </head>

  <body>
    <h1>ADK Streaming Test</h1>
    <div
      id="messages"
      style="height: 300px; overflow-y: auto; border: 1px solid black"></div>
    <br />

    <form id="messageForm">
      <label for="message">Message:</label>
      <input type="text" id="message" name="message" />
      <button type="submit" id="sendButton" disabled>Send</button>
    </form>
  </body>

  <script>
    // Connect the server with a WebSocket connection
    const sessionId = Math.random().toString().substring(10);
    const ws_url = "ws://" + window.location.host + "/ws/" + sessionId;
    let ws = new WebSocket(ws_url);

    // Get DOM elements
    const messageForm = document.getElementById("messageForm");
    const messageInput = document.getElementById("message");
    const messagesDiv = document.getElementById("messages");
    let currentMessageId = null;

    // WebSocket handlers
    function addWebSocketHandlers(ws) {
      ws.onopen = function () {
        console.log("WebSocket connection opened.");
        document.getElementById("sendButton").disabled = false;
        document.getElementById("messages").textContent = "Connection opened";
        addSubmitHandler(this);
      };

      ws.onmessage = function (event) {
        // Parse the incoming message
        const packet = JSON.parse(event.data);
        console.log(packet);

        // Check if the turn is complete
        // if turn complete, add new message
        if (packet.turn_complete && packet.turn_complete == true) {
          currentMessageId = null;
          return;
        }

        // add a new message for a new turn
        if (currentMessageId == null) {
          currentMessageId = Math.random().toString(36).substring(7);
          const message = document.createElement("p");
          message.id = currentMessageId;
          // Append the message element to the messagesDiv
          messagesDiv.appendChild(message);
        }

        // Add message text to the existing message element
        const message = document.getElementById(currentMessageId);
        message.textContent += packet.message;

        // Scroll down to the bottom of the messagesDiv
        messagesDiv.scrollTop = messagesDiv.scrollHeight;
      };

      // When the connection is closed, try reconnecting
      ws.onclose = function () {
        console.log("WebSocket connection closed.");
        document.getElementById("sendButton").disabled = true;
        document.getElementById("messages").textContent = "Connection closed";
        setTimeout(function () {
          console.log("Reconnecting...");
          ws = new WebSocket(ws_url);
          addWebSocketHandlers(ws);
        }, 5000);
      };

      ws.onerror = function (e) {
        console.log("WebSocket error: ", e);
      };
    }
    addWebSocketHandlers(ws);

    // Add submit handler to the form
    function addSubmitHandler(ws) {
      messageForm.onsubmit = function (e) {
        e.preventDefault();
        const message = messageInput.value;
        if (message) {
          const p = document.createElement("p");
          p.textContent = "> " + message;
          messagesDiv.appendChild(p);
          ws.send(message);
          messageInput.value = "";
        }
        return false;
      };
    }
  </script>
</html>

This HTML file sets up a basic webpage with:

  • 一个表单(`messageForm`),包含用于输入消息的输入框和一个"发送"按钮。
  • JavaScript 能够:
  • 连接到位于`wss://[当前主机]/ws/[随机会话ID]`的WebSocket服务器。
  • 连接成功后启用"发送"按钮。
  • 将从WebSocket接收到的消息追加到`messages` div中,处理流式响应和轮次完成。
  • 当表单提交时,将输入框中输入的文本发送到WebSocket服务器。
  • 如果WebSocket连接关闭,会尝试重新连接。

6. 与您的流式应用交互

1. 导航到正确的目录:

要有效运行您的智能体,您需要位于应用文件夹 (adk-streaming/app)

2. 启动Fast API: 运行以下命令以启动CLI界面

uvicorn main:app --reload

3. 访问用户界面: 当UI服务器启动后,终端会显示一个本地URL(例如 http://localhost:8000)。点击此链接在浏览器中打开用户界面。

现在你应该能看到如下用户界面:

ADK Streaming Test

尝试提问What is Gemini?。智能体会使用Google搜索来回应您的查询。您会注意到界面以流式文本形式显示智能体的响应。您还可以随时向智能体发送消息,即使智能体仍在响应中。这展示了ADK Streaming的双向通信能力。

相比传统同步网页应用的优势:

  • 实时双向通信:无缝交互。
  • 响应更迅速、互动性更强:无需等待完整回复或不断刷新。体验如同实时对话。
  • 可扩展支持音频、图像和视频流的多模态应用。

恭喜!您已成功使用ADK创建并交互了您的首个流式智能体!

后续步骤

  • 添加音频/图像模态:通过Streaming,您还可以使用音频和图像与智能体进行实时通信。我们将在未来添加更多支持多模态的示例。敬请期待!