Skip to main content
Version: Next

教程:创建一个自定义搜索引擎和问答系统

在本教程中,学习如何索引和查询从Spark集群加载的大数据。您将设置一个Jupyter Notebook,执行以下操作:

  • 将各种表单(发票)加载到Apache Spark会话中的数据框中
  • 分析它们以确定其特征
  • 将结果输出组装成表格数据结构
  • 将输出写入托管在Azure Cognitive Search中的搜索索引
  • 探索和查询您创建的内容

1 - 设置依赖项

我们首先导入包并连接到在此工作流程中使用的Azure资源。

%pip install openai==0.28.1
from synapse.ml.core.platform import find_secret

cognitive_key = find_secret(
secret_name="ai-services-api-key", keyvault="mmlspark-build-keys"
) # Replace the call to find_secret with your key as a python string. e.g. cognitive_key="27snaiw..."
cognitive_location = "eastus"

translator_key = find_secret(
secret_name="translator-key", keyvault="mmlspark-build-keys"
) # Replace the call to find_secret with your key as a python string.
translator_location = "eastus"

search_key = find_secret(
secret_name="azure-search-key", keyvault="mmlspark-build-keys"
) # Replace the call to find_secret with your key as a python string.
search_service = "mmlspark-azure-search"
search_index = "form-demo-index-5"

openai_key = find_secret(
secret_name="openai-api-key-2", keyvault="mmlspark-build-keys"
) # Replace the call to find_secret with your key as a python string.
openai_service_name = "synapseml-openai-2"
openai_deployment_name = "gpt-35-turbo"
openai_url = f"https://{openai_service_name}.openai.azure.com/"

2 - 将数据加载到Spark

此代码从用于演示目的的Azure存储账户加载一些外部文件。这些文件是各种发票,它们被读入数据框。

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


def blob_to_url(blob):
[prefix, postfix] = blob.split("@")
container = prefix.split("/")[-1]
split_postfix = postfix.split("/")
account = split_postfix[0]
filepath = "/".join(split_postfix[1:])
return "https://{}/{}/{}".format(account, container, filepath)


df2 = (
spark.read.format("binaryFile")
.load("wasbs://publicwasb@mmlspark.blob.core.windows.net/form_subset/*")
.select("path")
.limit(10)
.select(udf(blob_to_url, StringType())("path").alias("url"))
.cache()
)

display(df2)

3 - 应用表单识别

此代码加载AnalyzeInvoices transformer并传递包含发票的数据框的引用。它调用Azure Forms Analyzer的预构建发票模型。

from synapse.ml.services.form import AnalyzeInvoices

analyzed_df = (
AnalyzeInvoices()
.setSubscriptionKey(cognitive_key)
.setLocation(cognitive_location)
.setImageUrlCol("url")
.setOutputCol("invoices")
.setErrorCol("errors")
.setConcurrency(5)
.transform(df2)
.cache()
)

display(analyzed_df)

4 - 简化表单识别输出

此代码使用FormOntologyLearner,这是一个转换器,用于分析Form Recognizer转换器(用于Azure AI文档智能)的输出并推断出表格数据结构。AnalyzeInvoices的输出是动态的,并根据在内容中检测到的功能而变化。

FormOntologyLearner 扩展了 AnalyzeInvoices 转换器的功能,通过寻找可用于创建表格数据结构的模式。将输出组织成多列和多行,使得下游分析更加简单。

from synapse.ml.services.form import FormOntologyLearner

organized_df = (
FormOntologyLearner()
.setInputCol("invoices")
.setOutputCol("extracted")
.fit(analyzed_df)
.transform(analyzed_df)
.select("url", "extracted.*")
.cache()
)

display(organized_df)

使用我们漂亮的表格数据框,我们可以用一些SparkSQL来展平表单中的嵌套表格

from pyspark.sql.functions import explode, col

itemized_df = (
organized_df.select("*", explode(col("Items")).alias("Item"))
.drop("Items")
.select("Item.*", "*")
.drop("Item")
)

display(itemized_df)

5 - 添加翻译

此代码加载Translate,这是一个在Azure AI服务中调用Azure AI翻译服务的转换器。原始文本,即“Description”列中的英文文本,被机器翻译成多种语言。所有输出都整合到“output.translations”数组中。

from synapse.ml.services.translate import Translate

translated_df = (
Translate()
.setSubscriptionKey(translator_key)
.setLocation(translator_location)
.setTextCol("Description")
.setErrorCol("TranslationError")
.setOutputCol("output")
.setToLanguage(["zh-Hans", "fr", "ru", "cy"])
.setConcurrency(5)
.transform(itemized_df)
.withColumn("Translations", col("output.translations")[0])
.drop("output", "TranslationError")
.cache()
)

display(translated_df)

6 - 使用OpenAI将产品翻译成表情符号 🤯

from synapse.ml.services.openai import OpenAIPrompt
from pyspark.sql.functions import trim, split

emoji_template = """
Your job is to translate item names into emoji. Do not add anything but the emoji and end the translation with a comma

Two Ducks: 🦆🦆,
Light Bulb: 💡,
Three Peaches: 🍑🍑🍑,
Two kitchen stoves: ♨️♨️,
A red car: 🚗,
A person and a cat: 🧍🐈,
A {Description}: """

prompter = (
OpenAIPrompt()
.setSubscriptionKey(openai_key)
.setDeploymentName(openai_deployment_name)
.setUrl(openai_url)
.setMaxTokens(5)
.setPromptTemplate(emoji_template)
.setErrorCol("error")
.setOutputCol("Emoji")
)

emoji_df = (
prompter.transform(translated_df)
.withColumn("Emoji", trim(split(col("Emoji"), ",").getItem(0)))
.drop("error", "prompt")
.cache()
)
display(emoji_df.select("Description", "Emoji"))

7 - 使用OpenAI推断供应商地址所在的大陆

continent_template = """
Which continent does the following address belong to?

Pick one value from Europe, Australia, North America, South America, Asia, Africa, Antarctica.

Dont respond with anything but one of the above. If you don't know the answer or cannot figure it out from the text, return None. End your answer with a comma.

Address: "6693 Ryan Rd, North Whales",
Continent: Europe,
Address: "6693 Ryan Rd",
Continent: None,
Address: "{VendorAddress}",
Continent:"""

continent_df = (
prompter.setOutputCol("Continent")
.setPromptTemplate(continent_template)
.transform(emoji_df)
.withColumn("Continent", trim(split(col("Continent"), ",").getItem(0)))
.drop("error", "prompt")
.cache()
)
display(continent_df.select("VendorAddress", "Continent"))

8 - 为表单创建Azure搜索索引

from synapse.ml.services import *
from pyspark.sql.functions import monotonically_increasing_id, lit

(
continent_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
.withColumn("SearchAction", lit("upload"))
.writeToAzureSearch(
subscriptionKey=search_key,
actionCol="SearchAction",
serviceName=search_service,
indexName=search_index,
keyCol="DocID",
)
)

9 - 尝试一个搜索查询

import requests

search_url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2019-05-06".format(
search_service, search_index
)
requests.post(
search_url, json={"search": "door"}, headers={"api-key": search_key}
).json()

10 - 构建一个可以使用 Azure Search 作为工具的聊天机器人 🧠🔧

import json
import openai

openai.api_type = "azure"
openai.api_base = openai_url
openai.api_key = openai_key
openai.api_version = "2023-03-15-preview"

chat_context_prompt = f"""
You are a chatbot designed to answer questions with the help of a search engine that has the following information:

{continent_df.columns}

If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be brief. If you need to use the search engine to solve the please output a json in the form of {{"query": "example_query"}}
"""


def search_query_prompt(question):
return f"""
Given the search engine above, what would you search for to answer the following question?

Question: "{question}"

Please output a json in the form of {{"query": "example_query"}}
"""


def search_result_prompt(query):
search_results = requests.post(
search_url, json={"search": query}, headers={"api-key": search_key}
).json()
return f"""

You previously ran a search for "{query}" which returned the following results:

{search_results}

You should use the results to help you answer questions. If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be Brief and mention which query you used to solve the problem.
"""


def prompt_gpt(messages):
response = openai.ChatCompletion.create(
engine=openai_deployment_name, messages=messages, max_tokens=None, top_p=0.95
)
return response["choices"][0]["message"]["content"]


def custom_chatbot(question):
while True:
try:
query = json.loads(
prompt_gpt(
[
{"role": "system", "content": chat_context_prompt},
{"role": "user", "content": search_query_prompt(question)},
]
)
)["query"]

return prompt_gpt(
[
{"role": "system", "content": chat_context_prompt},
{"role": "system", "content": search_result_prompt(query)},
{"role": "user", "content": question},
]
)
except Exception as e:
raise e

11 - 向我们的聊天机器人提问

custom_chatbot("What did Luke Diaz buy?")

12 - 快速双重检查

display(
continent_df.where(col("CustomerName") == "Luke Diaz")
.select("Description")
.distinct()
)