介绍

CrewAI Flows 是一个强大的功能,旨在简化和优化AI工作流的创建和管理。Flows 允许开发者高效地组合和协调编码任务和Crews,为构建复杂的AI自动化提供了一个强大的框架。

Flows 允许您创建结构化、事件驱动的工作流。它们提供了一种无缝的方式来连接多个任务、管理状态并控制 AI 应用程序中的执行流程。通过 Flows,您可以轻松设计和实施多步骤流程,充分利用 CrewAI 的功能。

  1. 简化工作流创建: 轻松将多个Crews和任务链接在一起,以创建复杂的AI工作流。

  2. 状态管理: Flows 使得在工作流中的不同任务之间管理和共享状态变得非常容易。

  3. 事件驱动架构: 基于事件驱动模型构建,支持动态和响应式的工作流程。

  4. 灵活的控制流: 在工作流中实现条件逻辑、循环和分支。

入门指南

让我们创建一个简单的流程,在这个流程中,您将在一个任务中使用OpenAI生成一个随机城市,然后在另一个任务中使用该城市生成一个有趣的事实。

Code

from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion


class ExampleFlow(Flow):
    model = "gpt-4o-mini"

    @start()
    def generate_city(self):
        print("Starting flow")

        response = completion(
            model=self.model,
            messages=[
                {
                    "role": "user",
                    "content": "Return the name of a random city in the world.",
                },
            ],
        )

        random_city = response["choices"][0]["message"]["content"]
        print(f"Random City: {random_city}")

        return random_city

    @listen(generate_city)
    def generate_fun_fact(self, random_city):
        response = completion(
            model=self.model,
            messages=[
                {
                    "role": "user",
                    "content": f"Tell me a fun fact about {random_city}",
                },
            ],
        )

        fun_fact = response["choices"][0]["message"]["content"]
        return fun_fact



flow = ExampleFlow()
result = flow.kickoff()

print(f"Generated fun fact: {result}")

在上面的例子中,我们创建了一个简单的Flow,它使用OpenAI生成一个随机城市,然后生成关于该城市的一个有趣的事实。该Flow由两个任务组成:generate_citygenerate_fun_factgenerate_city 任务是Flow的起点,而 generate_fun_fact 任务监听 generate_city 任务的输出。

当你运行流程时,它将生成一个随机城市,然后生成关于该城市的一个有趣的事实。输出将被打印到控制台。

注意: 确保你已经设置了你的.env文件来存储你的OPENAI_API_KEY。这个密钥对于向OpenAI API进行身份验证请求是必要的。

@start()

@start() 装饰器用于将一个方法标记为流程的起点。当流程启动时,所有用 @start() 装饰的方法都会并行执行。你可以在一个流程中有多个启动方法,它们都会在流程启动时执行。

@listen()

@listen() 装饰器用于将一个方法标记为Flow中另一个任务输出的监听器。使用@listen()装饰的方法将在指定任务发出输出时执行。该方法可以访问它正在监听的任务的输出作为参数。

用法

@listen() 装饰器可以通过多种方式使用:

  1. 通过名称监听方法:你可以将要监听的方法名称作为字符串传递。当该方法完成时,监听器方法将被触发。

    代码
    @listen("generate_city")
    def generate_fun_fact(self, random_city):
        # 实现
    
  2. 直接监听方法:你可以传递方法本身。当该方法完成时,监听方法将被触发。

    代码
    @listen(generate_city)
    def generate_fun_fact(self, random_city):
        # 实现
    

流程输出

访问和处理Flow的输出对于将您的AI工作流集成到更大的应用程序或系统中至关重要。CrewAI Flows提供了直接的机制来检索最终输出、访问中间结果并管理您的Flow的整体状态。

获取最终输出

当你运行一个流程时,最终输出由最后完成的方法决定。kickoff() 方法返回这个最终方法的输出。

以下是访问最终输出的方法:

在这个例子中,second_method 是最后完成的方法,因此它的输出将是流程的最终输出。 kickoff() 方法将返回最终输出,然后将其打印到控制台。

访问和更新状态

除了检索最终输出外,您还可以访问和更新Flow中的状态。状态可用于在Flow中的不同方法之间存储和共享数据。Flow运行后,您可以访问状态以检索在执行期间添加或更新的任何信息。

以下是如何更新和访问状态的示例:

在这个例子中,状态由first_methodsecond_method更新。 在流程运行后,您可以访问最终状态以查看这些方法所做的更新。

通过确保返回最终方法的输出并提供对状态的访问,CrewAI Flows 使得将 AI 工作流的结果集成到更大的应用程序或系统中变得容易,同时在整个流程的执行过程中维护和访问状态。

流程状态管理

有效地管理状态对于构建可靠且可维护的AI工作流程至关重要。CrewAI Flows 提供了强大的机制,用于非结构化和结构化的状态管理,使开发人员能够选择最适合其应用程序需求的方法。

非结构化状态管理

在非结构化状态管理中,所有状态都存储在Flow类的state属性中。 这种方法提供了灵活性,使开发人员能够动态添加或修改状态属性,而无需定义严格的结构。

Code
from crewai.flow.flow import Flow, listen, start

class UntructuredExampleFlow(Flow):

    @start()
    def first_method(self):
        self.state.message = "Hello from structured flow"
        self.state.counter = 0

    @listen(first_method)
    def second_method(self):
        self.state.counter += 1
        self.state.message += " - updated"

    @listen(second_method)
    def third_method(self):
        self.state.counter += 1
        self.state.message += " - updated again"

        print(f"State after third_method: {self.state}")


flow = UntructuredExampleFlow()
flow.kickoff()

关键点:

  • 灵活性: 您可以动态地向 self.state 添加属性,而无需预定义的约束。
  • 简单性: 适用于状态结构简单或变化显著的工作流程。

结构化状态管理

结构化状态管理利用预定义的模式来确保工作流中的一致性和类型安全。 通过使用像Pydantic的BaseModel这样的模型,开发者可以定义状态的确切结构,从而在开发环境中实现更好的验证和自动补全。

Code
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel


class ExampleState(BaseModel):
    counter: int = 0
    message: str = ""


class StructuredExampleFlow(Flow[ExampleState]):

    @start()
    def first_method(self):
        self.state.message = "Hello from structured flow"

    @listen(first_method)
    def second_method(self):
        self.state.counter += 1
        self.state.message += " - updated"

    @listen(second_method)
    def third_method(self):
        self.state.counter += 1
        self.state.message += " - updated again"

        print(f"State after third_method: {self.state}")


flow = StructuredExampleFlow()
flow.kickoff()

关键点:

  • 定义的模式: ExampleState 清晰地概述了状态结构,增强了代码的可读性和可维护性。
  • 类型安全:利用Pydantic确保状态属性遵循指定的类型,减少运行时错误。
  • 自动完成: IDE 可以根据定义的状态模型提供更好的自动完成和错误检查。

在非结构化和结构化状态管理之间做出选择

  • 在以下情况下使用非结构化状态管理:

    • 工作流的状态简单或高度动态。
    • 灵活性优先于严格的状态定义。
    • 需要快速原型设计,而不需要定义模式的开销。
  • 在以下情况下使用结构化状态管理:

    • 工作流需要一个定义良好且一致的状态结构。
    • 类型安全和验证对应用程序的可靠性很重要。
    • 您希望利用IDE功能,如自动完成和类型检查,以提升开发者的体验。

通过提供非结构化和结构化的状态管理选项,CrewAI Flows 使开发者能够构建既灵活又强大的AI工作流,满足广泛的应用需求。

流量控制

条件逻辑: or

Flows 中的 or_ 函数允许您监听多个方法,并在任何指定方法发出输出时触发监听方法。

当你运行这个流程时,logger 方法将会被 start_methodsecond_method 的输出触发。 or_ 函数用于监听多个方法,并在任何指定的方法发出输出时触发监听方法。

条件逻辑: and

Flows中的and_函数允许您监听多个方法,并仅在所有指定方法都发出输出时触发监听方法。

当你运行这个流程时,logger 方法只有在 start_methodsecond_method 都发出输出时才会被触发。 and_ 函数用于监听多个方法,并且只有在所有指定的方法都发出输出时才会触发监听方法。

路由器

@router() 装饰器在 Flows 中允许你根据方法的输出定义条件路由逻辑。 你可以根据方法的输出指定不同的路由,从而动态控制执行流程。

在上述示例中,start_method 生成一个随机布尔值并将其设置在状态中。 second_method 使用 @router() 装饰器根据布尔值定义条件路由逻辑。 如果布尔值为 True,则该方法返回 "success",如果为 False,则返回 "failed"third_methodfourth_method 监听 second_method 的输出并根据返回值执行。

当你运行这个流程时,输出将根据start_method生成的随机布尔值而变化。

向流程中添加团队

在CrewAI中创建具有多个团队的流程非常简单。

你可以通过运行以下命令生成一个新的CrewAI项目,该项目包括创建具有多个crews的流程所需的所有脚手架:

crewai create flow name_of_flow

此命令将生成一个新的CrewAI项目,包含必要的文件夹结构。生成的项目包括一个名为poem_crew的预构建crew,该crew已经可以工作。您可以通过复制、粘贴和编辑此crew来创建其他crew,并将其用作模板。

文件夹结构

运行crewai create flow name_of_flow命令后,您将看到一个类似于以下的文件夹结构:

目录/文件描述
name_of_flow/流程的根目录。
├── crews/包含特定团队的目录。
│ └── poem_crew/“poem_crew”的目录,包含其配置和脚本。
│ ├── config/“poem_crew”的配置文件目录。
│ │ ├── agents.yaml定义“poem_crew”代理的YAML文件。
│ │ └── tasks.yaml定义“poem_crew”任务的YAML文件。
│ ├── poem_crew.py“poem_crew”功能的脚本。
├── tools/用于流程中使用的附加工具的目录。
│ └── custom_tool.py自定义工具实现。
├── main.py运行流程的主脚本。
├── README.md项目描述和说明。
├── pyproject.toml项目依赖和设置的配置文件。
└── .gitignore指定在版本控制中忽略的文件和目录。

组建您的团队

crews文件夹中,您可以定义多个crews。每个crew将拥有自己的文件夹,其中包含配置文件和crew定义文件。例如,poem_crew文件夹包含:

  • config/agents.yaml: 定义团队的代理。
  • config/tasks.yaml: 定义团队的任务。
  • poem_crew.py: 包含crew的定义,包括agents、tasks和crew本身。

你可以复制、粘贴并编辑poem_crew来创建其他团队。

main.py 中连接团队

main.py 文件是您创建流程并将各个部分连接在一起的地方。您可以使用 Flow 类以及装饰器 @start@listen 来定义您的流程,并指定执行的流程。

以下是如何在main.py文件中连接poem_crew的示例:

Code
#!/usr/bin/env python
from random import randint

from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
from .crews.poem_crew.poem_crew import PoemCrew

class PoemState(BaseModel):
    sentence_count: int = 1
    poem: str = ""

class PoemFlow(Flow[PoemState]):

    @start()
    def generate_sentence_count(self):
        print("Generating sentence count")
        self.state.sentence_count = randint(1, 5)

    @listen(generate_sentence_count)
    def generate_poem(self):
        print("Generating poem")
        result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count})

        print("Poem generated", result.raw)
        self.state.poem = result.raw

    @listen(generate_poem)
    def save_poem(self):
        print("Saving poem")
        with open("poem.txt", "w") as f:
            f.write(self.state.poem)

def kickoff():
    poem_flow = PoemFlow()
    poem_flow.kickoff()


def plot():
    poem_flow = PoemFlow()
    poem_flow.plot()

if __name__ == "__main__":
    kickoff()

在这个例子中,PoemFlow 类定义了一个流程,该流程生成句子计数,使用 PoemCrew 生成一首诗,然后将诗保存到文件中。通过调用 kickoff() 方法来启动流程。

运行流程

(可选)在运行流程之前,您可以通过运行以下命令来安装依赖项:

crewai install

一旦所有依赖项都安装完毕,你需要通过运行以下命令来激活虚拟环境:

source .venv/bin/activate

激活虚拟环境后,您可以通过执行以下命令之一来运行流程:

crewai flow kickoff

uv run kickoff

流程将执行,您应该在控制台中看到输出。

绘制流程图

可视化您的AI工作流程可以提供有关流程结构和执行路径的宝贵见解。CrewAI提供了一个强大的可视化工具,允许您生成流程的交互式图表,使您更容易理解和优化您的AI工作流程。

什么是图表?

CrewAI中的图表是您AI工作流程的图形表示。它们展示了各种任务、它们之间的连接以及数据流。这种可视化有助于理解操作顺序、识别瓶颈,并确保工作流程逻辑符合您的期望。

如何生成图表

CrewAI 提供了两种便捷的方法来生成你的流程图:

选项1:使用plot()方法

如果你直接使用一个流程实例,你可以通过在流程对象上调用plot()方法来生成一个图表。此方法将创建一个包含流程交互图表的HTML文件。

Code
# Assuming you have a flow instance
flow.plot("my_flow_plot")

这将在当前目录中生成一个名为 my_flow_plot.html 的文件。您可以在网页浏览器中打开此文件以查看交互式图表。

选项2:使用命令行

如果您在一个结构化的CrewAI项目中工作,您可以使用命令行生成一个图表。这对于较大的项目特别有用,您可以在其中可视化整个流程设置。

crewai flow plot

此命令将生成一个包含您流程图的HTML文件,类似于plot()方法。该文件将保存在您的项目目录中,您可以在网页浏览器中打开它以探索流程。

理解图表

生成的图表将显示代表流程中任务的节点,有向边表示执行的流程。该图表是交互式的,允许您放大和缩小,并将鼠标悬停在节点上以查看其他详细信息。

通过可视化您的流程,您可以更清晰地理解工作流的结构,从而更容易调试、优化并向他人传达您的AI流程。

结论

绘制你的流程是CrewAI的一个强大功能,它增强了你设计和管理复杂AI工作流的能力。无论你选择使用plot()方法还是命令行,生成图表都将为你提供工作流的可视化表示,有助于开发和演示。

下一步

如果您有兴趣探索更多的流程示例,我们在示例库中有多种推荐。以下是四个具体的流程示例,每个示例都展示了独特的用例,以帮助您将当前的问题类型与特定示例匹配:

  1. 电子邮件自动回复流程:此示例演示了一个无限循环,其中后台作业持续运行以自动回复电子邮件。对于需要重复执行而无需人工干预的任务来说,这是一个很好的用例。查看示例

  2. Lead Score Flow: 此流程展示了如何通过添加人工反馈和使用路由器处理不同的条件分支。这是一个很好的例子,展示了如何将动态决策和人工监督纳入您的工作流程中。查看示例

  3. 写书流程:这个示例在将多个团队串联在一起方面表现出色,其中一个团队的输出被另一个团队使用。具体来说,一个团队负责整本书的大纲,另一个团队根据大纲生成章节。最终,所有内容都被连接起来,生成一本完整的书。这个流程非常适合需要不同任务之间协调的复杂、多步骤过程。查看示例

  4. 会议助手流程: 此流程展示了如何广播一个事件以触发多个后续操作。例如,会议结束后,流程可以更新Trello看板、发送Slack消息并保存结果。这是一个处理单一事件多个结果的绝佳示例,非常适合全面的任务管理和通知系统。查看示例

通过探索这些示例,您可以深入了解如何利用CrewAI Flows应对各种用例,从自动化重复任务到管理具有动态决策和人类反馈的复杂多步骤流程。

此外,请查看我们关于如何在CrewAI中使用流程的YouTube视频!

这个页面有帮助吗?