2024年8月2日

在ChatGPT中使用GCP函数和GPT操作实现GCP BigQuery向量搜索

,

本笔记本提供了逐步指南,介绍如何将Google Cloud BigQuery作为支持向量搜索功能的数据库使用,结合OpenAI嵌入技术,然后在其基础上创建Google Cloud Function以便接入ChatGPT中的自定义GPT。

对于希望在Google云平台(GCP)内建立RAG基础设施,并将其作为端点与其他平台(如ChatGPT)集成的客户来说,这是一个可行的解决方案。

Google Cloud BigQuery 是一个完全托管、无服务器的数据仓库,利用谷歌基础设施的处理能力实现超快速SQL查询。它让开发者能够轻松存储和分析海量数据集。

Google Cloud Functions 是一种轻量级、基于事件的异步计算解决方案,可让您创建小型单一用途的函数来响应云事件,而无需管理服务器或运行时环境。

先决条件:

要运行此手册,您必须拥有:

  • 您有权访问的GCP项目
  • 拥有创建BigQuery数据集和Google云函数权限的GCP用户
  • GCP CLI 已安装并连接
  • OpenAI API key
  • ChatGPT Plus、Teams 或 Enterprise 订阅

架构

以下是该解决方案的架构图,我们将逐步讲解:

bigquery-rag-architecture.png

目录

  1. 环境设置 通过安装和导入所需库并配置GCP设置来完成环境搭建。包括:

  2. Prepare Data 通过嵌入文档以及捕获额外的元数据来准备要上传的数据。我们将使用OpenAI文档的一个子集作为示例数据。

  3. 创建支持向量搜索的BigQuery表
    创建一个BigQuery表并上传我们准备好的数据。包含:

  4. Create GCP Function 使用之前计算的环境变量和gcloud CLI

  5. 在ChatGPT的自定义GPT中输入 对BigQuery中的嵌入数据执行搜索:

! pip install -q google-auth
! pip install -q openai
! pip install -q pandas
! pip install -q google-cloud-functions
! pip install -q python-dotenv
! pip install -q pyperclip
! pip install -q PyPDF2
! pip install -q tiktoken
! pip install -q google-cloud-bigquery
! pip install -q pyyaml
# Standard Libraries
import json  
import os
import csv
import shutil
from itertools import islice
import concurrent.futures
import yaml

# Third-Party Libraries
import pandas as pd
import numpy as np
from PyPDF2 import PdfReader
import tiktoken
from dotenv import load_dotenv
import pyperclip

# OpenAI Libraries
from openai import OpenAI

# Google Cloud Identity and Credentials
from google.auth import default
from google.cloud import bigquery
from google.cloud import functions_v1

配置GCP项目

如果尚未设置,我们将安装GCP命令行工具,验证GCP身份并设置您的默认项目。

# Add gcloud to PATH
os.environ['PATH'] += os.pathsep + os.path.expanduser('~/google-cloud-sdk/bin')

# Verify gcloud is in PATH
! gcloud --version
! gcloud auth application-default login
project_id = "<insert_project_id>"  # Replace with your actual project ID
! gcloud config set project {project_id}
! gcloud services enable cloudfunctions.googleapis.com
! gcloud services enable cloudbuild.googleapis.com
! gcloud services enable bigquery.googleapis.com

配置OpenAI设置

本节将指导您完成为OpenAI设置身份验证的步骤。在开始之前,请确保您已获取OpenAI API密钥。

openai_api_key = os.environ.get("OPENAI_API_KEY", "<your OpenAI API key if not set as an env var>") # Saving this as a variable to reference in function app in later step
openai_client = OpenAI(api_key=openai_api_key)
embeddings_model = "text-embedding-3-small" # We'll use this by default, but you can change to your text-embedding-3-large if desired
from google.auth import default

# Use default credentials
credentials, project_id = default()
region = "us-central1" # e.g: "us-central1"
print("Default Project ID:", project_id)

准备数据

我们将把OpenAI文档中的几页嵌入并存储到oai_docs文件夹中。首先我们会逐个嵌入,添加到CSV文件,然后使用该CSV文件上传到索引。

我们将采用这本指南手册中强调的一些技术。这是一种快速嵌入文本的方法,无需考虑诸如章节等变量,利用我们的视觉模型来描述图像/图表/示意图,对于较长文档则在文本块之间进行重叠处理等。

为了处理超过8191个标记上下文长度的文本文件,我们可以单独使用分块嵌入,或者以某种方式(例如按每个分块的大小加权平均)将它们组合起来。

我们将采用Python官方手册中的一个函数,该函数可将序列分割成多个块。

def batched(iterable, n):
    """Batch data into tuples of length n. The last batch may be shorter."""
    # batched('ABCDEFG', 3) --> ABC DEF G
    if n < 1:
        raise ValueError('n must be at least one')
    it = iter(iterable)
    while (batch := tuple(islice(it, n))):
        yield batch

现在我们定义一个函数,将字符串编码为标记(token)并将其分割成块。我们将使用tiktoken,这是OpenAI开发的一个快速开源标记器。

要了解更多关于使用Tiktoken计算令牌的信息,请查看此烹饪书

def chunked_tokens(text, chunk_length, encoding_name='cl100k_base'):
    # Get the encoding object for the specified encoding name. OpenAI's tiktoken library, which is used in this notebook, currently supports two encodings: 'bpe' and 'cl100k_base'. The 'bpe' encoding is used for GPT-3 and earlier models, while 'cl100k_base' is used for newer models like GPT-4.
    encoding = tiktoken.get_encoding(encoding_name)
    # Encode the input text into tokens
    tokens = encoding.encode(text)
    # Create an iterator that yields chunks of tokens of the specified length
    chunks_iterator = batched(tokens, chunk_length)
    # Yield each chunk from the iterator
    yield from chunks_iterator

最后,我们可以编写一个函数来安全处理嵌入请求,即使输入文本超过最大上下文长度,也能通过分块处理输入标记并单独嵌入每个块。average标志可设为True以返回块嵌入的加权平均值,或设为False直接返回未经修改的块嵌入列表。

注意:您还可以采用其他技术,包括:

  • 使用GPT-4o捕获图像/图表描述以进行嵌入
  • 基于段落或章节的分块
  • 为每篇文章添加更具描述性的元数据。
EMBEDDING_CTX_LENGTH = 8191
EMBEDDING_ENCODING='cl100k_base'
def generate_embeddings(text, model):
    # Generate embeddings for the provided text using the specified model
    embeddings_response = openai_client.embeddings.create(model=model, input=text)
    # Extract the embedding data from the response
    embedding = embeddings_response.data[0].embedding
    return embedding

def len_safe_get_embedding(text, model=embeddings_model, max_tokens=EMBEDDING_CTX_LENGTH, encoding_name=EMBEDDING_ENCODING):
    # Initialize lists to store embeddings and corresponding text chunks
    chunk_embeddings = []
    chunk_texts = []
    # Iterate over chunks of tokens from the input text
    for chunk in chunked_tokens(text, chunk_length=max_tokens, encoding_name=encoding_name):
        # Generate embeddings for each chunk and append to the list
        chunk_embeddings.append(generate_embeddings(chunk, model=model))
        # Decode the chunk back to text and append to the list
        chunk_texts.append(tiktoken.get_encoding(encoding_name).decode(chunk))
    # Return the list of chunk embeddings and the corresponding text chunks
    return chunk_embeddings, chunk_texts

接下来,我们可以定义一个辅助函数来捕获文档的额外元数据。在本示例中,我将从一个类别列表中进行选择,以便后续在元数据过滤器中使用

categories = ['authentication','models','techniques','tools','setup','billing_limits','other']

def categorize_text(text, categories):

    # Create a prompt for categorization
    messages = [
        {"role": "system", "content": f"""You are an expert in LLMs, and you will be given text that corresponds to an article in OpenAI's documentation.
         Categorize the document into one of these categories: {', '.join(categories)}. Only respond with the category name and nothing else."""},
        {"role": "user", "content": text}
    ]
    try:
        # Call the OpenAI API to categorize the text
        response = openai_client.chat.completions.create(
            model="gpt-4o",
            messages=messages
        )

        # Extract the category from the response
        category = response.choices[0].message.content
        return category
    except Exception as e:
        print(f"Error categorizing text: {str(e)}")
        return None

# Example usage

现在,我们可以定义一些辅助函数来处理oai_docs文件夹中的.txt文件。你可以自由地将此应用于自己的数据,它同时支持.txt和.pdf文件。

def extract_text_from_pdf(pdf_path):
    # Initialize the PDF reader
    reader = PdfReader(pdf_path)
    text = ""
    # Iterate through each page in the PDF and extract text
    for page in reader.pages:
        text += page.extract_text()
    return text

def process_file(file_path, idx, categories, embeddings_model):
    file_name = os.path.basename(file_path)
    print(f"Processing file {idx + 1}: {file_name}")
    
    # Read text content from .txt files
    if file_name.endswith('.txt'):
        with open(file_path, 'r', encoding='utf-8') as file:
            text = file.read()
    # Extract text content from .pdf files
    elif file_name.endswith('.pdf'):
        text = extract_text_from_pdf(file_path)
    
    title = file_name
    # Generate embeddings for the title
    title_vectors, title_text = len_safe_get_embedding(title, embeddings_model)
    print(f"Generated title embeddings for {file_name}")
    
    # Generate embeddings for the content
    content_vectors, content_text = len_safe_get_embedding(text, embeddings_model)
    print(f"Generated content embeddings for {file_name}")
    
    category = categorize_text(' '.join(content_text), categories)
    print(f"Categorized {file_name} as {category}")
    
    # Prepare the data to be appended
    data = []
    for i, content_vector in enumerate(content_vectors):
        data.append({
            "id": f"{idx}_{i}",
            "vector_id": f"{idx}_{i}",
            "title": title_text[0],
            "text": content_text[i],
            "title_vector": json.dumps(title_vectors[0]),  # Assuming title is short and has only one chunk
            "content_vector": json.dumps(content_vector),
            "category": category
        })
        print(f"Appended data for chunk {i + 1}/{len(content_vectors)} of {file_name}")
    
    return data

我们现在将使用这个辅助函数来处理我们的OpenAI文档。您可以通过修改下面process_files中的文件夹路径,自由更新以使用您自己的数据。

请注意,这将并发处理所选文件夹中的文档,因此如果使用txt文件,耗时应少于30秒;若使用PDF文件,耗时则会稍长一些。

## Customize the location below if you are using different data besides the OpenAI documentation. Note that if you are using a different dataset, you will need to update the categories list as well.
folder_name = "../../../data/oai_docs"

files = [os.path.join(folder_name, f) for f in os.listdir(folder_name) if f.endswith('.txt') or f.endswith('.pdf')]
data = []

# Process each file concurrently
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {executor.submit(process_file, file_path, idx, categories, embeddings_model): idx for idx, file_path in enumerate(files)}
    for future in concurrent.futures.as_completed(futures):
        try:
            result = future.result()
            data.extend(result)
        except Exception as e:
            print(f"Error processing file: {str(e)}")

# Write the data to a CSV file
csv_file = os.path.join("..", "embedded_data.csv")
with open(csv_file, 'w', newline='', encoding='utf-8') as csvfile:
    fieldnames = ["id", "vector_id", "title", "text", "title_vector", "content_vector","category"]
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    for row in data:
        writer.writerow(row)
        print(f"Wrote row with id {row['id']} to CSV")

# Convert the CSV file to a Dataframe
article_df = pd.read_csv("../embedded_data.csv")
# Read vectors from strings back into a list using json.loads
article_df["title_vector"] = article_df.title_vector.apply(json.loads)
article_df["content_vector"] = article_df.content_vector.apply(json.loads)
article_df["vector_id"] = article_df["vector_id"].apply(str)
article_df["category"] = article_df["category"].apply(str)
article_df.head()

我们现在有一个包含六列的embedded_data.csv文件,可以上传到我们的向量数据库!

使用Vector Search创建BigQuery表

创建BigQuery数据集

我们将利用Google SDK创建一个名为"oai_docs"的数据集,其中包含一个表名为"embedded_data",但您可以随意更改这些变量(您也可以更改区域)。

PS: We won't create a BigQuery index, that could improve the performance of the vector search, because such index requires more than 1k rows in our dataset which we don't have in our example, but feel free to leverage that for your own use-case.

# Create bigquery table

from google.cloud import bigquery
from google.api_core.exceptions import Conflict

# Define the dataset ID (project_id.dataset_id)
raw_dataset_id = 'oai_docs'
dataset_id = project_id + '.' + raw_dataset_id

client = bigquery.Client(credentials=credentials, project=project_id)

# Construct a full Dataset object to send to the API
dataset = bigquery.Dataset(dataset_id)

# Specify the geographic location where the dataset should reside
dataset.location = "US"

# Send the dataset to the API for creation
try:
    dataset = client.create_dataset(dataset, timeout=30)
    print(f"Created dataset {client.project}.{dataset.dataset_id}")
except Conflict:
    print(f"dataset {dataset.dataset_id } already exists")
    
# Read the CSV file, properly handling multiline fields
csv_file_path = "../embedded_data.csv"
df = pd.read_csv(csv_file_path, engine='python', quotechar='"', quoting=1)

# Display the first few rows of the dataframe
df.head()

创建表格并上传数据

我们将创建包含属性名称和类型的表。注意'content_vector'属性,它允许为单行存储一个浮点数向量,我们将用它来进行向量搜索。

这段代码将循环处理我们之前创建的CSV文件,将数据行插入到Bigquery中。 如果您多次运行此代码,将会插入多条相同的记录,这会导致搜索时结果准确性降低(您可以为ID设置唯一性约束或每次清理数据库)。

# Read the CSV file, properly handling multiline fields
dataset_id = project_id + '.' + raw_dataset_id
client = bigquery.Client(credentials=credentials, project=project_id)
csv_file_path = "../embedded_data.csv"
df = pd.read_csv(csv_file_path, engine='python', quotechar='"', quoting=1)

# Preprocess the data to ensure content_vector is correctly formatted
# removing last and first character which are brackets [], comma splitting and converting to float
def preprocess_content_vector(row):
    row['content_vector'] = [float(x) for x in row['content_vector'][1:-1].split(',')]
    return row

# Apply preprocessing to the dataframe
df = df.apply(preprocess_content_vector, axis=1)

# Define the schema of the final table
final_schema = [
    bigquery.SchemaField("id", "STRING"),
    bigquery.SchemaField("vector_id", "STRING"),
    bigquery.SchemaField("title", "STRING"),
    bigquery.SchemaField("text", "STRING"),
    bigquery.SchemaField("title_vector", "STRING"),
    bigquery.SchemaField("content_vector", "FLOAT64", mode="REPEATED"),
    bigquery.SchemaField("category", "STRING"),
]

# Define the final table ID
raw_table_id = 'embedded_data'
final_table_id = f'{dataset_id}.' + raw_table_id

# Create the final table object
final_table = bigquery.Table(final_table_id, schema=final_schema)

# Send the table to the API for creation
final_table = client.create_table(final_table, exists_ok=True)  # API request
print(f"Created final table {project_id}.{final_table.dataset_id}.{final_table.table_id}")

# Convert DataFrame to list of dictionaries for BigQuery insertion
rows_to_insert = df.to_dict(orient='records')

# Upload data to the final table
errors = client.insert_rows_json(f"{final_table.dataset_id}.{final_table.table_id}", rows_to_insert)  # API request

if errors:
    print(f"Encountered errors while inserting rows: {errors}")
else:
    print(f"Successfully loaded data into {dataset_id}:{final_table_id}")

测试搜索

现在数据已经上传,我们将在下面本地测试纯向量相似性搜索和带元数据过滤的功能,以确保其按预期工作。

您可以测试纯向量搜索和元数据过滤。

下面的查询是纯向量搜索,我们没有按类别进行过滤。

query = "What model should I use to embed?"
category = "models"

embedding_query = generate_embeddings(query, embeddings_model)
embedding_query_list = ', '.join(map(str, embedding_query))

query = f"""
WITH search_results AS (
  SELECT query.id AS query_id, base.id AS base_id, distance
  FROM VECTOR_SEARCH(
    TABLE oai_docs.embedded_data, 'content_vector',
    (SELECT ARRAY[{embedding_query_list}] AS content_vector, 'query_vector' AS id),
    top_k => 2, distance_type => 'COSINE', options => '{{"use_brute_force": true}}')
)
SELECT sr.query_id, sr.base_id, sr.distance, ed.text, ed.title
FROM search_results sr
JOIN oai_docs.embedded_data ed ON sr.base_id = ed.id
ORDER BY sr.distance ASC
"""

query_job = client.query(query)
results = query_job.result()  # Wait for the job to complete

for row in results:
    print(f"query_id: {row['query_id']}, base_id: {row['base_id']}, distance: {row['distance']}, text_truncated: {row['text'][0:100]}")

使用元数据过滤执行搜索

元数据过滤可以在向量搜索找到最接近语义结果的基础上,进一步限制具有特定属性的发现。

提供的代码片段展示了如何执行带有元数据过滤的查询:


query = "What model should I use to embed?"
category = "models"

embedding_query = generate_embeddings(query, embeddings_model)
embedding_query_list = ', '.join(map(str, embedding_query))


query = f"""
WITH search_results AS (
  SELECT query.id AS query_id, base.id AS base_id, distance
  FROM VECTOR_SEARCH(
    (SELECT * FROM oai_docs.embedded_data WHERE category = '{category}'), 
    'content_vector',
    (SELECT ARRAY[{embedding_query_list}] AS content_vector, 'query_vector' AS id),
    top_k => 4, distance_type => 'COSINE', options => '{{"use_brute_force": true}}')
)
SELECT sr.query_id, sr.base_id, sr.distance, ed.text, ed.title, ed.category
FROM search_results sr
JOIN oai_docs.embedded_data ed ON sr.base_id = ed.id
ORDER BY sr.distance ASC
"""


query_job = client.query(query)
results = query_job.result()  # Wait for the job to complete

for row in results:
    print(f"category: {row['category']}, title: {row['title']}, base_id: {row['base_id']}, distance: {row['distance']}, text_truncated: {row['text'][0:100]}")

创建GCP函数

导出变量

我们将在此文件夹中的main.py部署该函数(也可在此获取)。

第一步,我们将导出变量以定位目标表/数据集,并使用OpenAI的API生成嵌入向量。

# Create a dictionary to store the environment variables (they were used previously and are just retrieved)
env_variables = {
    'OPENAI_API_KEY': openai_api_key,
    'EMBEDDINGS_MODEL': embeddings_model,
    'PROJECT_ID': project_id,
    'DATASET_ID': raw_dataset_id,
    'TABLE_ID': raw_table_id
}

# Write the environment variables to a YAML file
with open('env.yml', 'w') as yaml_file:
    yaml.dump(env_variables, yaml_file, default_flow_style=False)

print("env.yml file created successfully.")

部署函数

我们现在将为当前项目创建一个名为"openai_docs_search"的Google函数,为此我们将运行以下CLI命令,利用之前创建的环境变量。请注意,此函数可以从任何地方调用而无需身份验证,请不要在生产环境中使用或添加额外的身份验证机制。

! gcloud functions deploy openai_docs_search \
  --runtime python39 \
  --trigger-http \
  --allow-unauthenticated \
  --env-vars-file env.yml

在ChatGPT的自定义GPT中输入

现在我们有了一个可以查询这个向量搜索索引的GCP函数,让我们将其设置为GPT Action!

查看关于GPTs的文档此处以及关于GPT Actions的文档此处。将以下内容用作GPT的指令和GPT Action的OpenAPI规范。

创建OpenAPI规范

以下是一个OpenAPI规范示例。当我们运行下面的代码块时,一个功能规范将被复制到剪贴板,以便粘贴到GPT Action中。

请注意,默认情况下此功能不包含任何身份验证,但您可以按照GCP文档此处的指引为GCP Functions设置身份验证。

spec = f"""
openapi: 3.1.0
info:
  title: OpenAI API documentation search
  description: API to perform a semantic search over OpenAI APIs
  version: 1.0.0
servers:
  - url: https://{region}-{project_id}.cloudfunctions.net
    description: Main (production) server
paths:
  /openai_docs_search:
    post:
      operationId: openai_docs_search
      summary: Perform a search
      description: Returns search results for the given query parameters.
      requestBody:
        required: true
        content:
          application/json:
            schema:
              type: object
              properties:
                query:
                  type: string
                  description: The search query string
                top_k:
                  type: integer
                  description: Number of top results to return. Maximum is 3.
                category:
                  type: string
                  description: The category to filter on, on top of similarity search (used for metadata filtering). Possible values are {categories}.
      responses:
        '200':
          description: A JSON response with the search results
          content:
            application/json:
              schema:
                type: object
                properties:
                  items:
                    type: array
                    items:
                      type: object
                      properties:
                        text:
                          type: string
                          example: "Learn how to turn text into numbers, unlocking use cases like search..."
                        title:
                          type: string
                          example: "embeddings.txt"
                        distance:
                          type: number
                          format: float
                          example: 0.484939891778730
                        category:
                          type: string
                          example: "models"
"""
print(spec)
pyperclip.copy(spec)
print("OpenAPI spec copied to clipboard")

创建GPT指令

请根据需要自由修改指令。查看我们的文档此处获取一些提示工程技巧。

instructions = f'''
You are an OpenAI docs assistant. You have an action in your knowledge base where you can make a POST request to search for information. The POST request should always include: {{
    "query": "<user_query>",
    "k_": <integer>,
    "category": <string, but optional>
}}. Your goal is to assist users by performing searches using this POST request and providing them with relevant information based on the query.

You must only include knowledge you get from your action in your response.
The category must be from the following list: {categories}, which you should determine based on the user's query. If you cannot determine, then do not include the category in the POST request.
'''
pyperclip.copy(instructions)
print("GPT Instructions copied to clipboard")
print(instructions)

回顾

我们现已通过以下步骤成功将GCP BigQuery向量搜索与ChatGPT中的GPT Actions集成:

  1. 使用OpenAI的嵌入技术嵌入文档,同时利用gpt-4o添加一些额外的元数据。
  2. 将数据上传至GCP BigQuery(原始数据和嵌入向量)
  3. 在GCP Functions上创建了一个端点来获取这些数据
  4. 将其整合到一个自定义GPT中。

我们的GPT现在可以检索信息来帮助回答用户查询,使其更加准确并针对我们的数据进行定制。以下是GPT的实际应用示例:

gcp-rag-quickstart-gpt.png