Azure OpenAI 用于大数据
Azure OpenAI 服务可以通过提示完成 API 来解决大量的自然语言任务。为了使您的提示工作流从少量示例扩展到大型示例数据集变得更加容易,我们将 Azure OpenAI 服务与分布式机器学习库 SynapseML 集成在一起。这种集成使得使用 Apache Spark 分布式计算框架处理数百万个提示变得简单。本教程展示了如何使用 Azure OpenAI 在分布式规模上应用大型语言模型。
先决条件
本快速入门的关键先决条件包括一个可用的Azure OpenAI资源,以及一个安装了SynapseML的Apache Spark集群。我们建议创建一个Synapse工作区,但Azure Databricks、HDInsight、Kubernetes上的Spark,甚至带有pyspark包的Python环境也可以工作。
- 一个 Azure OpenAI 资源 – 在创建资源之前,请先申请访问
- 创建一个Synapse工作区
- 创建一个无服务器的 Apache Spark 池
将此指南作为笔记本导入
下一步是将此代码添加到您的Spark集群中。您可以在Spark平台上创建一个笔记本并将代码复制到此笔记本中以运行演示。或者下载笔记本并将其导入Synapse Analytics。
- 下载此演示作为笔记本(选择原始,然后保存文件)
- 导入笔记本。
- 如果您使用的是Synapse Analytics 进入Synapse工作区
- 如果您使用的是Databricks 导入到Databricks工作区
- 如果您使用的是Fabric 导入到Fabric工作区
- 在您的集群上安装SynapseML。请参阅SynapseML网站底部的安装说明。
- 如果您正在使用Fabric,请查看安装指南。这需要在您导入的笔记本顶部粘贴一个额外的单元格。
将您的笔记本连接到集群并跟随操作,编辑和运行单元格。
填写服务信息
接下来,编辑笔记本中的单元格以指向您的服务。特别是设置service_name、deployment_name、location和key变量,使它们与您的OpenAI服务匹配:
from synapse.ml.core.platform import find_secret
# Fill in the following lines with your service information
# Learn more about selecting which embedding model to choose: https://openai.com/blog/new-and-improved-embedding-model
service_name = "synapseml-openai-2"
deployment_name = "gpt-35-turbo"
deployment_name_embeddings = "text-embedding-ada-002"
key = find_secret(
secret_name="openai-api-key-2", keyvault="mmlspark-build-keys"
) # please replace this line with your key as a string
assert key is not None and service_name is not None
创建一个提示数据集
接下来,创建一个由一系列行组成的数据框,每行一个提示。
你也可以直接从ADLS或其他数据库加载数据。有关加载和准备Spark数据框的更多信息,请参阅Apache Spark数据加载指南。
df = spark.createDataFrame(
[
("Hello my name is",),
("The best code is code thats",),
("SynapseML is ",),
]
).toDF("prompt")
创建OpenAICompletion Apache Spark客户端
要将OpenAI Completion服务应用于您创建的数据框,请创建一个OpenAICompletion对象,该对象作为分布式客户端。服务的参数可以通过单个值设置,也可以通过数据框的列使用OpenAICompletion对象上的适当设置器进行设置。这里我们将maxTokens设置为200。一个标记大约有四个字符,此限制适用于提示和结果的总和。我们还使用数据框中提示列的名称设置了promptCol参数。
from synapse.ml.services.openai import OpenAICompletion
completion = (
OpenAICompletion()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setCustomServiceName(service_name)
.setMaxTokens(200)
.setPromptCol("prompt")
.setErrorCol("error")
.setOutputCol("completions")
)
使用OpenAICompletion客户端转换数据框
在创建数据框和完成客户端之后,您可以转换输入数据集并添加一个名为completions的列,其中包含服务添加的所有信息。为了简化,只选择文本。
from pyspark.sql.functions import col
completed_df = completion.transform(df).cache()
display(
completed_df.select(
col("prompt"),
col("error"),
col("completions.choices.text").getItem(0).alias("text"),
)
)
您的输出应该看起来像这样。完成文本将与示例不同。
| 提示 | 错误 | 文本 |
|---|---|---|
| 你好,我的名字是 | null | Makaveli,我十八岁,我想长大后成为一名说唱歌手,我喜欢写作和制作音乐,我来自加利福尼亚州洛杉矶 |
| 最好的代码是 | null | 易于理解的代码。这是一个主观的陈述,没有明确的答案。 |
| SynapseML 是 | null | 一种能够学习如何预测事件未来结果的机器学习算法。 |
更多使用示例
生成文本嵌入
除了完成文本,我们还可以嵌入文本以供下游算法或向量检索架构使用。创建嵌入允许您从大量文档中搜索和检索文档,并且可以在提示工程不足以完成任务时使用。
有关使用OpenAIEmbedding的更多信息,请参阅我们的嵌入指南。
from synapse.ml.services.openai import OpenAIEmbedding
embedding = (
OpenAIEmbedding()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name_embeddings)
.setCustomServiceName(service_name)
.setTextCol("prompt")
.setErrorCol("error")
.setOutputCol("embeddings")
)
display(embedding.transform(df))
聊天完成
诸如ChatGPT和GPT-4等模型能够理解聊天内容,而不仅仅是单个提示。OpenAIChatCompletion 转换器大规模地展示了这一功能。
from synapse.ml.services.openai import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *
def make_message(role, content):
return Row(role=role, content=content, name=role)
chat_df = spark.createDataFrame(
[
(
[
make_message(
"system", "You are an AI chatbot with red as your favorite color"
),
make_message("user", "Whats your favorite color"),
],
),
(
[
make_message("system", "You are very excited"),
make_message("user", "How are you today"),
],
),
]
).toDF("messages")
chat_completion = (
OpenAIChatCompletion()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setCustomServiceName(service_name)
.setMessagesCol("messages")
.setErrorCol("error")
.setOutputCol("chat_completions")
)
display(
chat_completion.transform(chat_df).select(
"messages", "chat_completions.choices.message.content"
)
)
通过请求批处理提高吞吐量
该示例向服务发出了多个请求,每个提示一个请求。要在单个请求中完成多个提示,请使用批处理模式。首先,在OpenAICompletion对象中,不要将Prompt列设置为“Prompt”,而是为BatchPrompt列指定“batchPrompt”。 为此,创建一个每行包含提示列表的数据框。
截至撰写本文时,单个请求中目前有20个提示的限制,以及2048个“令牌”或大约1500个单词的硬性限制。
batch_df = spark.createDataFrame(
[
(["The time has come", "Pleased to", "Today stocks", "Here's to"],),
(["The only thing", "Ask not what", "Every litter", "I am"],),
]
).toDF("batchPrompt")
接下来我们创建OpenAICompletion对象。如果您的列是Array[String]类型,请设置batchPrompt列而不是prompt列。
batch_completion = (
OpenAICompletion()
.setSubscriptionKey(key)
.setDeploymentName(deployment_name)
.setCustomServiceName(service_name)
.setMaxTokens(200)
.setBatchPromptCol("batchPrompt")
.setErrorCol("error")
.setOutputCol("completions")
)
在调用transform时,每行都会发出一个请求。由于单行中有多个提示,每个请求都会发送该行中的所有提示。结果中包含请求中每行的结果。
completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)
使用自动小批量处理器
如果你的数据是列格式的,你可以使用SynapseML的FixedMiniBatcherTransformer将其转置为行格式。
from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI
completed_autobatch_df = (
df.coalesce(
1
) # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets
.mlTransform(FixedMiniBatchTransformer(batchSize=4))
.withColumnRenamed("prompt", "batchPrompt")
.mlTransform(batch_completion)
)
display(completed_autobatch_df)
翻译的提示工程
Azure OpenAI 服务可以通过提示工程解决许多不同的自然语言任务。在这里,我们展示了一个语言翻译提示的示例:
translate_df = spark.createDataFrame(
[
("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
(
"French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
),
]
).toDF("prompt")
display(completion.transform(translate_df))
问答提示
在这里,我们提示GPT-3进行常识问答:
qa_df = spark.createDataFrame(
[
(
"Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
)
]
).toDF("prompt")
display(completion.transform(qa_df))