DSPy 速查表
本页面将包含常用模式的代码片段。
DSPy 程序
强制生成新的语言模型输出
DSPy 缓存 LM 调用。提供一个唯一的 rollout_id 并设置非零的 temperature(例如 1.0)以绕过现有缓存条目,同时仍然缓存新结果:
predict = dspy.Predict("question -> answer")
predict(question="1+1", config={"rollout_id": 1, "temperature": 1.0})
dspy.Signature
class BasicQA(dspy.Signature):
"""Answer questions with short factoid answers."""
question: str = dspy.InputField()
answer: str = dspy.OutputField(desc="often between 1 and 5 words")
dspy.ChainOfThought
generate_answer = dspy.ChainOfThought(BasicQA)
# Call the predictor on a particular input alongside a hint.
question='What is the color of the sky?'
pred = generate_answer(question=question)
dspy.ProgramOfThought
pot = dspy.ProgramOfThought(BasicQA)
question = 'Sarah has 5 apples. She buys 7 more apples from the store. How many apples does Sarah have now?'
result = pot(question=question)
print(f"Question: {question}")
print(f"Final Predicted Answer (after ProgramOfThought process): {result.answer}")
dspy.ReAct
react_module = dspy.ReAct(BasicQA)
question = 'Sarah has 5 apples. She buys 7 more apples from the store. How many apples does Sarah have now?'
result = react_module(question=question)
print(f"Question: {question}")
print(f"Final Predicted Answer (after ReAct process): {result.answer}")
dspy.Retrieve
colbertv2_wiki17_abstracts = dspy.ColBERTv2(url='http://20.102.90.50:2017/wiki17_abstracts')
dspy.settings.configure(rm=colbertv2_wiki17_abstracts)
#Define Retrieve Module
retriever = dspy.Retrieve(k=3)
query='When was the first FIFA World Cup held?'
# Call the retriever on a particular query.
topK_passages = retriever(query).passages
for idx, passage in enumerate(topK_passages):
print(f'{idx+1}]', passage, '\n')
dspy.CodeAct
from dspy import CodeAct
def factorial(n):
"""Calculate factorial of n"""
if n == 1:
return 1
return n * factorial(n-1)
act = CodeAct("n->factorial", tools=[factorial])
result = act(n=5)
result # Returns 120
dspy.Parallel
import dspy
parallel = dspy.Parallel(num_threads=2)
predict = dspy.Predict("question -> answer")
result = parallel(
[
(predict, dspy.Example(question="1+1").with_inputs("question")),
(predict, dspy.Example(question="2+2").with_inputs("question"))
]
)
result
DSPy 指标
函数作为指标
要创建自定义指标,你可以创建一个返回数字或布尔值的函数:
def parse_integer_answer(answer, only_first_line=True):
try:
if only_first_line:
answer = answer.strip().split('\n')[0]
# find the last token that has a number in it
answer = [token for token in answer.split() if any(c.isdigit() for c in token)][-1]
answer = answer.split('.')[0]
answer = ''.join([c for c in answer if c.isdigit()])
answer = int(answer)
except (ValueError, IndexError):
# print(answer)
answer = 0
return answer
# Metric Function
def gsm8k_metric(gold, pred, trace=None) -> int:
return int(parse_integer_answer(str(gold.answer))) == int(parse_integer_answer(str(pred.answer)))
LLM作为评判者
class FactJudge(dspy.Signature):
"""Judge if the answer is factually correct based on the context."""
context = dspy.InputField(desc="Context for the prediction")
question = dspy.InputField(desc="Question to be answered")
answer = dspy.InputField(desc="Answer for the question")
factually_correct: bool = dspy.OutputField(desc="Is the answer factually correct based on the context?")
judge = dspy.ChainOfThought(FactJudge)
def factuality_metric(example, pred):
factual = judge(context=example.context, question=example.question, answer=pred.answer)
return factual.factually_correct
DSPy 评估
from dspy.evaluate import Evaluate
evaluate_program = Evaluate(devset=devset, metric=your_defined_metric, num_threads=NUM_THREADS, display_progress=True, display_table=num_rows_to_display)
evaluate_program(your_dspy_program)
DSPy 优化器
LabeledFewShot
from dspy.teleprompt import LabeledFewShot
labeled_fewshot_optimizer = LabeledFewShot(k=8)
your_dspy_program_compiled = labeled_fewshot_optimizer.compile(student = your_dspy_program, trainset=trainset)
BootstrapFewShot
from dspy.teleprompt import BootstrapFewShot
fewshot_optimizer = BootstrapFewShot(metric=your_defined_metric, max_bootstrapped_demos=4, max_labeled_demos=16, max_rounds=1, max_errors=10)
your_dspy_program_compiled = fewshot_optimizer.compile(student = your_dspy_program, trainset=trainset)
使用另一个LM进行编译,在teacher_settings中指定
from dspy.teleprompt import BootstrapFewShot
fewshot_optimizer = BootstrapFewShot(metric=your_defined_metric, max_bootstrapped_demos=4, max_labeled_demos=16, max_rounds=1, max_errors=10, teacher_settings=dict(lm=gpt4))
your_dspy_program_compiled = fewshot_optimizer.compile(student = your_dspy_program, trainset=trainset)
编译已编译的程序 - 引导已引导的程序
your_dspy_program_compiledx2 = teleprompter.compile(
your_dspy_program,
teacher=your_dspy_program_compiled,
trainset=trainset,
)
保存/加载已编译的程序
BootstrapFewShotWithRandomSearch
关于BootstrapFewShotWithRandomSearch的详细文档请参见此处。
from dspy.teleprompt import BootstrapFewShotWithRandomSearch
fewshot_optimizer = BootstrapFewShotWithRandomSearch(metric=your_defined_metric, max_bootstrapped_demos=2, num_candidate_programs=8, num_threads=NUM_THREADS)
your_dspy_program_compiled = fewshot_optimizer.compile(student = your_dspy_program, trainset=trainset, valset=devset)
其他自定义配置与自定义 BootstrapFewShot 优化器类似。
Ensemble
from dspy.teleprompt import BootstrapFewShotWithRandomSearch
from dspy.teleprompt.ensemble import Ensemble
fewshot_optimizer = BootstrapFewShotWithRandomSearch(metric=your_defined_metric, max_bootstrapped_demos=2, num_candidate_programs=8, num_threads=NUM_THREADS)
your_dspy_program_compiled = fewshot_optimizer.compile(student = your_dspy_program, trainset=trainset, valset=devset)
ensemble_optimizer = Ensemble(reduce_fn=dspy.majority)
programs = [x[-1] for x in your_dspy_program_compiled.candidate_programs]
your_dspy_program_compiled_ensemble = ensemble_optimizer.compile(programs[:3])
BootstrapFinetune
from dspy.teleprompt import BootstrapFewShotWithRandomSearch, BootstrapFinetune
#Compile program on current dspy.settings.lm
fewshot_optimizer = BootstrapFewShotWithRandomSearch(metric=your_defined_metric, max_bootstrapped_demos=2, num_threads=NUM_THREADS)
your_dspy_program_compiled = tp.compile(your_dspy_program, trainset=trainset[:some_num], valset=trainset[some_num:])
#Configure model to finetune
config = dict(target=model_to_finetune, epochs=2, bf16=True, bsize=6, accumsteps=2, lr=5e-5)
#Compile program on BootstrapFinetune
finetune_optimizer = BootstrapFinetune(metric=your_defined_metric)
finetune_program = finetune_optimizer.compile(your_dspy_program, trainset=some_new_dataset_for_finetuning_model, **config)
finetune_program = your_dspy_program
#Load program and activate model's parameters in program before evaluation
ckpt_path = "saved_checkpoint_path_from_finetuning"
LM = dspy.HFModel(checkpoint=ckpt_path, model=model_to_finetune)
for p in finetune_program.predictors():
p.lm = LM
p.activated = False
COPRO
关于COPRO的详细文档可以查看这里。
from dspy.teleprompt import COPRO
eval_kwargs = dict(num_threads=16, display_progress=True, display_table=0)
copro_teleprompter = COPRO(prompt_model=model_to_generate_prompts, metric=your_defined_metric, breadth=num_new_prompts_generated, depth=times_to_generate_prompts, init_temperature=prompt_generation_temperature, verbose=False)
compiled_program_optimized_signature = copro_teleprompter.compile(your_dspy_program, trainset=trainset, eval_kwargs=eval_kwargs)
MIPROv2
注意:详细文档可查看此处。MIPROv2是MIPRO的最新扩展版本,包含以下更新:(1) 指令提议的改进和(2) 通过小批量处理实现更高效的搜索。
使用MIPROv2进行优化
这展示了如何使用auto=light进行简单的开箱即用运行,它会为你配置许多超参数并执行轻量级优化运行。你也可以选择设置auto=medium或auto=heavy来执行更长时间的优化运行。更详细的MIPROv2文档这里还提供了关于如何手动设置超参数的更多信息。
# Import the optimizer
from dspy.teleprompt import MIPROv2
# Initialize optimizer
teleprompter = MIPROv2(
metric=gsm8k_metric,
auto="light", # Can choose between light, medium, and heavy optimization runs
)
# Optimize program
print(f"Optimizing program with MIPRO...")
optimized_program = teleprompter.compile(
program.deepcopy(),
trainset=trainset,
max_bootstrapped_demos=3,
max_labeled_demos=4,
)
# Save optimize program for future use
optimized_program.save(f"mipro_optimized")
# Evaluate optimized program
print(f"Evaluate optimized program...")
evaluate(optimized_program, devset=devset[:])
仅使用MIPROv2优化指令(零样本)
# Import the optimizer
from dspy.teleprompt import MIPROv2
# Initialize optimizer
teleprompter = MIPROv2(
metric=gsm8k_metric,
auto="light", # Can choose between light, medium, and heavy optimization runs
)
# Optimize program
print(f"Optimizing program with MIPRO...")
optimized_program = teleprompter.compile(
program.deepcopy(),
trainset=trainset,
max_bootstrapped_demos=0,
max_labeled_demos=0,
)
# Save optimize program for future use
optimized_program.save(f"mipro_optimized")
# Evaluate optimized program
print(f"Evaluate optimized program...")
evaluate(optimized_program, devset=devset[:])
KNNFewShot
from sentence_transformers import SentenceTransformer
from dspy import Embedder
from dspy.teleprompt import KNNFewShot
from dspy import ChainOfThought
knn_optimizer = KNNFewShot(k=3, trainset=trainset, vectorizer=Embedder(SentenceTransformer("all-MiniLM-L6-v2").encode))
qa_compiled = knn_optimizer.compile(student=ChainOfThought("question -> answer"))
BootstrapFewShotWithOptuna
from dspy.teleprompt import BootstrapFewShotWithOptuna
fewshot_optuna_optimizer = BootstrapFewShotWithOptuna(metric=your_defined_metric, max_bootstrapped_demos=2, num_candidate_programs=8, num_threads=NUM_THREADS)
your_dspy_program_compiled = fewshot_optuna_optimizer.compile(student=your_dspy_program, trainset=trainset, valset=devset)
其他自定义配置类似于自定义dspy.BootstrapFewShot优化器。
SIMBA
SIMBA,全称为随机内省小批量上升法,是一种提示优化器,能够接受任意的dspy程序,并通过一系列小批量处理逐步改进提示指令或少量示例。
from dspy.teleprompt import SIMBA
simba = SIMBA(metric=your_defined_metric, max_steps=12, max_demos=10)
optimized_program = simba.compile(student=your_dspy_program, trainset=trainset)
DSPy 工具与实用程序
dspy.Tool
import dspy
def search_web(query: str) -> str:
"""Search the web for information"""
return f"Search results for: {query}"
tool = dspy.Tool(search_web)
result = tool(query="Python programming")
dspy.streamify
import dspy
import asyncio
predict = dspy.Predict("question->answer")
stream_predict = dspy.streamify(
predict,
stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")],
)
async def read_output_stream():
output_stream = stream_predict(question="Why did a chicken cross the kitchen?")
async for chunk in output_stream:
print(chunk)
asyncio.run(read_output_stream())
dspy.asyncify
import dspy
dspy_program = dspy.ChainOfThought("question -> answer")
dspy_program = dspy.asyncify(dspy_program)
asyncio.run(dspy_program(question="What is DSPy"))
追踪使用情况
import dspy
dspy.settings.configure(track_usage=True)
result = dspy.ChainOfThought(BasicQA)(question="What is 2+2?")
print(f"Token usage: {result.get_lm_usage()}")
dspy.configure_cache
import dspy
# Configure cache settings
dspy.configure_cache(
enable_disk_cache=False,
enable_memory_cache=False,
)
DSPy Refine 和 BestofN
dspy.Suggest和dspy.Assert在 DSPy 2.6 中被dspy.Refine和dspy.BestofN取代。
最佳N选
使用不同的rollout ID(绕过缓存)运行模块最多N次,并返回由reward_fn定义的最佳预测,或第一个通过threshold的预测。
import dspy
qa = dspy.ChainOfThought("question -> answer")
def one_word_answer(args, pred):
return 1.0 if len(pred.answer) == 1 else 0.0
best_of_3 = dspy.BestOfN(module=qa, N=3, reward_fn=one_word_answer, threshold=1.0)
best_of_3(question="What is the capital of Belgium?").answer
# Brussels
精炼
通过使用不同的rollout ID(绕过缓存)运行一个模块最多N次,并根据reward_fn定义的规则返回最佳预测,或者返回第一个通过threshold的预测。在每次尝试(除了最后一次)之后,Refine会自动生成关于模块性能的详细反馈,并将这些反馈用作后续运行的提示,从而创建一个迭代优化过程。
import dspy
qa = dspy.ChainOfThought("question -> answer")
def one_word_answer(args, pred):
return 1.0 if len(pred.answer) == 1 else 0.0
best_of_3 = dspy.Refine(module=qa, N=3, reward_fn=one_word_answer, threshold=1.0)
best_of_3(question="What is the capital of Belgium?").answer
# Brussels
错误处理
默认情况下,Refine会尝试运行模块最多N次,直到满足阈值。如果模块遇到错误,它会继续尝试最多N次失败。你可以通过将fail_count设置为比N更小的数字来改变此行为。
refine = dspy.Refine(module=qa, N=3, reward_fn=one_word_answer, threshold=1.0, fail_count=1)
...
refine(question="What is the capital of Belgium?")
# If we encounter just one failed attempt, the module will raise an error.
如果你想在没有错误处理的情况下运行模块最多N次,可以设置fail_count为N。这是默认行为。