使用Azure OpenAI和基于GPU的KNN嵌入文本
Azure OpenAI 服务可以通过提示完成 API 来解决大量的自然语言任务。为了使您的提示工作流从少量示例扩展到大型示例数据集变得更加容易,我们将 Azure OpenAI 服务与分布式机器学习库 Spark Rapids ML 集成在一起。这种集成使得使用 Apache Spark 分布式计算框架处理数百万个提示变得容易。本教程展示了如何应用大型语言模型为大型文本数据集生成嵌入。此演示基于带有 NVIDIA GPU 加速 KNN 的“快速入门 - OpenAI 嵌入”笔记本。
注意: 使用演示数据集运行笔记本(步骤4)将生成与基于CPU的“快速入门 - OpenAI嵌入”笔记本相同的结果。要看到GPU加速,您需要针对更大的嵌入运行查询。 例如,运行100K行的数据集将提供6倍的加速,并且在2个节点的NVIDIA T4集群上消耗的内存比AMD Epic(Rome)2个节点的CPU集群少10倍。
步骤1:先决条件
本快速入门的关键先决条件包括一个可用的Azure OpenAI资源,以及一个安装了SynapseML的Apache Spark集群。我们建议创建一个Synapse工作区,但Azure Databricks、HDInsight、Kubernetes上的Spark,甚至带有pyspark包的Python环境也可以工作。
- 一个 Azure OpenAI 资源 – 在创建资源之前,请在此请求访问
- 创建一个Synapse工作区
- 创建一个无服务器的 Apache Spark 池
步骤2:将此指南导入为笔记本
下一步是将此代码添加到您的Spark集群中。您可以在Spark平台上创建一个笔记本并将代码复制到此笔记本中以运行演示。或者下载笔记本并将其导入Synapse Analytics。
- 下载此演示作为笔记本(点击Raw,然后保存文件)
- 将笔记本导入到Synapse工作区,或者如果使用Databricks,则导入到Databricks工作区
- 在您的集群上安装SynapseML。请参阅SynapseML网站底部的安装说明。请注意,这需要在您刚刚导入的笔记本顶部粘贴一个额外的单元格
- 要在Databricks上运行笔记本,请添加相关的初始化脚本(/tools/init_scripts/init-rapidsml-cuda-11.8.sh)
- 将您的笔记本连接到集群,并按照以下步骤操作,编辑并运行下面的单元格。
步骤3:填写您的服务信息
接下来,请编辑笔记本中的单元格以指向您的服务。特别是设置service_name、deployment_name、location和key变量以匹配您的OpenAI服务的相应值。
from synapse.ml.core.platform import find_secret
# Fill in the following lines with your service information
# Learn more about selecting which embedding model to choose: https://openai.com/blog/new-and-improved-embedding-model
service_name = "synapseml-openai-2"
deployment_name_embeddings = "text-embedding-ada-002"
key = find_secret(
secret_name="openai-api-key-2", keyvault="mmlspark-build-keys"
) # please replace this with your key as a string
assert key is not None and service_name is not None
步骤4:加载数据
在这个演示中,我们将探索一个关于美食评论的数据集
import pyspark.sql.functions as F
df = (
spark.read.options(inferSchema="True", delimiter=",", header=True)
.csv("wasbs://publicwasb@mmlspark.blob.core.windows.net/fine_food_reviews_1k.csv")
.repartition(5)
)
df = df.withColumn(
"combined",
F.format_string("Title: %s; Content: %s", F.trim(df.Summary), F.trim(df.Text)),
)
display(df)
步骤5:生成嵌入
我们将首先使用SynapseML OpenAIEmbedding客户端为评论生成嵌入。
from synapse.ml.services.openai import OpenAIEmbedding
embedding = (
OpenAIEmbedding()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name_embeddings)
.setCustomServiceName(service_name)
.setTextCol("combined")
.setErrorCol("error")
.setOutputCol("embeddings")
)
completed_df = embedding.transform(df).cache()
display(completed_df)
步骤6:降低嵌入维度以进行可视化
我们使用t-SNE分解将维度降低到2维。
import pandas as pd
from sklearn.manifold import TSNE
import numpy as np
collected = list(completed_df.collect())
matrix = np.array([[r["embeddings"]] for r in collected])[:, 0, :].astype(np.float64)
scores = np.array([[r["Score"]] for r in collected]).reshape(-1)
tsne = TSNE(n_components=2, perplexity=15, random_state=42, init="pca")
vis_dims = tsne.fit_transform(matrix)
vis_dims.shape
步骤7:绘制嵌入
我们现在使用t-SNE将嵌入的维度从1536降低到2。一旦嵌入被降低到二维,我们可以在二维散点图中绘制它们。我们根据星级评分对每个评论进行着色,从红色表示负面评论,到绿色表示正面评论。我们可以在降低到二维的情况下观察到良好的数据分离。
import matplotlib.pyplot as plt
import matplotlib
import numpy as np
colors = ["red", "darkorange", "gold", "turquoise", "darkgreen"]
x = [x for x, y in vis_dims]
y = [y for x, y in vis_dims]
color_indices = scores - 1
colormap = matplotlib.colors.ListedColormap(colors)
plt.scatter(x, y, c=color_indices, cmap=colormap, alpha=0.3)
for score in [0, 1, 2, 3, 4]:
avg_x = np.array(x)[scores - 1 == score].mean()
avg_y = np.array(y)[scores - 1 == score].mean()
color = colors[score]
plt.scatter(avg_x, avg_y, marker="x", color=color, s=100)
plt.title("Amazon ratings visualized in language using t-SNE")
步骤8:构建针对嵌入的查询
注意:文档和查询数据框中ID列的数据类型应相同。对于某些OpenAI模型,用户应使用不同的模型来嵌入文档和查询。这些模型分别由"-doc"和"-query"后缀表示。
from pyspark.ml import PipelineModel
embedding_query = (
OpenAIEmbedding()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name_embeddings)
.setCustomServiceName(service_name)
.setTextCol("query")
.setErrorCol("error")
.setOutputCol("embeddings")
)
query_df = (
spark.createDataFrame(
[
(
0,
"desserts",
),
(
1,
"disgusting",
),
]
)
.toDF("id", "query")
.withColumn("id", F.col("id").cast("int"))
)
embedding_query_df = (
embedding_query.transform(query_df).select("id", "embeddings").cache()
)
步骤9:拟合KNN模型
使用fit方法构建KNN模型
from spark_rapids_ml.knn import NearestNeighbors
rapids_knn = NearestNeighbors(k=10)
rapids_knn.setInputCol("embeddings").setIdCol("id")
rapids_knn_model = rapids_knn.fit(completed_df.select("id", "embeddings"))
步骤10:检索查询结果
找到k个最近的邻居
(_, _, knn_df) = rapids_knn_model.kneighbors(embedding_query_df)
display(knn_df)