Skip to main content

自定义回调

info

对于代理 点击这里

回调类

你可以创建一个自定义回调类,以便在 litellm 中发生事件时精确记录这些事件。

import litellm
from litellm.integrations.custom_logger import CustomLogger
from litellm import completion, acompletion

class MyCustomHandler(CustomLogger):
def log_pre_api_call(self, model, messages, kwargs):
print(f"API 调用前")

def log_post_api_call(self, kwargs, response_obj, start_time, end_time):
print(f"API 调用后")

def log_stream_event(self, kwargs, response_obj, start_time, end_time):
print(f"流事件")

def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"成功事件")

def log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"失败事件")

#### 异步 #### - 用于 acompletion/aembeddings

async def async_log_stream_event(self, kwargs, response_obj, start_time, end_time):
print(f"异步流事件")

async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"异步成功事件")

async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"异步失败事件")

customHandler = MyCustomHandler()

litellm.callbacks = [customHandler]

## 同步
response = completion(model="gpt-3.5-turbo", messages=[{ "role": "user", "content": "Hi 👋 - i'm openai"}],
stream=True)
for chunk in response:
continue


## 异步
import asyncio

def async completion():
response = await acompletion(model="gpt-3.5-turbo", messages=[{ "role": "user", "content": "Hi 👋 - i'm openai"}],
stream=True)
async for chunk in response:
continue
asyncio.run(completion())

回调函数

如果你只想在特定事件(例如输入时)记录日志,可以使用回调函数。

你可以设置自定义回调来触发:

  • litellm.input_callback - 在调用 LLM API 之前跟踪输入/转换后的输入
  • litellm.success_callback - 在调用 LLM API 之后跟踪输入/输出
  • litellm.failure_callback - 跟踪 litellm 调用的输入/输出 + 异常

定义自定义回调函数

创建一个接受特定参数的自定义回调函数:

def custom_callback(
kwargs, # 传递给 completion 的 kwargs
completion_response, # completion 的响应
start_time, end_time # 开始/结束时间
):
# 你的自定义代码在这里
print("LITELLM: 在自定义回调函数中")
print("kwargs", kwargs)
print("completion_response", completion_response)
print("start_time", start_time)
print("end_time", end_time)

设置自定义回调函数

import litellm
litellm.success_callback = [custom_callback]

使用你的自定义回调函数

import litellm
from litellm import completion

# 分配自定义回调函数
litellm.success_callback = [custom_callback]

response = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "Hi 👋 - i'm openai"
}
]
)

print(response)

异步回调函数

我们推荐使用自定义日志记录类进行异步处理。

from litellm.integrations.custom_logger import CustomLogger
from litellm import acompletion

class MyCustomHandler(CustomLogger):
#### 异步 ####

async def async_log_stream_event(self, kwargs, response_obj, start_time, end_time):
print(f"异步流事件")

async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"异步成功事件")

async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"异步失败事件")

import asyncio
customHandler = MyCustomHandler()

litellm.callbacks = [customHandler]

def async completion():
response = await acompletion(model="gpt-3.5-turbo", messages=[{ "role": "user", "content": "Hi 👋 - i'm openai"}],
stream=True)
async for chunk in response:
continue
asyncio.run(completion())

函数

如果你只想传递一个异步函数用于日志记录。

LiteLLM 目前仅支持异步 completion/embedding 调用的异步成功回调函数。

import asyncio, litellm 

async def async_test_logging_fn(kwargs, completion_obj, start_time, end_time):
print(f"On Async Success!")

async def test_chat_openai():
try:
# litellm.set_verbose = True
litellm.success_callback = [async_test_logging_fn]
response = await litellm.acompletion(model="gpt-3.5-turbo",
messages=[{
"role": "user",
"content": "Hi 👋 - i'm openai"
}],
stream=True)
async for chunk in response:
continue
except Exception as e:
print(e)
pytest.fail(f"An error occurred - {str(e)}")

asyncio.run(test_chat_openai())
info

我们正在积极尝试将此扩展到其他事件类型。如果你需要这个功能,请告诉我们!

kwargs 包含什么?

注意我们向自定义回调传递了一个 kwargs 参数。

def custom_callback(
kwargs, # 完成调用的 kwargs
completion_response, # 完成调用的响应
start_time, end_time # 开始/结束时间
):
# 你的自定义代码在这里
print("LITELLM: 在自定义回调函数中")
print("kwargs", kwargs)
print("completion_response", completion_response)
print("start_time", start_time)
print("end_time", end_time)

这是一个包含所有模型调用细节的字典(我们接收的参数、发送到 http 端点的值、收到的响应、错误时的堆栈跟踪等)。

这些内容都记录在我们的 Logger 的 model_call_details 中。

以下是你在 kwargs 字典中可以预期的确切内容:

### 默认参数 ### 
"model": self.model,
"messages": self.messages,
"optional_params": self.optional_params, # 特定于模型的参数
"litellm_params": self.litellm_params, # 特定于 litellm 的参数(例如传递给完成调用的元数据)
"start_time": self.start_time, # 调用开始时的日期时间对象

### API 调用前参数 ### (通过 kwargs["log_event_type"]="pre_api_call" 检查)
"input" = input # 发送到 LLM API 的确切提示
"api_key" = api_key # 用于该 LLM API 的 API 密钥
"additional_args" = additional_args # 该 API 调用的任何额外细节(例如包含发送的可选参数)

### API 调用后参数 ### (通过 kwargs["log_event_type"]="post_api_call" 检查)
"original_response" = original_response # 收到的原始 http 响应(通过 response.text 保存)

### 成功参数 ### (通过 kwargs["log_event_type"]="successful_api_call" 检查)
"complete_streaming_response" = complete_streaming_response # 完整的流式响应(仅在 `completion(..stream=True)` 时设置)
"end_time" = end_time # 调用完成时的日期时间对象

### 失败参数 ### (通过 kwargs["log_event_type"]="failed_api_call" 检查)
"exception" = exception # 引发的异常
"traceback_exception" = traceback_exception # 通过 `traceback.format_exc()` 生成的堆栈跟踪
"end_time" = end_time # 调用完成时的日期时间对象

缓存命中

缓存命中在成功事件中记录为 kwarg["cache_hit"]

以下是访问它的示例:

import litellm
from litellm.integrations.custom_logger import CustomLogger
from litellm import completion, acompletion, Cache

class MyCustomHandler(CustomLogger):
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
print(f"缓存命中的值: {kwargs['cache_hit']}")

async def test_async_completion_azure_caching():
customHandler_caching = MyCustomHandler()
litellm.cache = Cache(type="redis", host=os.environ['REDIS_HOST'], port=os.environ['REDIS_PORT'], password=os.environ['REDIS_PASSWORD'])
litellm.callbacks = [customHandler_caching]
unique_time = time.time()
response1 = await litellm.acompletion(model="azure/chatgpt-v-2",
messages=[{
"role": "user",
"content": f"Hi 👋 - i'm async azure {unique_time}"
}],
caching=True)
await asyncio.sleep(1)
print(f"customHandler_caching.states 缓存命中前: {customHandler_caching.states}")
response2 = await litellm.acompletion(model="azure/chatgpt-v-2",
messages=[{
"role": "user",
"content": f"Hi 👋 - i'm async azure {unique_time}"
}],
caching=True)
await asyncio.sleep(1) # 成功回调是并行完成的
print(f"customHandler_caching.states 缓存命中后: {customHandler_caching.states}")
assert len(customHandler_caching.errors) == 0
assert len(customHandler_caching.states) == 4 # pre, post, success, success

获取完整的流式响应

LiteLLM 会在最终的流式数据块中将完整的流式响应传递给你,作为自定义回调函数的 kwargs 的一部分。

# litellm.set_verbose = False
def custom_callback(
kwargs, # 完成时的参数
completion_response, # 完成的响应
start_time, end_time # 开始和结束时间
):
# print(f"streaming response: {completion_response}")
if "complete_streaming_response" in kwargs:
print(f"Complete Streaming Response: {kwargs['complete_streaming_response']}")

# 指定自定义回调函数
litellm.success_callback = [custom_callback]

response = completion(model="claude-instant-1", messages=messages, stream=True)
for idx, chunk in enumerate(response):
pass

记录额外的元数据

LiteLLM 在接受完成调用时接受一个元数据字典。你可以通过 completion(..., metadata={"key": "value"}) 将额外的元数据传递到你的完成调用中。

由于这是一个 litellm-specific param,它可以通过 kwargs["litellm_params"] 访问

from litellm import completion
import os, litellm

## 设置环境变量
os.environ["OPENAI_API_KEY"] = "your-api-key"

messages = [{ "content": "Hello, how are you?","role": "user"}]

def custom_callback(
kwargs, # 完成调用的kwargs
completion_response, # 完成调用的响应
start_time, end_time # 开始/结束时间
):
print(kwargs["litellm_params"]["metadata"])


# 分配自定义回调函数
litellm.success_callback = [custom_callback]

response = litellm.completion(model="gpt-3.5-turbo", messages=messages, metadata={"hello": "world"})

示例

自定义回调以跟踪流式传输和非流式传输的成本

默认情况下,响应成本可以通过 kwargs["response_cost"] 在成功时(同步 + 异步)访问


# 第一步:编写你的自定义回调函数
def track_cost_callback(
kwargs, # 完成调用的kwargs
completion_response, # 完成调用的响应
start_time, end_time # 开始/结束时间
):
try:
response_cost = kwargs["response_cost"] # litellm 为你计算响应成本
print("regular response_cost", response_cost)
except:
pass

# 第二步:分配自定义回调函数
litellm.success_callback = [track_cost_callback]

# 第三步:进行 litellm.completion 调用
response = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "Hi 👋 - i'm openai"
}
]
)

print(response)

自定义回调以记录LLM的转换输入

def get_transformed_inputs(
kwargs,
):
params_to_model = kwargs["additional_args"]["complete_input_dict"]
print("params to model", params_to_model)

litellm.input_callback = [get_transformed_inputs]

def test_chat_openai():
try:
response = completion(model="claude-2",
messages=[{
"role": "user",
"content": "Hi 👋 - i'm openai"
}])

print(response)

except Exception as e:
print(e)
pass

输出

params to model {'model': 'claude-2', 'prompt': "\n\nHuman: Hi 👋 - i'm openai\n\nAssistant: ", 'max_tokens_to_sample': 256}

自定义回调以写入Mixpanel

import mixpanel
import litellm
from litellm import completion

def custom_callback(
kwargs, # 完成调用的kwargs
completion_response, # 完成调用的响应
start_time, end_time # 开始/结束时间
):
# 你的自定义代码在这里
mixpanel.track("LLM Response", {"llm_response": completion_response})


# 分配自定义回调函数
litellm.success_callback = [custom_callback]

response = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "Hi 👋 - i'm openai"
}
]
)

print(response)

优云智算