2025-07-03:学习

我们来对 AI 搜索范式中的“执行官”—— Executor Agent 进行一次全面、详细的解析。

Executor Agent

Executor Agent 在 AI 搜索范式中扮演着“执行团队”或“实干家”的角色。它的核心任务是接收 Planner 制定的蓝图(DAG),并将其精准无误地转化为具体的、可量化的数据成果,为最终的答案生成提供坚实的基础。

职责概览

Executor 的核心职责可以概括为:调度任务、调用工具、评估结果。它并非一个简单的指令执行器,而是一个集成了并行调度、容错处理和质量控制的智能执行引擎。

# Executor Agent 的核心组件示意
import asyncio
from typing import Dict, Any, List

class ExecutorAgent:
    """
    Executor智能体,负责将规划蓝图(DAG)转化为实际结果。
    """
    def __init__(self, tools: Dict[str, Any]):
        # 1. 任务调度器,负责高效并行执行DAG
        self.scheduler = DAG_Scheduler(self.execute_single_task)
        
        # 2. 健壮的工具调用器,内置容错和备用机制
        self.tool_invoker = RobustToolInvoker(tools)
        
        # 3. 结果充分性检查器,确保子任务的完成质量
        self.sufficiency_checker = SufficiencyChecker()

    async def execute_plan(self, dag: 'DAG') -> Dict[str, Any]:
        """
        接收并执行整个 DAG 计划的主入口。
        """
        print("--- [Executor] Plan received. Starting execution. ---")
        # 调度器负责处理复杂的并行和依赖关系
        final_results = await self.scheduler.run(dag)
        print("--- [Executor] All tasks completed. Returning results. ---")
        return final_results

    async def execute_single_task(self, task: 'Task') -> Any:
        """
        执行单个任务的内部逻辑,包含质量控制循环。
        """
        # ... (详细逻辑见下文) ...
        pass

职责1:任务调度与执行 (Task Scheduling & Execution)

原理与实践

Executor 的首要职责是高效地执行一个复杂的、带有依赖关系的任务图(DAG)。它的核心原理是最大化并行度:同时执行所有互不依赖的任务,以缩短总执行时间。

为实现这一点,Executor 采用事件驱动的异步调度模型。调度器会持续跟踪已完成的任务,一旦某个任务完成,它会立刻检查其所有下游任务的依赖是否已全部满足。如果满足,新的“就绪”任务会被立即加入到执行队列中,而不是等待整个层级的任务都完成后再开始下一层。

import asyncio

class DAG_Scheduler:
    """
    一个基于异步 I/O 的 DAG 调度器,用于高效并行执行。
    """
    def __init__(self, task_executor_func):
        self.task_executor_func = task_executor_func

    async def run(self, dag: 'DAG') -> Dict[str, Any]:
        task_futures = {}
        # 为 DA G中的每个任务创建一个 Future,用于跟踪其完成状态
        for task_id in dag.tasks:
            task_futures[task_id] = asyncio.Future()

        # 调度所有没有依赖的起始任务
        for task_id in dag.get_start_nodes():
            self._schedule_task(task_id, dag, task_futures)

        # 等待所有任务的 Future 完成
        await asyncio.gather(*task_futures.values())
        
        # 从 Future 中提取结果并返回
        return {task_id: future.result() for task_id, future in task_futures.items()}

    def _schedule_task(self, task_id: str, dag: 'DAG', futures: dict):
        """调度一个任务,当它准备好时。"""
        task = dag.tasks[task_id]
        
        async def task_wrapper():
            # 等待所有前置依赖任务完成
            dependencies = dag.get_dependencies(task_id)
            dep_results_list = await asyncio.gather(*[futures[dep_id] for dep_id in dependencies])
            
            # 准备参数 (将依赖结果映射到任务参数)
            dep_results_map = {dep_id: res for dep_id, res in zip(dependencies, dep_results_list)}
            task.prepare_args(dep_results_map)
            
            # 执行任务
            result = await self.task_executor_func(task)
            
            # 设置当前任务的结果,这将解除下游任务的等待状态
            futures[task_id].set_result(result)

        asyncio.create_task(task_wrapper())

职责2:标准化与健壮的工具调用 (Standardized & Robust Tool Invocation)

原理与实践

Executor 需要与各种外部工具 API 交互,这些 API 的行为是不可预测的。因此,设计的难点在于如何处理工具的不可靠性

解决方案是构建一个健壮的工具调用层,内置两种核心容错机制:

  1. 自动重试 (Retry):对于网络抖动、API 临时不可用等瞬时故障,采用指数退避策略自动重试。
  2. 备用方案 (Fallback):当主工具连续失败时,利用 Planner 提供的工具包(Toolkit)信息,自动切换到备用工具。
from tenacity import retry, stop_after_attempt, wait_exponential

class RobustToolInvoker:
    def __init__(self, tool_registry: Dict[str, Any]):
        self.tool_registry = tool_registry

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
    async def invoke(self, tool_name: str, args: List) -> Any:
        """
        使用 tenacity 库实现带指数退避的自动重试。
        """
        tool = self.tool_registry.get(tool_name)
        if not tool:
            raise Exception(f"Tool '{tool_name}' not found.")
        print(f"      - [Invoker] Calling tool '{tool_name}' with args: {args}")
        return await tool.call(*args)

    async def invoke_with_fallback(self, toolkit: 'Toolkit', args: List) -> Any:
        """
        当主工具失败后,尝试备用工具。
        """
        try:
            return await self.invoke(toolkit.primary_tool_name, args)
        except Exception as e:
            print(f"      - [Invoker] Primary tool '{toolkit.primary_tool_name}' failed. Trying fallbacks...")
            for fallback_name in toolkit.fallback_tool_names:
                try:
                    return await self.invoke(fallback_name, args)
                except Exception as fallback_e:
                    print(f"      - [Invoker] Fallback '{fallback_name}' also failed: {fallback_e}")
                    continue
            # 所有工具都失败了,才抛出最终异常
            raise Exception("All tools in the toolkit failed.")

职责3:结果评估与内部迭代 (Result Evaluation & Internal Iteration)

原理与实践

这是 Executor 智能性的关键。它并非盲目地调用一次工具就提交结果。设计的难点在于如何客观地判断一次工具调用的结果是否“充分”,足以支撑当前子任务的目标。

解决方案是引入一个结果充分性检查器(Sufficiency Checker),它通常由一个轻量、快速的 LLM 实现。这个检查器将模糊的“质量”问题,转化为一个具体的“是/否”问题。

class SufficiencyChecker:
    def __init__(self):
        self.checker_llm = LightweightLLM() # 一个小模型,用于快速判断

    async def is_sufficient(self, subtask_description: str, collected_results: List[str]) -> bool:
        prompt = f"""
        # Subtask Goal:
        {subtask_description}

        # Collected Information:
        - {"\n- ".join(collected_results)}

        # Question: Is the collected information sufficient to fully and confidently achieve the subtask goal? Answer only with "YES" or "NO".
        """
        response = await self.checker_llm.generate(prompt)
        return "YES" in response.upper()

# 在 Executor 的单个任务执行逻辑中集成
async def execute_single_task(self, task: 'Task'):
    collected_info = []
    current_args = task.args
    for attempt in range(3): # 最多尝试3次内部迭代
        result = await self.tool_invoker.invoke(task.tool_name, current_args)
        collected_info.append(result)

        # 评估结果是否充分
        if await self.sufficiency_checker.is_sufficient(task.description, collected_info):
            print(f"  > [Executor] Result for '{task.id}' is sufficient after attempt {attempt + 1}.")
            return "".join(collected_info) # 返回整合后的结果

        # 如果不充分,自动精炼参数以进行下一次尝试
        print(f"  > [Executor] Result for '{task.id}' insufficient. Refining arguments for next attempt...")
        current_args = await self.refine_arguments_for_retry(task, collected_info)

    raise Exception(f"Task '{task.id}' failed to gather sufficient info after multiple attempts.")

Executor 与其他 Agent 的协作机制

Executor 是连接规划与生成的关键枢纽,其协作关系清晰明确:

协同对象 协作方式 关键交互内容
Planner Agent 接收指令 Executor接收Planner生成的DAG计划作为其全部的工作指令和行动依据。DAG 是它们之间唯一的“契约”。
Writer Agent 交付成果 当DAG中的所有任务执行完毕,Executor会将包含所有子任务ID和其对应执行结果的结构化数据包(字典或JSON),完整地交付给Writer
Master Agent 异常上报 当遇到自身容错机制(重试、备用)无法解决的严重失败时,Executor 会停止执行,并向 Master 抛出一个包含详细错误信息的异常报告,请求更高层级的干预。
# 协同交互的数据结构示例
# 1. 从 Planner 接收
DAG_Plan = {
    "vertices": [{"id": "task_1", "description": "...", "tool_binding": "...", "args": [...]}],
    "edges": [["task_1", "task_3"]]
}

# 2. 交付给 Writer
ResultSet = {
    "task_1": "Emperor Wu of Han lived from 156 BC to 87 BC.",
    "task_2": "Julius Caesar lived from 100 BC to 44 BC.",
    "task_3": "Based on the dates, the age difference is approximately 56 years."
}

# 3. 上报给 Master
FailureReport = {
    "failed_task_id": "task_2",
    "error_type": "ToolExecutionError",
    "message": "All tools in the SearchToolkit failed.",
    "timestamp": "..."
}

完整示例代码演示

下面是一个集成了上述概念的、可运行的完整ExecutorAgent示例。

import asyncio
import json
from typing import Dict, Any, List

# --- Mock Classes for Demonstration ---
class Tool:
    def __init__(self, name): self.name = name
    async def call(self, *args):
        print(f"      - [Tool Call] '{self.name}' called with: {args}")
        await asyncio.sleep(1) # Simulate network I/O
        if "fail" in args: raise Exception("Simulated API failure")
        if self.name == "Web_Search_Tool":
            if "Emperor Wu" in args[0]: return "Result: 156 BC - 87 BC."
            if "Julius Caesar" in args[0]: return "Result: 100 BC - 44 BC."
        if self.name == "Calculator_Tool": return "Final calculation: 56 years."
        return "Mock Result"

class Task:
    def __init__(self, id, description, tool_binding, args):
        self.id, self.description, self.tool, self.args = id, description, tool_binding, args

class DAG:
    # A simplified DAG representation for demonstration
    def __init__(self, data): 
        self.tasks = {v['id']: Task(**v) for v in data['vertices']}
        self.edges = data['edges']
        self.dependencies = {v['id']: [] for v in data['vertices']}
        self.successors = {v['id']: [] for v in data['vertices']}
        for src, dest in self.edges:
            self.dependencies[dest].append(src)
            self.successors[src].append(dest)

    def get_start_nodes(self):
        return [id for id, deps in self.dependencies.items() if not deps]

class ExecutorAgent:
    def __init__(self, tools: List[Tool]):
        self.tool_map = {tool.name: tool for tool in tools}

    async def execute_plan(self, dag: DAG) -> Dict[str, Any]:
        print("--- [Executor] Plan received. Starting DAG execution. ---")
        results = {}
        # Using a simple event-based async model for demonstration
        task_events = {task_id: asyncio.Event() for task_id in dag.tasks}
        tasks_to_run = [
            asyncio.create_task(self._task_runner(task_id, dag, results, task_events))
            for task_id in dag.tasks
        ]
        await asyncio.gather(*tasks_to_run)
        print("--- [Executor] All tasks completed. ---")
        return results

    async def _task_runner(self, task_id: str, dag: DAG, results: dict, events: dict):
        # Wait for all dependencies to be met
        dep_ids = dag.dependencies[task_id]
        if dep_ids:
            await asyncio.gather(*[events[dep_id].wait() for dep_id in dep_ids])
        
        task = dag.tasks[task_id]
        
        # Prepare arguments from dependency results
        prepared_args = [results.get(dep_id, '') for dep_id in dep_ids] if dep_ids else task.args
        
        print(f"  > [Executor] Task '{task.id}' is ready. Executing...")
        # A real implementation would have the internal iteration loop here
        tool = self.tool_map.get(task.tool)
        results[task_id] = await tool.call(*prepared_args)
        
        # Mark task as complete
        events[task_id].set()

# --- 模拟执行 ---
async def main():
    # Tools available to the Executor
    mock_tools = [Tool("Web_Search_Tool"), Tool("Calculator_Tool")]
    
    # The DAG plan received from the Planner
    dag_plan_data = {
      "vertices": [
        {'id': 'task_1', 'description': 'Search for dates of Emperor Wu', 'tool_binding': 'Web_Search_Tool', 'args': ['Emperor Wu of Han dates']},
        {'id': 'task_2', 'description': 'Search for dates of Caesar', 'tool_binding': 'Web_Search_Tool', 'args': ['Julius Caesar dates']},
        {'id': 'task_3', 'description': 'Calculate age difference', 'tool_binding': 'Calculator_Tool', 'args': ["{{task_1.ret}}", "{{task_2.ret}}"]}
      ],
      "edges": [["task_1", "task_3"], ["task_2", "task_3"]]
    }
    dag_plan = DAG(dag_plan_data)
    
    executor = ExecutorAgent(mock_tools)
    
    try:
        final_results = await executor.execute_plan(dag_plan)
        print("\n--- Final Results package from Executor (to be sent to Writer) ---")
        print(json.dumps(final_results, indent=2))
    except Exception as e:
        print(f"\n--- Execution failed: {e} ---")

if __name__ == "__main__":
    asyncio.run(main())

总结

Executor 是确保规划得以高质量、高效率和高可靠性落地的核心。它不仅仅是一个被动的指令执行者,更是一个主动的质量保障者和错误处理者。通过内置的“微循环”智能(结果评估与迭代)和强大的容错机制(重试与备用),ExecutorPlanner 的宏观计划,稳健地转化为 Writer 可以信赖的、结构化的数据基础,是连接“理论”与“实践”的坚固桥梁。

预知 Writer Agent 如何工作,我们下回分解。


大家好,我是自在哪吒的创始人、首席服务官 Kafka。让我们一起进化吧。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • AI 搜索范式(4) “运筹如织千机密,擘理似裁万缕匀。智绘蓝图分经纬,巧梳脉络见本真。” 上回书说到,AI 搜索...
    kafkaliu阅读 1,090评论 0 0
  • Zookeeper用于集群主备切换。 YARN让集群具备更好的扩展性。 Spark没有存储能力。 Spark的Ma...
    Yobhel阅读 12,115评论 0 34
  • RDD RDD的全称是:Resilient Distributed Dataset (弹性分布式数据集) 五个关键...
    阿涛哥阅读 3,890评论 0 0
  • AI 搜索范式(3) “运筹帷幄之中,决胜千里之外。智周万物而道济,变应千机自圆融。” 上回书说到,AI 搜索之所...
    kafkaliu阅读 1,027评论 0 0
  • 501. MapReduce计算框架中的输入和输出的基本数据结构是键-值对。 502. Hadoop神奇的一部分在...
    yoku酱阅读 4,442评论 0 6

友情链接更多精彩内容