Crew 详解 #
Crew 概述 #
在 CrewAI 框架中,Crew 代表一组协作的 Agent,共同完成一系列任务。每个 Crew 定义了:
- 任务执行策略
- Agent 协作方式
- 整体工作流程
Crew 属性 #
核心属性 #
| 属性 | 类型 | 描述 |
|---|---|---|
| agents | List[Agent] | 团队中的 Agent 列表 |
| tasks | List[Task] | 需要完成的任务列表 |
| process | Process | 执行流程(sequential/hierarchical) |
执行属性 #
| 属性 | 类型 | 默认值 | 描述 |
|---|---|---|---|
| verbose | bool/str | False | 日志详细程度 |
| memory | bool | False | 是否启用记忆功能 |
| cache | bool | True | 是否启用缓存 |
| max_rpm | Optional[int] | None | 每分钟最大请求数 |
管理属性 #
| 属性 | 类型 | 描述 |
|---|---|---|
| manager_llm | Union[str, LLM] | 管理者使用的 LLM(层级模式必需) |
| manager_agent | Agent | 自定义管理者 Agent |
| function_calling_llm | Any | 工具调用的 LLM |
高级属性 #
| 属性 | 类型 | 描述 |
|---|---|---|
| step_callback | Callable | 每个 Agent 步骤后的回调 |
| task_callback | Callable | 每个任务完成后的回调 |
| output_log_file | str/bool | 日志输出文件 |
| share_crew | bool | 是否共享执行数据 |
| planning | bool | 是否启用规划功能 |
| knowledge_sources | List | 知识源列表 |
| stream | bool | 是否启用流式输出 |
创建 Crew #
方式一:直接代码定义 #
python
from crewai import Agent, Task, Crew, Process
researcher = Agent(
role="研究分析师",
goal="研究 AI 趋势",
backstory="经验丰富的分析师"
)
writer = Agent(
role="技术作家",
goal="撰写技术文章",
backstory="专业的技术作家"
)
research_task = Task(
description="研究 AI 趋势",
expected_output="研究报告",
agent=researcher
)
write_task = Task(
description="撰写文章",
expected_output="博客文章",
agent=writer
)
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential,
verbose=True
)
方式二:使用装饰器(推荐) #
python
from crewai import Agent, Crew, Task, Process
from crewai.project import CrewBase, agent, task, crew
from typing import List
@CrewBase
class ContentCrew:
"""内容创作团队"""
agents: List[Agent]
tasks: List[Task]
agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
verbose=True
)
@agent
def writer(self) -> Agent:
return Agent(
config=self.agents_config['writer'],
verbose=True
)
@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task']
)
@task
def write_task(self) -> Task:
return Task(
config=self.tasks_config['write_task']
)
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
verbose=True
)
执行流程 #
Sequential(顺序执行) #
text
┌─────────────────────────────────────────────────────────────┐
│ Sequential 流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Task 1 ──> Task 2 ──> Task 3 ──> Task 4 │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ Agent 1 Agent 2 Agent 3 Agent 4 │
│ │
│ 特点: │
│ - 任务按顺序执行 │
│ - 前一个任务的输出可作为后一个任务的输入 │
│ - 简单直观,易于理解 │
│ │
└─────────────────────────────────────────────────────────────┘
python
from crewai import Crew, Process
crew = Crew(
agents=[researcher, analyst, writer, editor],
tasks=[research_task, analysis_task, write_task, edit_task],
process=Process.sequential,
verbose=True
)
result = crew.kickoff()
Hierarchical(层级管理) #
text
┌─────────────────────────────────────────────────────────────┐
│ Hierarchical 流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Manager Agent │
│ (协调和分配) │
│ │ │
│ ┌──────────────┼──────────────┐ │
│ ▼ ▼ ▼ │
│ Agent 1 Agent 2 Agent 3 │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ ▼ │
│ 结果汇总 │
│ │
│ 特点: │
│ - Manager Agent 协调所有任务 │
│ - 动态分配任务给合适的 Agent │
│ - 验证和汇总结果 │
│ │
└─────────────────────────────────────────────────────────────┘
python
from crewai import Crew, Process
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, write_task],
process=Process.hierarchical,
manager_llm="gpt-4o",
verbose=True
)
result = crew.kickoff()
自定义 Manager Agent #
python
from crewai import Agent, Crew, Process
manager = Agent(
role="项目经理",
goal="协调团队高效完成任务",
backstory="经验丰富的项目经理,擅长资源分配和进度管理",
allow_delegation=True
)
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, write_task],
process=Process.hierarchical,
manager_agent=manager,
verbose=True
)
Crew 执行 #
kickoff 方法 #
python
# 基本执行
result = crew.kickoff()
# 带输入参数
result = crew.kickoff(inputs={'topic': 'AI'})
# 异步执行
result = await crew.kickoff_async()
# 原生异步
result = await crew.akickoff(inputs={'topic': 'AI'})
kickoff_for_each #
python
# 批量执行
inputs_array = [
{'topic': 'AI in Healthcare'},
{'topic': 'AI in Finance'},
{'topic': 'AI in Education'}
]
results = crew.kickoff_for_each(inputs=inputs_array)
for result in results:
print(result.raw)
# 异步批量执行
async_results = await crew.akickoff_for_each(inputs=inputs_array)
流式输出 #
python
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
streaming = crew.kickoff(inputs={'topic': 'AI'})
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
Crew 输出 #
输出属性 #
python
result = crew.kickoff()
# 原始输出
print(result.raw)
# JSON 输出
if result.json_dict:
print(result.json_dict)
# Pydantic 输出
if result.pydantic:
print(result.pydantic)
# 任务输出列表
for task_output in result.tasks_output:
print(f"Task: {task_output.description}")
print(f"Output: {task_output.raw}")
# Token 使用情况
print(result.token_usage)
输出方法 #
python
# JSON 字符串
json_str = result.json
# 字典格式
output_dict = result.to_dict()
# 字符串表示
str_output = str(result)
Crew 记忆 #
启用记忆 #
python
crew = Crew(
agents=[agent1, agent2],
tasks=[task1, task2],
memory=True,
verbose=True
)
记忆类型 #
text
┌─────────────────────────────────────────────────────────────┐
│ Crew 记忆类型 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 短期记忆(Short-term Memory): │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ - 当前会话的上下文 │ │
│ │ - 最近交互的信息 │ │
│ │ - 临时存储 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 长期记忆(Long-term Memory): │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ - 持久化的知识 │ │
│ │ - 跨会话的信息 │ │
│ │ - 向量数据库存储 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 实体记忆(Entity Memory): │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ - 特定实体的信息 │ │
│ │ - 关系和属性 │ │
│ │ - 结构化存储 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
配置 Embedder #
python
crew = Crew(
agents=[agent],
tasks=[task],
memory=True,
embedder={
"provider": "openai",
"config": {
"model": "text-embedding-3-small"
}
}
)
Crew 规划 #
启用规划 #
python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
planning=True,
planning_llm="gpt-4o",
verbose=True
)
规划流程 #
text
┌─────────────────────────────────────────────────────────────┐
│ Crew 规划流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 分析任务 │
│ └─> AgentPlanner 分析所有任务 │
│ │
│ 2. 生成计划 │
│ └─> 为每个任务创建执行计划 │
│ │
│ 3. 添加计划 │
│ └─> 计划添加到任务描述中 │
│ │
│ 4. 执行任务 │
│ └─> Agent 按计划执行 │
│ │
└─────────────────────────────────────────────────────────────┘
Crew 回调 #
步骤回调 #
python
def step_callback(step_output):
print(f"Agent: {step_output.agent_role}")
print(f"Action: {step_output.action}")
print(f"Output: {step_output.output}")
crew = Crew(
agents=[agent],
tasks=[task],
step_callback=step_callback
)
任务回调 #
python
def task_callback(task_output):
print(f"Task completed: {task_output.description}")
save_to_database(task_output.raw)
crew = Crew(
agents=[agent],
tasks=[task],
task_callback=task_callback
)
生命周期钩子 #
python
from crewai.project import CrewBase, before_kickoff, after_kickoff
@CrewBase
class MyCrew:
@before_kickoff
def prepare_inputs(self, inputs):
inputs['timestamp'] = datetime.now().isoformat()
return inputs
@after_kickoff
def process_output(self, output):
output.raw = post_process(output.raw)
return output
@crew
def crew(self) -> Crew:
return Crew(agents=self.agents, tasks=self.tasks)
Crew 日志 #
保存日志 #
python
# 保存为文本文件
crew = Crew(
agents=[agent],
tasks=[task],
output_log_file=True
)
# 保存为指定文件
crew = Crew(
agents=[agent],
tasks=[task],
output_log_file="logs/execution.log"
)
# 保存为 JSON 格式
crew = Crew(
agents=[agent],
tasks=[task],
output_log_file="logs/execution.json"
)
Crew 使用指标 #
python
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
# 访问使用指标
print(crew.usage_metrics)
# 输出示例
{
'total_tokens': 15000,
'prompt_tokens': 10000,
'completion_tokens': 5000,
'successful_requests': 10
}
Crew 最佳实践 #
1. 团队结构设计 #
python
# 好的设计:清晰的职责分工
content_crew = Crew(
agents=[
researcher, # 研究
analyst, # 分析
writer, # 写作
editor # 审核
],
tasks=[
research_task,
analysis_task,
write_task,
edit_task
],
process=Process.sequential
)
2. 流程选择 #
python
# 简单线性任务:使用 Sequential
simple_crew = Crew(
agents=[agent1, agent2],
tasks=[task1, task2],
process=Process.sequential
)
# 复杂协作任务:使用 Hierarchical
complex_crew = Crew(
agents=[agent1, agent2, agent3],
tasks=[task1, task2, task3],
process=Process.hierarchical,
manager_llm="gpt-4o"
)
3. 性能优化 #
python
# 启用缓存
crew = Crew(
agents=[agent],
tasks=[task],
cache=True
)
# 限制请求速率
crew = Crew(
agents=[agent],
tasks=[task],
max_rpm=30
)
# 异步执行独立任务
crew = Crew(
agents=[agent1, agent2],
tasks=[async_task1, async_task2, summary_task]
)
4. 错误处理 #
python
try:
result = crew.kickoff()
except Exception as e:
print(f"执行失败: {e}")
log_error(e)
常见问题 #
Manager LLM 未设置 #
python
# 错误:层级模式需要 Manager LLM
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.hierarchical
)
# 正确:设置 Manager LLM
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.hierarchical,
manager_llm="gpt-4o"
)
任务执行顺序 #
python
# 确保任务顺序正确
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential
)
内存问题 #
python
# 对于大型任务,启用上下文管理
crew = Crew(
agents=[agent],
tasks=[task],
memory=True
)
下一步 #
现在你已经深入了解了 Crew,接下来学习 工具详解,掌握如何扩展 Agent 的能力!
最后更新:2026-04-04