Skip to content

异步DSPy编程

DSPy 提供对异步编程的原生支持,让您能够构建更高效和可扩展的应用程序。本指南将带您了解如何在 DSPy 中利用异步功能,涵盖内置模块和自定义实现。

为什么在DSPy中使用异步?

DSPy中的异步编程提供以下优势: - 通过并发操作提升性能 - 更优的资源利用率 - 减少I/O密集型操作的等待时间 - 增强处理多请求的可扩展性

何时使用同步或异步?

在DSPy中选择同步还是异步编程取决于你的具体用例。 以下指南可帮助你做出正确选择:

在以下情况下使用同步编程

  • 你正在探索或原型化新想法
  • 你正在进行研究或实验
  • 您正在构建中小型应用程序
  • 你需要更简单、更直接的代码
  • 您希望更轻松的调试和错误追踪

在以下情况下使用异步编程:

  • 您正在构建一个高吞吐量服务(高QPS)
  • 你正在使用仅支持异步操作的工具
  • 你需要高效处理多个并发请求
  • 您正在构建一个需要高可扩展性的生产服务

重要注意事项

虽然异步编程提供了性能优势,但也带来了一些权衡:

  • 更复杂的错误处理和调试
  • 存在细微、难以追踪的bug的可能性
  • 更复杂的代码结构
  • ipython(Colab、Jupyter lab、Databricks notebooks等)与普通python运行时的代码差异。

我们建议在大多数开发场景中从同步编程开始,仅在明确需要其优势时才切换到异步编程。这种方法使您能够在处理异步编程的额外复杂性之前专注于应用程序的核心逻辑。

异步使用内置模块

大多数DSPy内置模块通过acall()方法支持异步操作。该方法保持与同步__call__方法相同的接口,但以异步方式运行。

以下是一个使用 dspy.Predict 的基础示例:

import dspy
import asyncio
import os

os.environ["OPENAI_API_KEY"] = "your_api_key"

dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
predict = dspy.Predict("question->answer")

async def main():
    # Use acall() for async execution
    output = await predict.acall(question="why did a chicken cross the kitchen?")
    print(output)


asyncio.run(main())

使用异步工具

DSPy的Tool类无缝集成异步函数。当你提供一个异步函数给dspy.Tool时,你可以使用acall()来执行它。这对于I/O密集型操作或与外部服务交互特别有用。

import asyncio
import dspy
import os

os.environ["OPENAI_API_KEY"] = "your_api_key"

async def foo(x):
    # Simulate an async operation
    await asyncio.sleep(0.1)
    print(f"I get: {x}")

# Create a tool from the async function
tool = dspy.Tool(foo)

async def main():
    # Execute the tool asynchronously
    await tool.acall(x=2)

asyncio.run(main())

注意:当使用 dspy.ReAct 与工具时,调用 ReAct 实例上的 acall() 方法将自动使用工具的 acall() 方法异步执行所有工具。

创建自定义异步DSPy模块

要创建您自己的异步DSPy模块,请实现aforward()方法而不是forward()。该方法应包含您模块的异步逻辑。以下是一个自定义模块的示例,该模块串联了两个异步操作:

import dspy
import asyncio
import os

os.environ["OPENAI_API_KEY"] = "your_api_key"
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))

class MyModule(dspy.Module):
    def __init__(self):
        self.predict1 = dspy.ChainOfThought("question->answer")
        self.predict2 = dspy.ChainOfThought("answer->simplified_answer")

    async def aforward(self, question, **kwargs):
        # Execute predictions sequentially but asynchronously
        answer = await self.predict1.acall(question=question)
        return await self.predict2.acall(answer=answer)


async def main():
    mod = MyModule()
    result = await mod.acall(question="Why did a chicken cross the kitchen?")
    print(result)


asyncio.run(main())
优云智算