Skip to main content

多实例TPM/RPM测试 (litellm.Router)

测试您定义的TPM/RPM限制是否在多个Router对象实例中得到遵守。

在我们的测试中:

  • 每个部署的最大RPM = 每分钟100个请求
  • Router上的最大吞吐量/分钟 = 每分钟200个请求(2个部署)
  • 我们通过Router发送的负载 = 每分钟600个请求
info

如果您不想调用真实的LLM API端点,您可以设置一个假的OpenAI服务器。查看代码

代码

让我们每分钟向Router发送600个请求。

复制此脚本 👇。将其保存为test_loadtest_router.py并通过python3 test_loadtest_router.py运行它。

from litellm import Router 
import litellm
litellm.suppress_debug_info = True
litellm.set_verbose = False
import logging
logging.basicConfig(level=logging.CRITICAL)
import os, random, uuid, time, asyncio

# OpenAI和Anthropic模型的模型列表
model_list = [
{
"model_name": "fake-openai-endpoint",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "my-fake-key",
"api_base": "http://0.0.0.0:8080",
"rpm": 100
},
},
{
"model_name": "fake-openai-endpoint",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "my-fake-key",
"api_base": "http://0.0.0.0:8081",
"rpm": 100
},
},
]

router_1 = Router(model_list=model_list, num_retries=0, enable_pre_call_checks=True, routing_strategy="usage-based-routing-v2", redis_host=os.getenv("REDIS_HOST"), redis_port=os.getenv("REDIS_PORT"), redis_password=os.getenv("REDIS_PASSWORD"))
router_2 = Router(model_list=model_list, num_retries=0, routing_strategy="usage-based-routing-v2", enable_pre_call_checks=True, redis_host=os.getenv("REDIS_HOST"), redis_port=os.getenv("REDIS_PORT"), redis_password=os.getenv("REDIS_PASSWORD"))



async def router_completion_non_streaming():
try:
client: Router = random.sample([router_1, router_2], 1)[0] # 随机选择客户端
# print(f"client={client}")
response = await client.acompletion(
model="fake-openai-endpoint", # [更改此项](如果您在代理中调用其他名称)
messages=[{"role": "user", "content": f"这是一个测试: {uuid.uuid4()}"}],
)
return response
except Exception as e:
# print(e)
return None

async def loadtest_fn():
start = time.time()
n = 600 # 并发任务数量
tasks = [router_completion_non_streaming() for _ in range(n)]
chat_completions = await asyncio.gather(*tasks)
successful_completions = [c for c in chat_completions if c is not None]
print(n, time.time() - start, len(successful_completions))

def get_utc_datetime():
import datetime as dt
from datetime import datetime

if hasattr(dt, "UTC"):
return datetime.now(dt.UTC) # type: ignore
else:
return datetime.utcnow() # type: ignore


# 运行事件循环以执行异步函数
async def parent_fn():
for _ in range(10):
dt = get_utc_datetime()
current_minute = dt.strftime("%H-%M")
print(f"触发新批次 - {current_minute}")
await loadtest_fn()
await asyncio.sleep(10)

asyncio.run(parent_fn())

多实例TPM/RPM负载测试 (代理)

测试您定义的TPM/RPM限制是否在多个实例中得到遵守。

最快捷的方法是通过测试代理。代理在底层使用了路由器,因此如果您正在使用其中任何一个,这个测试应该对您有效。

在我们的测试中:

  • 每个部署的最大RPM = 每分钟100个请求
  • 代理上的最大吞吐量/分钟 = 每分钟200个请求(2个部署)
  • 我们发送到代理的负载 = 每分钟600个请求

因此,我们将每分钟发送600个请求,但预计只有每分钟200个请求会成功。

info

如果您不想调用真实的LLM API端点,您可以设置一个假的OpenAI服务器。查看代码

1. 设置配置

model_list:
- litellm_params:
api_base: http://0.0.0.0:8080
api_key: my-fake-key
model: openai/my-fake-model
rpm: 100
model_name: fake-openai-endpoint
- litellm_params:
api_base: http://0.0.0.0:8081
api_key: my-fake-key
model: openai/my-fake-model-2
rpm: 100
model_name: fake-openai-endpoint
router_settings:
num_retries: 0
enable_pre_call_checks: true
redis_host: os.environ/REDIS_HOST ## 👈 重要!使用redis设置代理
redis_password: os.environ/REDIS_PASSWORD
redis_port: os.environ/REDIS_PORT
routing_strategy: usage-based-routing-v2

2. 启动代理2个实例

实例1

litellm --config /path/to/config.yaml --port 4000

## 运行于 http://0.0.0.0:4000

实例2

litellm --config /path/to/config.yaml --port 4001

## 运行于 http://0.0.0.0:4001

3. 运行测试

让我们以每分钟600次请求的速度向代理发起请求。

复制这个脚本 👇。将其保存为 test_loadtest_proxy.py 并用 python3 test_loadtest_proxy.py 运行它。

from openai import AsyncOpenAI, AsyncAzureOpenAI
import random, uuid
import time, asyncio, litellm
# import logging
# logging.basicConfig(level=logging.DEBUG)
#### LITELLM PROXY ####
litellm_client = AsyncOpenAI(
api_key="sk-1234", # [CHANGE THIS]
base_url="http://0.0.0.0:4000"
)
litellm_client_2 = AsyncOpenAI(
api_key="sk-1234", # [CHANGE THIS]
base_url="http://0.0.0.0:4001"
)

async def proxy_completion_non_streaming():
try:
client = random.sample([litellm_client, litellm_client_2], 1)[0] # randomly pick b/w clients
# print(f"client={client}")
response = await client.chat.completions.create(
model="fake-openai-endpoint", # [CHANGE THIS] (if you call it something else on your proxy)
messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
)
return response
except Exception as e:
# print(e)
return None

async def loadtest_fn():
start = time.time()
n = 600 # Number of concurrent tasks
tasks = [proxy_completion_non_streaming() for _ in range(n)]
chat_completions = await asyncio.gather(*tasks)
successful_completions = [c for c in chat_completions if c is not None]
print(n, time.time() - start, len(successful_completions))

def get_utc_datetime():
import datetime as dt
from datetime import datetime

if hasattr(dt, "UTC"):
return datetime.now(dt.UTC) # type: ignore
else:
return datetime.utcnow() # type: ignore


# Run the event loop to execute the async function
async def parent_fn():
for _ in range(10):
dt = get_utc_datetime()
current_minute = dt.strftime("%H-%M")
print(f"triggered new batch - {current_minute}")
await loadtest_fn()
await asyncio.sleep(10)

asyncio.run(parent_fn())

额外 - 设置假OpenAI服务器

让我们设置一个每分钟请求限制为100次的假OpenAI服务器。

我们称我们的文件为 fake_openai_server.py

# import sys, os
# sys.path.insert(
# 0, os.path.abspath("../")
# ) # 将父目录添加到系统路径
from fastapi import FastAPI, Request, status, HTTPException, Depends
from fastapi.responses import StreamingResponse
from fastapi.security import OAuth2PasswordBearer
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi import FastAPI, Request, HTTPException, UploadFile, File
import httpx, os, json
from openai import AsyncOpenAI
from typing import Optional
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import PlainTextResponse


class ProxyException(Exception):
# 注意:不要修改这个
# 这是用来精确映射到OPENAI异常的
def __init__(
self,
message: str,
type: str,
param: Optional[str],
code: Optional[int],
):
self.message = message
self.type = type
self.param = param
self.code = code

def to_dict(self) -> dict:
"""将ProxyException实例转换为字典。"""
return {
"message": self.message,
"type": self.type,
"param": self.param,
"code": self.code,
}


limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter

@app.exception_handler(RateLimitExceeded)
async def _rate_limit_exceeded_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(status_code=429,
content={"detail": "Rate Limited!"})

app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

# 用于完成
@app.post("/chat/completions")
@app.post("/v1/chat/completions")
@limiter.limit("100/minute")
async def completion(request: Request):
# raise HTTPException(status_code=429, detail="Rate Limited!")
return {
"id": "chatcmpl-123",
"object": "chat.completion",
"created": 1677652288,
"model": None,
"system_fingerprint": "fp_44709d6fcb",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "\n\nHello there, how may I assist you today?",
},
"logprobs": None,
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": 9,
"completion_tokens": 12,
"total_tokens": 21
}
}

if __name__ == "__main__":
import socket
import uvicorn
port = 8080
while True:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('0.0.0.0', port))
if result != 0:
print(f"端口 {port} 可用,正在启动服务器...")
break
else:
port += 1

uvicorn.run(app, host="0.0.0.0", port=port)
python3 fake_openai_server.py
优云智算