Skip to content

dspy.CodeAct

dspy.CodeAct(signature: str | type[Signature], tools: list[Callable], max_iters: int = 5, interpreter: PythonInterpreter | None = None)

基类: ReAct, ProgramOfThought

CodeAct 是一个利用代码解释器和预定义工具来解决问题的模块。

使用指定的模型、温度参数和最大令牌数初始化 CodeAct 类。

参数:

名称 类型 描述 默认值
signature Union[str, Type[Signature]]

模块的签名。

必填
tools list[Callable]

要使用的工具可调用对象。CodeAct 仅接受函数,不接受可调用对象。

必填
max_iters int

生成答案的最大迭代次数。

5
interpreter PythonInterpreter | None

要使用的PythonInterpreter实例。如果为None,则实例化一个新的。

None

示例:

from dspy.predict import CodeAct
def factorial(n):
    if n == 1:
        return 1
    return n * factorial(n-1)

act = CodeAct("n->factorial", tools=[factorial])
act(n=5) # 120

Source code in dspy/predict/code_act.py
def __init__(self, signature: str | type[Signature], tools: list[Callable], max_iters: int = 5, interpreter: PythonInterpreter | None = None):
    """
    Initializes the CodeAct class with the specified model, temperature, and max tokens.

    Args:
        signature (Union[str, Type[Signature]]): The signature of the module.
        tools (list[Callable]): The tool callables to be used. CodeAct only accepts functions and not callable objects.
        max_iters (int): The maximum number of iterations to generate the answer.
        interpreter: PythonInterpreter instance to use. If None, a new one is instantiated.
    Example:
        ```python
        from dspy.predict import CodeAct
        def factorial(n):
            if n == 1:
                return 1
            return n * factorial(n-1)

        act = CodeAct("n->factorial", tools=[factorial])
        act(n=5) # 120
        ```
    """
    self.signature = ensure_signature(signature)
    self.max_iters = max_iters
    self.history = []

    tools = [t if isinstance(t, Tool) else Tool(t) for t in tools]
    if any(
        not inspect.isfunction(tool.func) for tool in tools
    ):
        raise ValueError("CodeAct only accepts functions and not callable objects.")
    tools = {tool.name: tool for tool in tools}

    instructions = self._build_instructions(self.signature, tools)

    codeact_signature = (
        dspy.Signature({**self.signature.input_fields}, "\n".join(instructions))
        .append("trajectory", dspy.InputField(), type_=str)
        .append("generated_code", dspy.OutputField(desc="Python code that when executed, produces output relevant to answering the question"), type_=str)
        .append("finished", dspy.OutputField(desc="a boolean flag to determine if the process is done"), type_=bool)
    )

    extract_signature = dspy.Signature(
        {**self.signature.input_fields, **self.signature.output_fields},
        self.signature.instructions,
    ).append("trajectory", dspy.InputField(), type_=str)

    self.tools: dict[str, Tool] = tools
    self.codeact = dspy.Predict(codeact_signature)
    self.extractor = dspy.ChainOfThought(extract_signature)
    # It will raises exception when dspy cannot find available deno instance by now.
    self.interpreter = interpreter or PythonInterpreter()

函数

__call__(*args, **kwargs) -> Prediction

Source code in dspy/primitives/module.py
@with_callbacks
def __call__(self, *args, **kwargs) -> Prediction:
    caller_modules = settings.caller_modules or []
    caller_modules = list(caller_modules)
    caller_modules.append(self)

    with settings.context(caller_modules=caller_modules):
        if settings.track_usage and thread_local_overrides.get().get("usage_tracker") is None:
            with track_usage() as usage_tracker:
                output = self.forward(*args, **kwargs)
            tokens = usage_tracker.get_total_tokens()

            # Some optimizers (e.g., GEPA bootstrap tracing) temporarily patch
            # module.forward to return a tuple: (prediction, trace).
            # When usage tracking is enabled, ensure we attach usage to the
            # prediction object if present.
            prediction_in_output = None
            if isinstance(output, Prediction):
                prediction_in_output = output
            elif isinstance(output, tuple) and len(output) > 0 and isinstance(output[0], Prediction):
                prediction_in_output = output[0]
            if not prediction_in_output:
                raise ValueError("No prediction object found in output to call set_lm_usage on.")

            prediction_in_output.set_lm_usage(tokens)
            return output

        return self.forward(*args, **kwargs)

batch(examples: list[Example], num_threads: int | None = None, max_errors: int | None = None, return_failed_examples: bool = False, provide_traceback: bool | None = None, disable_progress_bar: bool = False) -> list[Example] | tuple[list[Example], list[Example], list[Exception]]

使用Parallel模块并行处理dspy.Example实例列表。

参数:

名称 类型 描述 默认值
examples list[Example]

要处理的dspy.Example实例列表。

必填
num_threads int | None

用于并行处理的线程数量。

None
max_errors int | None

Maximum number of errors allowed before stopping execution. If None, inherits from dspy.settings.max_errors.

None
return_failed_examples bool

是否返回失败的示例和异常。

False
provide_traceback bool | None

是否在错误日志中包含回溯信息。

None
disable_progress_bar bool

是否显示进度条。

False

返回:

类型 描述
list[Example] | tuple[list[Example], list[Example], list[Exception]]

结果列表,以及可选的失败示例和异常。

Source code in dspy/primitives/module.py
def batch(
    self,
    examples: list[Example],
    num_threads: int | None = None,
    max_errors: int | None = None,
    return_failed_examples: bool = False,
    provide_traceback: bool | None = None,
    disable_progress_bar: bool = False,
) -> list[Example] | tuple[list[Example], list[Example], list[Exception]]:
    """
    Processes a list of dspy.Example instances in parallel using the Parallel module.

    Args:
        examples: List of dspy.Example instances to process.
        num_threads: Number of threads to use for parallel processing.
        max_errors: Maximum number of errors allowed before stopping execution.
            If ``None``, inherits from ``dspy.settings.max_errors``.
        return_failed_examples: Whether to return failed examples and exceptions.
        provide_traceback: Whether to include traceback information in error logs.
        disable_progress_bar: Whether to display the progress bar.

    Returns:
        List of results, and optionally failed examples and exceptions.
    """
    # Create a list of execution pairs (self, example)
    exec_pairs = [(self, example.inputs()) for example in examples]

    # Create an instance of Parallel
    parallel_executor = Parallel(
        num_threads=num_threads,
        max_errors=max_errors,
        return_failed_examples=return_failed_examples,
        provide_traceback=provide_traceback,
        disable_progress_bar=disable_progress_bar,
    )

    # Execute the forward method of Parallel
    if return_failed_examples:
        results, failed_examples, exceptions = parallel_executor.forward(exec_pairs)
        return results, failed_examples, exceptions
    else:
        results = parallel_executor.forward(exec_pairs)
        return results

deepcopy()

深拷贝模块。

这是对默认Python深拷贝的一个调整,仅对self.parameters()进行深拷贝,而对于其他属性,我们只进行浅拷贝。

Source code in dspy/primitives/base_module.py
def deepcopy(self):
    """Deep copy the module.

    This is a tweak to the default python deepcopy that only deep copies `self.parameters()`, and for other
    attributes, we just do the shallow copy.
    """
    try:
        # If the instance itself is copyable, we can just deep copy it.
        # Otherwise we will have to create a new instance and copy over the attributes one by one.
        return copy.deepcopy(self)
    except Exception:
        pass

    # Create an empty instance.
    new_instance = self.__class__.__new__(self.__class__)
    # Set attribuetes of the copied instance.
    for attr, value in self.__dict__.items():
        if isinstance(value, BaseModule):
            setattr(new_instance, attr, value.deepcopy())
        else:
            try:
                # Try to deep copy the attribute
                setattr(new_instance, attr, copy.deepcopy(value))
            except Exception:
                logging.warning(
                    f"Failed to deep copy attribute '{attr}' of {self.__class__.__name__}, "
                    "falling back to shallow copy or reference copy."
                )
                try:
                    # Fallback to shallow copy if deep copy fails
                    setattr(new_instance, attr, copy.copy(value))
                except Exception:
                    # If even the shallow copy fails, we just copy over the reference.
                    setattr(new_instance, attr, value)

    return new_instance

dump_state(json_mode=True)

Source code in dspy/primitives/base_module.py
def dump_state(self, json_mode=True):
    return {name: param.dump_state(json_mode=json_mode) for name, param in self.named_parameters()}

get_lm()

Source code in dspy/primitives/module.py
def get_lm(self):
    all_used_lms = [param.lm for _, param in self.named_predictors()]

    if len(set(all_used_lms)) == 1:
        return all_used_lms[0]

    raise ValueError("Multiple LMs are being used in the module. There's no unique LM to return.")

inspect_history(n: int = 1)

Source code in dspy/primitives/module.py
def inspect_history(self, n: int = 1):
    return pretty_print_history(self.history, n)

load(path)

加载已保存的模块。如果您想加载整个程序,而不仅仅是现有程序的状态,您可能还想查看 dspy.load。

参数:

名称 类型 描述 默认值
path str

保存状态文件的路径,应为 .json 或 .pkl 文件

必填
Source code in dspy/primitives/base_module.py
def load(self, path):
    """Load the saved module. You may also want to check out dspy.load, if you want to
    load an entire program, not just the state for an existing program.

    Args:
        path (str): Path to the saved state file, which should be a .json or a .pkl file
    """
    path = Path(path)

    if path.suffix == ".json":
        with open(path, "rb") as f:
            state = orjson.loads(f.read())
    elif path.suffix == ".pkl":
        with open(path, "rb") as f:
            state = cloudpickle.load(f)
    else:
        raise ValueError(f"`path` must end with `.json` or `.pkl`, but received: {path}")

    dependency_versions = get_dependency_versions()
    saved_dependency_versions = state["metadata"]["dependency_versions"]
    for key, saved_version in saved_dependency_versions.items():
        if dependency_versions[key] != saved_version:
            logger.warning(
                f"There is a mismatch of {key} version between saved model and current environment. "
                f"You saved with `{key}=={saved_version}`, but now you have "
                f"`{key}=={dependency_versions[key]}`. This might cause errors or performance downgrade "
                "on the loaded model, please consider loading the model in the same environment as the "
                "saving environment."
            )
    self.load_state(state)

load_state(state)

Source code in dspy/primitives/base_module.py
def load_state(self, state):
    for name, param in self.named_parameters():
        param.load_state(state[name])

map_named_predictors(func)

对所有命名预测器应用一个函数。

Source code in dspy/primitives/module.py
def map_named_predictors(self, func):
    """Applies a function to all named predictors."""
    for name, predictor in self.named_predictors():
        set_attribute_by_name(self, name, func(predictor))
    return self

named_parameters()

与PyTorch不同,它也能处理(非递归的)参数列表。

Source code in dspy/primitives/base_module.py
def named_parameters(self):
    """
    Unlike PyTorch, handles (non-recursive) lists of parameters too.
    """

    import dspy
    from dspy.predict.parameter import Parameter

    visited = set()
    named_parameters = []

    def add_parameter(param_name, param_value):
        if isinstance(param_value, Parameter):
            if id(param_value) not in visited:
                visited.add(id(param_value))
                named_parameters.append((param_name, param_value))

        elif isinstance(param_value, dspy.Module):
            # When a sub-module is pre-compiled, keep it frozen.
            if not getattr(param_value, "_compiled", False):
                for sub_name, param in param_value.named_parameters():
                    add_parameter(f"{param_name}.{sub_name}", param)

    if isinstance(self, Parameter):
        add_parameter("self", self)

    for name, value in self.__dict__.items():
        if isinstance(value, Parameter):
            add_parameter(name, value)

        elif isinstance(value, dspy.Module):
            # When a sub-module is pre-compiled, keep it frozen.
            if not getattr(value, "_compiled", False):
                for sub_name, param in value.named_parameters():
                    add_parameter(f"{name}.{sub_name}", param)

        elif isinstance(value, (list, tuple)):
            for idx, item in enumerate(value):
                add_parameter(f"{name}[{idx}]", item)

        elif isinstance(value, dict):
            for key, item in value.items():
                add_parameter(f"{name}['{key}']", item)

    return named_parameters

named_predictors()

Source code in dspy/primitives/module.py
def named_predictors(self):
    from dspy.predict.predict import Predict

    return [(name, param) for name, param in self.named_parameters() if isinstance(param, Predict)]

named_sub_modules(type_=None, skip_compiled=False) -> Generator[tuple[str, BaseModule], None, None]

查找模块中的所有子模块及其名称。

假设 self.children[4]['key'].sub_module 是一个子模块。那么名称将是 children[4]['key'].sub_module。但如果该子模块可以通过不同路径访问,则只会返回其中一个路径。

Source code in dspy/primitives/base_module.py
def named_sub_modules(self, type_=None, skip_compiled=False) -> Generator[tuple[str, "BaseModule"], None, None]:
    """Find all sub-modules in the module, as well as their names.

    Say `self.children[4]['key'].sub_module` is a sub-module. Then the name will be
    `children[4]['key'].sub_module`. But if the sub-module is accessible at different
    paths, only one of the paths will be returned.
    """
    if type_ is None:
        type_ = BaseModule

    queue = deque([("self", self)])
    seen = {id(self)}

    def add_to_queue(name, item):
        if id(item) not in seen:
            seen.add(id(item))
            queue.append((name, item))

    while queue:
        name, item = queue.popleft()

        if isinstance(item, type_):
            yield name, item

        if isinstance(item, BaseModule):
            if skip_compiled and getattr(item, "_compiled", False):
                continue
            for sub_name, sub_item in item.__dict__.items():
                add_to_queue(f"{name}.{sub_name}", sub_item)

        elif isinstance(item, (list, tuple)):
            for i, sub_item in enumerate(item):
                add_to_queue(f"{name}[{i}]", sub_item)

        elif isinstance(item, dict):
            for key, sub_item in item.items():
                add_to_queue(f"{name}[{key}]", sub_item)

parameters()

Source code in dspy/primitives/base_module.py
def parameters(self):
    return [param for _, param in self.named_parameters()]

predictors()

Source code in dspy/primitives/module.py
def predictors(self):
    return [param for _, param in self.named_predictors()]

reset_copy()

深度复制模块并重置所有参数。

Source code in dspy/primitives/base_module.py
def reset_copy(self):
    """Deep copy the module and reset all parameters."""
    new_instance = self.deepcopy()

    for param in new_instance.parameters():
        param.reset()

    return new_instance

save(path, save_program=False, modules_to_serialize=None)

保存模块。

将模块保存到目录或文件。有两种模式: - save_program=False: 仅将模块的状态保存为json或pickle文件,具体取决于文件扩展名的值。 - save_program=True: 通过cloudpickle将整个模块保存到目录,其中包含模型的状态和架构。

如果 save_program=True 并且提供了 modules_to_serialize,它将使用 cloudpickle 的 register_pickle_by_value 注册这些模块进行序列化。 这会使 cloudpickle 按值而不是按引用序列化模块,确保模块与保存的程序一起完全保留。当您有需要与程序一起序列化的自定义模块时,这非常有用。 如果为 None,则不会注册任何模块进行序列化。

我们还会保存依赖版本,以便加载的模型可以检查是否存在关键依赖或dspy版本的版本不匹配问题。

参数:

名称 类型 描述 默认值
path str

保存状态文件的路径,当save_program=False时,应为.json或.pkl文件, 当save_program=True时,应为目录。

必填
save_program bool

如果为True,则通过cloudpickle将整个模块保存到目录,否则仅保存状态。

False
modules_to_serialize list

一个模块列表,用于通过 cloudpickle 的 register_pickle_by_value 进行序列化。 如果为 None,则不会注册任何模块进行序列化。

None
Source code in dspy/primitives/base_module.py
def save(self, path, save_program=False, modules_to_serialize=None):
    """Save the module.

    Save the module to a directory or a file. There are two modes:
    - `save_program=False`: Save only the state of the module to a json or pickle file, based on the value of
        the file extension.
    - `save_program=True`: Save the whole module to a directory via cloudpickle, which contains both the state and
        architecture of the model.

    If `save_program=True` and `modules_to_serialize` are provided, it will register those modules for serialization
    with cloudpickle's `register_pickle_by_value`. This causes cloudpickle to serialize the module by value rather
    than by reference, ensuring the module is fully preserved along with the saved program. This is useful
    when you have custom modules that need to be serialized alongside your program. If None, then no modules
    will be registered for serialization.

    We also save the dependency versions, so that the loaded model can check if there is a version mismatch on
    critical dependencies or DSPy version.

    Args:
        path (str): Path to the saved state file, which should be a .json or .pkl file when `save_program=False`,
            and a directory when `save_program=True`.
        save_program (bool): If True, save the whole module to a directory via cloudpickle, otherwise only save
            the state.
        modules_to_serialize (list): A list of modules to serialize with cloudpickle's `register_pickle_by_value`.
            If None, then no modules will be registered for serialization.

    """
    metadata = {}
    metadata["dependency_versions"] = get_dependency_versions()
    path = Path(path)

    if save_program:
        if path.suffix:
            raise ValueError(
                f"`path` must point to a directory without a suffix when `save_program=True`, but received: {path}"
            )
        if path.exists() and not path.is_dir():
            raise NotADirectoryError(f"The path '{path}' exists but is not a directory.")

        if not path.exists():
            # Create the directory (and any parent directories)
            path.mkdir(parents=True)

        try:
            modules_to_serialize = modules_to_serialize or []
            for module in modules_to_serialize:
                cloudpickle.register_pickle_by_value(module)

            with open(path / "program.pkl", "wb") as f:
                cloudpickle.dump(self, f)
        except Exception as e:
            raise RuntimeError(
                f"Saving failed with error: {e}. Please remove the non-picklable attributes from your DSPy program, "
                "or consider using state-only saving by setting `save_program=False`."
            )
        with open(path / "metadata.json", "wb") as f:
            f.write(orjson.dumps(metadata, option=orjson.OPT_INDENT_2 | orjson.OPT_APPEND_NEWLINE))

        return

    if path.suffix == ".json":
        state = self.dump_state()
        state["metadata"] = metadata
        try:
            with open(path, "wb") as f:
                f.write(orjson.dumps(state, option=orjson.OPT_INDENT_2 | orjson.OPT_APPEND_NEWLINE))
        except Exception as e:
            raise RuntimeError(
                f"Failed to save state to {path} with error: {e}. Your DSPy program may contain non "
                "json-serializable objects, please consider saving the state in .pkl by using `path` ending "
                "with `.pkl`, or saving the whole program by setting `save_program=True`."
            )
    elif path.suffix == ".pkl":
        state = self.dump_state(json_mode=False)
        state["metadata"] = metadata
        with open(path, "wb") as f:
            cloudpickle.dump(state, f)
    else:
        raise ValueError(f"`path` must end with `.json` or `.pkl` when `save_program=False`, but received: {path}")

set_lm(lm)

Source code in dspy/primitives/module.py
def set_lm(self, lm):
    for _, param in self.named_predictors():
        param.lm = lm

CodeAct

CodeAct 是一个 DSPy 模块,它结合了代码生成和工具执行来解决问题。它生成使用提供的工具和 Python 标准库来完成任务的 Python 代码片段。

基本用法

以下是一个使用CodeAct的简单示例:

import dspy
from dspy.predict import CodeAct

# Define a simple tool function
def factorial(n: int) -> int:
    """Calculate the factorial of a number."""
    if n == 1:
        return 1
    return n * factorial(n-1)

# Create a CodeAct instance
act = CodeAct("n->factorial_result", tools=[factorial])

# Use the CodeAct instance
result = act(n=5)
print(result) # Will calculate factorial(5) = 120

工作原理

CodeAct以迭代方式运行:

  1. 接收输入参数和可用工具
  2. 生成使用这些工具的Python代码片段
  3. 使用Python沙箱执行代码
  4. 收集输出并判断任务是否完成
  5. 根据收集的信息回答原始问题

⚠️ 限制

仅接受纯函数作为工具(不接受可调用对象)

以下示例由于使用了可调用对象而无法工作。

# ❌ NG
class Add():
    def __call__(self, a: int, b: int):
        return a + b

dspy.CodeAct("question -> answer", tools=[Add()])

无法使用外部库

以下示例由于使用了外部库 numpy 而无法工作。

# ❌ NG
import numpy as np

def exp(i: int):
    return np.exp(i)

dspy.CodeAct("question -> answer", tools=[exp])

所有依赖函数都需要传递给CodeAct

依赖于其他函数或类但未传递给CodeAct的函数无法使用。以下示例无法工作,因为工具函数依赖于其他未传递给CodeAct的函数或类,例如Profilesecret_function

# ❌ NG
from pydantic import BaseModel

class Profile(BaseModel):
    name: str
    age: int

def age(profile: Profile):
    return 

def parent_function():
    print("Hi!")

def child_function():
    parent_function()

dspy.CodeAct("question -> answer", tools=[age, child_function])

相反,以下示例有效,因为所有必要的工具函数都已传递给 CodeAct

# ✅ OK

def parent_function():
    print("Hi!")

def child_function():
    parent_function()

dspy.CodeAct("question -> answer", tools=[parent_function, child_function])
优云智算