2023年2月13日

使用Weaviate与OpenAI向量化模块进行嵌入搜索

本笔记本适用于以下场景:

  • 您的数据未进行向量化处理
  • 您希望对您的数据运行向量搜索
  • 您希望将Weaviate与OpenAI模块(text2vec-openai)结合使用,为您生成向量嵌入。

本笔记本将引导您完成一个简单流程:设置Weaviate实例、连接该实例(使用OpenAI API密钥)、配置数据模式、导入数据(系统会自动为您的数据生成向量嵌入)以及运行语义搜索。

对于希望在安全环境中存储和搜索我们的嵌入向量以支持生产用例(如聊天机器人、主题建模等)的客户来说,这是一个常见需求。

什么是Weaviate

Weaviate 是一个开源的向量搜索引擎,能够将数据对象与其向量一起存储。这使得向量搜索可以与结构化过滤相结合。

Weaviate 采用 KNN 算法创建向量优化索引,使您的查询能够极速运行。了解更多 here

Weaviate让您能够使用最喜爱的机器学习模型,并轻松扩展至数十亿数据对象。

部署选项

无论您的应用场景或生产环境如何,Weaviate都能为您提供合适的解决方案。您可以通过以下方式部署Weaviate:

  • 自托管 - 您可以在本地通过docker部署Weaviate,或部署到任意服务器。
  • SaaS – 你可以使用Weaviate云服务(WCS)来托管你的Weaviate实例。
  • 混合SaaS - 您可以在自己的私有云服务中部署Weaviate。

编程语言

Weaviate提供四种客户端库,可让您从应用程序中进行通信:

此外,Weaviate还提供REST层接口。基本上,您可以使用任何支持REST请求的语言来调用Weaviate。

演示流程

演示流程如下:

  • 前提条件设置: 创建一个Weaviate实例并安装所需的库
  • 连接: 连接到您的Weaviate实例
  • Schema Configuration: Configure the schema of your data
    • 注意: 这里我们可以定义使用哪个OpenAI嵌入模型
    • 注意: 这里我们可以配置需要索引的属性
  • Import data: Load a demo dataset and import it into Weaviate
    • 注意:导入过程将根据schema中的配置自动为您的数据建立索引
    • 注意:您无需显式地对数据进行向量化,Weaviate将与OpenAI通信为您完成此操作
  • Run Queries: Query
    • 注意:您无需显式地对查询进行向量化,Weaviate会与OpenAI通信为您完成此操作

完成本笔记本的学习后,您应该对如何设置和使用向量数据库有了基本理解,可以继续探索利用我们嵌入功能的更复杂用例。

Weaviate中的OpenAI模块

所有Weaviate实例都配备了text2vec-openai模块。

该模块负责在导入(或任何CRUD操作)期间以及运行查询时处理向量化。

无需手动向量化数据

这对你来说是个好消息。使用text2vec-openai你无需手动向量化数据,因为Weaviate会在需要时自动调用OpenAI。

您需要做的就是:

  1. 提供您的OpenAI API密钥 - 当您连接到Weaviate客户端时
  2. 在您的Schema中定义要使用的OpenAI向量化工具

先决条件

在开始这个项目之前,我们需要进行以下设置:

  • 创建一个 Weaviate 实例
  • install libraries
    • weaviate-client
    • datasets
    • apache-beam
  • 获取您的OpenAI API key

===========================================================

创建Weaviate实例

要创建Weaviate实例,我们有两种选择:

  1. (推荐路径) Weaviate Cloud Service - 在云端托管您的Weaviate实例。本教程使用免费沙盒环境就完全足够了。
  2. 使用Docker在本地安装并运行Weaviate。

选项1 – WCS安装步骤

使用Weaviate云服务 (WCS) 创建一个免费的Weaviate集群。

  1. 创建一个免费账户并/或登录到 WCS
  2. create a Weaviate Cluster with the following settings:
    • 沙盒环境: Sandbox Free
    • Weaviate 版本:使用默认(最新)
    • OIDC认证:Disabled
  3. 您的实例应该在一两分钟内准备就绪
  4. 记下Cluster Id。该链接将带您前往集群的完整路径(稍后连接时需要用到)。它应该类似于:https://your-project-name.weaviate.network

选项2 – 使用Docker本地运行Weaviate实例

使用Docker在本地安装并运行Weaviate。

  1. 下载 ./docker-compose.yml 文件
  2. 然后打开终端,导航到存放docker-compose.yml文件的位置,并通过以下命令启动docker:docker-compose up -d
  3. 准备就绪后,您的实例将在 http://localhost:8080 地址可用

注意:要关闭您的docker实例,可以调用:docker-compose down

了解更多

要了解更多关于在Docker中使用Weaviate的信息,请参阅安装文档

# Install the Weaviate client for Python
!pip install weaviate-client>=3.11.0

# Install datasets and apache-beam to load the sample datasets
!pip install datasets apache-beam

===========================================================

准备您的OpenAI API密钥

OpenAI API key 用于在导入时对数据进行向量化处理,以及执行查询操作。

如果您没有OpenAI API密钥,可以从https://beta.openai.com/account/api-keys获取。

获取密钥后,请将其添加到环境变量中,命名为OPENAI_API_KEY

# Export OpenAI API Key
!export OPENAI_API_KEY="your key"
# Test that your OpenAI API key is correctly set as an environment variable
# Note. if you run this notebook locally, you will need to reload your terminal and the notebook for the env variables to be live.
import os

# Note. alternatively you can set a temporary env variable like this:
# os.environ["OPENAI_API_KEY"] = 'your-key-goes-here'

if os.getenv("OPENAI_API_KEY") is not None:
    print ("OPENAI_API_KEY is ready")
else:
    print ("OPENAI_API_KEY environment variable not found")
import weaviate
from datasets import load_dataset
import os

# Connect to your Weaviate instance
client = weaviate.Client(
    url="https://your-wcs-instance-name.weaviate.network/",
    # url="http://localhost:8080/",
    auth_client_secret=weaviate.auth.AuthApiKey(api_key="<YOUR-WEAVIATE-API-KEY>"), # comment out this line if you are not using authentication for your Weaviate instance (i.e. for locally deployed instances)
    additional_headers={
        "X-OpenAI-Api-Key": os.getenv("OPENAI_API_KEY")
    }
)

# Check if your instance is live and ready
# This should return `True`
client.is_ready()

Schema

在本节中,我们将:

  1. 为您的数据配置数据模式
  2. 选择OpenAI模块

这是第二个也是最后一步,需要OpenAI的特定配置。 完成此步骤后,其余说明将仅涉及Weaviate,因为OpenAI任务将自动处理。

什么是模式(schema)

在Weaviate中,您创建模式(schemas)来捕获您将要搜索的每个实体。

模式(schema)是您告知Weaviate的方式:

  • 应该使用哪种嵌入模型来向量化数据
  • 您的数据构成(属性名称和类型)
  • 哪些属性应该进行向量化和索引

在本教程中,我们将使用一个Articles数据集,其中包含:

  • title
  • content
  • url

我们希望将titlecontent向量化,但不包括url

为了对数据进行向量化和查询,我们将使用text-embedding-3-small

# Clear up the schema, so that we can recreate it
client.schema.delete_all()
client.schema.get()

# Define the Schema object to use `text-embedding-3-small` on `title` and `content`, but skip it for `url`
article_schema = {
    "class": "Article",
    "description": "A collection of articles",
    "vectorizer": "text2vec-openai",
    "moduleConfig": {
        "text2vec-openai": {
          "model": "ada",
          "modelVersion": "002",
          "type": "text"
        }
    },
    "properties": [{
        "name": "title",
        "description": "Title of the article",
        "dataType": ["string"]
    },
    {
        "name": "content",
        "description": "Contents of the article",
        "dataType": ["text"]
    },
    {
        "name": "url",
        "description": "URL to the article",
        "dataType": ["string"],
        "moduleConfig": { "text2vec-openai": { "skip": True } }
    }]
}

# add the Article schema
client.schema.create_class(article_schema)

# get the schema to make sure it worked
client.schema.get()

导入数据

在本节中,我们将:

  1. 加载Simple Wikipedia数据集
  2. 配置Weaviate批量导入(以提高导入效率)
  3. 将数据导入Weaviate

注意:
如前所述,我们无需手动对数据进行向量化处理。
text2vec-openai模块会自动完成这项工作。

### STEP 1 - load the dataset

from datasets import load_dataset
from typing import List, Iterator

# We'll use the datasets library to pull the Simple Wikipedia dataset for embedding
dataset = list(load_dataset("wikipedia", "20220301.simple")["train"])

# For testing, limited to 2.5k articles for demo purposes
dataset = dataset[:2_500]

# Limited to 25k articles for larger demo purposes
# dataset = dataset[:25_000]

# for free OpenAI acounts, you can use 50 objects
# dataset = dataset[:50]
### Step 2 - configure Weaviate Batch, with
# - starting batch size of 100
# - dynamically increase/decrease based on performance
# - add timeout retries if something goes wrong

client.batch.configure(
    batch_size=10, 
    dynamic=True,
    timeout_retries=3,
#   callback=None,
)
### Step 3 - import data

print("Importing Articles")

counter=0

with client.batch as batch:
    for article in dataset:
        if (counter %10 == 0):
            print(f"Import {counter} / {len(dataset)} ")

        properties = {
            "title": article["title"],
            "content": article["text"],
            "url": article["url"]
        }
        
        batch.add_data_object(properties, "Article")
        counter = counter+1

print("Importing Articles complete")       
# Test that all data has loaded – get object count
result = (
    client.query.aggregate("Article")
    .with_fields("meta { count }")
    .do()
)
print("Object count: ", result["data"]["Aggregate"]["Article"], "\n")
# Test one article has worked by checking one object
test_article = (
    client.query
    .get("Article", ["title", "url", "content"])
    .with_limit(1)
    .do()
)["data"]["Get"]["Article"][0]

print(test_article['title'])
print(test_article['url'])
print(test_article['content'])

搜索数据

如上所述,我们将向新索引发起一些查询,并根据与现有向量的接近程度返回结果

def query_weaviate(query, collection_name):
    
    nearText = {
        "concepts": [query],
        "distance": 0.7,
    }

    properties = [
        "title", "content", "url",
        "_additional {certainty distance}"
    ]

    result = (
        client.query
        .get(collection_name, properties)
        .with_near_text(nearText)
        .with_limit(10)
        .do()
    )
    
    # Check for errors
    if ("errors" in result):
        print ("\033[91mYou probably have run out of OpenAI API calls for the current minute – the limit is set at 60 per minute.")
        raise Exception(result["errors"][0]['message'])
    
    return result["data"]["Get"][collection_name]
query_result = query_weaviate("modern art in Europe", "Article")

for i, article in enumerate(query_result):
    print(f"{i+1}. { article['title']} (Score: {round(article['_additional']['certainty'],3) })")
query_result = query_weaviate("Famous battles in Scottish history", "Article")

for i, article in enumerate(query_result):
    print(f"{i+1}. { article['title']} (Score: {round(article['_additional']['certainty'],3) })")

感谢您的关注,您现在已掌握如何建立自己的向量数据库并使用嵌入技术实现各种酷炫功能——尽情探索吧!对于更复杂的应用场景,请继续学习本代码库中的其他实用示例。