基于类的流程#

实验性功能

这是一个实验性功能,可能会随时更改。了解更多更多

当用户需要在多轮流程运行期间在内存中持久化对象(如连接)时,他们可以编写一个可调用类作为流程入口,并将持久化参数放在__init__方法中。

如果用户需要在批量运行输出上记录指标,他们可以添加一个__aggregate__方法,它将在批量运行完成后被调度。 __aggregate__方法应仅包含1个参数,即批量运行结果的列表。

详情请参见连接支持聚合支持

类作为流程#

假设我们有一个文件 flow_entry.py:

class Reply(TypedDict):
    output: str

class MyFlow:
    def __init__(self, model_config: AzureOpenAIModelConfiguration, flow_config: dict):
      """Flow initialization logic goes here."""
      self.model_config = model_config
      self.flow_config = flow_config

    def __call__(question: str) -> Reply:
      """Flow execution logic goes here."""
      return Reply(output=output)

    def __aggregate__(self, line_results: List[str]) -> dict:
      """Aggregation logic goes here. Return key-value pair as metrics."""
      return {"key": val}

流程测试#

使用原始代码进行测试#

由于flow的定义是函数/可调用类。我们建议用户像运行其他脚本一样直接运行它:

class MyFlow:
    pass
if __name__ == "__main__":
    flow = MyFlow(model_config, flow_config)
    output = flow(question)
    metrics = flow.__aggregate__([output])
    # check metrics here

通过函数调用测试#

还支持将您的类入口转换为流,并利用提示流的能力进行测试。

您可以使用以下CLI进行测试:

# flow entry syntax: path.to.module:ClassName
pf flow test --flow flow_entry:MyFlow --inputs question="What's the capital of France?" --init init.json

注意: 目前此命令将在您的工作目录中生成一个flow.flex.yaml文件。该文件将成为流程的入口。

查看完整示例:basic-chat

与流程聊天#

支持在CLI中与流程聊天:

pf flow test --flow flow_entry:MyFlow --inputs inputs.json --init init.json --ui

查看这里获取更多信息。

批量运行#

用户也可以批量运行一个流程。

pf run create --flow "path.to.module:ClassName" --data "./data.jsonl"
# user can also directly use entry in `flow` param for batch run
pf.run(flow="path.to.module:ClassName", init="./init.jsonl", data="./data.jsonl")

或者直接运行导入的流程类或流程实例。

from promptflow.core import AzureOpenAIModelConfiguration


class MyFlow:
    pass

config = AzureOpenAIModelConfiguration(
  azure_deployment="my_deployment",
  # connection and api_key configs are exclusive
  connection="my_aoai_connection",
  api_key="actual_key",
)
pf.run(flow=MyFlow, init={"model_config": config, "flow_config": {}}, data="./data.jsonl")
# or
flow_obj = MyFlow(model_config=config, flow_config={})
pf.run(flow=flow_obj, data="./data.jsonl")

了解更多关于此主题的信息,请访问运行和评估流程

定义一个流程 yaml#

用户可以手动编写一个名为flow.flex.yaml的YAML文件,或者将一个函数/可调用条目保存到YAML文件中。 这对于高级场景(如部署或在云中运行)是必需的。 一个流程YAML可能如下所示:

$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
entry: path.to.module:ClassName

使用YAML进行批量运行#

用户可以批量运行一个流程。流程初始化函数的参数由init参数支持。

用户需要编写一个JSON文件作为init的值,因为在命令行中编写模型配置很困难。

{
    "model_config": {
        "azure_endpoint": "my_endpoint",
        "azure_deployment": "my_deployment",
        "api_key": "actual_api_key"
    },
    "flow_config": {}
}
pf run create --flow "./flow.flex.yaml" --data "./data.jsonl" --init init.json
pf = PFClient()

config = AzureOpenAIModelConfiguration(
  azure_deployment="my_deployment",
  api_key="actual_key"
)
# if init's value is not json serializable, raise user error
pf.run(flow="./flow.flex.yaml", init={"model_config": config, "flow_config": {}}, data="./data.jsonl")

# when submit to cloud, user can only use connection
# in runtime executor will resolve connection in AzureOpenAIModelConfiguration and set connection's fields to ModelConfig: equal to original ModelConfiguration.from_connection()
config = AzureOpenAIModelConfiguration(
  azure_deployment="my_embedding_deployment",
  connection="my-aoai-connection",
) 
pfazure.run(flow="./flow.flex.yaml", init={"model_config": config, "flow_config": {}}, data="./data.jsonl")

部署流程#

用户可以提供一个流程。流程初始化函数的参数由init参数支持。 流程应在YAML中具有完整的初始化/输入/输出规范,以确保可以生成服务swagger。

用户需要编写一个JSON文件作为init的值,因为在命令行中编写模型配置很困难。

{
    "model_config": {
        "azure_endpoint": "my_endpoint",
        "azure_deployment": "my_deployment",
        "api_key": "actual_api_key"
    },
    "flow_config": {}
}
# user can only pass model config by file 
pf flow serve --source "./"  --port 8088 --host localhost --init path/to/init.json

了解更多:部署流程

聚合支持#

引入聚合支持以帮助用户计算指标。

class MyFlow:
    def __call__(text: str) -> str:
      """Flow execution logic goes here."""
      pass

    # will only execute once after batch run finished.
    # the processed_results will be list of __call__'s output and we will log the return value as metrics automatically.
    def __aggregate__(self, processed_results: List[str]) -> dict:
        for element in processed_results:
            # If __call__'s output is primitive type, element will be primitive type.
            # If __call__'s output is dataclass, element will be a dictionary, but can access it's attribute with `element.attribute_name`
            # For other cases, it's recommended to access by key `element["attribute_name"]`

注意:

聚合支持有几个限制:

  • 聚合函数只会在批量运行时执行。

  • 仅支持1个硬编码的 __aggregate__ 函数。

  • 在执行时,__aggregate__ 只会传递 1 个位置参数。

  • 聚合函数的输入将是流程运行的输出列表。

    • processed_results中传递的每个元素在__aggregate__函数中传递的与每行的__call__返回的对象不同。

    • 重构的元素是一个支持一层属性访问的字典。但建议通过键访问它们。请参阅上面的示例以了解用法。

  • 如果聚合函数接受超过1个参数,在提交阶段会引发错误。

下一步#