我们来对 AI 搜索范式中的“执行官”—— Executor Agent 进行一次全面、详细的解析。
Executor Agent
Executor Agent 在 AI 搜索范式中扮演着“执行团队”或“实干家”的角色。它的核心任务是接收 Planner 制定的蓝图(DAG),并将其精准无误地转化为具体的、可量化的数据成果,为最终的答案生成提供坚实的基础。
职责概览
Executor 的核心职责可以概括为:调度任务、调用工具、评估结果。它并非一个简单的指令执行器,而是一个集成了并行调度、容错处理和质量控制的智能执行引擎。
# Executor Agent 的核心组件示意
import asyncio
from typing import Dict, Any, List
class ExecutorAgent:
"""
Executor智能体,负责将规划蓝图(DAG)转化为实际结果。
"""
def __init__(self, tools: Dict[str, Any]):
# 1. 任务调度器,负责高效并行执行DAG
self.scheduler = DAG_Scheduler(self.execute_single_task)
# 2. 健壮的工具调用器,内置容错和备用机制
self.tool_invoker = RobustToolInvoker(tools)
# 3. 结果充分性检查器,确保子任务的完成质量
self.sufficiency_checker = SufficiencyChecker()
async def execute_plan(self, dag: 'DAG') -> Dict[str, Any]:
"""
接收并执行整个 DAG 计划的主入口。
"""
print("--- [Executor] Plan received. Starting execution. ---")
# 调度器负责处理复杂的并行和依赖关系
final_results = await self.scheduler.run(dag)
print("--- [Executor] All tasks completed. Returning results. ---")
return final_results
async def execute_single_task(self, task: 'Task') -> Any:
"""
执行单个任务的内部逻辑,包含质量控制循环。
"""
# ... (详细逻辑见下文) ...
pass
职责1:任务调度与执行 (Task Scheduling & Execution)
原理与实践
Executor 的首要职责是高效地执行一个复杂的、带有依赖关系的任务图(DAG)。它的核心原理是最大化并行度:同时执行所有互不依赖的任务,以缩短总执行时间。
为实现这一点,Executor 采用事件驱动的异步调度模型。调度器会持续跟踪已完成的任务,一旦某个任务完成,它会立刻检查其所有下游任务的依赖是否已全部满足。如果满足,新的“就绪”任务会被立即加入到执行队列中,而不是等待整个层级的任务都完成后再开始下一层。
import asyncio
class DAG_Scheduler:
"""
一个基于异步 I/O 的 DAG 调度器,用于高效并行执行。
"""
def __init__(self, task_executor_func):
self.task_executor_func = task_executor_func
async def run(self, dag: 'DAG') -> Dict[str, Any]:
task_futures = {}
# 为 DA G中的每个任务创建一个 Future,用于跟踪其完成状态
for task_id in dag.tasks:
task_futures[task_id] = asyncio.Future()
# 调度所有没有依赖的起始任务
for task_id in dag.get_start_nodes():
self._schedule_task(task_id, dag, task_futures)
# 等待所有任务的 Future 完成
await asyncio.gather(*task_futures.values())
# 从 Future 中提取结果并返回
return {task_id: future.result() for task_id, future in task_futures.items()}
def _schedule_task(self, task_id: str, dag: 'DAG', futures: dict):
"""调度一个任务,当它准备好时。"""
task = dag.tasks[task_id]
async def task_wrapper():
# 等待所有前置依赖任务完成
dependencies = dag.get_dependencies(task_id)
dep_results_list = await asyncio.gather(*[futures[dep_id] for dep_id in dependencies])
# 准备参数 (将依赖结果映射到任务参数)
dep_results_map = {dep_id: res for dep_id, res in zip(dependencies, dep_results_list)}
task.prepare_args(dep_results_map)
# 执行任务
result = await self.task_executor_func(task)
# 设置当前任务的结果,这将解除下游任务的等待状态
futures[task_id].set_result(result)
asyncio.create_task(task_wrapper())
职责2:标准化与健壮的工具调用 (Standardized & Robust Tool Invocation)
原理与实践
Executor 需要与各种外部工具 API 交互,这些 API 的行为是不可预测的。因此,设计的难点在于如何处理工具的不可靠性。
解决方案是构建一个健壮的工具调用层,内置两种核心容错机制:
- 自动重试 (Retry):对于网络抖动、API 临时不可用等瞬时故障,采用指数退避策略自动重试。
-
备用方案 (Fallback):当主工具连续失败时,利用
Planner提供的工具包(Toolkit)信息,自动切换到备用工具。
from tenacity import retry, stop_after_attempt, wait_exponential
class RobustToolInvoker:
def __init__(self, tool_registry: Dict[str, Any]):
self.tool_registry = tool_registry
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
async def invoke(self, tool_name: str, args: List) -> Any:
"""
使用 tenacity 库实现带指数退避的自动重试。
"""
tool = self.tool_registry.get(tool_name)
if not tool:
raise Exception(f"Tool '{tool_name}' not found.")
print(f" - [Invoker] Calling tool '{tool_name}' with args: {args}")
return await tool.call(*args)
async def invoke_with_fallback(self, toolkit: 'Toolkit', args: List) -> Any:
"""
当主工具失败后,尝试备用工具。
"""
try:
return await self.invoke(toolkit.primary_tool_name, args)
except Exception as e:
print(f" - [Invoker] Primary tool '{toolkit.primary_tool_name}' failed. Trying fallbacks...")
for fallback_name in toolkit.fallback_tool_names:
try:
return await self.invoke(fallback_name, args)
except Exception as fallback_e:
print(f" - [Invoker] Fallback '{fallback_name}' also failed: {fallback_e}")
continue
# 所有工具都失败了,才抛出最终异常
raise Exception("All tools in the toolkit failed.")
职责3:结果评估与内部迭代 (Result Evaluation & Internal Iteration)
原理与实践
这是 Executor 智能性的关键。它并非盲目地调用一次工具就提交结果。设计的难点在于如何客观地判断一次工具调用的结果是否“充分”,足以支撑当前子任务的目标。
解决方案是引入一个结果充分性检查器(Sufficiency Checker),它通常由一个轻量、快速的 LLM 实现。这个检查器将模糊的“质量”问题,转化为一个具体的“是/否”问题。
class SufficiencyChecker:
def __init__(self):
self.checker_llm = LightweightLLM() # 一个小模型,用于快速判断
async def is_sufficient(self, subtask_description: str, collected_results: List[str]) -> bool:
prompt = f"""
# Subtask Goal:
{subtask_description}
# Collected Information:
- {"\n- ".join(collected_results)}
# Question: Is the collected information sufficient to fully and confidently achieve the subtask goal? Answer only with "YES" or "NO".
"""
response = await self.checker_llm.generate(prompt)
return "YES" in response.upper()
# 在 Executor 的单个任务执行逻辑中集成
async def execute_single_task(self, task: 'Task'):
collected_info = []
current_args = task.args
for attempt in range(3): # 最多尝试3次内部迭代
result = await self.tool_invoker.invoke(task.tool_name, current_args)
collected_info.append(result)
# 评估结果是否充分
if await self.sufficiency_checker.is_sufficient(task.description, collected_info):
print(f" > [Executor] Result for '{task.id}' is sufficient after attempt {attempt + 1}.")
return "".join(collected_info) # 返回整合后的结果
# 如果不充分,自动精炼参数以进行下一次尝试
print(f" > [Executor] Result for '{task.id}' insufficient. Refining arguments for next attempt...")
current_args = await self.refine_arguments_for_retry(task, collected_info)
raise Exception(f"Task '{task.id}' failed to gather sufficient info after multiple attempts.")
Executor 与其他 Agent 的协作机制
Executor 是连接规划与生成的关键枢纽,其协作关系清晰明确:
| 协同对象 | 协作方式 | 关键交互内容 |
|---|---|---|
| Planner Agent | 接收指令 |
Executor接收Planner生成的DAG计划作为其全部的工作指令和行动依据。DAG 是它们之间唯一的“契约”。 |
| Writer Agent | 交付成果 | 当DAG中的所有任务执行完毕,Executor会将包含所有子任务ID和其对应执行结果的结构化数据包(字典或JSON),完整地交付给Writer。 |
| Master Agent | 异常上报 | 当遇到自身容错机制(重试、备用)无法解决的严重失败时,Executor 会停止执行,并向 Master 抛出一个包含详细错误信息的异常报告,请求更高层级的干预。 |
# 协同交互的数据结构示例
# 1. 从 Planner 接收
DAG_Plan = {
"vertices": [{"id": "task_1", "description": "...", "tool_binding": "...", "args": [...]}],
"edges": [["task_1", "task_3"]]
}
# 2. 交付给 Writer
ResultSet = {
"task_1": "Emperor Wu of Han lived from 156 BC to 87 BC.",
"task_2": "Julius Caesar lived from 100 BC to 44 BC.",
"task_3": "Based on the dates, the age difference is approximately 56 years."
}
# 3. 上报给 Master
FailureReport = {
"failed_task_id": "task_2",
"error_type": "ToolExecutionError",
"message": "All tools in the SearchToolkit failed.",
"timestamp": "..."
}
完整示例代码演示
下面是一个集成了上述概念的、可运行的完整ExecutorAgent示例。
import asyncio
import json
from typing import Dict, Any, List
# --- Mock Classes for Demonstration ---
class Tool:
def __init__(self, name): self.name = name
async def call(self, *args):
print(f" - [Tool Call] '{self.name}' called with: {args}")
await asyncio.sleep(1) # Simulate network I/O
if "fail" in args: raise Exception("Simulated API failure")
if self.name == "Web_Search_Tool":
if "Emperor Wu" in args[0]: return "Result: 156 BC - 87 BC."
if "Julius Caesar" in args[0]: return "Result: 100 BC - 44 BC."
if self.name == "Calculator_Tool": return "Final calculation: 56 years."
return "Mock Result"
class Task:
def __init__(self, id, description, tool_binding, args):
self.id, self.description, self.tool, self.args = id, description, tool_binding, args
class DAG:
# A simplified DAG representation for demonstration
def __init__(self, data):
self.tasks = {v['id']: Task(**v) for v in data['vertices']}
self.edges = data['edges']
self.dependencies = {v['id']: [] for v in data['vertices']}
self.successors = {v['id']: [] for v in data['vertices']}
for src, dest in self.edges:
self.dependencies[dest].append(src)
self.successors[src].append(dest)
def get_start_nodes(self):
return [id for id, deps in self.dependencies.items() if not deps]
class ExecutorAgent:
def __init__(self, tools: List[Tool]):
self.tool_map = {tool.name: tool for tool in tools}
async def execute_plan(self, dag: DAG) -> Dict[str, Any]:
print("--- [Executor] Plan received. Starting DAG execution. ---")
results = {}
# Using a simple event-based async model for demonstration
task_events = {task_id: asyncio.Event() for task_id in dag.tasks}
tasks_to_run = [
asyncio.create_task(self._task_runner(task_id, dag, results, task_events))
for task_id in dag.tasks
]
await asyncio.gather(*tasks_to_run)
print("--- [Executor] All tasks completed. ---")
return results
async def _task_runner(self, task_id: str, dag: DAG, results: dict, events: dict):
# Wait for all dependencies to be met
dep_ids = dag.dependencies[task_id]
if dep_ids:
await asyncio.gather(*[events[dep_id].wait() for dep_id in dep_ids])
task = dag.tasks[task_id]
# Prepare arguments from dependency results
prepared_args = [results.get(dep_id, '') for dep_id in dep_ids] if dep_ids else task.args
print(f" > [Executor] Task '{task.id}' is ready. Executing...")
# A real implementation would have the internal iteration loop here
tool = self.tool_map.get(task.tool)
results[task_id] = await tool.call(*prepared_args)
# Mark task as complete
events[task_id].set()
# --- 模拟执行 ---
async def main():
# Tools available to the Executor
mock_tools = [Tool("Web_Search_Tool"), Tool("Calculator_Tool")]
# The DAG plan received from the Planner
dag_plan_data = {
"vertices": [
{'id': 'task_1', 'description': 'Search for dates of Emperor Wu', 'tool_binding': 'Web_Search_Tool', 'args': ['Emperor Wu of Han dates']},
{'id': 'task_2', 'description': 'Search for dates of Caesar', 'tool_binding': 'Web_Search_Tool', 'args': ['Julius Caesar dates']},
{'id': 'task_3', 'description': 'Calculate age difference', 'tool_binding': 'Calculator_Tool', 'args': ["{{task_1.ret}}", "{{task_2.ret}}"]}
],
"edges": [["task_1", "task_3"], ["task_2", "task_3"]]
}
dag_plan = DAG(dag_plan_data)
executor = ExecutorAgent(mock_tools)
try:
final_results = await executor.execute_plan(dag_plan)
print("\n--- Final Results package from Executor (to be sent to Writer) ---")
print(json.dumps(final_results, indent=2))
except Exception as e:
print(f"\n--- Execution failed: {e} ---")
if __name__ == "__main__":
asyncio.run(main())
总结
Executor 是确保规划得以高质量、高效率和高可靠性落地的核心。它不仅仅是一个被动的指令执行者,更是一个主动的质量保障者和错误处理者。通过内置的“微循环”智能(结果评估与迭代)和强大的容错机制(重试与备用),Executor 将 Planner 的宏观计划,稳健地转化为 Writer 可以信赖的、结构化的数据基础,是连接“理论”与“实践”的坚固桥梁。
预知 Writer Agent 如何工作,我们下回分解。
大家好,我是自在哪吒的创始人、首席服务官 Kafka。让我们一起进化吧。