Skip to main content
Version: Next

使用LangChain Transformer

LangChain 是一个软件开发框架,旨在简化使用大型语言模型(LLMs)创建应用程序的过程。LangChain 中的链不仅仅是单个 LLM 调用,而是一系列调用的序列(可以是对 LLM 的调用或对其他实用程序的调用),自动执行一系列调用和操作。 为了更容易在大规模数据集上扩展 LangChain 的执行,我们将 LangChain 与分布式机器学习库 SynapseML 集成在一起。这种集成使得使用 Apache Spark 分布式计算框架处理数百万数据变得容易,同时结合 LangChain 框架。

本教程展示了如何大规模应用LangChain进行论文摘要和组织。我们从arxiv链接的表格开始,并应用LangChain Transformer自动提取相应的论文标题、作者、摘要和一些相关工作。

步骤1:先决条件

本快速入门的关键先决条件包括一个可用的Azure OpenAI资源,以及一个安装了SynapseML的Apache Spark集群。我们建议创建一个Synapse工作区,但Azure Databricks、HDInsight、Kubernetes上的Spark,甚至带有pyspark包的Python环境也可以工作。

  1. 一个 Azure OpenAI 资源 – 在创建资源之前,请在此请求访问
  2. 创建一个Synapse工作区
  3. 创建一个无服务器的 Apache Spark 池

步骤2:将此指南导入为笔记本

下一步是将此代码添加到您的Spark集群中。您可以在Spark平台上创建一个笔记本并将代码复制到此笔记本中以运行演示。或者下载笔记本并将其导入Synapse Analytics。

  1. 将笔记本导入Microsoft FabricSynapse Workspace,如果使用Databricks,则导入Databricks Workspace
  2. 在您的集群上安装SynapseML。请参阅SynapseML网站底部的安装说明。请注意,这需要在您刚刚导入的笔记本顶部粘贴一个额外的单元格。
  3. 将您的笔记本连接到集群,并按照以下步骤操作,编辑并运行下面的单元格。
%pip install openai==0.28.1 langchain==0.0.331 pdf2image pdfminer.six unstructured==0.10.24 pytesseract numpy==1.22.4 nltk==3.8.1
import os, openai, langchain, uuid
from langchain.llms import AzureOpenAI, OpenAI
from langchain.agents import load_tools, initialize_agent, AgentType
from langchain.chains import TransformChain, LLMChain, SimpleSequentialChain
from langchain.document_loaders import OnlinePDFLoader
from langchain.tools.bing_search.tool import BingSearchRun, BingSearchAPIWrapper
from langchain.prompts import PromptTemplate
from synapse.ml.services.langchain import LangchainTransformer
from synapse.ml.core.platform import running_on_synapse, find_secret

步骤3:填写服务信息并构建LLM

接下来,请编辑笔记本中的单元格以指向您的服务。特别是设置model_namedeployment_nameopenai_api_baseopen_api_key变量以匹配您的OpenAI服务。请随意将find_secret替换为您的密钥,如下所示

openai_api_key = "99sj2w82o...."

bing_subscription_key = "..."

请注意,您还需要设置您的Bing搜索以获取访问您的Bing搜索订阅密钥

openai_api_key = find_secret(
secret_name="openai-api-key-2", keyvault="mmlspark-build-keys"
)
openai_api_base = "https://synapseml-openai-2.openai.azure.com/"
openai_api_version = "2022-12-01"
openai_api_type = "azure"
deployment_name = "gpt-35-turbo"
bing_search_url = "https://api.bing.microsoft.com/v7.0/search"
bing_subscription_key = find_secret(
secret_name="bing-search-key", keyvault="mmlspark-build-keys"
)

os.environ["BING_SUBSCRIPTION_KEY"] = bing_subscription_key
os.environ["BING_SEARCH_URL"] = bing_search_url
os.environ["OPENAI_API_TYPE"] = openai_api_type
os.environ["OPENAI_API_VERSION"] = openai_api_version
os.environ["OPENAI_API_BASE"] = openai_api_base
os.environ["OPENAI_API_KEY"] = openai_api_key

llm = AzureOpenAI(
deployment_name=deployment_name,
model_name=deployment_name,
temperature=0.1,
verbose=True,
)

第4步:LangChain Transformer的基本用法

创建一个链

我们将从一个简单的链开始演示基本用法,该链为输入单词创建定义

copy_prompt = PromptTemplate(
input_variables=["technology"],
template="Define the following word: {technology}",
)

chain = LLMChain(llm=llm, prompt=copy_prompt)
transformer = (
LangchainTransformer()
.setInputCol("technology")
.setOutputCol("definition")
.setChain(chain)
.setSubscriptionKey(openai_api_key)
.setUrl(openai_api_base)
)

创建数据集并应用链

# construction of test dataframe
df = spark.createDataFrame(
[(0, "docker"), (1, "spark"), (2, "python")], ["label", "technology"]
)
display(transformer.transform(df))

保存和加载LangChain转换器

LangChain Transformers 可以保存和加载。请注意,LangChain 序列化仅适用于没有记忆的链。

temp_dir = "tmp"
if not os.path.exists(temp_dir):
os.mkdir(temp_dir)
path = os.path.join(temp_dir, "langchainTransformer")
transformer.save(path)
loaded = LangchainTransformer.load(path)
display(loaded.transform(df))

步骤5:使用LangChain进行大规模文献综述

创建一个用于论文摘要的顺序链

我们现在将构建一个顺序链,用于从arxiv链接中提取结构化信息。特别是,我们将要求langchain提取论文的标题、作者信息和论文内容的摘要。之后,我们使用一个网络搜索工具来查找第一作者最近撰写的论文。

总结一下,我们的顺序链包含以下步骤:

  1. 转换链: 从arxiv链接中提取论文内容 =>
  2. LLMChain: 总结论文,提取论文标题和作者 =>
  3. 转换链: 用于生成提示 =>
  4. 带有网页搜索工具的代理: 使用网页搜索查找第一作者最近的论文
def paper_content_extraction(inputs: dict) -> dict:
arxiv_link = inputs["arxiv_link"]
loader = OnlinePDFLoader(arxiv_link)
pages = loader.load_and_split()
return {"paper_content": pages[0].page_content + pages[1].page_content}


def prompt_generation(inputs: dict) -> dict:
output = inputs["Output"]
prompt = (
"find the paper title, author, summary in the paper description below, output them. After that, Use websearch to find out 3 recent papers of the first author in the author section below (first author is the first name separated by comma) and list the paper titles in bullet points: <Paper Description Start>\n"
+ output
+ "<Paper Description End>."
)
return {"prompt": prompt}


paper_content_extraction_chain = TransformChain(
input_variables=["arxiv_link"],
output_variables=["paper_content"],
transform=paper_content_extraction,
verbose=False,
)

paper_summarizer_template = """You are a paper summarizer, given the paper content, it is your job to summarize the paper into a short summary, and extract authors and paper title from the paper content.
Here is the paper content:
{paper_content}
Output:
paper title, authors and summary.
"""
prompt = PromptTemplate(
input_variables=["paper_content"], template=paper_summarizer_template
)
summarize_chain = LLMChain(llm=llm, prompt=prompt, verbose=False)

prompt_generation_chain = TransformChain(
input_variables=["Output"],
output_variables=["prompt"],
transform=prompt_generation,
verbose=False,
)

bing = BingSearchAPIWrapper(k=3)
tools = [BingSearchRun(api_wrapper=bing)]
web_search_agent = initialize_agent(
tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=False
)

sequential_chain = SimpleSequentialChain(
chains=[
paper_content_extraction_chain,
summarize_chain,
prompt_generation_chain,
web_search_agent,
]
)

应用LangChain转换器以大规模执行此工作负载

我们现在可以使用LangchainTransformer大规模地使用我们的链

paper_df = spark.createDataFrame(
[
(0, "https://arxiv.org/pdf/2107.13586.pdf"),
(1, "https://arxiv.org/pdf/2101.00190.pdf"),
(2, "https://arxiv.org/pdf/2103.10385.pdf"),
(3, "https://arxiv.org/pdf/2110.07602.pdf"),
],
["label", "arxiv_link"],
)

# construct langchain transformer using the paper summarizer chain define above
paper_info_extractor = (
LangchainTransformer()
.setInputCol("arxiv_link")
.setOutputCol("paper_info")
.setChain(sequential_chain)
.setSubscriptionKey(openai_api_key)
.setUrl(openai_api_base)
)


# extract paper information from arxiv links, the paper information needs to include:
# paper title, paper authors, brief paper summary, and recent papers published by the first author
display(paper_info_extractor.transform(paper_df))