def__init__(self,signature:str|type[Signature],tools:list[Callable],max_iters:int=5,interpreter:PythonInterpreter|None=None):""" Initializes the CodeAct class with the specified model, temperature, and max tokens. Args: signature (Union[str, Type[Signature]]): The signature of the module. tools (list[Callable]): The tool callables to be used. CodeAct only accepts functions and not callable objects. max_iters (int): The maximum number of iterations to generate the answer. interpreter: PythonInterpreter instance to use. If None, a new one is instantiated. Example: ```python from dspy.predict import CodeAct def factorial(n): if n == 1: return 1 return n * factorial(n-1) act = CodeAct("n->factorial", tools=[factorial]) act(n=5) # 120 ``` """self.signature=ensure_signature(signature)self.max_iters=max_itersself.history=[]tools=[tifisinstance(t,Tool)elseTool(t)fortintools]ifany(notinspect.isfunction(tool.func)fortoolintools):raiseValueError("CodeAct only accepts functions and not callable objects.")tools={tool.name:toolfortoolintools}instructions=self._build_instructions(self.signature,tools)codeact_signature=(dspy.Signature({**self.signature.input_fields},"\n".join(instructions)).append("trajectory",dspy.InputField(),type_=str).append("generated_code",dspy.OutputField(desc="Python code that when executed, produces output relevant to answering the question"),type_=str).append("finished",dspy.OutputField(desc="a boolean flag to determine if the process is done"),type_=bool))extract_signature=dspy.Signature({**self.signature.input_fields,**self.signature.output_fields},self.signature.instructions,).append("trajectory",dspy.InputField(),type_=str)self.tools:dict[str,Tool]=toolsself.codeact=dspy.Predict(codeact_signature)self.extractor=dspy.ChainOfThought(extract_signature)# It will raises exception when dspy cannot find available deno instance by now.self.interpreter=interpreterorPythonInterpreter()
@with_callbacksdef__call__(self,*args,**kwargs)->Prediction:caller_modules=settings.caller_modulesor[]caller_modules=list(caller_modules)caller_modules.append(self)withsettings.context(caller_modules=caller_modules):ifsettings.track_usageandthread_local_overrides.get().get("usage_tracker")isNone:withtrack_usage()asusage_tracker:output=self.forward(*args,**kwargs)tokens=usage_tracker.get_total_tokens()# Some optimizers (e.g., GEPA bootstrap tracing) temporarily patch# module.forward to return a tuple: (prediction, trace).# When usage tracking is enabled, ensure we attach usage to the# prediction object if present.prediction_in_output=Noneifisinstance(output,Prediction):prediction_in_output=outputelifisinstance(output,tuple)andlen(output)>0andisinstance(output[0],Prediction):prediction_in_output=output[0]ifnotprediction_in_output:raiseValueError("No prediction object found in output to call set_lm_usage on.")prediction_in_output.set_lm_usage(tokens)returnoutputreturnself.forward(*args,**kwargs)
defbatch(self,examples:list[Example],num_threads:int|None=None,max_errors:int|None=None,return_failed_examples:bool=False,provide_traceback:bool|None=None,disable_progress_bar:bool=False,)->list[Example]|tuple[list[Example],list[Example],list[Exception]]:""" Processes a list of dspy.Example instances in parallel using the Parallel module. Args: examples: List of dspy.Example instances to process. num_threads: Number of threads to use for parallel processing. max_errors: Maximum number of errors allowed before stopping execution. If ``None``, inherits from ``dspy.settings.max_errors``. return_failed_examples: Whether to return failed examples and exceptions. provide_traceback: Whether to include traceback information in error logs. disable_progress_bar: Whether to display the progress bar. Returns: List of results, and optionally failed examples and exceptions. """# Create a list of execution pairs (self, example)exec_pairs=[(self,example.inputs())forexampleinexamples]# Create an instance of Parallelparallel_executor=Parallel(num_threads=num_threads,max_errors=max_errors,return_failed_examples=return_failed_examples,provide_traceback=provide_traceback,disable_progress_bar=disable_progress_bar,)# Execute the forward method of Parallelifreturn_failed_examples:results,failed_examples,exceptions=parallel_executor.forward(exec_pairs)returnresults,failed_examples,exceptionselse:results=parallel_executor.forward(exec_pairs)returnresults
defdeepcopy(self):"""Deep copy the module. This is a tweak to the default python deepcopy that only deep copies `self.parameters()`, and for other attributes, we just do the shallow copy. """try:# If the instance itself is copyable, we can just deep copy it.# Otherwise we will have to create a new instance and copy over the attributes one by one.returncopy.deepcopy(self)exceptException:pass# Create an empty instance.new_instance=self.__class__.__new__(self.__class__)# Set attribuetes of the copied instance.forattr,valueinself.__dict__.items():ifisinstance(value,BaseModule):setattr(new_instance,attr,value.deepcopy())else:try:# Try to deep copy the attributesetattr(new_instance,attr,copy.deepcopy(value))exceptException:logging.warning(f"Failed to deep copy attribute '{attr}' of {self.__class__.__name__}, ""falling back to shallow copy or reference copy.")try:# Fallback to shallow copy if deep copy failssetattr(new_instance,attr,copy.copy(value))exceptException:# If even the shallow copy fails, we just copy over the reference.setattr(new_instance,attr,value)returnnew_instance
defget_lm(self):all_used_lms=[param.lmfor_,paraminself.named_predictors()]iflen(set(all_used_lms))==1:returnall_used_lms[0]raiseValueError("Multiple LMs are being used in the module. There's no unique LM to return.")
defload(self,path):"""Load the saved module. You may also want to check out dspy.load, if you want to load an entire program, not just the state for an existing program. Args: path (str): Path to the saved state file, which should be a .json or a .pkl file """path=Path(path)ifpath.suffix==".json":withopen(path,"rb")asf:state=orjson.loads(f.read())elifpath.suffix==".pkl":withopen(path,"rb")asf:state=cloudpickle.load(f)else:raiseValueError(f"`path` must end with `.json` or `.pkl`, but received: {path}")dependency_versions=get_dependency_versions()saved_dependency_versions=state["metadata"]["dependency_versions"]forkey,saved_versioninsaved_dependency_versions.items():ifdependency_versions[key]!=saved_version:logger.warning(f"There is a mismatch of {key} version between saved model and current environment. "f"You saved with `{key}=={saved_version}`, but now you have "f"`{key}=={dependency_versions[key]}`. This might cause errors or performance downgrade ""on the loaded model, please consider loading the model in the same environment as the ""saving environment.")self.load_state(state)
defmap_named_predictors(self,func):"""Applies a function to all named predictors."""forname,predictorinself.named_predictors():set_attribute_by_name(self,name,func(predictor))returnself
defnamed_parameters(self):""" Unlike PyTorch, handles (non-recursive) lists of parameters too. """importdspyfromdspy.predict.parameterimportParametervisited=set()named_parameters=[]defadd_parameter(param_name,param_value):ifisinstance(param_value,Parameter):ifid(param_value)notinvisited:visited.add(id(param_value))named_parameters.append((param_name,param_value))elifisinstance(param_value,dspy.Module):# When a sub-module is pre-compiled, keep it frozen.ifnotgetattr(param_value,"_compiled",False):forsub_name,paraminparam_value.named_parameters():add_parameter(f"{param_name}.{sub_name}",param)ifisinstance(self,Parameter):add_parameter("self",self)forname,valueinself.__dict__.items():ifisinstance(value,Parameter):add_parameter(name,value)elifisinstance(value,dspy.Module):# When a sub-module is pre-compiled, keep it frozen.ifnotgetattr(value,"_compiled",False):forsub_name,paraminvalue.named_parameters():add_parameter(f"{name}.{sub_name}",param)elifisinstance(value,(list,tuple)):foridx,iteminenumerate(value):add_parameter(f"{name}[{idx}]",item)elifisinstance(value,dict):forkey,iteminvalue.items():add_parameter(f"{name}['{key}']",item)returnnamed_parameters
defnamed_sub_modules(self,type_=None,skip_compiled=False)->Generator[tuple[str,"BaseModule"],None,None]:"""Find all sub-modules in the module, as well as their names. Say `self.children[4]['key'].sub_module` is a sub-module. Then the name will be `children[4]['key'].sub_module`. But if the sub-module is accessible at different paths, only one of the paths will be returned. """iftype_isNone:type_=BaseModulequeue=deque([("self",self)])seen={id(self)}defadd_to_queue(name,item):ifid(item)notinseen:seen.add(id(item))queue.append((name,item))whilequeue:name,item=queue.popleft()ifisinstance(item,type_):yieldname,itemifisinstance(item,BaseModule):ifskip_compiledandgetattr(item,"_compiled",False):continueforsub_name,sub_iteminitem.__dict__.items():add_to_queue(f"{name}.{sub_name}",sub_item)elifisinstance(item,(list,tuple)):fori,sub_iteminenumerate(item):add_to_queue(f"{name}[{i}]",sub_item)elifisinstance(item,dict):forkey,sub_iteminitem.items():add_to_queue(f"{name}[{key}]",sub_item)
defreset_copy(self):"""Deep copy the module and reset all parameters."""new_instance=self.deepcopy()forparaminnew_instance.parameters():param.reset()returnnew_instance
defsave(self,path,save_program=False,modules_to_serialize=None):"""Save the module. Save the module to a directory or a file. There are two modes: - `save_program=False`: Save only the state of the module to a json or pickle file, based on the value of the file extension. - `save_program=True`: Save the whole module to a directory via cloudpickle, which contains both the state and architecture of the model. If `save_program=True` and `modules_to_serialize` are provided, it will register those modules for serialization with cloudpickle's `register_pickle_by_value`. This causes cloudpickle to serialize the module by value rather than by reference, ensuring the module is fully preserved along with the saved program. This is useful when you have custom modules that need to be serialized alongside your program. If None, then no modules will be registered for serialization. We also save the dependency versions, so that the loaded model can check if there is a version mismatch on critical dependencies or DSPy version. Args: path (str): Path to the saved state file, which should be a .json or .pkl file when `save_program=False`, and a directory when `save_program=True`. save_program (bool): If True, save the whole module to a directory via cloudpickle, otherwise only save the state. modules_to_serialize (list): A list of modules to serialize with cloudpickle's `register_pickle_by_value`. If None, then no modules will be registered for serialization. """metadata={}metadata["dependency_versions"]=get_dependency_versions()path=Path(path)ifsave_program:ifpath.suffix:raiseValueError(f"`path` must point to a directory without a suffix when `save_program=True`, but received: {path}")ifpath.exists()andnotpath.is_dir():raiseNotADirectoryError(f"The path '{path}' exists but is not a directory.")ifnotpath.exists():# Create the directory (and any parent directories)path.mkdir(parents=True)try:modules_to_serialize=modules_to_serializeor[]formoduleinmodules_to_serialize:cloudpickle.register_pickle_by_value(module)withopen(path/"program.pkl","wb")asf:cloudpickle.dump(self,f)exceptExceptionase:raiseRuntimeError(f"Saving failed with error: {e}. Please remove the non-picklable attributes from your DSPy program, ""or consider using state-only saving by setting `save_program=False`.")withopen(path/"metadata.json","wb")asf:f.write(orjson.dumps(metadata,option=orjson.OPT_INDENT_2|orjson.OPT_APPEND_NEWLINE))returnifpath.suffix==".json":state=self.dump_state()state["metadata"]=metadatatry:withopen(path,"wb")asf:f.write(orjson.dumps(state,option=orjson.OPT_INDENT_2|orjson.OPT_APPEND_NEWLINE))exceptExceptionase:raiseRuntimeError(f"Failed to save state to {path} with error: {e}. Your DSPy program may contain non ""json-serializable objects, please consider saving the state in .pkl by using `path` ending ""with `.pkl`, or saving the whole program by setting `save_program=True`.")elifpath.suffix==".pkl":state=self.dump_state(json_mode=False)state["metadata"]=metadatawithopen(path,"wb")asf:cloudpickle.dump(state,f)else:raiseValueError(f"`path` must end with `.json` or `.pkl` when `save_program=False`, but received: {path}")
importdspyfromdspy.predictimportCodeAct# Define a simple tool functiondeffactorial(n:int)->int:"""Calculate the factorial of a number."""ifn==1:return1returnn*factorial(n-1)# Create a CodeAct instanceact=CodeAct("n->factorial_result",tools=[factorial])# Use the CodeAct instanceresult=act(n=5)print(result)# Will calculate factorial(5) = 120