查询管道聊天引擎¶
通过将查询管道与记忆缓冲区相结合,我们可以设计自己的自定义聊天引擎循环。
In [ ]:
Copied!
%pip install llama-index-core
%pip install llama-index-llms-openai
%pip install llama-index-embeddings-openai
%pip install llama-index-postprocessor-colbert-rerank
%pip install llama-index-readers-web
%pip install llama-index-core
%pip install llama-index-llms-openai
%pip install llama-index-embeddings-openai
%pip install llama-index-postprocessor-colbert-rerank
%pip install llama-index-readers-web
In [ ]:
Copied!
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
索引构建¶
作为测试,我们将索引Anthropic关于工具/函数调用的最新文档。
In [ ]:
Copied!
from llama_index.readers.web import BeautifulSoupWebReader
reader = BeautifulSoupWebReader()
documents = reader.load_data(
["https://docs.anthropic.com/claude/docs/tool-use"]
)
from llama_index.readers.web import BeautifulSoupWebReader
reader = BeautifulSoupWebReader()
documents = reader.load_data(
["https://docs.anthropic.com/claude/docs/tool-use"]
)
如果你检查文档文本,会发现有太多空行,让我们稍微清理一下。
In [ ]:
Copied!
lines = documents[0].text.split("\n")
# remove sections with more than two empty lines in a row
fixed_lines = [lines[0]]
for idx in range(1, len(lines)):
if lines[idx].strip() == "" and lines[idx - 1].strip() == "":
continue
fixed_lines.append(lines[idx])
documents[0].text = "\n".join(fixed_lines)
lines = documents[0].text.split("\n")
# 移除连续出现两个以上空行的段落
fixed_lines = [lines[0]]
for idx in range(1, len(lines)):
if lines[idx].strip() == "" and lines[idx - 1].strip() == "":
continue
fixed_lines.append(lines[idx])
documents[0].text = "\n".join(fixed_lines)
现在,我们可以使用OpenAI嵌入来创建我们的索引。
In [ ]:
Copied!
from llama_index.core import VectorStoreIndex
from llama_index.embeddings.openai import OpenAIEmbedding
index = VectorStoreIndex.from_documents(
documents,
embed_model=OpenAIEmbedding(
model="text-embedding-3-large", embed_batch_size=256
),
)
from llama_index.core import VectorStoreIndex
from llama_index.embeddings.openai import OpenAIEmbedding
index = VectorStoreIndex.from_documents(
documents,
embed_model=OpenAIEmbedding(
model="text-embedding-3-large", embed_batch_size=256
),
)
查询管道构建¶
作为演示,让我们构建一个强大的查询管道,使用HyDE进行检索,Colbert进行重新排序。
In [ ]:
Copied!
from llama_index.core.query_pipeline import (
QueryPipeline,
InputComponent,
ArgPackComponent,
)
from llama_index.core.prompts import PromptTemplate
from llama_index.llms.openai import OpenAI
from llama_index.postprocessor.colbert_rerank import ColbertRerank
# First, we create an input component to capture the user query
input_component = InputComponent()
# Next, we use the LLM to rewrite a user query
rewrite = (
"Please write a query to a semantic search engine using the current conversation.\n"
"\n"
"\n"
"{chat_history_str}"
"\n"
"\n"
"Latest message: {query_str}\n"
'Query:"""\n'
)
rewrite_template = PromptTemplate(rewrite)
llm = OpenAI(
model="gpt-4-turbo-preview",
temperature=0.2,
)
# we will retrieve two times, so we need to pack the retrieved nodes into a single list
argpack_component = ArgPackComponent()
# using that, we will retrieve...
retriever = index.as_retriever(similarity_top_k=6)
# then postprocess/rerank with Colbert
reranker = ColbertRerank(top_n=3)
from llama_index.core.query_pipeline import (
QueryPipeline,
InputComponent,
ArgPackComponent,
)
from llama_index.core.prompts import PromptTemplate
from llama_index.llms.openai import OpenAI
from llama_index.postprocessor.colbert_rerank import ColbertRerank
# 首先,我们创建一个输入组件来捕获用户查询
input_component = InputComponent()
# 接下来,我们使用LLM重写用户查询
rewrite = (
"请根据当前对话内容为语义搜索引擎编写一个查询语句\n"
"\n"
"\n"
"{chat_history_str}"
"\n"
"\n"
"最新消息: {query_str}\n"
'查询语句:"""\n'
)
rewrite_template = PromptTemplate(rewrite)
llm = OpenAI(
model="gpt-4-turbo-preview",
temperature=0.2,
)
# 我们需要进行两次检索,因此需要将检索到的节点打包成单个列表
argpack_component = ArgPackComponent()
# 使用该组件进行检索...
retriever = index.as_retriever(similarity_top_k=6)
# 然后使用Colbert进行后处理/重新排序
reranker = ColbertRerank(top_n=3)
为了利用聊天记录和检索到的节点生成响应,让我们创建一个自定义组件。
In [ ]:
Copied!
# then lastly, we need to create a response using the nodes AND chat history
from typing import Any, Dict, List, Optional
from llama_index.core.bridge.pydantic import Field
from llama_index.core.llms import ChatMessage
from llama_index.core.query_pipeline import CustomQueryComponent
from llama_index.core.schema import NodeWithScore
DEFAULT_CONTEXT_PROMPT = (
"Here is some context that may be relevant:\n"
"-----\n"
"{node_context}\n"
"-----\n"
"Please write a response to the following question, using the above context:\n"
"{query_str}\n"
)
class ResponseWithChatHistory(CustomQueryComponent):
llm: OpenAI = Field(..., description="OpenAI LLM")
system_prompt: Optional[str] = Field(
default=None, description="System prompt to use for the LLM"
)
context_prompt: str = Field(
default=DEFAULT_CONTEXT_PROMPT,
description="Context prompt to use for the LLM",
)
def _validate_component_inputs(
self, input: Dict[str, Any]
) -> Dict[str, Any]:
"""Validate component inputs during run_component."""
# NOTE: this is OPTIONAL but we show you where to do validation as an example
return input
@property
def _input_keys(self) -> set:
"""Input keys dict."""
# NOTE: These are required inputs. If you have optional inputs please override
# `optional_input_keys_dict`
return {"chat_history", "nodes", "query_str"}
@property
def _output_keys(self) -> set:
return {"response"}
def _prepare_context(
self,
chat_history: List[ChatMessage],
nodes: List[NodeWithScore],
query_str: str,
) -> List[ChatMessage]:
node_context = ""
for idx, node in enumerate(nodes):
node_text = node.get_content(metadata_mode="llm")
node_context += f"Context Chunk {idx}:\n{node_text}\n\n"
formatted_context = self.context_prompt.format(
node_context=node_context, query_str=query_str
)
user_message = ChatMessage(role="user", content=formatted_context)
chat_history.append(user_message)
if self.system_prompt is not None:
chat_history = [
ChatMessage(role="system", content=self.system_prompt)
] + chat_history
return chat_history
def _run_component(self, **kwargs) -> Dict[str, Any]:
"""Run the component."""
chat_history = kwargs["chat_history"]
nodes = kwargs["nodes"]
query_str = kwargs["query_str"]
prepared_context = self._prepare_context(
chat_history, nodes, query_str
)
response = llm.chat(prepared_context)
return {"response": response}
async def _arun_component(self, **kwargs: Any) -> Dict[str, Any]:
"""Run the component asynchronously."""
# NOTE: Optional, but async LLM calls are easy to implement
chat_history = kwargs["chat_history"]
nodes = kwargs["nodes"]
query_str = kwargs["query_str"]
prepared_context = self._prepare_context(
chat_history, nodes, query_str
)
response = await llm.achat(prepared_context)
return {"response": response}
response_component = ResponseWithChatHistory(
llm=llm,
system_prompt=(
"You are a Q&A system. You will be provided with the previous chat history, "
"as well as possibly relevant context, to assist in answering a user message."
),
)
# 最后,我们需要利用节点和聊天历史记录来创建响应
from typing import Any, Dict, List, Optional
from llama_index.core.bridge.pydantic import Field
from llama_index.core.llms import ChatMessage
from llama_index.core.query_pipeline import CustomQueryComponent
from llama_index.core.schema import NodeWithScore
DEFAULT_CONTEXT_PROMPT = (
"以下是可能相关的上下文:\n"
"-----\n"
"{node_context}\n"
"-----\n"
"请根据上述上下文回答以下问题:\n"
"{query_str}\n"
)
class ResponseWithChatHistory(CustomQueryComponent):
llm: OpenAI = Field(..., description="OpenAI 大语言模型")
system_prompt: Optional[str] = Field(
default=None, description="用于大语言模型的系统提示"
)
context_prompt: str = Field(
default=DEFAULT_CONTEXT_PROMPT,
description="用于大语言模型的上下文提示",
)
def _validate_component_inputs(
self, input: Dict[str, Any]
) -> Dict[str, Any]:
"""在运行组件时验证输入"""
# 注意:这是可选的,但我们在这里展示如何进行验证作为示例
return input
@property
def _input_keys(self) -> set:
"""输入键字典"""
# 注意:这些是必需的输入。如果有可选输入,请重写
# `optional_input_keys_dict`
return {"chat_history", "nodes", "query_str"}
@property
def _output_keys(self) -> set:
return {"response"}
def _prepare_context(
self,
chat_history: List[ChatMessage],
nodes: List[NodeWithScore],
query_str: str,
) -> List[ChatMessage]:
node_context = ""
for idx, node in enumerate(nodes):
node_text = node.get_content(metadata_mode="llm")
node_context += f"上下文片段 {idx}:\n{node_text}\n\n"
formatted_context = self.context_prompt.format(
node_context=node_context, query_str=query_str
)
user_message = ChatMessage(role="user", content=formatted_context)
chat_history.append(user_message)
if self.system_prompt is not None:
chat_history = [
ChatMessage(role="system", content=self.system_prompt)
] + chat_history
return chat_history
def _run_component(self, **kwargs) -> Dict[str, Any]:
"""运行组件"""
chat_history = kwargs["chat_history"]
nodes = kwargs["nodes"]
query_str = kwargs["query_str"]
prepared_context = self._prepare_context(
chat_history, nodes, query_str
)
response = llm.chat(prepared_context)
return {"response": response}
async def _arun_component(self, **kwargs: Any) -> Dict[str, Any]:
"""异步运行组件"""
# 注意:这是可选的,但实现异步LLM调用很简单
chat_history = kwargs["chat_history"]
nodes = kwargs["nodes"]
query_str = kwargs["query_str"]
prepared_context = self._prepare_context(
chat_history, nodes, query_str
)
response = await llm.achat(prepared_context)
return {"response": response}
response_component = ResponseWithChatHistory(
llm=llm,
system_prompt=(
"你是一个问答系统。你将获得之前的聊天历史记录,"
"以及可能相关的上下文,来协助回答用户消息。"
),
)
创建好我们的模块后,我们可以在查询管道中将它们链接起来。
In [ ]:
Copied!
pipeline = QueryPipeline(
modules={
"input": input_component,
"rewrite_template": rewrite_template,
"llm": llm,
"rewrite_retriever": retriever,
"query_retriever": retriever,
"join": argpack_component,
"reranker": reranker,
"response_component": response_component,
},
verbose=False,
)
# run both retrievers -- once with the hallucinated query, once with the real query
pipeline.add_link(
"input", "rewrite_template", src_key="query_str", dest_key="query_str"
)
pipeline.add_link(
"input",
"rewrite_template",
src_key="chat_history_str",
dest_key="chat_history_str",
)
pipeline.add_link("rewrite_template", "llm")
pipeline.add_link("llm", "rewrite_retriever")
pipeline.add_link("input", "query_retriever", src_key="query_str")
# each input to the argpack component needs a dest key -- it can be anything
# then, the argpack component will pack all the inputs into a single list
pipeline.add_link("rewrite_retriever", "join", dest_key="rewrite_nodes")
pipeline.add_link("query_retriever", "join", dest_key="query_nodes")
# reranker needs the packed nodes and the query string
pipeline.add_link("join", "reranker", dest_key="nodes")
pipeline.add_link(
"input", "reranker", src_key="query_str", dest_key="query_str"
)
# synthesizer needs the reranked nodes and query str
pipeline.add_link("reranker", "response_component", dest_key="nodes")
pipeline.add_link(
"input", "response_component", src_key="query_str", dest_key="query_str"
)
pipeline.add_link(
"input",
"response_component",
src_key="chat_history",
dest_key="chat_history",
)
pipeline = QueryPipeline(
modules={
"input": input_component,
"rewrite_template": rewrite_template,
"llm": llm,
"rewrite_retriever": retriever,
"query_retriever": retriever,
"join": argpack_component,
"reranker": reranker,
"response_component": response_component,
},
verbose=False,
)
# 同时运行两个检索器 - 一个使用幻觉查询,一个使用真实查询
pipeline.add_link(
"input", "rewrite_template", src_key="query_str", dest_key="query_str"
)
pipeline.add_link(
"input",
"rewrite_template",
src_key="chat_history_str",
dest_key="chat_history_str",
)
pipeline.add_link("rewrite_template", "llm")
pipeline.add_link("llm", "rewrite_retriever")
pipeline.add_link("input", "query_retriever", src_key="query_str")
# argpack组件的每个输入都需要一个dest key - 可以是任意值
# 然后,argpack组件会将所有输入打包成一个列表
pipeline.add_link("rewrite_retriever", "join", dest_key="rewrite_nodes")
pipeline.add_link("query_retriever", "join", dest_key="query_nodes")
# 重新排序器需要打包后的节点和查询字符串
pipeline.add_link("join", "reranker", dest_key="nodes")
pipeline.add_link(
"input", "reranker", src_key="query_str", dest_key="query_str"
)
# 合成器需要重新排序后的节点和查询字符串
pipeline.add_link("reranker", "response_component", dest_key="nodes")
pipeline.add_link(
"input", "response_component", src_key="query_str", dest_key="query_str"
)
pipeline.add_link(
"input",
"response_component",
src_key="chat_history",
dest_key="chat_history",
)
让我们测试这个流程以确认它能正常工作!
使用记忆运行管道¶
上述流程使用两个输入——查询字符串和聊天历史记录列表。
查询字符串就是简单的字符串输入/查询。
聊天历史记录是一个ChatMessage对象列表。我们可以使用llama-index中的内存模块直接管理和创建内存!
In [ ]:
Copied!
from llama_index.core.memory import ChatMemoryBuffer
pipeline_memory = ChatMemoryBuffer.from_defaults(token_limit=8000)
从llama_index.core.memory导入ChatMemoryBuffer
pipeline_memory = ChatMemoryBuffer.from_defaults(token_limit=8000)
让我们预先创建一个"聊天会话"并观察其运行过程。
In [ ]:
Copied!
user_inputs = [
"Hello!",
"How does tool-use work with Claude-3 work?",
"What models support it?",
"Thanks, that what I needed to know!",
]
for msg in user_inputs:
# get memory
chat_history = pipeline_memory.get()
# prepare inputs
chat_history_str = "\n".join([str(x) for x in chat_history])
# run pipeline
response = pipeline.run(
query_str=msg,
chat_history=chat_history,
chat_history_str=chat_history_str,
)
# update memory
user_msg = ChatMessage(role="user", content=msg)
pipeline_memory.put(user_msg)
print(str(user_msg))
pipeline_memory.put(response.message)
print(str(response.message))
print()
user_inputs = [
"你好!",
"Claude-3的工具使用是如何工作的?",
"支持哪些模型?",
"谢谢,这正是我想知道的!",
]
for msg in user_inputs:
# 获取记忆
chat_history = pipeline_memory.get()
# 准备输入
chat_history_str = "\n".join([str(x) for x in chat_history])
# 运行管道
response = pipeline.run(
query_str=msg,
chat_history=chat_history,
chat_history_str=chat_history_str,
)
# 更新记忆
user_msg = ChatMessage(role="user", content=msg)
pipeline_memory.put(user_msg)
print(str(user_msg))
pipeline_memory.put(response.message)
print(str(response.message))
print()
user: Hello! assistant: Hello! How can I assist you today? user: How does tool-use work with Claude-3 work? assistant: Tool use with Claude-3 operates under a framework designed to extend the model's capabilities by integrating it with external data sources and functionalities through user-provided tools. This process involves several key steps and considerations to ensure effective tool integration and utilization. Here's a breakdown of how tool use works with Claude-3: 1. **Tool Specification**: Users define tools in the API request, specifying the tool's name, a detailed description of its purpose and behavior, and an input schema that outlines the expected parameters. This schema is crucial for Claude to understand when and how to use the tool correctly. 2. **Decision to Use a Tool**: When Claude-3 receives a user prompt that may benefit from tool use, it assesses whether any available tools can assist with the query or task. This decision is based on the context provided by the user and the detailed descriptions of the tools. 3. **Tool Use Request Formation**: If Claude decides to use a tool, it constructs a properly formatted tool use request. This includes selecting the appropriate tool(s) and determining the necessary inputs based on the user's prompt and the tool's input schema. 4. **Execution of Tool Code**: The actual execution of the tool code occurs on the client side. The system extracts the tool name and input from Claude's tool use request, runs the tool code, and then returns the results to Claude. 5. **Formulating a Response**: After receiving the tool results, Claude uses this information to formulate its final response to the user's original prompt. This step may involve interpreting the tool's output and integrating it into a coherent and informative answer. 6. **Sequential Tool Use**: Claude generally prefers using one tool at a time, using the output of one tool to inform its next action. This sequential approach helps manage dependencies between tools and simplifies the tool use process. 7. **Error Handling and Retries**: If a tool use request is invalid or missing required parameters, Claude can retry the request with the missing information filled in, based on error responses from the client side. However, after a few failed attempts, Claude may stop trying and apologize to the user. 8. **Debugging and Improvement**: Developers are encouraged to debug unexpected tool use behavior by examining Claude's chain of thought output and refining tool descriptions and schemas for clarity and comprehensiveness. By adhering to these steps and best practices, developers can effectively integrate and utilize tools with Claude-3, significantly expanding its capabilities beyond its base knowledge. This framework allows for the creation of complex, agentic orchestrations where Claude can perform a wide variety of tasks, from simple data retrieval to more complex problem-solving scenarios. user: What models support it? assistant: The tool use feature, as described in the provided context, is supported by Claude-3 models, including specific versions like Claude-3 Opus and Haiku. These models are designed to interact with external client-side tools and functions, allowing for a wide variety of tasks to be performed by equipping Claude with custom tools. The context specifically mentions Claude-3 Opus as being capable of handling more complex tool use scenarios, including managing multiple tools simultaneously and better catching missing arguments. Haiku is mentioned for dealing with more straightforward tools, inferring missing parameters when they are not explicitly given. user: Thanks, that what I needed to know! assistant: You're welcome! If you have any more questions or need further assistance, feel free to ask. Happy to help!