缓存 - 内存、Redis、s3、Redis 语义缓存、磁盘
初始化缓存 - 内存、Redis、s3 存储桶、Redis 语义、磁盘缓存、Qdrant 语义
安装 redis
pip install redis
对于托管版本,您可以在这里设置自己的 Redis 数据库:https://app.redislabs.com/
import litellm
from litellm import completion
from litellm.caching.caching import Cache
litellm.cache = Cache(type="redis", host=<host>, port=<port>, password=<password>)
# 进行完成调用
response1 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke."}]
)
response2 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke."}]
)
# response1 == response2, response 1 被缓存
安装 boto3
pip install boto3
设置 AWS 环境变量
AWS_ACCESS_KEY_ID = "AKI*******"
AWS_SECRET_ACCESS_KEY = "WOl*****"
import litellm
from litellm import completion
from litellm.caching.caching import Cache
# 传递 s3 存储桶名称
litellm.cache = Cache(type="s3", s3_bucket_name="cache-bucket-litellm", s3_region_name="us-west-2")
# 进行完成调用
response1 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke."}]
)
response2 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke."}]
)
# response1 == response2, response 1 被缓存
安装 redis
pip install redisvl==0.0.7
对于托管版本,您可以在这里设置自己的 Redis 数据库:https://app.redislabs.com/
import litellm
from litellm import completion
from litellm.caching.caching import Cache
random_number = random.randint(
1, 100000
) # 添加一个随机数以确保始终添加 / 读取缓存
print("测试语义缓存")
litellm.cache = Cache(
type="redis-semantic",
host=os.environ["REDIS_HOST"],
port=os.environ["REDIS_PORT"],
password=os.environ["REDIS_PASSWORD"],
similarity_threshold=0.8, # 缓存命中的相似度阈值,0 == 无相似度,1 = 完全匹配,0.5 == 50% 相似度
redis_semantic_cache_embedding_model="text-embedding-ada-002", # 此模型传递给 litellm.embedding(),支持任何 litellm.embedding() 模型
)
response1 = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": f"写一句关于: {random_number} 的诗",
}
],
max_tokens=20,
)
print(f"response1: {response1}")
random_number = random.randint(1, 100000)
response2 = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": f"写一句关于: {random_number} 的诗",
}
],
max_tokens=20,
)
print(f"response2: {response1}")
assert response1.id == response2.id
# response1 == response2, response 1 被缓存
您可以通过以下步骤设置自己的云 Qdrant 集群:https://qdrant.tech/documentation/quickstart-cloud/
要在本地设置 Qdrant 集群,请按照以下步骤操作:https://qdrant.tech/documentation/quickstart/
import litellm
from litellm import completion
from litellm.caching.caching import Cache
random_number = random.randint(
1, 100000
) # 添加一个随机数以确保它总是从缓存中添加/读取
print("测试语义缓存")
litellm.cache = Cache(
type="qdrant-semantic",
qdrant_api_base=os.environ["QDRANT_API_BASE"],
qdrant_api_key=os.environ["QDRANT_API_KEY"],
qdrant_collection_name="your_collection_name", # 你集合的任意名称
similarity_threshold=0.7, # 缓存命中的相似度阈值,0 == 无相似度,1 = 完全匹配,0.5 == 50% 相似度
qdrant_quantization_config ="binary", # 可以是 'binary', 'product' 或 'scalar' 之一,由 qdrant 支持
qdrant_semantic_cache_embedding_model="text-embedding-ada-002", # 此模型传递给 litellm.embedding(),任何 litellm.embedding() 模型都支持
)
response1 = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": f"写一句关于: {random_number} 的诗",
}
],
max_tokens=20,
)
print(f"response1: {response1}")
random_number = random.randint(1, 100000)
response2 = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": f"写一句关于: {random_number} 的诗",
}
],
max_tokens=20,
)
print(f"response2: {response2}")
assert response1.id == response2.id
# response1 == response2, response 1 被缓存
快速开始
import litellm
from litellm import completion
from litellm.caching.caching import Cache
litellm.cache = Cache()
# 进行completion调用
response1 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "讲个笑话。"}],
caching=True
)
response2 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "讲个笑话。"}],
caching=True
)
# response1 == response2,response 1 被缓存
快速开始
安装diskcache:
pip install diskcache
然后你可以如下使用磁盘缓存。
import litellm
from litellm import completion
from litellm.caching.caching import Cache
litellm.cache = Cache(type="disk")
# 进行completion调用
response1 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "讲个笑话。"}],
caching=True
)
response2 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "讲个笑话。"}],
caching=True
)
# response1 == response2,response 1 被缓存
如果你运行代码两次,response1将会使用第一次运行时存储在缓存文件中的缓存。
为每个LiteLLM调用开启/关闭缓存
LiteLLM支持4种缓存控制:
no-cache
: 可选(bool) 当True
时,不会返回缓存响应,而是调用实际的端点。no-store
: 可选(bool) 当True
时,不会缓存响应。ttl
: 可选(int) - 将响应缓存用户定义的时间量(以秒为单位)。s-maxage
: 可选(int) 仅接受用户定义范围内的缓存响应(以秒为单位)。
no-cache
的示例用法 - 当True
时,不会返回缓存响应
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "你好,你是谁"
}
],
cache={"no-cache": True},
)
no-store
的示例用法 - 当True
时,不会缓存响应。
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "你好,你是谁"
}
],
cache={"no-store": True},
)
`ttl`的示例用法 - 将响应缓存10秒
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "你好,你是谁"
}
],
cache={"ttl": 10},
)
`s-maxage`的示例用法 - 仅接受60秒内的缓存响应
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "你好,你是谁"
}
],
cache={"s-maxage": 60},
)
缓存上下文管理器 - 启用、禁用、更新缓存
使用上下文管理器可以轻松启用、禁用和更新litellm缓存
启用缓存
快速启用
litellm.enable_cache()
高级参数
litellm.enable_cache(
type: Optional[Literal["local", "redis", "s3", "disk"]] = "local",
host: Optional[str] = None,
port: Optional[str] = None,
password: Optional[str] = None,
supported_call_types: Optional[
List[Literal["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"]]
] = ["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"],
**kwargs,
)
禁用缓存
关闭缓存
litellm.disable_cache()
更新缓存参数(Redis主机、端口等)
更新缓存参数
litellm.update_cache(
type: Optional[Literal["local", "redis", "s3", "disk"]] = "local",
host: Optional[str] = None,
port: Optional[str] = None,
password: Optional[str] = None,
supported_call_types: Optional[
List[Literal["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"]]
] = ["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"],
**kwargs,
)
自定义缓存键:
定义函数以返回缓存键
# 这个函数接收*args和**kwargs,并返回用于缓存的键
def custom_get_cache_key(*args, **kwargs):
# 返回用于缓存的键:
key = kwargs.get("model", "") + str(kwargs.get("messages", "")) + str(kwargs.get("temperature", "")) + str(kwargs.get("logit_bias", ""))
print("缓存的键", key)
return key
将你的函数设置为 litellm.cache.get_cache_key
from litellm.caching.caching import Cache
cache = Cache(type="redis", host=os.environ['REDIS_HOST'], port=os.environ['REDIS_PORT'], password=os.environ['REDIS_PASSWORD'])
cache.get_cache_key = custom_get_cache_key # 为你的缓存设置 get_cache_key 函数
litellm.cache = cache # 将 litellm.cache 设置为你的缓存
如何编写自定义的添加/获取缓存函数
1. 初始化缓存
from litellm.caching.caching import Cache
cache = Cache()
2. 定义自定义的添加/获取缓存函数
def add_cache(self, result, *args, **kwargs):
你的逻辑
def get_cache(self, *args, **kwargs):
你的逻辑
3. 将缓存的添加/获取函数指向你的自定义函数
cache.add_cache = add_cache
cache.get_cache = get_cache
缓存初始化参数
def __init__(
self,
type: Optional[Literal["local", "redis", "redis-semantic", "s3", "disk"]] = "local",
supported_call_types: Optional[
List[Literal["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"]]
] = ["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"],
ttl: Optional[float] = None,
default_in_memory_ttl: Optional[float] = None,
# redis 缓存参数
host: Optional[str] = None,
port: Optional[str] = None,
password: Optional[str] = None,
namespace: Optional[str] = None,
default_in_redis_ttl: Optional[float] = None,
similarity_threshold: Optional[float] = None,
redis_semantic_cache_use_async=False,
redis_semantic_cache_embedding_model="text-embedding-ada-002",
redis_flush_size=None,
# s3 Bucket, boto3 配置
s3_bucket_name: Optional[str] = None,
s3_region_name: Optional[str] = None,
s3_api_version: Optional[str] = None,
s3_path: Optional[str] = None, # 如果你想保存到特定路径
s3_use_ssl: Optional[bool] = True,
s3_verify: Optional[Union[bool, str]] = None,
s3_endpoint_url: Optional[str] = None,
s3_aws_access_key_id: Optional[str] = None,
s3_aws_secret_access_key: Optional[str] = None,
s3_aws_session_token: Optional[str] = None,
s3_config: Optional[Any] = None,
# 磁盘缓存参数
disk_cache_dir=None,
# qdrant 缓存参数
qdrant_api_base: Optional[str] = None,
qdrant_api_key: Optional[str] = None,
qdrant_collection_name: Optional[str] = None,
qdrant_quantization_config: Optional[str] = None,
qdrant_semantic_cache_embedding_model="text-embedding-ada-002",
**kwargs
):
日志记录
缓存命中会在成功事件中作为 kwarg["cache_hit"]
记录。
以下是访问它的示例:
import litellm
from litellm.integrations.custom_logger import CustomLogger
from litellm import completion, acompletion, Cache
# 创建成功事件的自定义回调
class MyCustomHandler(CustomLogger):
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
print(f"Value of Cache hit: {kwargs['cache_hit']}")
async def test_async_completion_azure_caching():
# 设置自定义回调
customHandler_caching = MyCustomHandler()
litellm.callbacks = [customHandler_caching]
# 初始化缓存
litellm.cache = Cache(type="redis", host=os.environ['REDIS_HOST'], port=os.environ['REDIS_PORT'], password=os.environ['REDIS_PASSWORD'])
unique_time = time.time()
response1 = await litellm.acompletion(model="azure/chatgpt-v-2",
messages=[{
"role": "user",
"content": f"Hi 👋 - i'm async azure {unique_time}"
}],
caching=True)
await asyncio.sleep(1)
print(f"customHandler_caching.states pre-cache hit: {customHandler_caching.states}")
response2 = await litellm.acompletion(model="azure/chatgpt-v-2",
messages=[{
"role": "user",
"content": f"Hi 👋 - i'm async azure {unique_time}"
}],
caching=True)
await asyncio.sleep(1) # 成功回调是并行完成的