promptflow.executor.flow_executor 模块#

class promptflow.executor.flow_executor.FlowExecutor(flow: Flow, connections: ConnectionProvider, run_tracker: RunTracker, cache_manager: AbstractCacheManager, loaded_tools: Mapping[str, Callable], *, raise_ex: bool = False, working_dir=None, line_timeout_sec=None, flow_file=None)#

基础类: object

此类用于为不同的输入执行单个流程。

Parameters:
  • flow (Flow) – 要执行的流程。

  • connections (dict) – 用于流程的连接。

  • run_tracker (RunTracker) – 用于流程的运行跟踪器。

  • cache_manager (AbstractCacheManager) – 用于流程的缓存管理器。

  • loaded_tools (Mapping[str, Callable]) – 用于流程的已加载工具。

  • worker_count (可选[int]) – 用于流程的工作线程数。默认值为16。

  • raise_ex (可选[bool]) – 是否抛出异常。默认为 False。

  • working_dir (可选[str]) – 用于流程的工作目录。默认为 None。

  • line_timeout_sec (可选[int]) – 用于流的行超时时间(以秒为单位)。默认值为 LINE_TIMEOUT_SEC。

  • flow_file (可选[路径]) – 用于流程的流程文件。默认为 None。

property aggregation_nodes#

获取流执行器的聚合节点。

Returns:

聚合节点的列表。

Return type:

列表

static apply_inputs_mapping(inputs: Mapping[str, Mapping[str, Any]], inputs_mapping: Mapping[str, str]) Dict[str, Any]#
convert_flow_input_types(inputs: dict) Mapping[str, Any]#

将给定输入字典的输入类型转换为与流程的预期类型匹配。

Parameters:

inputs (dict) – 包含流程输入的字典。

Returns:

包含转换后输入的字典。

Return type:

映射[str, 任意]

classmethod create(flow_file: Path, connections: Union[dict, ConnectionProvider], working_dir: Optional[Path] = None, *, entry: Optional[str] = None, storage: Optional[AbstractRunStorage] = None, raise_ex: bool = True, node_override: Optional[Dict[str, Dict[str, Any]]] = None, line_timeout_sec: Optional[int] = None, init_kwargs: Optional[Dict[str, Any]] = None, **kwargs) FlowExecutor#

创建一个新的FlowExecutor实例。

Parameters:
  • flow_file (Path) – 流程文件的路径。

  • connections (Union[dict, ConnectionProvider]) – 用于流程的连接。

  • working_dir (可选[str]) – 用于流程的工作目录。默认为 None。

  • func (可选[str]) – 如果提供了 .py 文件,则用于流程的函数。默认值为 None。

  • storage (可选[AbstractRunStorage]) – 用于流程的存储。默认为 None。

  • raise_ex (可选[bool]) – 是否抛出异常。默认为 True。

  • node_override (可选[Dict[str, Dict[str, Any]]]) – 用于流程的节点覆盖。默认值为 None。

  • line_timeout_sec (可选[int]) – 用于流的行超时时间(以秒为单位)。默认值为 LINE_TIMEOUT_SEC。

  • init_kwargs (Optional[Dict[str, Any]]) – 可调用类的初始化参数,仅支持灵活流程。

Returns:

FlowExecutor 的新实例。

Return type:

FlowExecutor

enable_streaming_for_llm_flow(stream_required: Callable[[], bool])#

启用连接到输出的LLM节点,以返回由stream_required控制的流式结果。

如果stream_required回调返回True,LLM节点将返回一个字符串生成器。 否则,LLM节点将返回一个字符串。

Parameters:

stream_required (Callable[[], bool]) – 一个不需要参数并返回布尔值的回调函数,用于指示是否应为LLM节点启用流式结果。

Returns:

ensure_flow_is_serializable()#

确保流程是可序列化的。

某些节点可能会返回一个字符串生成器以创建流式输出。 这在将流程部署为Web服务时非常有用。 然而,在交互模式下,执行器假定节点结果是可JSON序列化的。

此方法为流程中的每个节点添加一个包装器,以消耗流输出并将它们合并为一个字符串供执行器使用。

Returns:

exec(inputs: dict, node_concurrency=16) dict#

使用给定的输入执行流程并返回输出。

Parameters:
  • inputs (dict) – 包含流程输入值的字典。

  • node_concurrency (int) – 可以同时执行的最大节点数。

Returns:

包含流程输出值的字典。

Return type:

字典

exec_aggregation(inputs: Mapping[str, Any], aggregation_inputs: Mapping[str, Any], run_id=None, node_concurrency=16) AggregationResult#

执行流程的聚合节点。

Parameters:
  • inputs (Mapping[str, Any]) – 输入名称到其值的映射。

  • aggregation_inputs (Mapping[str, Any]) – 聚合输入名称到其值的映射。

  • run_id (可选[str]) – 当前运行的ID,如果有的话。

  • node_concurrency (int) – 可以同时执行的最大节点数。

Returns:

聚合节点的结果。

Return type:

AggregationResult

Raises:

如果输入或aggregation_inputs无效,则抛出FlowError。

exec_line(inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, validate_inputs: bool = True, node_concurrency=16, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None) LineResult#

执行流程的单个行。

Parameters:
  • inputs (Mapping[str, Any]) – 该行的输入值。

  • index (可选[int]) – 要执行的行的索引。

  • run_id (可选[str]) – 流程运行的ID。

  • validate_inputs (bool) – 是否验证输入值。

  • node_concurrency (int) – 可以同时执行的最大节点数。

  • allow_generator_output (bool) – 是否允许生成器输出。

  • line_timeout_sec (可选[int]) – 等待一行输出的最长时间。

Returns:

执行该行的结果。

Return type:

LineResult

async exec_line_async(inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, validate_inputs: bool = True, node_concurrency=16, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None, sync_iterator_to_async: bool = True) LineResult#

执行流程的单个行。

Parameters:
  • inputs (Mapping[str, Any]) – 该行的输入值。

  • index (可选[int]) – 要执行的行的索引。

  • run_id (可选[str]) – 流程运行的ID。

  • validate_inputs (bool) – 是否验证输入值。

  • node_concurrency (int) – 可以同时执行的最大节点数。

  • allow_generator_output (bool) – 是否允许生成器输出。

  • sync_iterator_to_async (bool) – 是否将同步迭代器输出转换为异步迭代器。

Returns:

执行该行的结果。

Return type:

LineResult

get_inputs_definition()#
get_status_summary(run_id: str)#

获取给定运行状态的摘要。

Parameters:

run_id (str) – 获取状态摘要的运行ID。

Returns:

给定运行状态的摘要。

Return type:

字符串

property has_aggregation_node: bool#

检查流程执行器是否有任何聚合节点。

Returns:

如果流执行器至少有一个聚合节点,则为True,否则为False。

Return type:

布尔

classmethod load_and_exec_node(flow_file: Path, node_name: str, *, storage: Optional[AbstractRunStorage] = None, output_sub_dir: Optional[str] = None, flow_inputs: Optional[Mapping[str, Any]] = None, dependency_nodes_outputs: Optional[Mapping[str, Any]] = None, connections: Optional[dict] = None, working_dir: Optional[Path] = None, raise_ex: bool = False)#

从流程中加载并执行单个节点。

Parameters:
  • flow_file (Path) – 流程文件的路径。

  • node_name (str) – 要执行的节点的名称。

  • storage (可选[AbstractRunStorage]) – 用于流程的存储。

  • output_sub_dir (可选[str]) – 用于持久化流程图像的目录。仅保留以保持向后兼容性。

  • flow_inputs (可选[映射[str, 任意]]) – 用于流程的输入。默认为 None。

  • dependency_nodes_outputs (Optional[Mapping[str, Any]) – 依赖节点的输出。默认值为 None。

  • connections (可选[字典]) – 用于流程的连接。默认为 None。

  • working_dir (可选[str]) – 用于流程的工作目录。默认为 None。

  • raise_ex (可选[bool]) – 是否抛出异常。默认为 False。

static update_environment_variables_with_connections(connections: dict)#

使用连接更新环境变量。

Parameters:

connections (dict) – 包含连接信息的字典。

Returns:

包含更新后的环境变量的字典。

Return type:

字典

promptflow.executor.flow_executor.enable_streaming_for_llm_tool(f)#

启用支持流模式的LLM工具。

Parameters:

f (函数) – 要包装的函数。

Returns:

包装后的函数。

Return type:

函数

AzureOpenAI.completion 和 AzureOpenAI.chat 工具支持流式和非流式模式。 默认情况下,流式模式是关闭的。使用此包装器来开启它。

promptflow.executor.flow_executor.execute_flow(flow_file: Path, working_dir: Path, output_dir: Path, connections: dict, inputs: Mapping[str, Any], *, run_id: Optional[str] = None, run_aggregation: bool = True, enable_stream_output: bool = False, allow_generator_output: bool = False, init_kwargs: Optional[dict] = None, **kwargs) LineResult#

执行流程,包括聚合节点。

Parameters:
  • flow_file (Path) – 流程文件的路径。

  • working_dir (Path) – 流程的工作目录。

  • output_dir (Path) – 相对于 working_dir 的相对路径。

  • connections (dict) – 包含连接信息的字典。

  • inputs (Mapping[str, Any]) – 包含流程输入值的字典。

  • enable_stream_output (可选[布尔值]) – 是否允许流(生成器)输出作为流程输出。默认值为 False。

  • run_id (可选[str]) – 运行ID将在操作上下文中设置并用于会话。

  • init_kwargs (dict) – 用于flex flow的初始化参数,仅在flow是可调用类时支持。

  • kwargs (Any) – 其他用于创建流程执行器的关键字参数。

Returns:

执行流程的线路结果。

Return type:

LineResult

promptflow.executor.flow_executor.signal_handler(sig, frame)#

处理进程接收到的终止信号。

目前,只有单节点运行使用此处理程序。我们打印日志并引发一个KeyboardInterrupt,以便外部代码可以捕获此异常并取消正在运行的节点。