Skip to main content

缓存 - 内存、Redis、s3、Redis 语义缓存、磁盘

查看代码

info

初始化缓存 - 内存、Redis、s3 存储桶、Redis 语义、磁盘缓存、Qdrant 语义

安装 redis

pip install redis

对于托管版本,您可以在这里设置自己的 Redis 数据库:https://app.redislabs.com/

import litellm
from litellm import completion
from litellm.caching.caching import Cache

litellm.cache = Cache(type="redis", host=<host>, port=<port>, password=<password>)

# 进行完成调用
response1 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke."}]
)
response2 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke."}]
)

# response1 == response2, response 1 被缓存

安装 boto3

pip install boto3

设置 AWS 环境变量

AWS_ACCESS_KEY_ID = "AKI*******"
AWS_SECRET_ACCESS_KEY = "WOl*****"
import litellm
from litellm import completion
from litellm.caching.caching import Cache

# 传递 s3 存储桶名称
litellm.cache = Cache(type="s3", s3_bucket_name="cache-bucket-litellm", s3_region_name="us-west-2")

# 进行完成调用
response1 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke."}]
)
response2 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke."}]
)

# response1 == response2, response 1 被缓存

安装 redis

pip install redisvl==0.0.7

对于托管版本,您可以在这里设置自己的 Redis 数据库:https://app.redislabs.com/

import litellm
from litellm import completion
from litellm.caching.caching import Cache

random_number = random.randint(
1, 100000
) # 添加一个随机数以确保始终添加 / 读取缓存

print("测试语义缓存")
litellm.cache = Cache(
type="redis-semantic",
host=os.environ["REDIS_HOST"],
port=os.environ["REDIS_PORT"],
password=os.environ["REDIS_PASSWORD"],
similarity_threshold=0.8, # 缓存命中的相似度阈值,0 == 无相似度,1 = 完全匹配,0.5 == 50% 相似度
redis_semantic_cache_embedding_model="text-embedding-ada-002", # 此模型传递给 litellm.embedding(),支持任何 litellm.embedding() 模型
)
response1 = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": f"写一句关于: {random_number} 的诗",
}
],
max_tokens=20,
)
print(f"response1: {response1}")

random_number = random.randint(1, 100000)

response2 = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": f"写一句关于: {random_number} 的诗",
}
],
max_tokens=20,
)
print(f"response2: {response1}")
assert response1.id == response2.id
# response1 == response2, response 1 被缓存

您可以通过以下步骤设置自己的云 Qdrant 集群:https://qdrant.tech/documentation/quickstart-cloud/

要在本地设置 Qdrant 集群,请按照以下步骤操作:https://qdrant.tech/documentation/quickstart/

import litellm
from litellm import completion
from litellm.caching.caching import Cache

random_number = random.randint(
1, 100000
) # 添加一个随机数以确保它总是从缓存中添加/读取

print("测试语义缓存")
litellm.cache = Cache(
type="qdrant-semantic",
qdrant_api_base=os.environ["QDRANT_API_BASE"],
qdrant_api_key=os.environ["QDRANT_API_KEY"],
qdrant_collection_name="your_collection_name", # 你集合的任意名称
similarity_threshold=0.7, # 缓存命中的相似度阈值,0 == 无相似度,1 = 完全匹配,0.5 == 50% 相似度
qdrant_quantization_config ="binary", # 可以是 'binary', 'product' 或 'scalar' 之一,由 qdrant 支持
qdrant_semantic_cache_embedding_model="text-embedding-ada-002", # 此模型传递给 litellm.embedding(),任何 litellm.embedding() 模型都支持
)

response1 = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": f"写一句关于: {random_number} 的诗",
}
],
max_tokens=20,
)
print(f"response1: {response1}")

random_number = random.randint(1, 100000)

response2 = completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": f"写一句关于: {random_number} 的诗",
}
],
max_tokens=20,
)
print(f"response2: {response2}")
assert response1.id == response2.id
# response1 == response2, response 1 被缓存

快速开始

import litellm
from litellm import completion
from litellm.caching.caching import Cache
litellm.cache = Cache()

# 进行completion调用
response1 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "讲个笑话。"}],
caching=True
)
response2 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "讲个笑话。"}],
caching=True
)

# response1 == response2,response 1 被缓存

快速开始

安装diskcache:

pip install diskcache

然后你可以如下使用磁盘缓存。

import litellm
from litellm import completion
from litellm.caching.caching import Cache
litellm.cache = Cache(type="disk")

# 进行completion调用
response1 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "讲个笑话。"}],
caching=True
)
response2 = completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "讲个笑话。"}],
caching=True
)

# response1 == response2,response 1 被缓存

如果你运行代码两次,response1将会使用第一次运行时存储在缓存文件中的缓存。

为每个LiteLLM调用开启/关闭缓存

LiteLLM支持4种缓存控制:

  • no-cache: 可选(bool)True时,不会返回缓存响应,而是调用实际的端点。
  • no-store: 可选(bool)True时,不会缓存响应。
  • ttl: 可选(int) - 将响应缓存用户定义的时间量(以秒为单位)。
  • s-maxage: 可选(int) 仅接受用户定义范围内的缓存响应(以秒为单位)。

如果您需要更多功能,请告诉我们

no-cache的示例用法 - 当True时,不会返回缓存响应

response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "你好,你是谁"
}
],
cache={"no-cache": True},
)

no-store的示例用法 - 当True时,不会缓存响应。

response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "你好,你是谁"
}
],
cache={"no-store": True},
)
`ttl`的示例用法 - 将响应缓存10秒
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "你好,你是谁"
}
],
cache={"ttl": 10},
)
`s-maxage`的示例用法 - 仅接受60秒内的缓存响应
response = litellm.completion(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "你好,你是谁"
}
],
cache={"s-maxage": 60},
)

缓存上下文管理器 - 启用、禁用、更新缓存

使用上下文管理器可以轻松启用、禁用和更新litellm缓存

启用缓存

快速启用

litellm.enable_cache()

高级参数

litellm.enable_cache(
type: Optional[Literal["local", "redis", "s3", "disk"]] = "local",
host: Optional[str] = None,
port: Optional[str] = None,
password: Optional[str] = None,
supported_call_types: Optional[
List[Literal["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"]]
] = ["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"],
**kwargs,
)

禁用缓存

关闭缓存

litellm.disable_cache()

更新缓存参数(Redis主机、端口等)

更新缓存参数

litellm.update_cache(
type: Optional[Literal["local", "redis", "s3", "disk"]] = "local",
host: Optional[str] = None,
port: Optional[str] = None,
password: Optional[str] = None,
supported_call_types: Optional[
List[Literal["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"]]
] = ["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"],
**kwargs,
)

自定义缓存键:

定义函数以返回缓存键

# 这个函数接收*args和**kwargs,并返回用于缓存的键
def custom_get_cache_key(*args, **kwargs):
# 返回用于缓存的键:
key = kwargs.get("model", "") + str(kwargs.get("messages", "")) + str(kwargs.get("temperature", "")) + str(kwargs.get("logit_bias", ""))
print("缓存的键", key)
return key

将你的函数设置为 litellm.cache.get_cache_key

from litellm.caching.caching import Cache

cache = Cache(type="redis", host=os.environ['REDIS_HOST'], port=os.environ['REDIS_PORT'], password=os.environ['REDIS_PASSWORD'])

cache.get_cache_key = custom_get_cache_key # 为你的缓存设置 get_cache_key 函数

litellm.cache = cache # 将 litellm.cache 设置为你的缓存

如何编写自定义的添加/获取缓存函数

1. 初始化缓存

from litellm.caching.caching import Cache
cache = Cache()

2. 定义自定义的添加/获取缓存函数

def add_cache(self, result, *args, **kwargs):
你的逻辑

def get_cache(self, *args, **kwargs):
你的逻辑

3. 将缓存的添加/获取函数指向你的自定义函数

cache.add_cache = add_cache
cache.get_cache = get_cache

缓存初始化参数

def __init__(
self,
type: Optional[Literal["local", "redis", "redis-semantic", "s3", "disk"]] = "local",
supported_call_types: Optional[
List[Literal["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"]]
] = ["completion", "acompletion", "embedding", "aembedding", "atranscription", "transcription"],
ttl: Optional[float] = None,
default_in_memory_ttl: Optional[float] = None,

# redis 缓存参数
host: Optional[str] = None,
port: Optional[str] = None,
password: Optional[str] = None,
namespace: Optional[str] = None,
default_in_redis_ttl: Optional[float] = None,
similarity_threshold: Optional[float] = None,
redis_semantic_cache_use_async=False,
redis_semantic_cache_embedding_model="text-embedding-ada-002",
redis_flush_size=None,

# s3 Bucket, boto3 配置
s3_bucket_name: Optional[str] = None,
s3_region_name: Optional[str] = None,
s3_api_version: Optional[str] = None,
s3_path: Optional[str] = None, # 如果你想保存到特定路径
s3_use_ssl: Optional[bool] = True,
s3_verify: Optional[Union[bool, str]] = None,
s3_endpoint_url: Optional[str] = None,
s3_aws_access_key_id: Optional[str] = None,
s3_aws_secret_access_key: Optional[str] = None,
s3_aws_session_token: Optional[str] = None,
s3_config: Optional[Any] = None,

# 磁盘缓存参数
disk_cache_dir=None,

# qdrant 缓存参数
qdrant_api_base: Optional[str] = None,
qdrant_api_key: Optional[str] = None,
qdrant_collection_name: Optional[str] = None,
qdrant_quantization_config: Optional[str] = None,
qdrant_semantic_cache_embedding_model="text-embedding-ada-002",

**kwargs
):

日志记录

缓存命中会在成功事件中作为 kwarg["cache_hit"] 记录。

以下是访问它的示例:

import litellm
from litellm.integrations.custom_logger import CustomLogger
from litellm import completion, acompletion, Cache

# 创建成功事件的自定义回调
class MyCustomHandler(CustomLogger):
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
print(f"Value of Cache hit: {kwargs['cache_hit']}")

async def test_async_completion_azure_caching():
# 设置自定义回调
customHandler_caching = MyCustomHandler()
litellm.callbacks = [customHandler_caching]

# 初始化缓存
litellm.cache = Cache(type="redis", host=os.environ['REDIS_HOST'], port=os.environ['REDIS_PORT'], password=os.environ['REDIS_PASSWORD'])
unique_time = time.time()
response1 = await litellm.acompletion(model="azure/chatgpt-v-2",
messages=[{
"role": "user",
"content": f"Hi 👋 - i'm async azure {unique_time}"
}],
caching=True)
await asyncio.sleep(1)
print(f"customHandler_caching.states pre-cache hit: {customHandler_caching.states}")
response2 = await litellm.acompletion(model="azure/chatgpt-v-2",
messages=[{
"role": "user",
"content": f"Hi 👋 - i'm async azure {unique_time}"
}],
caching=True)
await asyncio.sleep(1) # 成功回调是并行完成的