路由器 - 负载均衡、回退策略
LiteLLM 管理:
- 在多个部署之间进行负载均衡(例如 Azure/OpenAI)
- 优先处理重要请求以确保它们不会失败(即排队)
- 基本可靠性逻辑 - 冷却时间、回退、超时和重试(固定 + 指数退避)跨多个部署/提供商。
在生产环境中,litellm 支持使用 Redis 来跟踪冷却服务器和使用情况(管理 tpm/rpm 限制)。
如果你想在不同的 LLM API 之间进行负载均衡,请使用我们的 LiteLLM 代理服务器
负载均衡
(感谢 @paulpierre 和 sweep proxy 对这一实现的贡献) 查看代码
快速开始
在多个 azure/bedrock/provider 部署之间进行负载均衡。LiteLLM 将在某个区域的调用失败时处理重试。
- SDK
- PROXY
from litellm import Router
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias -> loadbalance between models with same `model_name`
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
}, {
"model_name": "gpt-4",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/gpt-4",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION"),
}
}, {
"model_name": "gpt-4",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-4",
"api_key": os.getenv("OPENAI_API_KEY"),
}
},
]
router = Router(model_list=model_list)
# openai.ChatCompletion.create 替换
# 请求 model="gpt-3.5-turbo" 将选择 model_name="gpt-3.5-turbo" 的部署
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}])
print(response)
# openai.ChatCompletion.create 替换
# 请求 model="gpt-4" 将选择 model_name="gpt-4" 的部署
response = await router.acompletion(model="gpt-4",
ßmessages=[{"role": "user", "content": "Hey, how's it going?"}])
print(response)
查看详细的代理负载均衡/回退文档 这里
- 使用多个部署设置 model_list
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/<your-deployment-name>
api_base: <your-azure-endpoint>
api_key: <your-azure-api-key>
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/gpt-turbo-small-ca
api_base: https://my-endpoint-canada-berri992.openai.azure.com/
api_key: <your-azure-api-key>
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/gpt-turbo-large
api_base: https://openai-france-1234.openai.azure.com/
api_key: <your-azure-api-key>
- 启动代理
litellm --config /path/to/config.yaml
- 测试它!
curl -X POST 'http://0.0.0.0:4000/chat/completions' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer sk-1234' \
-d '{
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Hi there!"}
],
"mock_testing_rate_limit_error": true
}'
可用端点
router.completion()
- 聊天完成端点,调用 100 多个 LLMrouter.acompletion()
- 异步聊天完成调用router.embedding()
- 嵌入端点,适用于 Azure、OpenAI、Huggingface 端点router.aembedding()
- 异步嵌入调用router.text_completion()
- 旧版 OpenAI/v1/completions
端点格式的完成调用router.atext_completion()
- 异步文本完成调用router.image_generation()
- OpenAI/v1/images/generations
端点格式的完成调用router.aimage_generation()
- 异步图像生成调用
高级 - 路由策略 ⭐️
路由策略 - 加权选择、速率限制感知、最不繁忙、延迟基于、成本基于
路由器提供了4种策略来在多个部署之间路由您的调用:
- 基于速率的v2(异步)
- 基于延迟的
- (默认) 加权选择 (异步)
- 基于使用率的限速感知
- Least-Busy
- 自定义路由策略
- 最低成本路由(异步)
🎉 新功能 这是基于使用情况的异步实现。
过滤掉超出tpm/rpm限制的部署 - 如果您传入部署的tpm/rpm限制。
路由到当分钟内TPM使用率最低的部署。
在生产环境中,我们使用Redis来跟踪多个部署的使用情况(TPM/RPM)。此实现使用异步redis调用(redis.incr和redis.mget)。
对于Azure,每1000 TPM您可以获得6 RPM
- sdk
- 代理
from litellm import Router
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
"tpm": 100000,
"rpm": 10000,
},
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
"tpm": 100000,
"rpm": 1000,
},
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
"tpm": 100000,
"rpm": 1000,
},
}]
router = Router(model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
routing_strategy="usage-based-routing-v2" # 👈 KEY CHANGE
enable_pre_call_checks=True, # enables router rate limits for concurrent calls
)
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
print(response)
1. 在配置中设置策略
model_list:
- model_name: gpt-3.5-turbo # model alias
litellm_params: # params for litellm completion/embedding call
model: azure/chatgpt-v-2 # actual model name
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
tpm: 100000
rpm: 10000
- model_name: gpt-3.5-turbo
litellm_params: # params for litellm completion/embedding call
model: gpt-3.5-turbo
api_key: os.getenv(OPENAI_API_KEY)
tpm: 100000
rpm: 1000
router_settings:
routing_strategy: usage-based-routing-v2 # 👈 关键变化
redis_host: <your-redis-host>
redis_password: <your-redis-password>
redis_port: <your-redis-port>
enable_pre_call_check: true
general_settings:
master_key: sk-1234
2. 启动代理
litellm --config /path/to/config.yaml
3. 测试它!
curl --location 'http://localhost:4000/v1/chat/completions' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer sk-1234' \
--data '{
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "嘿,最近怎么样?"}]
}'
选择响应时间最低的部署。
它会缓存并根据请求的发送和从部署接收的时间更新部署的响应时间。
from litellm import Router
import asyncio
model_list = [{ ... }]
# init router
router = Router(model_list=model_list,
routing_strategy="latency-based-routing",# 👈 set routing strategy
enable_pre_call_check=True, # enables router rate limits for concurrent calls
)
## CALL 1+2
tasks = []
response = None
final_response = None
for _ in range(2):
tasks.append(router.acompletion(model=model, messages=messages))
response = await asyncio.gather(*tasks)
if response is not None:
## CALL 3
await asyncio.sleep(1) # let the cache update happen
picked_deployment = router.lowestlatency_logger.get_available_deployments(
model_group=model, healthy_deployments=router.healthy_deployments
)
final_response = await router.acompletion(model=model, messages=messages)
print(f"min deployment id: {picked_deployment}")
print(f"model id: {final_response._hidden_params['model_id']}")
assert (
final_response._hidden_params["model_id"]
== picked_deployment["model_info"]["id"]
)
设置时间窗口
设置时间窗口,以考虑在计算部署平均延迟时回溯的时间范围。
在路由器中
router = Router(..., routing_strategy_args={"ttl": 10})
在代理中
router_settings:
routing_strategy_args: {"ttl": 10}
设置最低延迟缓冲区
设置一个缓冲区,在该缓冲区内的部署将被选为调用的候选对象。
例如:
如果你有5个部署
https://litellm-prod-1.openai.azure.com/: 0.07s
https://litellm-prod-2.openai.azure.com/: 0.1s
https://litellm-prod-3.openai.azure.com/: 0.1s
https://litellm-prod-4.openai.azure.com/: 0.1s
https://litellm-prod-5.openai.azure.com/: 4.66s
为了防止初始时prod-1
超载,可以设置一个50%的缓冲区,考虑prod-2, prod-3, prod-4
这些部署。
在路由器中
router = Router(..., routing_strategy_args={"lowest_latency_buffer": 0.5})
在代理中
router_settings:
routing_strategy_args: {"lowest_latency_buffer": 0.5}
默认 根据提供的每分钟请求数 (rpm) 或每分钟令牌数 (tpm)选择部署
如果未提供rpm
或tpm
,则随机选择一个部署
你还可以设置一个weight
参数,以指定何时应选择哪个模型。
- 基于RPM的混排
- 基于权重的混排
LiteLLM代理配置.yaml
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-v-2
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
rpm: 900
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-functioncalling
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
rpm: 10
Python SDK
from litellm import Router
import asyncio
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"rpm": 900, # requests per minute for this API
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"rpm": 10,
}
},]
# init router
router = Router(model_list=model_list, routing_strategy="simple-shuffle")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
LiteLLM代理配置.yaml
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-v-2
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
weight: 9
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-functioncalling
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
weight: 1
Python SDK
from litellm import Router
import asyncio
model_list = [{
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": {
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"weight": 9, # pick this 90% of the time
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"weight": 1,
}
}]
# init router
router = Router(model_list=model_list, routing_strategy="simple-shuffle")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
这将路由到该分钟内TPM使用率最低的部署。
在生产环境中,我们使用Redis来跟踪多个部署的TPM/RPM使用情况。
如果你传入了部署的tpm/rpm限制,这还会检查是否超过限制,并过滤掉那些限制将被超出的部署。
对于Azure,你的RPM = TPM/6。
from litellm import Router
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 100000,
"rpm": 10000,
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 100000,
"rpm": 1000,
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 100000,
"rpm": 1000,
}]
router = Router(model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
routing_strategy="usage-based-routing"
enable_pre_call_check=True, # enables router rate limits for concurrent calls
)
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
print(response)
选择一个正在处理最少通话的部署。
from litellm import Router
import asyncio
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
}]
# init router
router = Router(model_list=model_list, routing_strategy="least-busy")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
插入自定义路由策略以选择部署
步骤 1. 定义您的自定义路由策略
from litellm.router import CustomRoutingStrategyBase
class CustomRoutingStrategy(CustomRoutingStrategyBase):
async def async_get_available_deployment(
self,
model: str,
messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
):
"""
根据给定的参数异步获取可用的部署。
参数:
model (str): 模型的名称。
messages (Optional[List[Dict[str, str]]], optional): 给定请求的消息列表。默认为 None。
input (Optional[Union[str, List]], optional): 给定嵌入请求的输入。默认为 None。
specific_deployment (Optional[bool], optional): 是否获取特定部署。默认为 False。
request_kwargs (Optional[Dict], optional): 额外的请求关键字参数。默认为 None。
返回:
返回 litellm.router.model_list 中的一个元素
"""
print("在自定义异步获取可用部署中")
model_list = router.model_list
print("路由器模型列表=", model_list)
for model in model_list:
if isinstance(model, dict):
if model["litellm_params"]["model"] == "openai/very-special-endpoint":
return model
pass
def get_available_deployment(
self,
model: str,
messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
):
"""
根据给定的参数同步获取可用的部署。
参数:
model (str): 模型的名称。
messages (Optional[List[Dict[str, str]]], optional): 给定请求的消息列表。默认为 None。
input (Optional[Union[str, List]], optional): 给定嵌入请求的输入。默认为 None。
specific_deployment (Optional[bool], optional): 是否获取特定部署。默认为 False。
request_kwargs (Optional[Dict], optional): 额外的请求关键字参数。默认为 None。
返回:
返回 litellm.router.model_list 中的一个元素
"""
pass
步骤 2. 使用自定义路由策略初始化路由器
from litellm import Router
router = Router(
model_list=[
{
"model_name": "azure-model",
"litellm_params": {
"model": "openai/very-special-endpoint",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/", # 如果你是Krrish,这是我们Railway端点上的OpenAI Endpoint3 :)
"api_key": "fake-key",
},
"model_info": {"id": "very-special-endpoint"},
},
{
"model_name": "azure-model",
"litellm_params": {
"model": "openai/fast-endpoint",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/",
"api_key": "fake-key",
},
"model_info": {"id": "fast-endpoint"},
},
],
set_verbose=True,
debug_level="DEBUG",
timeout=1,
) # type: ignore
router.set_custom_routing_strategy(CustomRoutingStrategy()) # 👈 在这里设置你的路由策略
第三步:测试你的路由策略。期望在运行 router.acompletion
请求时调用你的自定义路由策略。
for _ in range(10):
response = await router.acompletion(
model="azure-model", messages=[{"role": "user", "content": "hello"}]
)
print(response)
_picked_model_id = response._hidden_params["model_id"]
print("picked model=", _picked_model_id)
基于最低成本选择部署
工作原理:
- 获取所有健康部署
- 选择所有未超出其提供的
rpm/tpm
限制的部署 - 对于每个部署,检查
litellm_param["model"]
是否存在于litellm_model_cost_map
中 - 如果部署在
litellm_model_cost_map
中不存在 -> 使用部署成本=$1
- 选择成本最低的部署
from litellm import Router
import asyncio
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "gpt-4"},
"model_info": {"id": "openai-gpt-4"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "groq/llama3-8b-8192"},
"model_info": {"id": "groq-llama"},
},
]
# 初始化路由器
router = Router(model_list=model_list, routing_strategy="cost-based-routing")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
print(response._hidden_params["model_id"]) # 预期为groq-llama,因为groq/llama成本最低
return response
asyncio.run(router_acompletion())
使用自定义输入/输出定价
设置litellm_params["input_cost_per_token"]
和litellm_params["output_cost_per_token"]
以在路由时使用自定义定价
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"input_cost_per_token": 0.00003,
"output_cost_per_token": 0.00003,
},
"model_info": {"id": "chatgpt-v-experimental"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-1",
"input_cost_per_token": 0.000000001,
"output_cost_per_token": 0.00000001,
},
"model_info": {"id": "chatgpt-v-1"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-5",
"input_cost_per_token": 10,
"output_cost_per_token": 12,
},
"model_info": {"id": "chatgpt-v-5"},
},
]
# 初始化路由器
router = Router(model_list=model_list, routing_strategy="cost-based-routing")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
print(response._hidden_params["model_id"]) # 预期为chatgpt-v-1,因为chatgpt-v-1成本最低
return response
asyncio.run(router_acompletion())
基本可靠性
最大并行请求数(异步)
用于路由器上异步请求的信号量。限制对部署的最大并发调用数。在流量高峰场景中很有用。
如果设置了tpm/rpm,并且没有给出最大并行请求限制,我们使用RPM或计算出的RPM(tpm/1000/6)作为最大并行请求限制。
from litellm import Router
model_list = [{
"model_name": "gpt-4",
"litellm_params": {
"model": "azure/gpt-4",
...
"max_parallel_requests": 10 # 👈 设置每个部署的最大并行请求数
}
}]
### 或者 ###
router = Router(model_list=model_list, default_max_parallel_requests=20) # 👈 设置默认最大并行请求数
# 部署最大并行请求数 > 默认最大并行请求数
超时设置
路由器中设置的超时是针对整个调用过程的,并且也会传递到completion()调用级别。
全局超时
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,
timeout=30) # 如果调用时间超过30秒则引发超时错误
print(response)
每个模型的超时
from litellm import Router
import asyncio
model_list = [{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"timeout": 300 # 设置5分钟的超时
"stream_timeout": 30 # 设置流式调用的30秒超时
}
}]
# 初始化路由器
router = Router(model_list=model_list, routing_strategy="least-busy")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
冷却时间
设置模型在一分钟内允许失败的最大次数,超过该次数后,模型将被冷却一分钟。
- SDK
- 代理
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,
allowed_fails=1, # 如果模型在一分钟内失败超过1次,则使其冷却。
cooldown_time=100 # 如果失败次数超过允许的次数,则将部署冷却100秒。
)
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# 正常调用
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
设置全局值
router_settings:
allowed_fails: 3 # 如果在一分钟内失败超过1次,则进入冷却模式。
cooldown_time: 30 # (以秒为单位)如果失败次数/分钟 > allowed_fails,冷却模型的时间
默认值:
- allowed_fails: 0
- cooldown_time: 60秒
按模型设置
model_list:
- model_name: fake-openai-endpoint
litellm_params:
model: predibase/llama-3-8b-instruct
api_key: os.environ/PREDIBASE_API_KEY
tenant_id: os.environ/PREDIBASE_TENANT_ID
max_new_tokens: 256
cooldown_time: 0 # 👈 关键更改
预期响应
所选模型无可用部署,请在60秒后重试。传递的模型=claude-3-5-sonnet。预调用检查=False,允许的模型区域=n/a。
禁用冷却时间
- SDK
- 代理
from litellm import Router
router = Router(..., disable_cooldowns=True)
router_settings:
disable_cooldowns: True
重试
对于异步和同步函数,我们都支持重试失败的请求。
对于RateLimitError,我们实现指数退避。
对于通用错误,我们立即重试。
以下是我们可以设置num_retries = 3
的快速示例:
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,
num_retries=3)
user_message = "你好,旧金山的天气怎么样?"
messages = [{"content": user_message, "role": "user"}]
# 正常调用
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"响应: {response}")
我们还支持设置在重试失败请求之前等待的最短时间。这是通过retry_after
参数实现的。
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,
num_retries=3, retry_after=5) # 在重试请求之前至少等待5秒
user_message = "你好,旧金山的天气怎么样?"
messages = [{"content": user_message, "role": "user"}]
# 正常调用
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"响应: {response}")
[高级]: 根据错误类型自定义重试和冷却时间
- 使用
RetryPolicy
如果你想根据收到的异常设置num_retries
- 使用
AllowedFailsPolicy
设置在冷却部署之前每分钟允许的allowed_fails
自定义数量
- SDK
- 代理
示例:
retry_policy = RetryPolicy(
ContentPolicyViolationErrorRetries=3, # run 3 retries for ContentPolicyViolationErrors
AuthenticationErrorRetries=0, # run 0 retries for AuthenticationErrorRetries
)
allowed_fails_policy = AllowedFailsPolicy(
ContentPolicyViolationErrorAllowedFails=1000, # Allow 1000 ContentPolicyViolationError before cooling down a deployment
RateLimitErrorAllowedFails=100, # Allow 100 RateLimitErrors before cooling down a deployment
)
示例用法
from litellm.router import RetryPolicy, AllowedFailsPolicy
retry_policy = RetryPolicy(
ContentPolicyViolationErrorRetries=3, # run 3 retries for ContentPolicyViolationErrors
AuthenticationErrorRetries=0, # run 0 retries for AuthenticationErrorRetries
BadRequestErrorRetries=1,
TimeoutErrorRetries=2,
RateLimitErrorRetries=3,
)
allowed_fails_policy = AllowedFailsPolicy(
ContentPolicyViolationErrorAllowedFails=1000, # Allow 1000 ContentPolicyViolationError before cooling down a deployment
RateLimitErrorAllowedFails=100, # Allow 100 RateLimitErrors before cooling down a deployment
)
router = litellm.Router(
model_list=[
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
},
{
"model_name": "bad-model", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
},
],
retry_policy=retry_policy,
allowed_fails_policy=allowed_fails_policy,
)
response = await router.acompletion(
model=model,
messages=messages,
)
router_settings:
retry_policy: {
"BadRequestErrorRetries": 3,
"ContentPolicyViolationErrorRetries": 4
}
allowed_fails_policy: {
"ContentPolicyViolationErrorAllowedFails": 1000, # 允许在冷却部署前出现1000次ContentPolicyViolationError
"RateLimitErrorAllowedFails": 100 # 允许在冷却部署前出现100次RateLimitError
}
回退机制
如果在尝试 num_retries
次后调用失败,则回退到另一个模型组。
快速开始
from litellm import Router
router = Router(
model_list=[
{ # 坏模型
"model_name": "bad-model",
"litellm_params": {
"model": "openai/my-bad-model",
"api_key": "my-bad-api-key",
"mock_response": "Bad call"
},
},
{ # 好模型
"model_name": "my-good-model",
"litellm_params": {
"model": "gpt-4o",
"api_key": os.getenv("OPENAI_API_KEY"),
"mock_response": "Good call"
},
},
],
fallbacks=[{"bad-model": ["my-good-model"]}] # 👈 关键更改
)
response = router.completion(
model="bad-model",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
mock_testing_fallbacks=True,
)
如果错误是上下文窗口超出错误,则回退到更大的模型组(如果提供)。
回退按顺序进行 - ["gpt-3.5-turbo", "gpt-4", "gpt-4-32k"],首先尝试 'gpt-3.5-turbo',然后是 'gpt-4',依此类推。
您还可以设置 default_fallbacks
,以防特定模型组配置错误或表现不佳。
回退有三种类型:
content_policy_fallbacks
:针对 litellm.ContentPolicyViolationError - LiteLLM 映射了不同提供商的内容政策违规错误 查看代码context_window_fallbacks
:针对 litellm.ContextWindowExceededErrors - LiteLLM 映射了不同提供商的上下文窗口错误消息 查看代码fallbacks
:针对所有其他错误,例如 litellm.RateLimitError
内容政策违规回退
关键更改:
content_policy_fallbacks=[{"claude-2": ["my-fallback-model"]}]
- SDK
- PROXY
from litellm import Router
router = Router(
model_list=[
{
"model_name": "claude-2",
"litellm_params": {
"model": "claude-2",
"api_key": "",
"mock_response": Exception("content filtering policy"),
},
},
{
"model_name": "my-fallback-model",
"litellm_params": {
"model": "claude-2",
"api_key": "",
"mock_response": "This works!",
},
},
],
content_policy_fallbacks=[{"claude-2": ["my-fallback-model"]}], # 👈 KEY CHANGE
# fallbacks=[..], # [OPTIONAL]
# context_window_fallbacks=[..], # [OPTIONAL]
)
response = router.completion(
model="claude-2",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
在您的 proxy config.yaml 中只需添加这一行 👇
router_settings:
content_policy_fallbacks=[{"claude-2": ["my-fallback-model"]}]
启动代理
litellm --config /path/to/config.yaml
# 运行在 http://0.0.0.0:4000
上下文窗口超出回退
关键更改:
context_window_fallbacks=[{"claude-2": ["my-fallback-model"]}]
- SDK
- PROXY
from litellm import Router
router = Router(
model_list=[
{
"model_name": "claude-2",
"litellm_params": {
"model": "claude-2",
"api_key": "",
"mock_response": Exception("prompt is too long"),
},
},
{
"model_name": "my-fallback-model",
"litellm_params": {
"model": "claude-2",
"api_key": "",
"mock_response": "This works!",
},
},
],
context_window_fallbacks=[{"claude-2": ["my-fallback-model"]}], # 👈 KEY CHANGE
# fallbacks=[..], # [OPTIONAL]
# content_policy_fallbacks=[..], # [OPTIONAL]
)
response = router.completion(
model="claude-2",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
在您的 proxy config.yaml 中只需添加这一行 👇
router_settings:
context_window_fallbacks=[{"claude-2": ["my-fallback-model"]}]
启动代理
litellm --config /path/to/config.yaml
# 运行在 http://0.0.0.0:4000
常规回退
关键更改:
fallbacks=[{"claude-2": ["my-fallback-model"]}]
- SDK
- PROXY
from litellm import Router
router = Router(
model_list=[
{
"model_name": "claude-2",
"litellm_params": {
"model": "claude-2",
"api_key": "",
"mock_response": Exception("this is a rate limit error"),
},
},
{
"model_name": "my-fallback-model",
"litellm_params": {
"model": "claude-2",
"api_key": "",
"mock_response": "This works!",
},
},
],
fallbacks=[{"claude-2": ["my-fallback-model"]}], # 👈 KEY CHANGE
# context_window_fallbacks=[..], # [OPTIONAL]
# content_policy_fallbacks=[..], # [OPTIONAL]
)
response = router.completion(
model="claude-2",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
在您的 proxy config.yaml 中只需添加这一行 👇
router_settings:
fallbacks=[{"claude-2": ["my-fallback-model"]}]
启动代理
litellm --config /path/to/config.yaml
# 运行在 http://0.0.0.0:4000
缓存
在生产环境中,我们推荐使用Redis缓存。为了在本地快速测试,我们也支持简单的内存缓存。
内存缓存
router = Router(model_list=model_list,
cache_responses=True)
print(response)
Redis缓存
router = Router(model_list=model_list,
redis_host=os.getenv("REDIS_HOST"),
redis_password=os.getenv("REDIS_PASSWORD"),
redis_port=os.getenv("REDIS_PORT"),
cache_responses=True)
print(response)
传递Redis URL,附加参数
router = Router(model_list: Optional[list] = None,
## CACHING ##
redis_url=os.getenv("REDIS_URL")",
cache_kwargs= {}, # 传递给RedisCache的附加参数(参见caching.py)
cache_responses=True)
调用前检查(上下文窗口,欧盟地区)
启用调用前检查以过滤掉:
- 上下文窗口限制小于调用消息的部署。
- 非欧盟地区的部署。
- SDK
- 代理
1. 启用调用前检查
from litellm import Router
# ...
router = Router(model_list=model_list, enable_pre_call_checks=True) # 👈 设置为True
2. 设置模型列表
对于Azure部署的上下文窗口检查,设置基础模型。从此列表中选择基础模型,所有Azure模型都以azure/
开头。
对于'欧盟地区'过滤,设置部署的'region_name'。
注意: 我们根据您的litellm参数自动推断Vertex AI、Bedrock和IBM WatsonxAI的region_name。对于Azure,设置litellm.enable_preview = True
。
model_list = [
{
"model_name": "gpt-3.5-turbo", # 模型组名称
"litellm_params": { # litellm completion/embedding调用的参数
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"region_name": "eu" # 👈 设置'欧盟'地区名称
"base_model": "azure/gpt-35-turbo", # 👈 (仅限Azure) 设置基础模型
},
},
{
"model_name": "gpt-3.5-turbo", # 模型组名称
"litellm_params": { # litellm completion/embedding调用的参数
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name": "gemini-pro",
"litellm_params: {
"model": "vertex_ai/gemini-pro-1.5",
"vertex_project": "adroit-crow-1234",
"vertex_location": "us-east1" # 👈 自动推断'region_name
}
}
]
router = Router(model_list=model_list, enable_pre_call_checks=True)
3. 测试它!
- 上下文窗口检查
- 欧盟地区检查
"""
- 给定具有不同上下文窗口(4k vs. 16k)的gpt-3.5-turbo模型组
- 发送一个5k的提示
- 断言它工作
"""
from litellm import Router
import os
model_list = [
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"base_model": "azure/gpt-35-turbo",
},
"model_info": {
"base_model": "azure/gpt-35-turbo",
}
},
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
]
router = Router(model_list=model_list, enable_pre_call_checks=True)
text = "What is the meaning of 42?" * 5000
response = router.completion(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": text},
{"role": "user", "content": "Who was Alexander?"},
],
)
print(f"response: {response}")
"""
- 提供两个gpt-3.5-turbo部署,分别在欧洲和非欧洲地区
- 进行一次调用
- 断言它选择了欧洲地区的模型
"""
from litellm import Router
import os
model_list = [
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"region_name": "eu"
},
"model_info": {
"id": "1"
}
},
{
"model_name": "gpt-3.5-turbo", # model group name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"model_info": {
"id": "2"
}
},
]
router = Router(model_list=model_list, enable_pre_call_checks=True)
response = router.completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Who was Alexander?"}],
)
print(f"response: {response}")
print(f"response id: {response._hidden_params['model_id']}")
有关在代理上如何执行此操作的信息,请参见此处
跨模型组缓存
如果你想在两个不同的模型组之间进行缓存(例如,Azure 部署和 OpenAI),请使用缓存组。
import litellm, asyncio, time
from litellm import Router
# set os env
os.environ["OPENAI_API_KEY"] = ""
os.environ["AZURE_API_KEY"] = ""
os.environ["AZURE_API_BASE"] = ""
os.environ["AZURE_API_VERSION"] = ""
async def test_acompletion_caching_on_router_caching_groups():
# tests acompletion + caching on router
try:
litellm.set_verbose = True
model_list = [
{
"model_name": "openai-gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo-0613",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name": "azure-gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION")
},
}
]
messages = [
{"role": "user", "content": f"write a one sentence poem {time.time()}?"}
]
start_time = time.time()
router = Router(model_list=model_list,
cache_responses=True,
caching_groups=[("openai-gpt-3.5-turbo", "azure-gpt-3.5-turbo")])
response1 = await router.acompletion(model="openai-gpt-3.5-turbo", messages=messages, temperature=1)
print(f"response1: {response1}")
await asyncio.sleep(1) # add cache is async, async sleep for cache to get set
response2 = await router.acompletion(model="azure-gpt-3.5-turbo", messages=messages, temperature=1)
assert response1.id == response2.id
assert len(response1.choices[0].message.content) > 0
assert response1.choices[0].message.content == response2.choices[0].message.content
except Exception as e:
traceback.print_exc()
asyncio.run(test_acompletion_caching_on_router_caching_groups())
警报 🚨
为以下事件向 Slack / 你的 Webhook URL 发送警报
- LLM API 异常
- 缓慢的 LLM 响应
从 https://api.slack.com/messaging/webhooks 获取 Slack Webhook URL
使用方法
初始化一个 AlertingConfig
并将其传递给 litellm.Router
。以下代码将触发警报,因为 api_key=bad-key
是无效的
from litellm.router import AlertingConfig
import litellm
import os
router = litellm.Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "bad_key",
},
}
],
alerting_config= AlertingConfig(
alerting_threshold=10, # threshold for slow / hanging llm responses (in seconds). Defaults to 300 seconds
webhook_url= os.getenv("SLACK_WEBHOOK_URL") # webhook you want to send alerts to
),
)
try:
await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
except:
pass
跟踪 Azure 部署的成本
问题:当使用 azure/gpt-4-1106-preview
时,Azure 在响应中返回 gpt-4
。这导致成本跟踪不准确
解决方案 ✅:在路由器初始化时设置 model_info["base_model"]
,以便 litellm 使用正确的模型来计算 Azure 成本
步骤 1. 路由器设置
from litellm import Router
model_list = [
{ # list of model deployments
"model_name": "gpt-4-preview", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"model_info": {
"base_model": "azure/gpt-4-1106-preview" # azure/gpt-4-1106-preview will be used for cost tracking, ensure this exists in litellm model_prices_and_context_window.json
}
},
{
"model_name": "gpt-4-32k",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"model_info": {
"base_model": "azure/gpt-4-32k" # azure/gpt-4-32k will be used for cost tracking, ensure this exists in litellm model_prices_and_context_window.json
}
}
]
router = Router(model_list=model_list)
步骤 2. 在自定义回调中访问 response_cost
,litellm 为你计算响应成本
import litellm
from litellm.integrations.custom_logger import CustomLogger
class MyCustomHandler(CustomLogger):
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"成功事件")
response_cost = kwargs.get("response_cost")
print("response_cost=", response_cost)
customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]
# 路由器完成调用
response = router.completion(
model="gpt-4-32k",
messages=[{ "role": "user", "content": "你好,你是谁?"}]
)
默认的 litellm.completion/embedding 参数
你还可以为litellm的完成/嵌入调用设置默认参数。以下是操作方法:
from litellm import Router
fallback_dict = {"gpt-3.5-turbo": "gpt-3.5-turbo-16k"}
router = Router(model_list=model_list,
default_litellm_params={"context_window_fallback_dict": fallback_dict})
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# 正常调用
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
自定义回调 - 跟踪API密钥、API端点、使用的模型
如果你需要跟踪每次完成调用所使用的api_key、api端点、模型、自定义llm提供者,你可以设置一个自定义回调
使用方法
import litellm
from litellm.integrations.custom_logger import CustomLogger
class MyCustomHandler(CustomLogger):
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
print("kwargs=", kwargs)
litellm_params= kwargs.get("litellm_params")
api_key = litellm_params.get("api_key")
api_base = litellm_params.get("api_base")
custom_llm_provider= litellm_params.get("custom_llm_provider")
response_cost = kwargs.get("response_cost")
# 打印值
print("api_key=", api_key)
print("api_base=", api_base)
print("custom_llm_provider=", custom_llm_provider)
print("response_cost=", response_cost)
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Failure")
print("kwargs=")
customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]
# 初始化路由器
router = Router(model_list=model_list, routing_strategy="simple-shuffle")
# 路由器完成调用
response = router.completion(
model="gpt-3.5-turbo",
messages=[{ "role": "user", "content": "Hi who are you"}]
)
部署路由器
如果你希望在不同的LLM API之间进行负载均衡的服务器,请使用我们的LiteLLM代理服务器
litellm.Router的初始化参数
def __init__(
model_list: Optional[list] = None,
## CACHING ##
redis_url: Optional[str] = None,
redis_host: Optional[str] = None,
redis_port: Optional[int] = None,
redis_password: Optional[str] = None,
cache_responses: Optional[bool] = False,
cache_kwargs: dict = {}, # additional kwargs to pass to RedisCache (see caching.py)
caching_groups: Optional[
List[tuple]
] = None, # if you want to cache across model groups
client_ttl: int = 3600, # ttl for cached clients - will re-initialize after this time in seconds
## RELIABILITY ##
num_retries: int = 0,
timeout: Optional[float] = None,
default_litellm_params={}, # default params for Router.chat.completion.create
fallbacks: Optional[List] = None,
default_fallbacks: Optional[List] = None
allowed_fails: Optional[int] = None, # Number of times a deployment can failbefore being added to cooldown
cooldown_time: float = 1, # (seconds) time to cooldown a deployment after failure
context_window_fallbacks: Optional[List] = None,
model_group_alias: Optional[dict] = {},
retry_after: int = 0, # (min) time to wait before retrying a failed request
routing_strategy: Literal[
"simple-shuffle",
"least-busy",
"usage-based-routing",
"latency-based-routing",
"cost-based-routing",
] = "simple-shuffle",
## DEBUGGING ##
set_verbose: bool = False, # set this to True for seeing logs
debug_level: Literal["DEBUG", "INFO"] = "INFO", # set this to "DEBUG" for detailed debugging
):
调试路由器
基本调试
设置 Router(set_verbose=True)
from litellm import Router
router = Router(
model_list=model_list,
set_verbose=True
)
详细调试
设置 Router(set_verbose=True,debug_level="DEBUG")
from litellm import Router
router = Router(
model_list=model_list,
set_verbose=True,
debug_level="DEBUG" # 默认为INFO
)
非常详细的调试
设置 litellm.set_verbose=True
和 Router(set_verbose=True,debug_level="DEBUG")
from litellm import Router
import litellm
litellm.set_verbose = True
router = Router(
model_list=model_list,
set_verbose=True,
debug_level="DEBUG" # 默认为INFO
)