promptflow.executor.flow_executor 模块#
- class promptflow.executor.flow_executor.FlowExecutor(flow: Flow, connections: ConnectionProvider, run_tracker: RunTracker, cache_manager: AbstractCacheManager, loaded_tools: Mapping[str, Callable], *, raise_ex: bool = False, working_dir=None, line_timeout_sec=None, flow_file=None)#
基础类:
object
此类用于为不同的输入执行单个流程。
- Parameters:
flow (Flow) – 要执行的流程。
connections (dict) – 用于流程的连接。
run_tracker (RunTracker) – 用于流程的运行跟踪器。
cache_manager (AbstractCacheManager) – 用于流程的缓存管理器。
loaded_tools (Mapping[str, Callable]) – 用于流程的已加载工具。
worker_count (可选[int]) – 用于流程的工作线程数。默认值为16。
raise_ex (可选[bool]) – 是否抛出异常。默认为 False。
working_dir (可选[str]) – 用于流程的工作目录。默认为 None。
line_timeout_sec (可选[int]) – 用于流的行超时时间(以秒为单位)。默认值为 LINE_TIMEOUT_SEC。
flow_file (可选[路径]) – 用于流程的流程文件。默认为 None。
- property aggregation_nodes#
获取流执行器的聚合节点。
- Returns:
聚合节点的列表。
- Return type:
列表
- static apply_inputs_mapping(inputs: Mapping[str, Mapping[str, Any]], inputs_mapping: Mapping[str, str]) Dict[str, Any] #
- convert_flow_input_types(inputs: dict) Mapping[str, Any] #
将给定输入字典的输入类型转换为与流程的预期类型匹配。
- Parameters:
inputs (dict) – 包含流程输入的字典。
- Returns:
包含转换后输入的字典。
- Return type:
映射[str, 任意]
- classmethod create(flow_file: Path, connections: Union[dict, ConnectionProvider], working_dir: Optional[Path] = None, *, entry: Optional[str] = None, storage: Optional[AbstractRunStorage] = None, raise_ex: bool = True, node_override: Optional[Dict[str, Dict[str, Any]]] = None, line_timeout_sec: Optional[int] = None, init_kwargs: Optional[Dict[str, Any]] = None, **kwargs) FlowExecutor #
创建一个新的FlowExecutor实例。
- Parameters:
flow_file (Path) – 流程文件的路径。
connections (Union[dict, ConnectionProvider]) – 用于流程的连接。
working_dir (可选[str]) – 用于流程的工作目录。默认为 None。
func (可选[str]) – 如果提供了 .py 文件,则用于流程的函数。默认值为 None。
storage (可选[AbstractRunStorage]) – 用于流程的存储。默认为 None。
raise_ex (可选[bool]) – 是否抛出异常。默认为 True。
node_override (可选[Dict[str, Dict[str, Any]]]) – 用于流程的节点覆盖。默认值为 None。
line_timeout_sec (可选[int]) – 用于流的行超时时间(以秒为单位)。默认值为 LINE_TIMEOUT_SEC。
init_kwargs (Optional[Dict[str, Any]]) – 可调用类的初始化参数,仅支持灵活流程。
- Returns:
FlowExecutor 的新实例。
- Return type:
- enable_streaming_for_llm_flow(stream_required: Callable[[], bool])#
启用连接到输出的LLM节点,以返回由stream_required控制的流式结果。
如果stream_required回调返回True,LLM节点将返回一个字符串生成器。 否则,LLM节点将返回一个字符串。
- Parameters:
stream_required (Callable[[], bool]) – 一个不需要参数并返回布尔值的回调函数,用于指示是否应为LLM节点启用流式结果。
- Returns:
无
- ensure_flow_is_serializable()#
确保流程是可序列化的。
某些节点可能会返回一个字符串生成器以创建流式输出。 这在将流程部署为Web服务时非常有用。 然而,在交互模式下,执行器假定节点结果是可JSON序列化的。
此方法为流程中的每个节点添加一个包装器,以消耗流输出并将它们合并为一个字符串供执行器使用。
- Returns:
无
- exec(inputs: dict, node_concurrency=16) dict #
使用给定的输入执行流程并返回输出。
- Parameters:
inputs (dict) – 包含流程输入值的字典。
node_concurrency (int) – 可以同时执行的最大节点数。
- Returns:
包含流程输出值的字典。
- Return type:
字典
- exec_aggregation(inputs: Mapping[str, Any], aggregation_inputs: Mapping[str, Any], run_id=None, node_concurrency=16) AggregationResult #
执行流程的聚合节点。
- Parameters:
inputs (Mapping[str, Any]) – 输入名称到其值的映射。
aggregation_inputs (Mapping[str, Any]) – 聚合输入名称到其值的映射。
run_id (可选[str]) – 当前运行的ID,如果有的话。
node_concurrency (int) – 可以同时执行的最大节点数。
- Returns:
聚合节点的结果。
- Return type:
AggregationResult
- Raises:
如果输入或aggregation_inputs无效,则抛出FlowError。
- exec_line(inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, validate_inputs: bool = True, node_concurrency=16, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None) LineResult #
执行流程的单个行。
- Parameters:
inputs (Mapping[str, Any]) – 该行的输入值。
index (可选[int]) – 要执行的行的索引。
run_id (可选[str]) – 流程运行的ID。
validate_inputs (bool) – 是否验证输入值。
node_concurrency (int) – 可以同时执行的最大节点数。
allow_generator_output (bool) – 是否允许生成器输出。
line_timeout_sec (可选[int]) – 等待一行输出的最长时间。
- Returns:
执行该行的结果。
- Return type:
LineResult
- async exec_line_async(inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, validate_inputs: bool = True, node_concurrency=16, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None, sync_iterator_to_async: bool = True) LineResult #
执行流程的单个行。
- Parameters:
inputs (Mapping[str, Any]) – 该行的输入值。
index (可选[int]) – 要执行的行的索引。
run_id (可选[str]) – 流程运行的ID。
validate_inputs (bool) – 是否验证输入值。
node_concurrency (int) – 可以同时执行的最大节点数。
allow_generator_output (bool) – 是否允许生成器输出。
sync_iterator_to_async (bool) – 是否将同步迭代器输出转换为异步迭代器。
- Returns:
执行该行的结果。
- Return type:
LineResult
- get_inputs_definition()#
- get_status_summary(run_id: str)#
获取给定运行状态的摘要。
- Parameters:
run_id (str) – 获取状态摘要的运行ID。
- Returns:
给定运行状态的摘要。
- Return type:
字符串
- property has_aggregation_node: bool#
检查流程执行器是否有任何聚合节点。
- Returns:
如果流执行器至少有一个聚合节点,则为True,否则为False。
- Return type:
布尔
- classmethod load_and_exec_node(flow_file: Path, node_name: str, *, storage: Optional[AbstractRunStorage] = None, output_sub_dir: Optional[str] = None, flow_inputs: Optional[Mapping[str, Any]] = None, dependency_nodes_outputs: Optional[Mapping[str, Any]] = None, connections: Optional[dict] = None, working_dir: Optional[Path] = None, raise_ex: bool = False)#
从流程中加载并执行单个节点。
- Parameters:
flow_file (Path) – 流程文件的路径。
node_name (str) – 要执行的节点的名称。
storage (可选[AbstractRunStorage]) – 用于流程的存储。
output_sub_dir (可选[str]) – 用于持久化流程图像的目录。仅保留以保持向后兼容性。
flow_inputs (可选[映射[str, 任意]]) – 用于流程的输入。默认为 None。
dependency_nodes_outputs (Optional[Mapping[str, Any]) – 依赖节点的输出。默认值为 None。
connections (可选[字典]) – 用于流程的连接。默认为 None。
working_dir (可选[str]) – 用于流程的工作目录。默认为 None。
raise_ex (可选[bool]) – 是否抛出异常。默认为 False。
- static update_environment_variables_with_connections(connections: dict)#
使用连接更新环境变量。
- Parameters:
connections (dict) – 包含连接信息的字典。
- Returns:
包含更新后的环境变量的字典。
- Return type:
字典
- promptflow.executor.flow_executor.enable_streaming_for_llm_tool(f)#
启用支持流模式的LLM工具。
- Parameters:
f (函数) – 要包装的函数。
- Returns:
包装后的函数。
- Return type:
函数
AzureOpenAI.completion 和 AzureOpenAI.chat 工具支持流式和非流式模式。 默认情况下,流式模式是关闭的。使用此包装器来开启它。
- promptflow.executor.flow_executor.execute_flow(flow_file: Path, working_dir: Path, output_dir: Path, connections: dict, inputs: Mapping[str, Any], *, run_id: Optional[str] = None, run_aggregation: bool = True, enable_stream_output: bool = False, allow_generator_output: bool = False, init_kwargs: Optional[dict] = None, **kwargs) LineResult #
执行流程,包括聚合节点。
- Parameters:
flow_file (Path) – 流程文件的路径。
working_dir (Path) – 流程的工作目录。
output_dir (Path) – 相对于 working_dir 的相对路径。
connections (dict) – 包含连接信息的字典。
inputs (Mapping[str, Any]) – 包含流程输入值的字典。
enable_stream_output (可选[布尔值]) – 是否允许流(生成器)输出作为流程输出。默认值为 False。
run_id (可选[str]) – 运行ID将在操作上下文中设置并用于会话。
init_kwargs (dict) – 用于flex flow的初始化参数,仅在flow是可调用类时支持。
kwargs (Any) – 其他用于创建流程执行器的关键字参数。
- Returns:
执行流程的线路结果。
- Return type:
LineResult
- promptflow.executor.flow_executor.signal_handler(sig, frame)#
处理进程接收到的终止信号。
目前,只有单节点运行使用此处理程序。我们打印日志并引发一个KeyboardInterrupt,以便外部代码可以捕获此异常并取消正在运行的节点。