推荐:设置MLflow追踪以了解底层运行情况。
MLflow DSPy 集成¶
MLflow 是一个与 DSPy 原生集成的 LLMOps 工具,提供可解释性和实验追踪功能。在本教程中,您可以使用 MLflow 将提示和优化进度可视化为追踪记录,以更好地理解 DSPy 的行为。您只需按照以下四个步骤即可轻松设置 MLflow。
- 安装 MLflow
%pip install mlflow>=2.20
- 在单独的终端中启动 MLflow UI
mlflow ui --port 5000
- 将笔记本连接到MLflow
import mlflow
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("DSPy")
- 启用追踪。
mlflow.dspy.autolog()
要了解更多关于集成的信息,请访问MLflow DSPy Documentation。
在本教程中,我们将演示新的实验性 dspy.SIMBA 提示优化器,它对于较大的LLM和更困难的任务往往非常强大。使用这个,我们将把我们的智能体从35%的准确率提升到60%。
In [1]:
Copied!
import dspy
import ujson
import random
gpt4o = dspy.LM("openai/gpt-4o", temperature=0.7)
dspy.configure(lm=gpt4o)
导入 dspy
导入 ujson
导入 random
gpt4o = dspy.LM("openai/gpt-4o", temperature=0.7)
dspy.configure(lm=gpt4o)
现在我们来下载数据。
In [2]:
Copied!
from dspy.utils import download
download("https://huggingface.co/datasets/bytedance-research/ToolHop/resolve/main/data/ToolHop.json")
data = ujson.load(open("ToolHop.json"))
random.Random(0).shuffle(data)
from dspy.utils import download
download("https://huggingface.co/datasets/bytedance-research/ToolHop/resolve/main/data/ToolHop.json")
data = ujson.load(open("ToolHop.json"))
random.Random(0).shuffle(data)
Downloading 'ToolHop.json'...
接下来让我们准备一组清理过的示例。ToolHop任务的有趣之处在于,智能体会获得一组独特的工具(函数),每个请求单独使用。因此,它需要学会在实践中如何有效使用任何此类工具。
在 [3] 中:
Copied!
import re
import inspect
examples = []
fns2code = {}
def finish(answer: str):
"""Conclude the trajectory and return the final answer."""
return answer
for datapoint in data:
func_dict = {}
for func_code in datapoint["functions"]:
cleaned_code = func_code.rsplit("\n\n# Example usage", 1)[0]
fn_name = re.search(r"^\s*def\s+([a-zA-Z0-9_]+)\s*\(", cleaned_code)
fn_name = fn_name.group(1) if fn_name else None
if not fn_name:
continue
local_vars = {}
exec(cleaned_code, {}, local_vars)
fn_obj = local_vars.get(fn_name)
if callable(fn_obj):
func_dict[fn_name] = fn_obj
assert fn_obj not in fns2code, f"Duplicate function found: {fn_name}"
fns2code[fn_obj] = (fn_name, cleaned_code)
func_dict["finish"] = finish
example = dspy.Example(question=datapoint["question"], answer=datapoint["answer"], functions=func_dict)
examples.append(example.with_inputs("question", "functions"))
trainset, devset, testset = examples[:100], examples[100:400], examples[400:]
import re
import inspect
examples = []
fns2code = {}
def finish(answer: str):
"""结束轨迹并返回最终答案。"""
return answer
for datapoint in data:
func_dict = {}
for func_code in datapoint["functions"]:
cleaned_code = func_code.rsplit("\n\n# 示例用法", 1)[0]
fn_name = re.search(r"^\s*def\s+([a-zA-Z0-9_]+)\s*\(", cleaned_code)
fn_name = fn_name.group(1) if fn_name else None
if not fn_name:
continue
local_vars = {}
exec(cleaned_code, {}, local_vars)
fn_obj = local_vars.get(fn_name)
if callable(fn_obj):
func_dict[fn_name] = fn_obj
assert fn_obj not in fns2code, f"发现重复函数: {fn_name}"
fns2code[fn_obj] = (fn_name, cleaned_code)
func_dict["finish"] = finish
example = dspy.Example(question=datapoint["question"], answer=datapoint["answer"], functions=func_dict)
examples.append(example.with_inputs("question", "functions"))
trainset, devset, testset = examples[:100], examples[100:400], examples[400:]
并为任务定义一些辅助工具。这里,我们将定义metric,它将比原论文中(严格得多):我们期望预测结果(经过标准化后)与真实值完全匹配。我们还会以第二种方式保持严格:我们只允许智能体总共执行5个步骤,以实现高效部署。
In [4]:
Copied!
from func_timeout import func_set_timeout
def wrap_function_with_timeout(fn):
@func_set_timeout(10)
def wrapper(*args, **kwargs):
try:
return {"return_value": fn(*args, **kwargs), "errors": None}
except Exception as e:
return {"return_value": None, "errors": str(e)}
return wrapper
def fn_metadata(func):
signature = inspect.signature(func)
docstring = inspect.getdoc(func) or "No docstring."
return dict(function_name=func.__name__, arguments=str(signature), docstring=docstring)
def metric(example, pred, trace=None):
gold = str(example.answer).rstrip(".0").replace(",", "").lower()
pred = str(pred.answer).rstrip(".0").replace(",", "").lower()
return pred == gold # stricter than the original paper's metric!
evaluate = dspy.Evaluate(devset=devset, metric=metric, num_threads=24, display_progress=True, display_table=0, max_errors=999)
from func_timeout import func_set_timeout
def wrap_function_with_timeout(fn):
@func_set_timeout(10)
def wrapper(*args, **kwargs):
try:
return {"return_value": fn(*args, **kwargs), "errors": None}
except Exception as e:
return {"return_value": None, "errors": str(e)}
return wrapper
def fn_metadata(func):
signature = inspect.signature(func)
docstring = inspect.getdoc(func) or "No docstring."
return dict(function_name=func.__name__, arguments=str(signature), docstring=docstring)
def metric(example, pred, trace=None):
gold = str(example.answer).rstrip(".0").replace(",", "").lower()
pred = str(pred.answer).rstrip(".0").replace(",", "").lower()
return pred == gold # stricter than the original paper's metric!
evaluate = dspy.Evaluate(devset=devset, metric=metric, num_threads=24, display_progress=True, display_table=0, max_errors=999)
现在,让我们定义智能体!我们智能体的核心将基于ReAct循环,其中模型会看到迄今为止的轨迹和可调用的函数集,并决定下一步要调用的工具。
为了保持最终智能体快速,我们会将其max_steps限制为5步。我们还会为每个函数调用设置超时运行。
In [5]:
Copied!
class Agent(dspy.Module):
def __init__(self, max_steps=5):
self.max_steps = max_steps
instructions = "For the final answer, produce short (not full sentence) answers in which you format dates as YYYY-MM-DD, names as Firstname Lastname, and numbers without leading 0s."
signature = dspy.Signature('question, trajectory, functions -> next_selected_fn, args: dict[str, Any]', instructions)
self.react = dspy.ChainOfThought(signature)
def forward(self, question, functions):
tools = {fn_name: fn_metadata(fn) for fn_name, fn in functions.items()}
trajectory = []
for _ in range(self.max_steps):
pred = self.react(question=question, trajectory=trajectory, functions=tools)
selected_fn = pred.next_selected_fn.strip('"').strip("'")
fn_output = wrap_function_with_timeout(functions[selected_fn])(**pred.args)
trajectory.append(dict(reasoning=pred.reasoning, selected_fn=selected_fn, args=pred.args, **fn_output))
if selected_fn == "finish":
break
return dspy.Prediction(answer=fn_output.get("return_value", ''), trajectory=trajectory)
class Agent(dspy.Module):
def __init__(self, max_steps=5):
self.max_steps = max_steps
instructions = "对于最终答案,生成简短(非完整句子)的答案,日期格式为YYYY-MM-DD,姓名为名 姓,数字不带前导零。"
signature = dspy.Signature('question, trajectory, functions -> next_selected_fn, args: dict[str, Any]', instructions)
self.react = dspy.ChainOfThought(signature)
def forward(self, question, functions):
tools = {fn_name: fn_metadata(fn) for fn_name, fn in functions.items()}
trajectory = []
for _ in range(self.max_steps):
pred = self.react(question=question, trajectory=trajectory, functions=tools)
selected_fn = pred.next_selected_fn.strip('"').strip("'")
fn_output = wrap_function_with_timeout(functions[selected_fn])(**pred.args)
trajectory.append(dict(reasoning=pred.reasoning, selected_fn=selected_fn, args=pred.args, **fn_output))
if selected_fn == "finish":
break
return dspy.Prediction(answer=fn_output.get("return_value", ''), trajectory=trajectory)
开箱即用,让我们在开发集上评估由GPT-4o驱动的智能体。
In [6]:
Copied!
agent = Agent()
evaluate(agent)
智能体 = Agent()
evaluate(智能体)
2025/03/23 21:46:10 INFO dspy.evaluate.evaluate: Average Metric: 105.0 / 300 (35.0%)
输出[6]:
35.0
现在,让我们使用dspy.SIMBA来优化智能体,它代表随机自省小批量上升。这个提示优化器接受任意的DSPy程序,比如我们这里的智能体,并按小批量序列进行,寻求对提示指令或少量示例进行逐步改进。
在 [ ]:
Copied!
simba = dspy.SIMBA(metric=metric, max_steps=12, max_demos=10)
optimized_agent = simba.compile(agent, trainset=trainset, seed=6793115)
simba = dspy.SIMBA(metric=metric, max_steps=12, max_demos=10)
optimized_agent = simba.compile(agent, trainset=trainset, seed=6793115)
完成此优化后,我们现在再次评估我们的智能体。我们看到相对增益大幅提升71%,准确率跃升至60%。
在 [8] 中:
Copied!
evaluate(optimized_agent)
评估(optimized_agent)
2025/03/23 21:46:21 INFO dspy.evaluate.evaluate: Average Metric: 182.0 / 300 (60.7%)
输出[8]:
60.67