跳到主要内容
Open In Colab

Websockets: 使用websockets进行流式输入和输出

Open on GitHub

本笔记本演示了如何使用 IOStream 类通过websockets流式传输输入和输出。使用websockets可以构建比使用web方法更具响应性的web客户端。主要区别在于websockets允许您推送数据,而使用web方法则需要轮询服务器以获取新响应。

在本指南中,我们将探索IOStream类的能力。它专为增强客户端开发而设计,特别是使用websockets进行输入和输出流的Web客户端。IOStream通过在Web应用程序中启用更动态和交互式的用户体验而脱颖而出。

Websockets技术是这一功能的核心,它通过允许数据实时“推送”到客户端,比传统的网页方法有了显著的进步。这不同于传统的方法,在传统方法中,客户端必须反复“轮询”服务器以检查是否有新的响应。通过使用底层的websockets库,IOStream类促进了服务器和客户端之间的持续、双向通信通道。这确保了更新能够即时接收,无需持续的轮询,从而使网页客户端更加高效和响应迅速。

通过IOStream类利用websockets的真正威力在于其能够创建高度响应的网络客户端。这种响应性对于需要实时数据更新的应用程序,如聊天应用程序,至关重要。通过将IOStream类集成到您的网络应用程序中,您不仅通过即时数据传输增强了用户体验,还通过消除不必要的轮询减少了服务器的负载。

本质上,通过IOStream类使用websockets的转变标志着web客户端开发的显著增强。这种方法不仅简化了客户端和服务器之间的数据交换过程,还为创建更具互动性和吸引力的web应用程序开辟了新的可能性。通过遵循本指南,开发者可以充分利用websockets和IOStream类的全部潜力,突破web客户端响应性和互动性的界限。

要求

Requirements

本笔记本需要一些额外的依赖项,可以通过pip安装:

pip install autogen-agentchat[websockets]~=0.2 fastapi uvicorn

如需更多信息,请参考安装指南

设置您的API端点

config_list_from_json 函数从环境变量或 JSON 文件加载配置列表。

from datetime import datetime
from tempfile import TemporaryDirectory

from websockets.sync.client import connect as ws_connect

import autogen
from autogen.io.websockets import IOWebsockets

config_list = autogen.config_list_from_json(
env_or_file="OAI_CONFIG_LIST",
filter_dict={
"model": ["gpt-4", "gpt-3.5-turbo", "gpt-3.5-turbo-16k"],
},
)
tip

了解更多关于为agent配置LLM的信息在这里.

定义 on_connect 函数

一个on_connect函数是利用websockets的应用程序中至关重要的部分,充当事件处理程序,每当新的客户端连接建立时被调用。此函数旨在启动特定于新连接客户端的任何必要设置、通信协议或数据交换程序。本质上,它为后续的交互会话奠定了基础,配置服务器和客户端之间的通信方式以及建立连接后采取的初始操作。现在,让我们深入了解如何定义此函数,特别是在使用AutoGen框架与websockets的背景下。

当客户端连接到websocket服务器时,服务器会自动启动一个IOWebsockets类的新实例。该实例对于管理服务器和客户端之间的数据流至关重要。on_connect函数利用该实例来设置通信协议,定义交互规则,并启动任何必要的初步数据交换或配置,以确保客户端-服务器交互顺利进行。

def on_connect(iostream: IOWebsockets) -> None:
print(f" - on_connect(): Connected to client using IOWebsockets {iostream}", flush=True)

print(" - on_connect(): Receiving message from client.", flush=True)

# 1. Receive Initial Message
initial_msg = iostream.input()

# 2. Instantiate ConversableAgent
agent = autogen.ConversableAgent(
name="chatbot",
system_message="Complete a task given to you and reply TERMINATE when the task is done. If asked about the weather, use tool 'weather_forecast(city)' to get the weather forecast for a city.",
llm_config={
"config_list": autogen.config_list_from_json(
env_or_file="OAI_CONFIG_LIST",
filter_dict={
"model": ["gpt-4", "gpt-3.5-turbo", "gpt-3.5-turbo-16k"],
},
),
"stream": True,
},
)

# 3. Define UserProxyAgent
user_proxy = autogen.UserProxyAgent(
name="user_proxy",
system_message="A proxy for the user.",
is_termination_msg=lambda x: x.get("content", "") and x.get("content", "").rstrip().endswith("TERMINATE"),
human_input_mode="NEVER",
max_consecutive_auto_reply=10,
code_execution_config=False,
)

# 4. Define Agent-specific Functions
def weather_forecast(city: str) -> str:
return f"The weather forecast for {city} at {datetime.now()} is sunny."

autogen.register_function(
weather_forecast, caller=agent, executor=user_proxy, description="Weather forecast for a city"
)

# 5. Initiate conversation
print(
f" - on_connect(): Initiating chat with agent {agent} using message '{initial_msg}'",
flush=True,
)
user_proxy.initiate_chat( # noqa: F704
agent,
message=initial_msg,
)

以下是如何定义一个典型的on_connect函数的解释,例如上面的示例中的那个:

  1. 接收初始消息: 建立连接后,立即从客户端接收初始消息。此步骤对理解客户的请求或启动对话流程至关重要。

  2. 实例化 ConversableAgent: 使用特定的系统消息和LLM配置创建一个ConversableAgent的实例。如果需要多个代理,请确保它们不共享相同的llm_config,因为向其中一个添加函数也会尝试将其添加到另一个。

  3. 实例化 UserProxyAgent: 同样地,创建一个 UserProxyAgent 实例,定义其终止条件、人工输入模式以及其他相关参数。不需要定义 llm_config,因为 UserProxyAgent 不使用 LLM。

  4. 定义特定代理的功能:如果您的可对话代理需要执行特定任务,例如在上面的示例中获取天气预报,请在 on_connect 范围内定义这些功能。相应地装饰这些功能,将其与您的代理链接。

  5. 发起对话: 最后,使用 UserProxyAgentinitiate_chat 方法来启动与可对话代理的交互,传递初始消息和缓存机制以提高效率。

使用 Python 客户端测试 websockets 服务器

测试一个on_connect函数与Python客户端涉及模拟客户端-服务器交互,以确保设置、数据交换和通信协议按预期运行。以下是使用Python客户端进行此测试的简要说明:

  1. 启动Websocket服务器: 使用 IOWebsockets.run_server_in_thread method 在一个单独的线程中启动服务器,指定on_connect函数和端口。 此方法返回正在运行的websocket服务器的URI。

  2. 连接到服务器: 使用返回的URI打开与服务器的连接。这模拟了客户端启动连接到您的websocket服务器的过程。

  3. 发送消息到服务器:一旦连接成功,从客户端发送一条消息到服务器。这测试服务器通过已建立的websocket连接接收消息的能力。

  4. 接收和处理消息: 实现一个循环以持续接收来自服务器的消息。根据需要对消息进行解码,并相应地进行处理。此步骤验证服务器响应客户端请求的能力。

该测试场景通过模拟一个真实的消息交换,有效地评估了客户端和服务器之间的交互,使用了on_connect函数。它确保服务器能够处理传入的连接、处理消息并将响应传递回客户端,这些都是基于websocket的稳健应用的关键功能。

with IOWebsockets.run_server_in_thread(on_connect=on_connect, port=8765) as uri:
print(f" - test_setup() with websocket server running on {uri}.", flush=True)

with ws_connect(uri) as websocket:
print(f" - Connected to server on {uri}", flush=True)

print(" - Sending message to server.", flush=True)
# websocket.send("2+2=?")
websocket.send("Check out the weather in Paris and write a poem about it.")

while True:
message = websocket.recv()
message = message.decode("utf-8") if isinstance(message, bytes) else message

print(message, end="", flush=True)

if "TERMINATE" in message:
print()
print(" - Received TERMINATE message. Exiting.", flush=True)
break
 - test_setup() with websocket server running on ws://127.0.0.1:8765.
- on_connect(): Connected to client using IOWebsockets <autogen.io.websockets.IOWebsockets object at 0x7b8fd65b3c10>
- on_connect(): Receiving message from client.
- Connected to server on ws://127.0.0.1:8765
- Sending message to server.
- on_connect(): Initiating chat with agent <autogen.agentchat.conversable_agent.ConversableAgent object at 0x7b909c086290> using message 'Check out the weather in Paris and write a poem about it.'
user_proxy (to chatbot):

Check out the weather in Paris and write a poem about it.

--------------------------------------------------------------------------------

>>>>>>>> USING AUTO REPLY...
chatbot (to user_proxy):


***** Suggested tool call (call_xFFWe52vwdpgZ8xTRV6adBdy): weather_forecast *****
Arguments:
{
"city": "Paris"
}
*********************************************************************************

--------------------------------------------------------------------------------

>>>>>>>> EXECUTING FUNCTION weather_forecast...
user_proxy (to chatbot):

user_proxy (to chatbot):

***** Response from calling tool (call_xFFWe52vwdpgZ8xTRV6adBdy) *****
The weather forecast for Paris at 2024-04-05 12:00:06.206125 is sunny.
**********************************************************************

--------------------------------------------------------------------------------

>>>>>>>> USING AUTO REPLY...
In the heart of France, beneath the sun's warm glow,
Lies the city of Paris, where the Seine waters flow.
Bathed in sunlight, every street and spire,
Illuminated each detail, just like a docile fire.

Once monochromatic cityscape, kissed by the sun's bright light,
Now a kaleidoscope of colors, from morning till the night.
This sun-swept city sparkles, under the azure dome,
Her inhabitants find comfort, for they call this city home.

One can wander in her sunshine, on this perfect weather day,
And feel the warmth it brings, to chase your blues away.
For the weather in Paris, is more than just a forecast,
It is a stage setting for dwellers and tourists amassed.

TERMINATE

chatbot (to user_proxy):

In the heart of France, beneath the sun's warm glow,
Lies the city of Paris, where the Seine waters flow.
Bathed in sunlight, every street and spire,
Illuminated each detail, just like a docile fire.

Once monochromatic cityscape, kissed by the sun's bright light,
Now a kaleidoscope of colors, from morning till the night.
This sun-swept city sparkles, under the azure dome,
Her inhabitants find comfort, for they call this city home.

One can wander in her sunshine, on this perfect weather day,
And feel the warmth it brings, to chase your blues away.
For the weather in Paris, is more than just a forecast,
It is a stage setting for dwellers and tourists amassed.

TERMINATE

- Received TERMINATE message. Exiting.

使用HTML/JS客户端测试在FastAPI服务器中运行的websockets服务器

下面的代码片段概述了在Web环境中测试on_connect函数的方法,使用 FastAPI来提供一个简单的交互式HTML页面。这种方法允许用户通过Web界面发送消息,然后由运行AutoGen框架的服务器通过WebSocket处理。以下是一步步的解释:

  1. FastAPI 应用设置: 代码首先导入必要的库并设置一个 FastAPI 应用。FastAPI 是一个现代、快速的网络框架,用于基于标准 Python 类型提示的 Python 3.7+ 构建 API。

  2. 用于用户交互的HTML模板:HTML模板被定义为一个多行Python字符串,其中包含一个用于消息输入的基本表单和一个用于管理websocket通信的脚本。该模板创建了一个用户界面,可以在此界面向服务器发送消息并动态显示响应。

  3. 运行Websocket服务器: 异步上下文管理器 run_websocket_server 使用指定的 on_connect 函数和端口通过 IOWebsockets.run_server_in_thread 启动websocket服务器。该服务器监听传入的websocket连接。

  4. 用于提供HTML页面的FastAPI路由: 定义了一个FastAPI路由 (@app.get("/")),以向用户提供HTML页面。当用户访问根URL时, 将返回用于websocket聊天的HTML内容,允许他们与websocket服务器进行交互。

  5. 启动FastAPI应用程序: 最后,使用Uvicorn启动FastAPI应用程序,Uvicorn是一个ASGI服务器,配置了应用程序和所需的附加参数。服务器启动后,即可向用户提供交互式的HTML页面。

这种测试方法允许用户与服务器之间进行交互式通信,提供了一种实时展示和评估on_connect函数行为的实用方式。用户可以通过网页发送消息,服务器根据on_connect函数中定义的逻辑处理这些消息,以用户友好的方式展示AutoGen框架处理websocket的能力和响应性。

from contextlib import asynccontextmanager  # noqa: E402
from pathlib import Path # noqa: E402

from fastapi import FastAPI # noqa: E402
from fastapi.responses import HTMLResponse # noqa: E402

PORT = 8000

html = """
<!DOCTYPE html>
<html>
<head>
<title>Autogen websocket test</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8080/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""


@asynccontextmanager
async def run_websocket_server(app):
with IOWebsockets.run_server_in_thread(on_connect=on_connect, port=8080) as uri:
print(f"Websocket server started at {uri}.", flush=True)

yield


app = FastAPI(lifespan=run_websocket_server)


@app.get("/")
async def get():
return HTMLResponse(html)
import uvicorn  # noqa: E402

config = uvicorn.Config(app)
server = uvicorn.Server(config)
await server.serve() # noqa: F704
INFO:     Started server process [5227]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [5227]
Websocket server started at ws://127.0.0.1:8080.
INFO: 127.0.0.1:42548 - "GET / HTTP/1.1" 200 OK
INFO: 127.0.0.1:42548 - "GET /favicon.ico HTTP/1.1" 404 Not Found
- on_connect(): Connected to client using IOWebsockets <autogen.io.websockets.IOWebsockets object at 0x7b8fc6991420>
- on_connect(): Receiving message from client.
- on_connect(): Initiating chat with agent <autogen.agentchat.conversable_agent.ConversableAgent object at 0x7b909c0cab00> using message 'write a poem about lundon'

上述描述的测试设置,利用FastAPI和websockets,不仅为on_connect功能提供了一个强大的测试框架,还为开发现实世界的应用程序奠定了基础。这种方法展示了如何使基于网络的互动变得动态和实时,这是现代应用程序开发的一个关键方面。

例如,此设置可以直接应用或调整以构建交互式聊天应用程序、实时数据仪表板或实时支持系统。websockets的集成使得服务器能够即时向客户端推送更新,这对于依赖及时信息传递的应用程序来说是一个关键特性。例如,基于此框架构建的聊天应用程序可以支持用户之间的即时消息传递,从而提高用户的参与度和满意度。

此外,用于测试的HTML页面的简洁性和交互性反映了用户界面设计如何提供无缝体验。开发人员可以在此基础之上扩展,引入更复杂的元素,如用户身份验证、消息加密和自定义用户交互,从而进一步定制应用程序以满足特定的用例需求。

FastAPI 框架的灵活性,结合 websockets 实现的实时通信,为开发人员提供了一个强大的工具集,用于构建可扩展、高效且高度交互的 Web 应用程序。无论是用于创建协作平台、流媒体服务,还是交互式游戏体验,此测试设置都展示了利用这些技术可以开发的潜在应用。

使用HTML/JS客户端测试WebSocket服务器

下面提供的代码片段是一个示例,展示了如何使用Python内置的http.server模块为on_connect函数创建一个交互式测试环境。此设置允许在Web浏览器中进行实时交互,使开发者能够以更加用户友好和实用的方式测试WebSocket功能。以下是对该代码运行方式及其潜在应用的详细说明:

  1. 提供一个简单的HTML页面: 代码首先定义了一个HTML页面,该页面包括一个用于发送消息的表单和一个用于显示接收消息的列表。JavaScript用于处理表单提交和WebSocket通信。

  2. HTML文件的临时目录:创建了一个临时目录来存储HTML文件。这种方法确保测试环境是干净且隔离的,最大限度地减少与现有文件或配置的冲突。

  3. 自定义HTTP请求处理程序:定义了一个SimpleHTTPRequestHandler的自定义子类来提供HTML文件。此处理程序重写了do_GET方法,将根路径(/)重定向到chat.html页面,确保访问服务器根URL的访客立即看到聊天界面。

  4. 启动Websocket服务器: 同时,使用 IOWebsockets.run_server_in_thread方法在另一个端口上启动了一个Websocket服务器,将之前定义的 on_connect函数作为新连接的回调。

  5. 用于HTML界面的HTTP服务器:实例化一个HTTP服务器来提供HTML聊天界面,使用户能够通过网络浏览器与websocket服务器进行交互。

这个设置展示了将websockets与简单的HTTP服务器集成的实际应用,以创建一个动态和交互式的网络应用程序。通过使用Python的标准库模块,它展示了开发实时应用程序的低门槛,如聊天系统、实时通知或交互式仪表板。

从这段代码示例中,关键点在于如何轻松地利用Python的内置库来原型化和测试复杂的网络功能。对于希望构建实际应用程序的开发者来说,这种方法提供了一种直接的方法来验证和完善websocket通信逻辑,然后再将其集成到更大的框架或系统中。这种测试设置的简单性和易用性使其成为开发各种交互式网络应用程序的绝佳起点。

from http.server import HTTPServer, SimpleHTTPRequestHandler  # noqa: E402

PORT = 8000

html = """
<!DOCTYPE html>
<html>
<head>
<title>Autogen websocket test</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8080/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""

with TemporaryDirectory() as temp_dir:
# create a simple HTTP webpage
path = Path(temp_dir) / "chat.html"
with open(path, "w") as f:
f.write(html)

#
class MyRequestHandler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=temp_dir, **kwargs)

def do_GET(self):
if self.path == "/":
self.path = "/chat.html"
return SimpleHTTPRequestHandler.do_GET(self)

handler = MyRequestHandler

with IOWebsockets.run_server_in_thread(on_connect=on_connect, port=8080) as uri:
print(f"Websocket server started at {uri}.", flush=True)

with HTTPServer(("", PORT), handler) as httpd:
print("HTTP server started at http://localhost:" + str(PORT))
try:
httpd.serve_forever()
except KeyboardInterrupt:
print(" - HTTP server stopped.", flush=True)
Websocket server started at ws://127.0.0.1:8080.
HTTP server started at http://localhost:8000
- on_connect(): Connected to client using IOWebsockets <autogen.io.websockets.IOWebsockets object at 0x7b8fc69937f0>
- on_connect(): Receiving message from client.
- on_connect(): Initiating chat with agent <autogen.agentchat.conversable_agent.ConversableAgent object at 0x7b8fc6990310> using message 'write a poem about new york'
- on_connect(): Connected to client using IOWebsockets <autogen.io.websockets.IOWebsockets object at 0x7b8fc68bdc90>
- on_connect(): Receiving message from client.
- on_connect(): Initiating chat with agent <autogen.agentchat.conversable_agent.ConversableAgent object at 0x7b8fc68be170> using message 'check the weather in london and write a poem about it'
- HTTP server stopped.
127.0.0.1 - - [05/Apr/2024 12:01:51] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [05/Apr/2024 12:01:51] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [05/Apr/2024 12:02:27] "GET / HTTP/1.1" 304 -
优云智算