2024–2025 年 AI Agent 从实验室走向生产,但很多团队很快发现:把所有任务塞给一个 LLM Agent,系统会在规模化时崩溃。本文面向 AI 工程师与架构师,系统讲解多 Agent 协作的六大编排模式(含完整代码)、LangGraph/CrewAI/AutoGen 选型矩阵、MCP+A2A 双协议层、PostgresSaver 生产工程与 MAST 可观测性;内含 Google Bake-Off 6 倍提速数据、AdaptOrch 12-23% 拓扑增益、故障分布统计、五步 Runbook 与选型决策树。
引言:为什么单个 Agent 不够用了
2024 年至 2025 年,AI Agent 的概念从实验室走向了生产。但很多团队很快发现:把所有任务塞给一个 LLM Agent,系统会在规模化时崩溃。问题不在模型本身,而在架构——单 Agent 在上下文窗口、专业能力、并发执行与容错方面存在结构性瓶颈。多 Agent 协作架构正是为了解决上述问题而生。
根据 MLflow 2026 年的报告,Google 内部的 Agent Bake-Off 实验显示,采用分布式多 Agent 架构后,处理时间从 1 小时降至 10 分钟,提升幅度超过 6 倍。而 AdaptOrch(2026 年学术论文)进一步证明:在多 Agent 系统中,编排拓扑的选择对系统性能的影响比底层模型的选择更大,在 SWE-bench 等基准测试中,正确的拓扑选择可以带来 12-23% 的性能提升。
核心痛点:单 Agent 的四个结构性瓶颈
- 上下文窗口瓶颈。 复杂任务的中间结果会把上下文塞满,导致后续推理质量骤降;检索、分析、生成、审核全部挤在同一窗口内,有效推理空间被严重压缩。
- 专业能力稀释。 一个 Agent 既要做信息检索、又要写代码、又要做决策审核,样样都做但样样不精;无法为每个子任务选用最优模型或专用工具链。
- 串行执行低效。 所有子任务顺序执行,总耗时是每步耗时之和,无法并发;独立子任务(如多源研究、多维度风险评估)白白浪费等待时间。
- 单点故障风险(SPOF)。 一旦这个 Agent 出问题——模型超时、工具调用失败或幻觉级联——整个流程全部停摆,缺乏局部降级与重试隔离能力。
一、多 Agent 协作系统核心概念
1.1 基本定义
多 Agent 协作系统(Multi-Agent System,MAS)是指由多个独立的 AI Agent 组成的系统,这些 Agent 通过明确的通信协议和编排机制协作完成单个 Agent 无法高效完成的复杂任务。
每个 Agent 通常具备以下特征:
| 特征 | 描述 |
| 角色专一 | 只负责一个明确定义的子任务(检索、推理、生成、验证等) |
| 工具访问 | 拥有完成自身任务所需的特定工具集 |
| 状态隔离 | 维护自己的上下文和内存,不污染其他 Agent |
| 可替换性 | 可以独立升级、替换,不影响整体系统 |
1.2 三种控制拓扑
集中式(Centralized) 分散式(Decentralized) 层级式(Hierarchical)
[Orchestrator] A ←→ B ←→ C [Top Orchestrator]
/ | \ ↕ ↕ / \
[A] [B] [C] D ←→ E ←→ F [Team-1 Lead] [Team-2 Lead]
/ \ / \
优点: 可审计、可控 优点: 高弹性、低延迟 [a] [b] [c] [d]
缺点: 单点瓶颈 缺点: 调试难、非确定性
优点: 两者平衡
二、六大编排设计模式详解
这六种模式覆盖了生产中 95% 以上的多 Agent 系统场景。
2.1 模式一:顺序流水线(Sequential Pipeline)
核心思路:Agent A 的输出直接作为 Agent B 的输入,严格线性执行。
[用户输入] → [信息检索Agent] → [分析Agent] → [撰写Agent] → [审核Agent] → [输出]
适用场景:步骤间有严格依赖关系;流程固定、不需要动态路由;典型案例包括文章创作流水线、代码审查流程。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class PipelineState(TypedDict):
query: str
retrieved_docs: str
analysis: str
final_report: str
def retrieval_agent(state: PipelineState):
docs = search_knowledge_base(state["query"])
return {"retrieved_docs": docs}
def analysis_agent(state: PipelineState):
result = llm.invoke(f"分析以下内容:{state['retrieved_docs']}")
return {"analysis": result.content}
def writer_agent(state: PipelineState):
report = llm.invoke(f"根据分析撰写报告:{state['analysis']}")
return {"final_report": report.content}
builder = StateGraph(PipelineState)
builder.add_node("retriever", retrieval_agent)
builder.add_node("analyzer", analysis_agent)
builder.add_node("writer", writer_agent)
builder.add_edge(START, "retriever")
builder.add_edge("retriever", "analyzer")
builder.add_edge("analyzer", "writer")
builder.add_edge("writer", END)
pipeline = builder.compile()
| 优点 | 缺点 |
| 实现简单,易于调试 | 总耗时 = 各步耗时之和 |
| 行为可预测 | 单步失败整体阻塞 |
| 适合合规审计 | 无法处理动态分支需求 |
2.2 模式二:并行扇出/扇入(Parallel Fan-out / Fan-in)
核心思路:多个 Agent 同时处理独立的子任务,最后由汇聚节点合并结果。总耗时 = max(T1, T2, ..., Tn) 而非 T1 + T2 + ... + Tn。
┌──→ [研究Agent-A] ──┐
[Supervisor] ──────├──→ [研究Agent-B] ──┼──→ [Synthesizer] → [输出]
└──→ [研究Agent-C] ──┘
from langgraph.types import Send
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator
class ResearchState(TypedDict):
query: str
research_results: Annotated[list, operator.add]
final_synthesis: str
def supervisor(state: ResearchState):
subtasks = [
{"query": state["query"], "source": "academic"},
{"query": state["query"], "source": "industry"},
{"query": state["query"], "source": "news"},
]
return [Send("research_worker", task) for task in subtasks]
def research_worker(state: dict):
result = search_by_source(state["query"], state["source"])
return {"research_results": [result]}
def synthesizer(state: ResearchState):
combined = "\n".join(state["research_results"])
synthesis = llm.invoke(f"综合以下研究结果:{combined}")
return {"final_synthesis": synthesis.content}
builder = StateGraph(ResearchState)
builder.add_node("research_worker", research_worker)
builder.add_node("synthesizer", synthesizer)
builder.add_conditional_edges(START, supervisor, ["research_worker"])
builder.add_edge("research_worker", "synthesizer")
builder.add_edge("synthesizer", END)
graph = builder.compile()
关键技术点:LangGraph 的 Send API 返回 Send 对象列表,子图会真正并发执行。配合 Annotated[list, operator.add] Reducer,并行分支结果自动聚合,无需手写锁或同步逻辑。
2.3 模式三:层级主管-工人(Hierarchical Supervisor-Worker)
核心思路:主管 Agent 负责意图识别、任务拆解和路由决策,将子任务分配给专业 Worker Agent,最后汇总结果。
[用户请求]
↓
[Supervisor Agent] ← 任务规划 + 路由决策
/ | \
[代码Agent] [搜索Agent] [数据Agent]
\ | /
[Synthesizer Agent]
↓
[最终输出]
双层路由优化(关键字快速通道 + LLM 精确路由):
KEYWORD_ROUTING = {
"代码": "code_agent",
"code": "code_agent",
"搜索": "search_agent",
"查询": "search_agent",
"数据": "data_agent",
}
def supervisor_with_fast_path(state):
query = state["query"].lower()
# 第一层:关键字快速通道(无需 LLM 调用,响应 <1ms)
for keyword, agent_name in KEYWORD_ROUTING.items():
if keyword in query:
return {"next": agent_name}
# 第二层:LLM 精确路由(处理复杂/模糊意图)
routing_prompt = f"""
用户请求:{state['query']}
可用Agent:code_agent, search_agent, data_agent
请返回最合适的Agent名称,只返回名称,不含其他内容。
"""
decision = llm.invoke(routing_prompt)
return {"next": decision.content.strip()}
2.4 模式四:群体协作(Swarm / Network)
核心思路:Agent 之间点对点直接传递任务,没有中央协调者,依靠终止规则(轮数、共识、超时)停止协作。适合代码审查、方案评估等多轮协商场景;非确定性高,生产环境慎用。
import autogen
reviewer_1 = autogen.AssistantAgent(
name="SecurityReviewer",
system_message="你是一位安全专家,专注于代码中的安全漏洞。"
)
reviewer_2 = autogen.AssistantAgent(
name="PerformanceReviewer",
system_message="你是一位性能专家,专注于代码的效率和资源使用。"
)
human_proxy = autogen.UserProxyAgent(
name="CodeAuthor",
human_input_mode="NEVER",
max_consecutive_auto_reply=2,
is_termination_msg=lambda x: "APPROVED" in x.get("content", "")
)
groupchat = autogen.GroupChat(
agents=[human_proxy, reviewer_1, reviewer_2],
messages=[],
max_round=6 # 硬性终止防止无限循环
)
manager = autogen.GroupChatManager(groupchat=groupchat)
2.5 模式五:黑板架构(Blackboard)
核心思路:所有 Agent 共享一个结构化工作空间(黑板),Agent 在满足自身前提条件时主动读写黑板,无需显式调度。适合长时间异步任务(小时级甚至天级)与异构服务协作。
┌─────────────────────────────┐
│ 黑板(共享状态) │
│ task_status: "research_done" │
│ research_data: {...} │
│ analysis_result: null │
└─────┬──────────────────┬──────┘
↑ 写入 ↓ 读取(条件满足时)
[研究Agent] [分析Agent]
(完成后写入) (检测到research_done后执行)
2.6 模式六:混合模式(Hybrid)
核心思路:在同一系统中组合使用多种模式,通常是「主管模式 + 流水线」的组合,以平衡控制性与效率。
[用户请求]
↓
[Intent Agent](路由器)
├──→ [简单查询] → 直接回答(无需多Agent)
└──→ [复杂报告生成]
↓
[Supervisor](层级主管)
/ \
[并行研究扇出] [质量保障流水线]
↙ ↓ ↘ ↓
[A] [B] [C] [审核] → [人工审核] → [发布]
↘ ↓ ↙
[Synthesizer]
三、LangGraph vs CrewAI vs AutoGen 对比
| 维度 | LangGraph | CrewAI | AutoGen(微软) |
| 架构范式 | 状态机图 | 角色制团队 | 对话式多Agent |
| 编程语言 | Python / JS/TS | Python | Python / .NET |
| 学习曲线 | 较陡 | 平缓 | 中等 |
| 状态管理 | 原生支持 | 需自实现 | 有限支持 |
| Human-in-the-Loop | 原生支持 | 需自实现 | 支持 |
| 可观测性 | LangSmith(商业) | 有限 | Azure Monitor |
| 生产就绪度 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
| 快速原型 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| Azure 集成 | ⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ |
| 适合场景 | 复杂有状态工作流 | 角色制内容流水线 | 对话式协作 |
选型建议
- 选 LangGraph:需要生产级可靠性(合规、金融、医疗)、复杂状态管理与持久化、Human-in-the-Loop 精细控制、条件分支和循环的精确表达。
- 选 CrewAI:快速验证 Idea(1-2 天出原型)、团队成员可用「角色」直觉理解 Agent、内容生成与研究报告等角色制场景。
- 选 AutoGen:处于微软/Azure 技术栈、需要 Agent 之间多轮辩论和迭代推理、做研究或快速实验不同对话模式。
四、MCP + A2A 双协议层
2026 年,多 Agent 系统的通信协议已标准化为两层互补架构,两者均已纳入 Linux Foundation Agentic AI Foundation 管理。
┌──────────────────────────────────────────────────────┐
│ 多Agent系统 │
│ Agent-1 ←──── A2A 协议 ────→ Agent-2 │
│ │ │ │
│ MCP协议 MCP协议 │
│ ↓ ↓ │
│ [工具/数据库/API] [工具/数据库/API] │
└──────────────────────────────────────────────────────┘
MCP(垂直):Agent ↔ 工具/外部系统
A2A(水平):Agent ↔ Agent
4.1 MCP(Model Context Protocol)
由 Anthropic 主导、Linux Foundation 管理的工具接入标准协议,统一 Agent 访问外部工具、数据库、API 的接口。
from mcp.server import Server
from mcp.types import Tool, TextContent
app = Server("data-agent-mcp")
@app.list_tools()
async def list_tools():
return [
Tool(
name="query_customer_db",
description="查询客户数据库,支持按ID、姓名、邮箱检索",
inputSchema={
"type": "object",
"properties": {
"field": {"type": "string", "enum": ["id", "name", "email"]},
"value": {"type": "string"}
},
"required": ["field", "value"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "query_customer_db":
result = db.query(arguments["field"], arguments["value"])
return [TextContent(type="text", text=str(result))]
4.2 A2A(Agent-to-Agent Protocol)
由 Google 发起,2025 年 4 月开源,2026 年初发布 v1.0,已有 Atlassian、Salesforce、SAP 等 50+ 合作伙伴。标准化 Agent 之间的任务委托、能力发现、状态同步。
// /.well-known/agent.json
{
"name": "ResearchAgent",
"version": "1.0",
"description": "专业信息检索与摘要Agent",
"url": "https://research-agent.internal/a2a",
"capabilities": { "streaming": true, "async": true },
"skills": [
{
"id": "web_research",
"name": "网络信息检索",
"description": "从互联网检索并摘要最新信息",
"tags": ["research", "summarization", "web"]
}
]
}
import httpx
async def discover_and_delegate(agent_url: str, task: str):
card_response = await httpx.get(f"{agent_url}/.well-known/agent.json")
agent_card = card_response.json()
available_skills = [s["id"] for s in agent_card["skills"]]
if "web_research" not in available_skills:
raise ValueError(f"Agent {agent_card['name']} 不支持 web_research 技能")
payload = {
"jsonrpc": "2.0",
"method": "message/send",
"id": "task-001",
"params": {
"message": {
"role": "user",
"parts": [{"type": "text", "text": task}]
}
}
}
response = await httpx.post(agent_card["url"], json=payload)
return response.json()
五、生产级工程实践
5.1 状态持久化与断点续传(PostgresSaver)
from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string("postgresql://user:pass@localhost/agentdb") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "user-session-12345"}}
# 即使进程重启,也可以从上次状态恢复
result = graph.invoke({"query": "分析Q2财报"}, config)
5.2 Human-in-the-Loop(interrupt HITL)
from langgraph.types import interrupt
def high_risk_action_agent(state):
proposed_action = plan_action(state)
human_decision = interrupt({
"proposed_action": proposed_action,
"risk_level": "HIGH",
"message": "此操作将修改生产数据库,请确认是否执行"
})
if human_decision["approved"]:
return execute_action(proposed_action)
else:
return {"status": "cancelled", "reason": human_decision.get("reason")}
5.3 熔断器与重试机制(CircuitBreaker)
import time
from functools import wraps
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = "CLOSED"
self.last_failure_time = None
def __call__(self, func):
@wraps(func)
async def wrapper(*args, **kwargs):
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise Exception("Circuit breaker OPEN - Agent 暂时不可用")
try:
result = await func(*args, **kwargs)
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failure_count = 0
return result
except Exception:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
raise
return wrapper
@CircuitBreaker(failure_threshold=3, recovery_timeout=30)
async def call_external_agent(task):
return await agent_client.send(task)
5.4 Token 预算控制(TokenBudgetManager)
class TokenBudgetManager:
def __init__(self, total_budget: int = 100_000):
self.total_budget = total_budget
self.used_tokens = 0
self.agent_usage = {}
def check_budget(self, agent_name: str, estimated_tokens: int) -> bool:
remaining = self.total_budget - self.used_tokens
if estimated_tokens > remaining:
raise BudgetExceededException(
f"Agent {agent_name} 请求 {estimated_tokens} tokens,"
f"但剩余预算仅 {remaining} tokens"
)
return True
def record_usage(self, agent_name: str, actual_tokens: int):
self.used_tokens += actual_tokens
self.agent_usage[agent_name] = self.agent_usage.get(agent_name, 0) + actual_tokens
六、可观测性:让黑盒变透明
根据 MAST 研究团队对 1642 个执行追踪的分析,多 Agent 系统的故障分布如下:
| 故障类型 | 占比 | 说明 |
| 系统设计问题 | 41.77% | 步骤重复、工具选择错误、上下文溢出、缺少终止条件 |
| Agent间不对齐 | 36.94% | 交接时上下文丢失、一个Agent的幻觉成为下一个的「事实」 |
| 任务验证失败 | 21.30% | 过早终止、不完整验证 |
更令人担忧的是:57% 的组织已有 Agent 在生产环境运行,但仅 8% 完成了 LLM 可观测性的实施。大量错误以 HTTP 200 返回,监控面板显示绿色,但系统实际上输出的是错误结果。
6.1 OpenTelemetry 分布式追踪
from opentelemetry import trace
import uuid
tracer = trace.get_tracer("multi-agent-system")
def traced_agent_call(agent_name: str, task: dict, correlation_id: str = None):
if not correlation_id:
correlation_id = str(uuid.uuid4())
with tracer.start_as_current_span(f"agent.{agent_name}") as span:
span.set_attribute("agent.name", agent_name)
span.set_attribute("correlation.id", correlation_id)
span.set_attribute("task.type", task.get("type", "unknown"))
try:
result = agent_registry[agent_name].run(task)
span.set_attribute("agent.tokens_used", result.get("tokens", 0))
span.set_attribute("agent.status", "success")
return result
except Exception as e:
span.set_attribute("agent.status", "error")
span.set_attribute("error.message", str(e))
raise
6.2 关键监控指标
MONITORING_METRICS = {
"task_success_rate": "端到端任务完成率(目标:>85%)",
"e2e_latency_p95": "P95端到端延迟(目标:<30s)",
"total_cost_per_task": "每次任务平均Token成本",
"agent_error_rate": "各Agent错误率(目标:<5%)",
"agent_retry_count": "重试次数(高重试 = 需要调查)",
"tool_call_budget_usage": "工具调用次数/预算比",
"output_quality_score": "输出质量评分",
"goal_alignment_score": "目标一致性评分",
"hallucination_rate": "幻觉检测率",
}
6.3 LLM-as-a-Judge 自动评估
def evaluate_agent_output(original_task: str, agent_output: str) -> dict:
evaluation_prompt = f"""
你是一位严格的质量评审专家。请评估以下AI Agent的输出质量。
原始任务:{original_task}
Agent输出:{agent_output}
请从以下维度评分(1-5分):
1. 任务完成度 2. 准确性 3. 相关性 4. 是否存在幻觉
请以JSON格式返回:
{{"completeness": x, "accuracy": x, "relevance": x,
"hallucination_detected": true/false, "comments": "..."}}
"""
evaluation = llm.invoke(evaluation_prompt)
return json.loads(evaluation.content)
七、常见踩坑与防坑指南
❌ 陷阱一:上下文污染(Context Pollution)
现象:Agent A 产生幻觉,错误结果被传给 Agent B、C,整个系统输出基于错误前提,而所有 HTTP 状态码都是 200。
def validate_agent_output(output: dict, schema: dict) -> bool:
jsonschema.validate(output, schema)
if output.get("confidence_score", 1.0) < 0.7:
raise LowConfidenceError(f"Agent 输出置信度过低: {output['confidence_score']}")
required_fields = schema.get("required", [])
missing = [f for f in required_fields if not output.get(f)]
if missing:
raise MissingFieldsError(f"输出缺少必填字段: {missing}")
return True
❌ 陷阱二:无限循环与代价失控
现象:Agent 进入重试循环或反复调用工具,Token 费用在几分钟内暴涨至预期的百倍。
MAX_ITERATIONS = 10
MAX_TOOL_CALLS_PER_AGENT = 20
MAX_TOTAL_TOKENS = 50_000
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["high_cost_tool"]
)
❌ 陷阱三:过度工程化
现象:为了使用多 Agent 而使用多 Agent,把简单的两步 LLM 链拆成 8 个 Agent,调试难度指数级上升。
先从顺序流水线开始。只有在有具体证据(并发需求、超过上下文限制、专业能力需要独立升级)时,才增加 Agent 数量。生产系统的最佳 Agent 数量通常是 3-8 个。
❌ 陷阱四:Demo 到生产的鸿沟(ProductionGuardrails)
现象:内部 Demo 效果很好,上线后面对真实用户的边缘输入就频繁失败。
class ProductionGuardrails:
def __init__(self):
self.input_validators = []
self.output_validators = []
def validate_input(self, user_input: str) -> str:
if len(user_input) > 10000:
raise InputTooLongError("输入超过10000字符限制")
injection_patterns = ["ignore previous instructions", "forget everything"]
for pattern in injection_patterns:
if pattern.lower() in user_input.lower():
raise PromptInjectionError("检测到潜在的提示注入攻击")
return user_input.strip()
def validate_output(self, output: str) -> str:
output = self.pii_filter.redact(output)
if self.content_classifier.is_harmful(output):
raise HarmfulContentError("输出包含有害内容")
return output
八、选型决策树
你的任务是否有明确的线性依赖步骤?
├─ 是 → 子任务是否可以并发执行?
│ ├─ 否 → 【顺序流水线】
│ └─ 是 → 【并行扇出 + 顺序流水线 混合】
│
└─ 否 → 是否有一个 Agent 具有决策权威?
├─ 是 → 规模是否足够大需要子团队?
│ ├─ 否 → 【Supervisor-Worker 层级模式】
│ └─ 是 → 【层级式(Supervisors of Supervisors)】
│
└─ 否 → 任务是否是长时间异步的?
├─ 是 → 【黑板架构】
└─ 否 → Agent 数量是否 ≤ 5?
├─ 是 → 【Swarm(注意设置终止条件)】
└─ 否 → 【考虑重新拆分为层级模式】
九、总结与 2026 趋势展望
核心要点回顾
- 编排拓扑 > 模型选择:AdaptOrch 研究证明,在多 Agent 系统中,如何组织 Agent 的协作方式比选择什么底层模型影响更大。
- 从简单开始:先用顺序流水线验证核心价值,有具体需求时再引入并发和层级结构。
- MCP + A2A 是未来标准:这两个协议已成为行业共识,值得在新项目中直接采用。
- 可观测性不是可选项:57% 的组织有 Agent 在生产运行,但仅 8% 完成了可观测性实施——这个差距正是事故发生的温床。
- 生产 Agent 数量 3-8 个最佳:超过这个数量,协调开销往往超过收益,应考虑层级化。
2026 年值得关注的趋势
- 联邦编排(Federated Orchestration):多团队维护各自的子编排器,共享学习到的路由策略
- 多模态多 Agent:视觉、音频 Agent 与文本 Agent 的混合协作正在成熟
- 自适应拓扑选择:系统根据任务特征动态选择最优编排模式(AdaptOrch 方向)
- EU AI Act 合规:欧盟法规要求完整的决策审计链,Agent 系统的可审计性成为强制要求
五步 Runbook:多 Agent 系统生产落地
步骤 1 — 评估任务拓扑
用选型决策树判断任务是否有线性依赖、可否并发、是否需要决策权威或长时间异步;选定顺序流水线、并行扇出、层级主管、黑板或混合模式。
步骤 2 — 选定框架并搭建骨架
按可靠性需求在 LangGraph(生产级状态机)、CrewAI(快速角色制原型)、AutoGen(对话式协作)中选型;实现 Supervisor 路由与 Worker 节点,关键字快速通道优先。
步骤 3 — 接入 MCP 与 A2A 协议层
用 MCP Server 暴露工具能力(数据库、API、文件系统);用 A2A Agent Card 实现跨 Agent 任务委托与能力发现,Orchestrator 通过 JSON-RPC 2.0 发送任务。
步骤 4 — 加固生产工程
配置 PostgresSaver 断点续传、interrupt() HITL 人工审核、CircuitBreaker 熔断与 TokenBudgetManager 预算控制;设置 MAX_ITERATIONS 与 MAX_TOOL_CALLS 硬性上限。
步骤 5 — 部署可观测性与护栏
接入 OpenTelemetry 分布式追踪(correlation_id 贯穿调用链)、核心监控指标(task_success_rate、e2e_latency_p95、hallucination_rate)与 LLM-as-Judge 质量评估;启用 ProductionGuardrails 输入输出校验。
可引用技术要点(2026)
- 性能增益: Google Agent Bake-Off 显示分布式多 Agent 架构将处理时间从 1 小时降至 10 分钟(6 倍提速);AdaptOrch 证明正确编排拓扑可带来 12-23% 基准测试提升,影响大于底层模型选择。
- 故障分布(MAST,1642 条追踪): 系统设计问题 41.77%、Agent 间不对齐 36.94%、任务验证失败 21.30%;57% 组织已有生产 Agent,仅 8% 完成 LLM 可观测性实施。
- 协议标准: MCP(垂直,Agent↔工具)与 A2A(水平,Agent↔Agent)均已纳入 Linux Foundation AAIF;A2A v1.0 已有 Atlassian、Salesforce、SAP 等 50+ 合作伙伴。
- 生产参数: 推荐 Agent 数量 3-8 个;Token 预算默认 100,000/任务;熔断阈值 failure_threshold=3-5;P95 端到端延迟目标 <30s,任务成功率目标 >85%。
结语与配套资源
多 Agent 协作不是「把一个大模型拆成多个小模型」这么简单——编排拓扑的选择往往比模型选择更重要。从顺序流水线起步,按需引入并行扇出与层级主管;用 MCP+A2A 构建标准化通信层;用 PostgresSaver、熔断器与 Token 预算守住生产底线;用 OpenTelemetry 与 LLM-as-Judge 让黑盒变透明。
配套资源:
在普通 Linux VPS 或 Docker 容器中跑多 Agent 编排可以完成验证,但缺乏原生 macOS 环境、Apple 工具链与 launchd 进程守护,Cursor/Claude Desktop 的 STDIO 子进程在笔记本合盖即断;Docker 增加抽象层与排障复杂度,多 Agent 长时运行任务的 PostgresSaver 与 OpenTelemetry Collector 在资源受限 VPS 上容易成为瓶颈。若你需要多 Agent 编排器、MCP Server 与 IDE Agent 长期同机 7×24 常驻、原生 macOS 与 M4 算力支撑并发 Worker,租赁 VPSMAC 的 Mac 云节点通常是更省心、更适合 AI 自动化生产环境的选择——编排拓扑写一次,模型与 Worker 随意换,节点始终在线。