Skip to main content

路由器 - 负载均衡、回退策略

LiteLLM 管理:

  • 在多个部署之间进行负载均衡(例如 Azure/OpenAI)
  • 优先处理重要请求以确保它们不会失败(即排队)
  • 基本可靠性逻辑 - 冷却时间、回退、超时和重试(固定 + 指数退避)跨多个部署/提供商。

在生产环境中,litellm 支持使用 Redis 来跟踪冷却服务器和使用情况(管理 tpm/rpm 限制)。

info

如果你想在不同的 LLM API 之间进行负载均衡,请使用我们的 LiteLLM 代理服务器

负载均衡

(感谢 @paulpierresweep proxy 对这一实现的贡献) 查看代码

快速开始

在多个 azure/bedrock/provider 部署之间进行负载均衡。LiteLLM 将在某个区域的调用失败时处理重试。

from litellm import Router

model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias -> loadbalance between models with same `model_name`
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
}, {
"model_name": "gpt-4",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/gpt-4",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION"),
}
}, {
"model_name": "gpt-4",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-4",
"api_key": os.getenv("OPENAI_API_KEY"),
}
},

]

router = Router(model_list=model_list)

# openai.ChatCompletion.create 替换
# 请求 model="gpt-3.5-turbo" 将选择 model_name="gpt-3.5-turbo" 的部署
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}])

print(response)

# openai.ChatCompletion.create 替换
# 请求 model="gpt-4" 将选择 model_name="gpt-4" 的部署
response = await router.acompletion(model="gpt-4",
ßmessages=[{"role": "user", "content": "Hey, how's it going?"}])

print(response)

可用端点

  • router.completion() - 聊天完成端点,调用 100 多个 LLM
  • router.acompletion() - 异步聊天完成调用
  • router.embedding() - 嵌入端点,适用于 Azure、OpenAI、Huggingface 端点
  • router.aembedding() - 异步嵌入调用
  • router.text_completion() - 旧版 OpenAI /v1/completions 端点格式的完成调用
  • router.atext_completion() - 异步文本完成调用
  • router.image_generation() - OpenAI /v1/images/generations 端点格式的完成调用
  • router.aimage_generation() - 异步图像生成调用

高级 - 路由策略 ⭐️

路由策略 - 加权选择、速率限制感知、最不繁忙、延迟基于、成本基于

路由器提供了4种策略来在多个部署之间路由您的调用:

🎉 新功能 这是基于使用情况的异步实现。

过滤掉超出tpm/rpm限制的部署 - 如果您传入部署的tpm/rpm限制。

路由到当分钟内TPM使用率最低的部署

在生产环境中,我们使用Redis来跟踪多个部署的使用情况(TPM/RPM)。此实现使用异步redis调用(redis.incr和redis.mget)。

对于Azure,每1000 TPM您可以获得6 RPM

from litellm import Router 


model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
"tpm": 100000,
"rpm": 10000,
},
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
"tpm": 100000,
"rpm": 1000,
},
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
"tpm": 100000,
"rpm": 1000,
},
}]
router = Router(model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
routing_strategy="usage-based-routing-v2" # 👈 KEY CHANGE
enable_pre_call_checks=True, # enables router rate limits for concurrent calls
)

response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]

print(response)

基本可靠性

最大并行请求数(异步)

用于路由器上异步请求的信号量。限制对部署的最大并发调用数。在流量高峰场景中很有用。

如果设置了tpm/rpm,并且没有给出最大并行请求限制,我们使用RPM或计算出的RPM(tpm/1000/6)作为最大并行请求限制。

from litellm import Router 

model_list = [{
"model_name": "gpt-4",
"litellm_params": {
"model": "azure/gpt-4",
...
"max_parallel_requests": 10 # 👈 设置每个部署的最大并行请求数
}
}]

### 或者 ###

router = Router(model_list=model_list, default_max_parallel_requests=20) # 👈 设置默认最大并行请求数


# 部署最大并行请求数 > 默认最大并行请求数

查看代码

超时设置

路由器中设置的超时是针对整个调用过程的,并且也会传递到completion()调用级别。

全局超时

from litellm import Router 

model_list = [{...}]

router = Router(model_list=model_list,
timeout=30) # 如果调用时间超过30秒则引发超时错误

print(response)

每个模型的超时

from litellm import Router 
import asyncio

model_list = [{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"timeout": 300 # 设置5分钟的超时
"stream_timeout": 30 # 设置流式调用的30秒超时
}
}]

# 初始化路由器
router = Router(model_list=model_list, routing_strategy="least-busy")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response

asyncio.run(router_acompletion())

冷却时间

设置模型在一分钟内允许失败的最大次数,超过该次数后,模型将被冷却一分钟。

from litellm import Router

model_list = [{...}]

router = Router(model_list=model_list,
allowed_fails=1, # 如果模型在一分钟内失败超过1次,则使其冷却。
cooldown_time=100 # 如果失败次数超过允许的次数,则将部署冷却100秒。
)

user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]

# 正常调用
response = router.completion(model="gpt-3.5-turbo", messages=messages)

print(f"response: {response}")

预期响应

所选模型无可用部署,请在60秒后重试。传递的模型=claude-3-5-sonnet。预调用检查=False,允许的模型区域=n/a。

禁用冷却时间

from litellm import Router 


router = Router(..., disable_cooldowns=True)

重试

对于异步和同步函数,我们都支持重试失败的请求。

对于RateLimitError,我们实现指数退避。

对于通用错误,我们立即重试。

以下是我们可以设置num_retries = 3的快速示例:

from litellm import Router

model_list = [{...}]

router = Router(model_list=model_list,
num_retries=3)

user_message = "你好,旧金山的天气怎么样?"
messages = [{"content": user_message, "role": "user"}]

# 正常调用
response = router.completion(model="gpt-3.5-turbo", messages=messages)

print(f"响应: {response}")

我们还支持设置在重试失败请求之前等待的最短时间。这是通过retry_after参数实现的。

from litellm import Router

model_list = [{...}]

router = Router(model_list=model_list,
num_retries=3, retry_after=5) # 在重试请求之前至少等待5秒

user_message = "你好,旧金山的天气怎么样?"
messages = [{"content": user_message, "role": "user"}]

# 正常调用
response = router.completion(model="gpt-3.5-turbo", messages=messages)

print(f"响应: {response}")

[高级]: 根据错误类型自定义重试和冷却时间

  • 使用RetryPolicy如果你想根据收到的异常设置num_retries
  • 使用AllowedFailsPolicy设置在冷却部署之前每分钟允许的allowed_fails自定义数量

查看所有异常类型

示例:

retry_policy = RetryPolicy(
ContentPolicyViolationErrorRetries=3, # run 3 retries for ContentPolicyViolationErrors
AuthenticationErrorRetries=0, # run 0 retries for AuthenticationErrorRetries
)

allowed_fails_policy = AllowedFailsPolicy(
ContentPolicyViolationErrorAllowedFails=1000, # Allow 1000 ContentPolicyViolationError before cooling down a deployment
RateLimitErrorAllowedFails=100, # Allow 100 RateLimitErrors before cooling down a deployment
)

示例用法

from litellm.router import RetryPolicy, AllowedFailsPolicy

retry_policy = RetryPolicy(
ContentPolicyViolationErrorRetries=3, # run 3 retries for ContentPolicyViolationErrors
AuthenticationErrorRetries=0, # run 0 retries for AuthenticationErrorRetries
BadRequestErrorRetries=1,
TimeoutErrorRetries=2,
RateLimitErrorRetries=3,
)

allowed_fails_policy = AllowedFailsPolicy(
ContentPolicyViolationErrorAllowedFails=1000, # Allow 1000 ContentPolicyViolationError before cooling down a deployment
RateLimitErrorAllowedFails=100, # Allow 100 RateLimitErrors before cooling down a deployment
)

router = litellm.Router(
model_list=[
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
},
{
"model_name": "bad-model", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
},
],
retry_policy=retry_policy,
allowed_fails_policy=allowed_fails_policy,
)

response = await router.acompletion(
model=model,
messages=messages,
)

回退机制

如果在尝试 num_retries 次后调用失败,则回退到另一个模型组。

快速开始

from litellm import Router 
router = Router(
model_list=[
{ # 坏模型
"model_name": "bad-model",
"litellm_params": {
"model": "openai/my-bad-model",
"api_key": "my-bad-api-key",
"mock_response": "Bad call"
},
},
{ # 好模型
"model_name": "my-good-model",
"litellm_params": {
"model": "gpt-4o",
"api_key": os.getenv("OPENAI_API_KEY"),
"mock_response": "Good call"
},
},
],
fallbacks=[{"bad-model": ["my-good-model"]}] # 👈 关键更改
)

response = router.completion(
model="bad-model",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
mock_testing_fallbacks=True,
)

如果错误是上下文窗口超出错误,则回退到更大的模型组(如果提供)。

回退按顺序进行 - ["gpt-3.5-turbo", "gpt-4", "gpt-4-32k"],首先尝试 'gpt-3.5-turbo',然后是 'gpt-4',依此类推。

您还可以设置 default_fallbacks,以防特定模型组配置错误或表现不佳。

回退有三种类型:

  • content_policy_fallbacks:针对 litellm.ContentPolicyViolationError - LiteLLM 映射了不同提供商的内容政策违规错误 查看代码
  • context_window_fallbacks:针对 litellm.ContextWindowExceededErrors - LiteLLM 映射了不同提供商的上下文窗口错误消息 查看代码
  • fallbacks:针对所有其他错误,例如 litellm.RateLimitError

内容政策违规回退

关键更改:

content_policy_fallbacks=[{"claude-2": ["my-fallback-model"]}]
from litellm import Router 

router = Router(
model_list=[
{
"model_name": "claude-2",
"litellm_params": {
"model": "claude-2",
"api_key": "",
"mock_response": Exception("content filtering policy"),
},
},
{
"model_name": "my-fallback-model",
"litellm_params": {
"model": "claude-2",
"api_key": "",
"mock_response": "This works!",
},
},
],
content_policy_fallbacks=[{"claude-2": ["my-fallback-model"]}], # 👈 KEY CHANGE
# fallbacks=[..], # [OPTIONAL]
# context_window_fallbacks=[..], # [OPTIONAL]
)

response = router.completion(
model="claude-2",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)

上下文窗口超出回退

关键更改:

context_window_fallbacks=[{"claude-2": ["my-fallback-model"]}]
from litellm import Router 

router = Router(
model_list=[
{
"model_name": "claude-2",
"litellm_params": {
"model": "claude-2",
"api_key": "",
"mock_response": Exception("prompt is too long"),
},
},
{
"model_name": "my-fallback-model",
"litellm_params": {
"model": "claude-2",
"api_key": "",
"mock_response": "This works!",
},
},
],
context_window_fallbacks=[{"claude-2": ["my-fallback-model"]}], # 👈 KEY CHANGE
# fallbacks=[..], # [OPTIONAL]
# content_policy_fallbacks=[..], # [OPTIONAL]
)

response = router.completion(
model="claude-2",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)

常规回退

关键更改:

fallbacks=[{"claude-2": ["my-fallback-model"]}]
from litellm import Router 

router = Router(
model_list=[
{
"model_name": "claude-2",
"litellm_params": {
"model": "claude-2",
"api_key": "",
"mock_response": Exception("this is a rate limit error"),
},
},
{
"model_name": "my-fallback-model",
"litellm_params": {
"model": "claude-2",
"api_key": "",
"mock_response": "This works!",
},
},
],
fallbacks=[{"claude-2": ["my-fallback-model"]}], # 👈 KEY CHANGE
# context_window_fallbacks=[..], # [OPTIONAL]
# content_policy_fallbacks=[..], # [OPTIONAL]
)

response = router.completion(
model="claude-2",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)

缓存

在生产环境中,我们推荐使用Redis缓存。为了在本地快速测试,我们也支持简单的内存缓存。

内存缓存

router = Router(model_list=model_list, 
cache_responses=True)

print(response)

Redis缓存

router = Router(model_list=model_list, 
redis_host=os.getenv("REDIS_HOST"),
redis_password=os.getenv("REDIS_PASSWORD"),
redis_port=os.getenv("REDIS_PORT"),
cache_responses=True)

print(response)

传递Redis URL,附加参数

router = Router(model_list: Optional[list] = None,
## CACHING ##
redis_url=os.getenv("REDIS_URL")",
cache_kwargs= {}, # 传递给RedisCache的附加参数(参见caching.py)
cache_responses=True)

调用前检查(上下文窗口,欧盟地区)

启用调用前检查以过滤掉:

  1. 上下文窗口限制小于调用消息的部署。
  2. 非欧盟地区的部署。

1. 启用调用前检查

from litellm import Router 
# ...
router = Router(model_list=model_list, enable_pre_call_checks=True) # 👈 设置为True

2. 设置模型列表

对于Azure部署的上下文窗口检查,设置基础模型。从此列表中选择基础模型,所有Azure模型都以azure/开头。

对于'欧盟地区'过滤,设置部署的'region_name'。

注意: 我们根据您的litellm参数自动推断Vertex AI、Bedrock和IBM WatsonxAI的region_name。对于Azure,设置litellm.enable_preview = True

查看代码

model_list = [
{
"model_name": "gpt-3.5-turbo", # 模型组名称
"litellm_params": { # litellm completion/embedding调用的参数
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"region_name": "eu" # 👈 设置'欧盟'地区名称
"base_model": "azure/gpt-35-turbo", # 👈 (仅限Azure) 设置基础模型
},
},
{
"model_name": "gpt-3.5-turbo", # 模型组名称
"litellm_params": { # litellm completion/embedding调用的参数
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name": "gemini-pro",
"litellm_params: {
"model": "vertex_ai/gemini-pro-1.5",
"vertex_project": "adroit-crow-1234",
"vertex_location": "us-east1" # 👈 自动推断'region_name
}
}

]

router = Router(model_list=model_list, enable_pre_call_checks=True)

3. 测试它!

"""
- 给定具有不同上下文窗口(4k vs. 16k)的gpt-3.5-turbo模型组
- 发送一个5k的提示
- 断言它工作
"""
from litellm import Router
import os

model_list = [
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"base_model": "azure/gpt-35-turbo",
},
"model_info": {
"base_model": "azure/gpt-35-turbo",
}
},
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
]

router = Router(model_list=model_list, enable_pre_call_checks=True)

text = "What is the meaning of 42?" * 5000

response = router.completion(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": text},
{"role": "user", "content": "Who was Alexander?"},
],
)

print(f"response: {response}")

跨模型组缓存

如果你想在两个不同的模型组之间进行缓存(例如,Azure 部署和 OpenAI),请使用缓存组。

import litellm, asyncio, time
from litellm import Router

# set os env
os.environ["OPENAI_API_KEY"] = ""
os.environ["AZURE_API_KEY"] = ""
os.environ["AZURE_API_BASE"] = ""
os.environ["AZURE_API_VERSION"] = ""

async def test_acompletion_caching_on_router_caching_groups():
# tests acompletion + caching on router
try:
litellm.set_verbose = True
model_list = [
{
"model_name": "openai-gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo-0613",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name": "azure-gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION")
},
}
]

messages = [
{"role": "user", "content": f"write a one sentence poem {time.time()}?"}
]
start_time = time.time()
router = Router(model_list=model_list,
cache_responses=True,
caching_groups=[("openai-gpt-3.5-turbo", "azure-gpt-3.5-turbo")])
response1 = await router.acompletion(model="openai-gpt-3.5-turbo", messages=messages, temperature=1)
print(f"response1: {response1}")
await asyncio.sleep(1) # add cache is async, async sleep for cache to get set
response2 = await router.acompletion(model="azure-gpt-3.5-turbo", messages=messages, temperature=1)
assert response1.id == response2.id
assert len(response1.choices[0].message.content) > 0
assert response1.choices[0].message.content == response2.choices[0].message.content
except Exception as e:
traceback.print_exc()

asyncio.run(test_acompletion_caching_on_router_caching_groups())

警报 🚨

为以下事件向 Slack / 你的 Webhook URL 发送警报

  • LLM API 异常
  • 缓慢的 LLM 响应

https://api.slack.com/messaging/webhooks 获取 Slack Webhook URL

使用方法

初始化一个 AlertingConfig 并将其传递给 litellm.Router。以下代码将触发警报,因为 api_key=bad-key 是无效的

from litellm.router import AlertingConfig
import litellm
import os

router = litellm.Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "bad_key",
},
}
],
alerting_config= AlertingConfig(
alerting_threshold=10, # threshold for slow / hanging llm responses (in seconds). Defaults to 300 seconds
webhook_url= os.getenv("SLACK_WEBHOOK_URL") # webhook you want to send alerts to
),
)
try:
await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
except:
pass

跟踪 Azure 部署的成本

问题:当使用 azure/gpt-4-1106-preview 时,Azure 在响应中返回 gpt-4。这导致成本跟踪不准确

解决方案 ✅:在路由器初始化时设置 model_info["base_model"],以便 litellm 使用正确的模型来计算 Azure 成本

步骤 1. 路由器设置

from litellm import Router

model_list = [
{ # list of model deployments
"model_name": "gpt-4-preview", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"model_info": {
"base_model": "azure/gpt-4-1106-preview" # azure/gpt-4-1106-preview will be used for cost tracking, ensure this exists in litellm model_prices_and_context_window.json
}
},
{
"model_name": "gpt-4-32k",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"model_info": {
"base_model": "azure/gpt-4-32k" # azure/gpt-4-32k will be used for cost tracking, ensure this exists in litellm model_prices_and_context_window.json
}
}
]

router = Router(model_list=model_list)

步骤 2. 在自定义回调中访问 response_costlitellm 为你计算响应成本

import litellm
from litellm.integrations.custom_logger import CustomLogger

class MyCustomHandler(CustomLogger):
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"成功事件")
response_cost = kwargs.get("response_cost")
print("response_cost=", response_cost)

customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]

# 路由器完成调用
response = router.completion(
model="gpt-4-32k",
messages=[{ "role": "user", "content": "你好,你是谁?"}]
)

默认的 litellm.completion/embedding 参数

你还可以为litellm的完成/嵌入调用设置默认参数。以下是操作方法:

from litellm import Router

fallback_dict = {"gpt-3.5-turbo": "gpt-3.5-turbo-16k"}

router = Router(model_list=model_list,
default_litellm_params={"context_window_fallback_dict": fallback_dict})

user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]

# 正常调用
response = router.completion(model="gpt-3.5-turbo", messages=messages)

print(f"response: {response}")

自定义回调 - 跟踪API密钥、API端点、使用的模型

如果你需要跟踪每次完成调用所使用的api_key、api端点、模型、自定义llm提供者,你可以设置一个自定义回调

使用方法

import litellm
from litellm.integrations.custom_logger import CustomLogger

class MyCustomHandler(CustomLogger):
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
print("kwargs=", kwargs)
litellm_params= kwargs.get("litellm_params")
api_key = litellm_params.get("api_key")
api_base = litellm_params.get("api_base")
custom_llm_provider= litellm_params.get("custom_llm_provider")
response_cost = kwargs.get("response_cost")

# 打印值
print("api_key=", api_key)
print("api_base=", api_base)
print("custom_llm_provider=", custom_llm_provider)
print("response_cost=", response_cost)

def log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Failure")
print("kwargs=")

customHandler = MyCustomHandler()

litellm.callbacks = [customHandler]

# 初始化路由器
router = Router(model_list=model_list, routing_strategy="simple-shuffle")

# 路由器完成调用
response = router.completion(
model="gpt-3.5-turbo",
messages=[{ "role": "user", "content": "Hi who are you"}]
)

部署路由器

如果你希望在不同的LLM API之间进行负载均衡的服务器,请使用我们的LiteLLM代理服务器

litellm.Router的初始化参数

def __init__(
model_list: Optional[list] = None,

## CACHING ##
redis_url: Optional[str] = None,
redis_host: Optional[str] = None,
redis_port: Optional[int] = None,
redis_password: Optional[str] = None,
cache_responses: Optional[bool] = False,
cache_kwargs: dict = {}, # additional kwargs to pass to RedisCache (see caching.py)
caching_groups: Optional[
List[tuple]
] = None, # if you want to cache across model groups
client_ttl: int = 3600, # ttl for cached clients - will re-initialize after this time in seconds

## RELIABILITY ##
num_retries: int = 0,
timeout: Optional[float] = None,
default_litellm_params={}, # default params for Router.chat.completion.create
fallbacks: Optional[List] = None,
default_fallbacks: Optional[List] = None
allowed_fails: Optional[int] = None, # Number of times a deployment can failbefore being added to cooldown
cooldown_time: float = 1, # (seconds) time to cooldown a deployment after failure
context_window_fallbacks: Optional[List] = None,
model_group_alias: Optional[dict] = {},
retry_after: int = 0, # (min) time to wait before retrying a failed request
routing_strategy: Literal[
"simple-shuffle",
"least-busy",
"usage-based-routing",
"latency-based-routing",
"cost-based-routing",
] = "simple-shuffle",

## DEBUGGING ##
set_verbose: bool = False, # set this to True for seeing logs
debug_level: Literal["DEBUG", "INFO"] = "INFO", # set this to "DEBUG" for detailed debugging
):

调试路由器

基本调试

设置 Router(set_verbose=True)

from litellm import Router

router = Router(
model_list=model_list,
set_verbose=True
)

详细调试

设置 Router(set_verbose=True,debug_level="DEBUG")

from litellm import Router

router = Router(
model_list=model_list,
set_verbose=True,
debug_level="DEBUG" # 默认为INFO
)

非常详细的调试

设置 litellm.set_verbose=TrueRouter(set_verbose=True,debug_level="DEBUG")

from litellm import Router
import litellm

litellm.set_verbose = True

router = Router(
model_list=model_list,
set_verbose=True,
debug_level="DEBUG" # 默认为INFO
)