任务概述

在CrewAI框架中,Task是由Agent完成的特定任务。

任务提供了执行所需的所有必要细节,例如描述、负责的代理、所需的工具等,从而促进了各种复杂性的操作。

CrewAI中的任务可以是协作的,需要多个代理共同工作。这是通过任务属性进行管理,并由Crew的流程进行协调,从而提高团队合作和效率。

任务执行流程

任务可以通过两种方式执行:

  • Sequential: 任务按照它们定义的顺序执行
  • 层次化: 任务根据代理的角色和专业知识进行分配

执行流程在创建团队时定义:

Code
crew = Crew(
    agents=[agent1, agent2],
    tasks=[task1, task2],
    process=Process.sequential  # or Process.hierarchical
)

任务属性

属性参数类型描述
描述descriptionstr对任务内容的清晰、简洁的陈述。
预期输出expected_outputstr任务完成时的详细描述。
名称 (可选)nameOptional[str]任务的名称标识符。
代理 (可选)agentOptional[BaseAgent]负责执行任务的代理。
工具 (可选)toolsList[BaseTool]代理在此任务中被限制使用的工具/资源。
上下文 (可选)contextOptional[List["Task"]]其他任务的输出将用作此任务的上下文。
异步执行 (可选)async_executionOptional[bool]任务是否应异步执行。默认为 False。
配置 (可选)configOptional[Dict[str, Any]]任务特定的配置参数。
输出文件 (可选)output_fileOptional[str]用于存储任务输出的文件路径。
输出 JSON (可选)output_jsonOptional[Type[BaseModel]]用于结构化 JSON 输出的 Pydantic 模型。
输出 Pydantic (可选)output_pydanticOptional[Type[BaseModel]]用于任务输出的 Pydantic 模型。
回调 (可选)callbackOptional[Any]任务完成后要执行的函数/对象。

创建任务

在CrewAI中有两种创建任务的方式:使用YAML配置(推荐)直接在代码中定义

使用YAML配置提供了一种更清晰、更易于维护的方式来定义任务。我们强烈建议在您的CrewAI项目中使用这种方法来定义任务。

在按照安装部分创建了您的CrewAI项目后,导航到src/latest_ai_development/config/tasks.yaml文件并修改模板以匹配您的特定任务需求。

在运行crew时,您的YAML文件中的变量(如{topic})将被输入值替换:

Code
crew.kickoff(inputs={'topic': 'AI Agents'})

以下是如何使用YAML配置任务的示例:

tasks.yaml
research_task:
  description: >
    Conduct a thorough research about {topic}
    Make sure you find any interesting and relevant information given
    the current year is 2024.
  expected_output: >
    A list with 10 bullet points of the most relevant information about {topic}
  agent: researcher

reporting_task:
  description: >
    Review the context you got and expand each topic into a full section for a report.
    Make sure the report is detailed and contains any and all relevant information.
  expected_output: >
    A fully fledge reports with the mains topics, each with a full section of information.
    Formatted as markdown without '```'
  agent: reporting_analyst
  output_file: report.md

要在你的代码中使用这个YAML配置,创建一个继承自CrewBase的crew类:

crew.py
# src/latest_ai_development/crew.py

from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai_tools import SerperDevTool

@CrewBase
class LatestAiDevelopmentCrew():
  """LatestAiDevelopment crew"""

  @agent
  def researcher(self) -> Agent:
    return Agent(
      config=self.agents_config['researcher'],
      verbose=True,
      tools=[SerperDevTool()]
    )

  @agent
  def reporting_analyst(self) -> Agent:
    return Agent(
      config=self.agents_config['reporting_analyst'],
      verbose=True
    )

  @task
  def research_task(self) -> Task:
    return Task(
      config=self.tasks_config['research_task']
    )

  @task
  def reporting_task(self) -> Task:
    return Task(
      config=self.tasks_config['reporting_task']
    )

  @crew
  def crew(self) -> Crew:
    return Crew(
      agents=[
        self.researcher(),
        self.reporting_analyst()
      ],
      tasks=[
        self.research_task(),
        self.reporting_task()
      ],
      process=Process.sequential
    )

您在YAML文件中使用的名称(agents.yamltasks.yaml)应与Python代码中的方法名称匹配。

直接代码定义(替代方案)

或者,您可以直接在代码中定义任务,而无需使用YAML配置:

task.py
from crewai import Task

research_task = Task(
    description="""
        Conduct a thorough research about AI Agents.
        Make sure you find any interesting and relevant information given
        the current year is 2024.
    """,
    expected_output="""
        A list with 10 bullet points of the most relevant information about AI Agents
    """,
    agent=researcher
)

reporting_task = Task(
    description="""
        Review the context you got and expand each topic into a full section for a report.
        Make sure the report is detailed and contains any and all relevant information.
    """,
    expected_output="""
        A fully fledge reports with the mains topics, each with a full section of information.
        Formatted as markdown without '```'
    """,
    agent=reporting_analyst,
    output_file="report.md"
)

直接指定一个agent进行分配,或者让hierarchical CrewAI的过程根据角色、可用性等来决定。

任务输出

理解任务输出对于构建有效的AI工作流程至关重要。CrewAI通过TaskOutput类提供了一种结构化的方式来处理任务结果,该类支持多种输出格式,并且可以轻松在任务之间传递。

CrewAI框架中任务的输出被封装在TaskOutput类中。该类提供了一种结构化的方式来访问任务的结果,包括各种格式,如原始输出、JSON和Pydantic模型。

默认情况下,TaskOutput 只会包含 raw 输出。只有当原始的 Task 对象分别配置了 output_pydanticoutput_json 时,TaskOutput 才会包含 pydanticjson_dict 输出。

任务输出属性

属性参数类型描述
描述descriptionstr任务的描述。
摘要summaryOptional[str]任务的摘要,自动从描述的前10个单词生成。
原始rawstr任务的原始输出。这是输出的默认格式。
PydanticpydanticOptional[BaseModel]一个Pydantic模型对象,表示任务的结构化输出。
JSON 字典json_dictOptional[Dict[str, Any]]表示任务输出的 JSON 字典。
代理agentstr执行任务的代理。
输出格式output_formatOutputFormat任务输出的格式,选项包括RAW、JSON和Pydantic。默认为RAW。

任务方法和属性

方法/属性描述
json如果输出格式为JSON,则返回任务输出的JSON字符串表示。
to_dict将JSON和Pydantic输出转换为字典。
str返回任务输出的字符串表示,优先使用Pydantic,然后是JSON,最后是原始数据。

访问任务输出

一旦任务被执行,其输出可以通过Task对象的output属性访问。TaskOutput类提供了多种方式来交互和展示这个输出。

示例

Code
# Example task
task = Task(
    description='Find and summarize the latest AI news',
    expected_output='A bullet list summary of the top 5 most important AI news',
    agent=research_agent,
    tools=[search_tool]
)

# Execute the crew
crew = Crew(
    agents=[research_agent],
    tasks=[task],
    verbose=True
)

result = crew.kickoff()

# Accessing the task output
task_output = task.output

print(f"Task Description: {task_output.description}")
print(f"Task Summary: {task_output.summary}")
print(f"Raw Output: {task_output.raw}")
if task_output.json_dict:
    print(f"JSON Output: {json.dumps(task_output.json_dict, indent=2)}")
if task_output.pydantic:
    print(f"Pydantic Output: {task_output.pydantic}")

任务依赖关系和上下文

任务可以依赖于其他任务的输出,使用context属性。例如:

Code
research_task = Task(
    description="Research the latest developments in AI",
    expected_output="A list of recent AI developments",
    agent=researcher
)

analysis_task = Task(
    description="Analyze the research findings and identify key trends",
    expected_output="Analysis report of AI trends",
    agent=analyst,
    context=[research_task]  # This task will wait for research_task to complete
)

任务护栏

任务护栏提供了一种在任务输出传递给下一个任务之前进行验证和转换的方法。此功能有助于确保数据质量,并在代理的输出不符合特定标准时提供反馈。

使用任务防护栏

要为任务添加防护栏,请通过guardrail参数提供一个验证函数:

Code
from typing import Tuple, Union, Dict, Any

def validate_blog_content(result: str) -> Tuple[bool, Union[Dict[str, Any], str]]:
    """Validate blog content meets requirements."""
    try:
        # Check word count
        word_count = len(result.split())
        if word_count > 200:
            return (False, {
                "error": "Blog content exceeds 200 words",
                "code": "WORD_COUNT_ERROR",
                "context": {"word_count": word_count}
            })

        # Additional validation logic here
        return (True, result.strip())
    except Exception as e:
        return (False, {
            "error": "Unexpected error during validation",
            "code": "SYSTEM_ERROR"
        })

blog_task = Task(
    description="Write a blog post about AI",
    expected_output="A blog post under 200 words",
    agent=blog_agent,
    guardrail=validate_blog_content  # Add the guardrail function
)

护栏功能要求

  1. 函数签名:

    • 必须接受一个参数(任务输出)
    • 应返回一个(bool, Any)的元组
    • 推荐使用类型提示,但可选
  2. 返回值:

    • 成功: 返回 (True, validated_result)
    • 失败: 返回 (False, error_details)

错误处理最佳实践

  1. 结构化错误响应:
Code
def validate_with_context(result: str) -> Tuple[bool, Union[Dict[str, Any], str]]:
    try:
        # Main validation logic
        validated_data = perform_validation(result)
        return (True, validated_data)
    except ValidationError as e:
        return (False, {
            "error": str(e),
            "code": "VALIDATION_ERROR",
            "context": {"input": result}
        })
    except Exception as e:
        return (False, {
            "error": "Unexpected error",
            "code": "SYSTEM_ERROR"
        })
  1. 错误类别:

    • 使用特定的错误代码
    • 包含相关的上下文
    • 提供可操作的反馈
  2. 验证链:

Code
from typing import Any, Dict, List, Tuple, Union

def complex_validation(result: str) -> Tuple[bool, Union[str, Dict[str, Any]]]:
    """Chain multiple validation steps."""
    # Step 1: Basic validation
    if not result:
        return (False, {"error": "Empty result", "code": "EMPTY_INPUT"})

    # Step 2: Content validation
    try:
        validated = validate_content(result)
        if not validated:
            return (False, {"error": "Invalid content", "code": "CONTENT_ERROR"})

        # Step 3: Format validation
        formatted = format_output(validated)
        return (True, formatted)
    except Exception as e:
        return (False, {
            "error": str(e),
            "code": "VALIDATION_ERROR",
            "context": {"step": "content_validation"}
        })

处理护栏结果

当防护栏返回 (False, error) 时:

  1. 错误被发送回代理
  2. 代理尝试解决问题
  3. 该过程重复直到:
    • 防护栏返回 (True, result)
    • 达到最大重试次数

带有重试处理的示例:

Code
from typing import Optional, Tuple, Union

def validate_json_output(result: str) -> Tuple[bool, Union[Dict[str, Any], str]]:
    """Validate and parse JSON output."""
    try:
        # Try to parse as JSON
        data = json.loads(result)
        return (True, data)
    except json.JSONDecodeError as e:
        return (False, {
            "error": "Invalid JSON format",
            "code": "JSON_ERROR",
            "context": {"line": e.lineno, "column": e.colno}
        })

task = Task(
    description="Generate a JSON report",
    expected_output="A valid JSON object",
    agent=analyst,
    guardrail=validate_json_output,
    max_retries=3  # Limit retry attempts
)

从任务中获取结构化一致的输出

同样重要的是要注意,crew的最终任务的输出成为实际crew本身的最终输出。

使用 output_pydantic

output_pydantic 属性允许您定义一个 Pydantic 模型,任务输出应遵循该模型。这确保了输出不仅结构化,而且根据 Pydantic 模型进行验证。

这里有一个示例,展示了如何使用 output_pydantic:

Code
import json

from crewai import Agent, Crew, Process, Task
from pydantic import BaseModel


class Blog(BaseModel):
    title: str
    content: str


blog_agent = Agent(
    role="Blog Content Generator Agent",
    goal="Generate a blog title and content",
    backstory="""You are an expert content creator, skilled in crafting engaging and informative blog posts.""",
    verbose=False,
    allow_delegation=False,
    llm="gpt-4o",
)

task1 = Task(
    description="""Create a blog title and content on a given topic. Make sure the content is under 200 words.""",
    expected_output="A compelling blog title and well-written content.",
    agent=blog_agent,
    output_pydantic=Blog,
)

# Instantiate your crew with a sequential process
crew = Crew(
    agents=[blog_agent],
    tasks=[task1],
    verbose=True,
    process=Process.sequential,
)

result = crew.kickoff()

# Option 1: Accessing Properties Using Dictionary-Style Indexing
print("Accessing Properties - Option 1")
title = result["title"]
content = result["content"]
print("Title:", title)
print("Content:", content)

# Option 2: Accessing Properties Directly from the Pydantic Model
print("Accessing Properties - Option 2")
title = result.pydantic.title
content = result.pydantic.content
print("Title:", title)
print("Content:", content)

# Option 3: Accessing Properties Using the to_dict() Method
print("Accessing Properties - Option 3")
output_dict = result.to_dict()
title = output_dict["title"]
content = output_dict["content"]
print("Title:", title)
print("Content:", content)

# Option 4: Printing the Entire Blog Object
print("Accessing Properties - Option 5")
print("Blog:", result)

在这个例子中:

  • 定义了一个Pydantic模型Blog,包含title和content字段。
  • 任务 task1 使用 output_pydantic 属性来指定其输出应符合 Blog 模型。
  • 执行完crew后,您可以以多种方式访问结构化输出,如下所示。

访问输出的解释

  1. 字典式索引:您可以直接使用 result["field_name"] 访问字段。这是因为 CrewOutput 类实现了 getitem 方法。
  2. 直接从Pydantic模型访问:直接从result.pydantic对象访问属性。
  3. 使用 to_dict() 方法:将输出转换为字典并访问字段。
  4. 打印整个对象:只需打印结果对象即可查看结构化输出。

使用 output_json

output_json 属性允许您以 JSON 格式定义预期的输出。这确保了任务的输出是一个有效的 JSON 结构,可以在您的应用程序中轻松解析和使用。

这里有一个示例,展示了如何使用 output_json

Code
import json

from crewai import Agent, Crew, Process, Task
from pydantic import BaseModel


# Define the Pydantic model for the blog
class Blog(BaseModel):
    title: str
    content: str


# Define the agent
blog_agent = Agent(
    role="Blog Content Generator Agent",
    goal="Generate a blog title and content",
    backstory="""You are an expert content creator, skilled in crafting engaging and informative blog posts.""",
    verbose=False,
    allow_delegation=False,
    llm="gpt-4o",
)

# Define the task with output_json set to the Blog model
task1 = Task(
    description="""Create a blog title and content on a given topic. Make sure the content is under 200 words.""",
    expected_output="A JSON object with 'title' and 'content' fields.",
    agent=blog_agent,
    output_json=Blog,
)

# Instantiate the crew with a sequential process
crew = Crew(
    agents=[blog_agent],
    tasks=[task1],
    verbose=True,
    process=Process.sequential,
)

# Kickoff the crew to execute the task
result = crew.kickoff()

# Option 1: Accessing Properties Using Dictionary-Style Indexing
print("Accessing Properties - Option 1")
title = result["title"]
content = result["content"]
print("Title:", title)
print("Content:", content)

# Option 2: Printing the Entire Blog Object
print("Accessing Properties - Option 2")
print("Blog:", result)

在这个例子中:

  • 定义了一个Pydantic模型Blog,包含title和content字段,用于指定JSON输出的结构。
  • 任务 task1 使用 output_json 属性来指示它期望符合 Blog 模型的 JSON 输出。
  • 执行完crew后,您可以通过以下两种方式访问结构化的JSON输出。

访问输出的解释

  1. 使用字典式索引访问属性:您可以直接使用 result["field_name"] 访问字段。这是可能的,因为 CrewOutput 类实现了 getitem 方法,允许您像处理字典一样处理输出。在此选项中,我们从结果中检索标题和内容。
  2. 打印整个博客对象:通过打印结果,您将获得CrewOutput对象的字符串表示。由于str方法被实现为返回JSON输出,这将显示整个输出作为表示博客对象的格式化字符串。

通过使用output_pydantic或output_json,您可以确保任务以一致且结构化的格式生成输出,从而更容易在应用程序内或跨多个任务处理和利用数据。

将工具与任务集成

利用来自CrewAI ToolkitLangChain Tools的工具来增强任务性能和代理交互。

使用工具创建任务

Code
import os
os.environ["OPENAI_API_KEY"] = "Your Key"
os.environ["SERPER_API_KEY"] = "Your Key" # serper.dev API key

from crewai import Agent, Task, Crew
from crewai_tools import SerperDevTool

research_agent = Agent(
  role='Researcher',
  goal='Find and summarize the latest AI news',
  backstory="""You're a researcher at a large company.
  You're responsible for analyzing data and providing insights
  to the business.""",
  verbose=True
)

# to perform a semantic search for a specified query from a text's content across the internet
search_tool = SerperDevTool()

task = Task(
  description='Find and summarize the latest AI news',
  expected_output='A bullet list summary of the top 5 most important AI news',
  agent=research_agent,
  tools=[search_tool]
)

crew = Crew(
    agents=[research_agent],
    tasks=[task],
    verbose=True
)

result = crew.kickoff()
print(result)

这展示了如何使用特定工具的任务可以覆盖代理的默认设置,以实现定制化的任务执行。

引用其他任务

在CrewAI中,一个任务的输出会自动传递到下一个任务,但你可以特别定义哪些任务的输出(包括多个)应该用作另一个任务的上下文。

当您有一个任务依赖于另一个任务的输出,而该任务不会立即执行时,这非常有用。这是通过任务的context属性实现的:

Code
# ...

research_ai_task = Task(
    description="Research the latest developments in AI",
    expected_output="A list of recent AI developments",
    async_execution=True,
    agent=research_agent,
    tools=[search_tool]
)

research_ops_task = Task(
    description="Research the latest developments in AI Ops",
    expected_output="A list of recent AI Ops developments",
    async_execution=True,
    agent=research_agent,
    tools=[search_tool]
)

write_blog_task = Task(
    description="Write a full blog post about the importance of AI and its latest news",
    expected_output="Full blog post that is 4 paragraphs long",
    agent=writer_agent,
    context=[research_ai_task, research_ops_task]
)

#...

异步执行

您可以定义一个异步执行的任务。这意味着团队不会等待它完成后再继续下一个任务。这对于需要很长时间才能完成的任务,或者对于执行下一个任务不是至关重要的任务非常有用。

然后你可以使用context属性来定义一个未来的任务,该任务应等待异步任务的输出完成。

Code
#...

list_ideas = Task(
    description="List of 5 interesting ideas to explore for an article about AI.",
    expected_output="Bullet point list of 5 ideas for an article.",
    agent=researcher,
    async_execution=True # Will be executed asynchronously
)

list_important_history = Task(
    description="Research the history of AI and give me the 5 most important events.",
    expected_output="Bullet point list of 5 important events.",
    agent=researcher,
    async_execution=True # Will be executed asynchronously
)

write_article = Task(
    description="Write an article about AI, its history, and interesting ideas.",
    expected_output="A 4 paragraph article about AI.",
    agent=writer,
    context=[list_ideas, list_important_history] # Will wait for the output of the two tasks to be completed
)

#...

回调机制

回调函数在任务完成后执行,允许根据任务的结果触发操作或通知。

Code
# ...

def callback_function(output: TaskOutput):
    # Do something after the task is completed
    # Example: Send an email to the manager
    print(f"""
        Task completed!
        Task: {output.description}
        Output: {output.raw}
    """)

research_task = Task(
    description='Find and summarize the latest AI news',
    expected_output='A bullet list summary of the top 5 most important AI news',
    agent=research_agent,
    tools=[search_tool],
    callback=callback_function
)

#...

访问特定任务输出

一旦一个团队完成运行,你可以通过使用任务对象的output属性来访问特定任务的输出:

Code
# ...
task1 = Task(
    description='Find and summarize the latest AI news',
    expected_output='A bullet list summary of the top 5 most important AI news',
    agent=research_agent,
    tools=[search_tool]
)

#...

crew = Crew(
    agents=[research_agent],
    tasks=[task1, task2, task3],
    verbose=True
)

result = crew.kickoff()

# Returns a TaskOutput object with the description and results of the task
print(f"""
    Task completed!
    Task: {task1.output.description}
    Output: {task1.output.raw}
""")

工具覆盖机制

在任务中指定工具可以实现代理能力的动态适应,强调CrewAI的灵活性。

错误处理和验证机制

在创建和执行任务时,某些验证机制已到位,以确保任务属性的健壮性和可靠性。这些包括但不限于:

  • 确保每个任务只设置一个输出类型,以保持清晰的输出预期。
  • 防止手动分配id属性,以维护唯一标识符系统的完整性。

这些验证有助于在crewAI框架内保持任务执行的一致性和可靠性。

任务护栏

任务防护栏提供了一种强大的方式,在任务输出传递给下一个任务之前进行验证、转换或过滤。防护栏是在下一个任务开始之前执行的可选函数,允许您确保任务输出满足特定要求或格式。

基本用法

Code
from typing import Tuple, Union
from crewai import Task

def validate_json_output(result: str) -> Tuple[bool, Union[dict, str]]:
    """Validate that the output is valid JSON."""
    try:
        json_data = json.loads(result)
        return (True, json_data)
    except json.JSONDecodeError:
        return (False, "Output must be valid JSON")

task = Task(
    description="Generate JSON data",
    expected_output="Valid JSON object",
    guardrail=validate_json_output
)

护栏如何工作

  1. 可选属性: 护栏是任务级别的可选属性,允许您仅在需要的地方添加验证。
  2. 执行时机: 防护栏函数在下一个任务开始之前执行,确保任务之间的数据流有效。
  3. 返回格式: Guardrails 必须返回一个元组 (success, data):
    • 如果 successTruedata 是验证/转换后的结果
    • 如果 successFalsedata 是错误信息
  4. 结果路由:
    • 成功时(True),结果会自动传递给下一个任务
    • 失败时(False),错误会发送回代理以生成新的答案

常见用例

数据格式验证

Code
def validate_email_format(result: str) -> Tuple[bool, Union[str, str]]:
    """Ensure the output contains a valid email address."""
    import re
    email_pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
    if re.match(email_pattern, result.strip()):
        return (True, result.strip())
    return (False, "Output must be a valid email address")

内容过滤

Code
def filter_sensitive_info(result: str) -> Tuple[bool, Union[str, str]]:
    """Remove or validate sensitive information."""
    sensitive_patterns = ['SSN:', 'password:', 'secret:']
    for pattern in sensitive_patterns:
        if pattern.lower() in result.lower():
            return (False, f"Output contains sensitive information ({pattern})")
    return (True, result)

数据转换

Code
def normalize_phone_number(result: str) -> Tuple[bool, Union[str, str]]:
    """Ensure phone numbers are in a consistent format."""
    import re
    digits = re.sub(r'\D', '', result)
    if len(digits) == 10:
        formatted = f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
        return (True, formatted)
    return (False, "Output must be a 10-digit phone number")

高级功能

链式多重验证

Code
def chain_validations(*validators):
    """Chain multiple validators together."""
    def combined_validator(result):
        for validator in validators:
            success, data = validator(result)
            if not success:
                return (False, data)
            result = data
        return (True, result)
    return combined_validator

# Usage
task = Task(
    description="Get user contact info",
    expected_output="Email and phone",
    guardrail=chain_validations(
        validate_email_format,
        filter_sensitive_info
    )
)

自定义重试逻辑

Code
task = Task(
    description="Generate data",
    expected_output="Valid data",
    guardrail=validate_data,
    max_retries=5  # Override default retry limit
)

保存文件时创建目录

您现在可以指定任务在将其输出保存到文件时是否应创建目录。这对于组织输出和确保文件路径结构正确特别有用。

Code
# ...

save_output_task = Task(
    description='Save the summarized AI news to a file',
    expected_output='File saved successfully',
    agent=research_agent,
    tools=[file_save_tool],
    output_file='outputs/ai_news_summary.txt',
    create_directory=True
)

#...

结论

任务是CrewAI中代理行为的驱动力。 通过正确定义任务及其结果,您为AI代理设定了有效工作的舞台,无论是独立工作还是作为协作单元。 为任务配备适当的工具,理解执行过程,并遵循稳健的验证实践,对于最大化CrewAI的潜力至关重要, 确保代理为其任务做好充分准备,并且任务按预期执行。

这个页面有帮助吗?