Skip to content

dspy.Predict

dspy.Predict(signature: str | type[Signature], callbacks: list[BaseCallback] | None = None, **config)

基类: Module, Parameter

基础DSPy模块,使用语言模型将输入映射到输出。

参数:

名称 类型 描述 默认值
signature str | type[Signature]

描述任务的输入/输出签名。

必填
callbacks list[BaseCallback] | None

用于插装的可选回调函数列表。

None
**config

默认关键字参数转发给底层语言模型。这些值可以通过在调用模块时传递一个config字典来覆盖单次调用。例如:

predict = dspy.Predict("q -> a", rollout_id=1, temperature=1.0)
predict(q="What is 1 + 52?", config={"rollout_id": 2, "temperature": 1.0})
{}
Source code in dspy/predict/predict.py
def __init__(self, signature: str | type[Signature], callbacks: list[BaseCallback] | None = None, **config):
    super().__init__(callbacks=callbacks)
    self.stage = random.randbytes(8).hex()
    self.signature = ensure_signature(signature)
    self.config = config
    self.reset()

函数

__call__(*args, **kwargs)

Source code in dspy/predict/predict.py
def __call__(self, *args, **kwargs):
    if args:
        raise ValueError(self._get_positional_args_error_message())

    return super().__call__(**kwargs)

acall(*args, **kwargs) async

Source code in dspy/predict/predict.py
async def acall(self, *args, **kwargs):
    if args:
        raise ValueError(self._get_positional_args_error_message())

    return await super().acall(**kwargs)

aforward(**kwargs) async

Source code in dspy/predict/predict.py
async def aforward(self, **kwargs):
    lm, config, signature, demos, kwargs = self._forward_preprocess(**kwargs)

    adapter = settings.adapter or ChatAdapter()
    if self._should_stream():
        with settings.context(caller_predict=self):
            completions = await adapter.acall(lm, lm_kwargs=config, signature=signature, demos=demos, inputs=kwargs)
    else:
        with settings.context(send_stream=None):
            completions = await adapter.acall(lm, lm_kwargs=config, signature=signature, demos=demos, inputs=kwargs)

    return self._forward_postprocess(completions, signature, **kwargs)

batch(examples: list[Example], num_threads: int | None = None, max_errors: int | None = None, return_failed_examples: bool = False, provide_traceback: bool | None = None, disable_progress_bar: bool = False) -> list[Example] | tuple[list[Example], list[Example], list[Exception]]

使用Parallel模块并行处理dspy.Example实例列表。

参数:

名称 类型 描述 默认值
examples list[Example]

要处理的dspy.Example实例列表。

必填
num_threads int | None

用于并行处理的线程数量。

None
max_errors int | None

Maximum number of errors allowed before stopping execution. If None, inherits from dspy.settings.max_errors.

None
return_failed_examples bool

是否返回失败的示例和异常。

False
provide_traceback bool | None

是否在错误日志中包含回溯信息。

None
disable_progress_bar bool

是否显示进度条。

False

返回:

类型 描述
list[Example] | tuple[list[Example], list[Example], list[Exception]]

结果列表,以及可选的失败示例和异常。

Source code in dspy/primitives/module.py
def batch(
    self,
    examples: list[Example],
    num_threads: int | None = None,
    max_errors: int | None = None,
    return_failed_examples: bool = False,
    provide_traceback: bool | None = None,
    disable_progress_bar: bool = False,
) -> list[Example] | tuple[list[Example], list[Example], list[Exception]]:
    """
    Processes a list of dspy.Example instances in parallel using the Parallel module.

    Args:
        examples: List of dspy.Example instances to process.
        num_threads: Number of threads to use for parallel processing.
        max_errors: Maximum number of errors allowed before stopping execution.
            If ``None``, inherits from ``dspy.settings.max_errors``.
        return_failed_examples: Whether to return failed examples and exceptions.
        provide_traceback: Whether to include traceback information in error logs.
        disable_progress_bar: Whether to display the progress bar.

    Returns:
        List of results, and optionally failed examples and exceptions.
    """
    # Create a list of execution pairs (self, example)
    exec_pairs = [(self, example.inputs()) for example in examples]

    # Create an instance of Parallel
    parallel_executor = Parallel(
        num_threads=num_threads,
        max_errors=max_errors,
        return_failed_examples=return_failed_examples,
        provide_traceback=provide_traceback,
        disable_progress_bar=disable_progress_bar,
    )

    # Execute the forward method of Parallel
    if return_failed_examples:
        results, failed_examples, exceptions = parallel_executor.forward(exec_pairs)
        return results, failed_examples, exceptions
    else:
        results = parallel_executor.forward(exec_pairs)
        return results

deepcopy()

深拷贝模块。

这是对默认Python深拷贝的一个调整,仅对self.parameters()进行深拷贝,而对于其他属性,我们只进行浅拷贝。

Source code in dspy/primitives/base_module.py
def deepcopy(self):
    """Deep copy the module.

    This is a tweak to the default python deepcopy that only deep copies `self.parameters()`, and for other
    attributes, we just do the shallow copy.
    """
    try:
        # If the instance itself is copyable, we can just deep copy it.
        # Otherwise we will have to create a new instance and copy over the attributes one by one.
        return copy.deepcopy(self)
    except Exception:
        pass

    # Create an empty instance.
    new_instance = self.__class__.__new__(self.__class__)
    # Set attribuetes of the copied instance.
    for attr, value in self.__dict__.items():
        if isinstance(value, BaseModule):
            setattr(new_instance, attr, value.deepcopy())
        else:
            try:
                # Try to deep copy the attribute
                setattr(new_instance, attr, copy.deepcopy(value))
            except Exception:
                logging.warning(
                    f"Failed to deep copy attribute '{attr}' of {self.__class__.__name__}, "
                    "falling back to shallow copy or reference copy."
                )
                try:
                    # Fallback to shallow copy if deep copy fails
                    setattr(new_instance, attr, copy.copy(value))
                except Exception:
                    # If even the shallow copy fails, we just copy over the reference.
                    setattr(new_instance, attr, value)

    return new_instance

dump_state(json_mode=True)

Source code in dspy/predict/predict.py
def dump_state(self, json_mode=True):
    state_keys = ["traces", "train"]
    state = {k: getattr(self, k) for k in state_keys}

    state["demos"] = []
    for demo in self.demos:
        demo = demo.copy()

        for field in demo:
            # FIXME: Saving BaseModels as strings in examples doesn't matter because you never re-access as an object
            demo[field] = serialize_object(demo[field])

        if isinstance(demo, dict) or not json_mode:
            state["demos"].append(demo)
        else:
            state["demos"].append(demo.toDict())

    state["signature"] = self.signature.dump_state()
    state["lm"] = self.lm.dump_state() if self.lm else None
    return state

forward(**kwargs)

Source code in dspy/predict/predict.py
def forward(self, **kwargs):
    lm, config, signature, demos, kwargs = self._forward_preprocess(**kwargs)

    adapter = settings.adapter or ChatAdapter()

    if self._should_stream():
        with settings.context(caller_predict=self):
            completions = adapter(lm, lm_kwargs=config, signature=signature, demos=demos, inputs=kwargs)
    else:
        with settings.context(send_stream=None):
            completions = adapter(lm, lm_kwargs=config, signature=signature, demos=demos, inputs=kwargs)

    return self._forward_postprocess(completions, signature, **kwargs)

get_config()

Source code in dspy/predict/predict.py
def get_config(self):
    return self.config

get_lm()

Source code in dspy/primitives/module.py
def get_lm(self):
    all_used_lms = [param.lm for _, param in self.named_predictors()]

    if len(set(all_used_lms)) == 1:
        return all_used_lms[0]

    raise ValueError("Multiple LMs are being used in the module. There's no unique LM to return.")

inspect_history(n: int = 1)

Source code in dspy/primitives/module.py
def inspect_history(self, n: int = 1):
    return pretty_print_history(self.history, n)

load(path)

加载已保存的模块。如果您想加载整个程序,而不仅仅是现有程序的状态,您可能还想查看 dspy.load。

参数:

名称 类型 描述 默认值
path str

保存状态文件的路径,应为 .json 或 .pkl 文件

必填
Source code in dspy/primitives/base_module.py
def load(self, path):
    """Load the saved module. You may also want to check out dspy.load, if you want to
    load an entire program, not just the state for an existing program.

    Args:
        path (str): Path to the saved state file, which should be a .json or a .pkl file
    """
    path = Path(path)

    if path.suffix == ".json":
        with open(path, "rb") as f:
            state = orjson.loads(f.read())
    elif path.suffix == ".pkl":
        with open(path, "rb") as f:
            state = cloudpickle.load(f)
    else:
        raise ValueError(f"`path` must end with `.json` or `.pkl`, but received: {path}")

    dependency_versions = get_dependency_versions()
    saved_dependency_versions = state["metadata"]["dependency_versions"]
    for key, saved_version in saved_dependency_versions.items():
        if dependency_versions[key] != saved_version:
            logger.warning(
                f"There is a mismatch of {key} version between saved model and current environment. "
                f"You saved with `{key}=={saved_version}`, but now you have "
                f"`{key}=={dependency_versions[key]}`. This might cause errors or performance downgrade "
                "on the loaded model, please consider loading the model in the same environment as the "
                "saving environment."
            )
    self.load_state(state)

load_state(state: dict) -> Predict

加载一个Predict对象的已保存状态。

参数:

名称 类型 描述 默认值
state dict

一个Predict对象的保存状态。

必填

返回:

类型 描述
Predict

允许方法链式调用的Self。

Source code in dspy/predict/predict.py
def load_state(self, state: dict) -> "Predict":
    """Load the saved state of a `Predict` object.

    Args:
        state: The saved state of a `Predict` object.

    Returns:
        Self to allow method chaining.
    """
    excluded_keys = ["signature", "extended_signature", "lm"]
    for name, value in state.items():
        # `excluded_keys` are fields that go through special handling.
        if name not in excluded_keys:
            setattr(self, name, value)

    self.signature = self.signature.load_state(state["signature"])
    self.lm = LM(**state["lm"]) if state["lm"] else None

    if "extended_signature" in state:  # legacy, up to and including 2.5, for CoT.
        raise NotImplementedError("Loading extended_signature is no longer supported in DSPy 2.6+")

    return self

map_named_predictors(func)

对所有命名预测器应用一个函数。

Source code in dspy/primitives/module.py
def map_named_predictors(self, func):
    """Applies a function to all named predictors."""
    for name, predictor in self.named_predictors():
        set_attribute_by_name(self, name, func(predictor))
    return self

named_parameters()

与PyTorch不同,它也能处理(非递归的)参数列表。

Source code in dspy/primitives/base_module.py
def named_parameters(self):
    """
    Unlike PyTorch, handles (non-recursive) lists of parameters too.
    """

    import dspy
    from dspy.predict.parameter import Parameter

    visited = set()
    named_parameters = []

    def add_parameter(param_name, param_value):
        if isinstance(param_value, Parameter):
            if id(param_value) not in visited:
                visited.add(id(param_value))
                named_parameters.append((param_name, param_value))

        elif isinstance(param_value, dspy.Module):
            # When a sub-module is pre-compiled, keep it frozen.
            if not getattr(param_value, "_compiled", False):
                for sub_name, param in param_value.named_parameters():
                    add_parameter(f"{param_name}.{sub_name}", param)

    if isinstance(self, Parameter):
        add_parameter("self", self)

    for name, value in self.__dict__.items():
        if isinstance(value, Parameter):
            add_parameter(name, value)

        elif isinstance(value, dspy.Module):
            # When a sub-module is pre-compiled, keep it frozen.
            if not getattr(value, "_compiled", False):
                for sub_name, param in value.named_parameters():
                    add_parameter(f"{name}.{sub_name}", param)

        elif isinstance(value, (list, tuple)):
            for idx, item in enumerate(value):
                add_parameter(f"{name}[{idx}]", item)

        elif isinstance(value, dict):
            for key, item in value.items():
                add_parameter(f"{name}['{key}']", item)

    return named_parameters

named_predictors()

Source code in dspy/primitives/module.py
def named_predictors(self):
    from dspy.predict.predict import Predict

    return [(name, param) for name, param in self.named_parameters() if isinstance(param, Predict)]

named_sub_modules(type_=None, skip_compiled=False) -> Generator[tuple[str, BaseModule], None, None]

查找模块中的所有子模块及其名称。

假设 self.children[4]['key'].sub_module 是一个子模块。那么名称将是 children[4]['key'].sub_module。但如果该子模块可以通过不同路径访问,则只会返回其中一个路径。

Source code in dspy/primitives/base_module.py
def named_sub_modules(self, type_=None, skip_compiled=False) -> Generator[tuple[str, "BaseModule"], None, None]:
    """Find all sub-modules in the module, as well as their names.

    Say `self.children[4]['key'].sub_module` is a sub-module. Then the name will be
    `children[4]['key'].sub_module`. But if the sub-module is accessible at different
    paths, only one of the paths will be returned.
    """
    if type_ is None:
        type_ = BaseModule

    queue = deque([("self", self)])
    seen = {id(self)}

    def add_to_queue(name, item):
        if id(item) not in seen:
            seen.add(id(item))
            queue.append((name, item))

    while queue:
        name, item = queue.popleft()

        if isinstance(item, type_):
            yield name, item

        if isinstance(item, BaseModule):
            if skip_compiled and getattr(item, "_compiled", False):
                continue
            for sub_name, sub_item in item.__dict__.items():
                add_to_queue(f"{name}.{sub_name}", sub_item)

        elif isinstance(item, (list, tuple)):
            for i, sub_item in enumerate(item):
                add_to_queue(f"{name}[{i}]", sub_item)

        elif isinstance(item, dict):
            for key, sub_item in item.items():
                add_to_queue(f"{name}[{key}]", sub_item)

parameters()

Source code in dspy/primitives/base_module.py
def parameters(self):
    return [param for _, param in self.named_parameters()]

predictors()

Source code in dspy/primitives/module.py
def predictors(self):
    return [param for _, param in self.named_predictors()]

reset()

Source code in dspy/predict/predict.py
def reset(self):
    self.lm = None
    self.traces = []
    self.train = []
    self.demos = []

reset_copy()

深度复制模块并重置所有参数。

Source code in dspy/primitives/base_module.py
def reset_copy(self):
    """Deep copy the module and reset all parameters."""
    new_instance = self.deepcopy()

    for param in new_instance.parameters():
        param.reset()

    return new_instance

save(path, save_program=False, modules_to_serialize=None)

保存模块。

将模块保存到目录或文件。有两种模式: - save_program=False: 仅将模块的状态保存为json或pickle文件,具体取决于文件扩展名的值。 - save_program=True: 通过cloudpickle将整个模块保存到目录,其中包含模型的状态和架构。

如果 save_program=True 并且提供了 modules_to_serialize,它将使用 cloudpickle 的 register_pickle_by_value 注册这些模块进行序列化。 这会使 cloudpickle 按值而不是按引用序列化模块,确保模块与保存的程序一起完全保留。当您有需要与程序一起序列化的自定义模块时,这非常有用。 如果为 None,则不会注册任何模块进行序列化。

我们还会保存依赖版本,以便加载的模型可以检查是否存在关键依赖或dspy版本的版本不匹配问题。

参数:

名称 类型 描述 默认值
path str

保存状态文件的路径,当save_program=False时,应为.json或.pkl文件, 当save_program=True时,应为目录。

必填
save_program bool

如果为True,则通过cloudpickle将整个模块保存到目录,否则仅保存状态。

False
modules_to_serialize list

一个模块列表,用于通过 cloudpickle 的 register_pickle_by_value 进行序列化。 如果为 None,则不会注册任何模块进行序列化。

None
Source code in dspy/primitives/base_module.py
def save(self, path, save_program=False, modules_to_serialize=None):
    """Save the module.

    Save the module to a directory or a file. There are two modes:
    - `save_program=False`: Save only the state of the module to a json or pickle file, based on the value of
        the file extension.
    - `save_program=True`: Save the whole module to a directory via cloudpickle, which contains both the state and
        architecture of the model.

    If `save_program=True` and `modules_to_serialize` are provided, it will register those modules for serialization
    with cloudpickle's `register_pickle_by_value`. This causes cloudpickle to serialize the module by value rather
    than by reference, ensuring the module is fully preserved along with the saved program. This is useful
    when you have custom modules that need to be serialized alongside your program. If None, then no modules
    will be registered for serialization.

    We also save the dependency versions, so that the loaded model can check if there is a version mismatch on
    critical dependencies or DSPy version.

    Args:
        path (str): Path to the saved state file, which should be a .json or .pkl file when `save_program=False`,
            and a directory when `save_program=True`.
        save_program (bool): If True, save the whole module to a directory via cloudpickle, otherwise only save
            the state.
        modules_to_serialize (list): A list of modules to serialize with cloudpickle's `register_pickle_by_value`.
            If None, then no modules will be registered for serialization.

    """
    metadata = {}
    metadata["dependency_versions"] = get_dependency_versions()
    path = Path(path)

    if save_program:
        if path.suffix:
            raise ValueError(
                f"`path` must point to a directory without a suffix when `save_program=True`, but received: {path}"
            )
        if path.exists() and not path.is_dir():
            raise NotADirectoryError(f"The path '{path}' exists but is not a directory.")

        if not path.exists():
            # Create the directory (and any parent directories)
            path.mkdir(parents=True)

        try:
            modules_to_serialize = modules_to_serialize or []
            for module in modules_to_serialize:
                cloudpickle.register_pickle_by_value(module)

            with open(path / "program.pkl", "wb") as f:
                cloudpickle.dump(self, f)
        except Exception as e:
            raise RuntimeError(
                f"Saving failed with error: {e}. Please remove the non-picklable attributes from your DSPy program, "
                "or consider using state-only saving by setting `save_program=False`."
            )
        with open(path / "metadata.json", "wb") as f:
            f.write(orjson.dumps(metadata, option=orjson.OPT_INDENT_2 | orjson.OPT_APPEND_NEWLINE))

        return

    if path.suffix == ".json":
        state = self.dump_state()
        state["metadata"] = metadata
        try:
            with open(path, "wb") as f:
                f.write(orjson.dumps(state, option=orjson.OPT_INDENT_2 | orjson.OPT_APPEND_NEWLINE))
        except Exception as e:
            raise RuntimeError(
                f"Failed to save state to {path} with error: {e}. Your DSPy program may contain non "
                "json-serializable objects, please consider saving the state in .pkl by using `path` ending "
                "with `.pkl`, or saving the whole program by setting `save_program=True`."
            )
    elif path.suffix == ".pkl":
        state = self.dump_state(json_mode=False)
        state["metadata"] = metadata
        with open(path, "wb") as f:
            cloudpickle.dump(state, f)
    else:
        raise ValueError(f"`path` must end with `.json` or `.pkl` when `save_program=False`, but received: {path}")

set_lm(lm)

Source code in dspy/primitives/module.py
def set_lm(self, lm):
    for _, param in self.named_predictors():
        param.lm = lm

update_config(**kwargs)

Source code in dspy/predict/predict.py
def update_config(self, **kwargs):
    self.config = {**self.config, **kwargs}

:::

优云智算