2025-06-27:学习

AI 搜索范式(3)

“运筹帷幄之中,决胜千里之外。智周万物而道济,变应千机自圆融。”

上回书说到,AI 搜索之所以效果更好,是因为技术范式发生了巨大飞跃,架构从“检索”演变到了“推理”。AI 搜索已经是一个具备规划、分解、协作、反思能力的智能化系统,能够真正像一个助理一样,为用户完成复杂的信息处理和问题解决任务。

本讲开始,具体说说论文里提到的这几个 Agent,以及给我们的启发,即便我们不做 AI 搜索,只要是 Agent,都会有些价值。

image-3.png

Master Agent

图中最左边的 Master Agent 是整个 AI 搜索系统的"大脑中枢"和"总指挥",它的设计是整个系统能否高效运转的关键。我们详细看下 Master Agent 的设计原理、协同机制和实现难点等。为清晰表达逻辑,用了一些伪代码,注意原论文不包括以下代码,此处仅用于示例。

职责概览

论文第 2 节中讲了 Master Agent 的四大核心职责:

"""
注:本文所有代码仅为示意,并非原论文的真实实现。

"""

class MasterAgent:
    """
    Master Agent 的四大核心职责:
    1. 查询复杂度分析 (Query Complexity Analysis)
    2. 动态组队决策 (Dynamic Team Configuration) 
    3. 执行监控 (Execution Monitoring)
    4. 反思与重规划 (Reflection & Re-planning)
    """
    
    def __init__(self):
        self.complexity_analyzer = ComplexityAnalyzer()
        self.team_configurator = TeamConfigurator()
        self.execution_monitor = ExecutionMonitor()
        self.reflection_engine = ReflectionEngine()

职责1: 查询复杂度分析

Master 首先需要准确判断用户查询的内在复杂度,这是整个系统选择处理策略的基础。

class ComplexityAnalyzer:
    """
    复杂度分析器 - Master的核心组件之一
    """
    
    def __init__(self):
        self.classifier_llm = DeepSeekR1Model()  # 强大的分类器 LLM
        self.complexity_features = ComplexityFeatureExtractor()
        
    def analyze_complexity(self, query: str) -> ComplexityLevel:
        """
        多维度复杂度分析
        """
        # 1. 静态特征提取
        static_features = self.extract_static_features(query)
        
        # 2. LLM 驱动的语义分析  
        semantic_analysis = self.semantic_complexity_analysis(query)
        
        # 3. 综合判断
        complexity_score = self.compute_complexity_score(
            static_features, semantic_analysis
        )
        
        return self.score_to_level(complexity_score)
    
    def extract_static_features(self, query: str) -> Dict:
        """
        提取静态复杂度特征
        """
        return {
            "query_length": len(query.split()),
            "question_words": self.count_question_words(query),
            "entity_count": self.count_named_entities(query),
            "comparison_indicators": self.detect_comparison_patterns(query),
            "calculation_requirements": self.detect_calculation_needs(query),
            "multi_step_indicators": self.detect_multi_step_patterns(query)
        }
    
    def semantic_complexity_analysis(self, query: str) -> Dict:
        """
        使用 LLM 进行语义复杂度分析
        """
        analysis_prompt = f"""
        分析以下查询的复杂度,考虑以下维度:
        1. 是否需要多步推理?
        2. 是否需要外部信息?
        3. 是否需要工具调用?
        4. 是否需要信息对比或计算?
        
        查询: "{query}"
        
        请返回 JSON 格式的分析结果。
        """
        
        response = self.classifier_llm.generate(analysis_prompt)
        return self.parse_semantic_analysis(response)
    
    def compute_complexity_score(self, static_features: Dict, semantic_analysis: Dict) -> float:
        """
        综合计算复杂度分数 (0-1 之间)
        """
        # 基于静态特征的分数
        static_score = (
            static_features["query_length"] * 0.1 +
            static_features["question_words"] * 0.2 +
            static_features["entity_count"] * 0.15 +
            static_features["comparison_indicators"] * 0.3 +
            static_features["calculation_requirements"] * 0.4 +
            static_features["multi_step_indicators"] * 0.5
        ) / 6
        
        # 基于语义分析的分数
        semantic_score = (
            semantic_analysis.get("multi_step_reasoning", 0) * 0.4 +
            semantic_analysis.get("external_info_needed", 0) * 0.3 +
            semantic_analysis.get("tool_usage_required", 0) * 0.3 +
            semantic_analysis.get("comparison_calculation", 0) * 0.5
        ) / 4
        
        # 加权平均
        final_score = 0.4 * static_score + 0.6 * semantic_score
        return min(final_score, 1.0)
    
    def score_to_level(self, score: float) -> ComplexityLevel:
        """
        分数转换为复杂度等级
        """
        if score <= 0.3:
            return ComplexityLevel.SIMPLE
        elif score <= 0.7:
            return ComplexityLevel.MODERATE  
        else:
            return ComplexityLevel.COMPLEX

职责2: 动态组队决策

Master 根据复杂度分析结果,动态决定启用哪些智能体和资源。简单查询可能只需要 Writer 或者 Executor 和 Writer 就可以。复杂的查询,还需要包括 Planner。论文中称为 Writer-Only ConfigurationExecutor-Inclusive ConfigurationPlanner-Enhanced Configuration

class TeamConfigurator:
    """
    团队配置器 - 实现论文中的三种动态配置
    """
    
    def configure_team(self, query: str, complexity: ComplexityLevel) -> TeamConfig:
        """
        根据复杂度选择合适的团队配置
        """
        if complexity == ComplexityLevel.SIMPLE:
            return self.writer_only_config(query)
        elif complexity == ComplexityLevel.MODERATE:
            return self.executor_inclusive_config(query)  
        else:
            return self.planner_enhanced_config(query)
    
    def writer_only_config(self, query: str) -> TeamConfig:
        """
        配置1: Writer-Only
        适用于: 简单事实查询,LLM内部知识足够
        """
        return TeamConfig(
            agents=["writer"],
            strategy="direct_answer",
            max_iterations=1,
            tools=[],
            estimated_cost=0.01,  # 最低成本
            estimated_latency=0.5  # 最快响应
        )
    
    def executor_inclusive_config(self, query: str) -> TeamConfig:
        """
        配置2: Executor-Inclusive  
        适用于: 需要外部信息但不需要复杂推理
        """
        relevant_tools = self.select_relevant_tools(query)
        
        return TeamConfig(
            agents=["executor", "writer"],
            strategy="single_step_retrieval",
            max_iterations=2,
            tools=relevant_tools,
            estimated_cost=0.05,
            estimated_latency=2.0
        )
    
    def planner_enhanced_config(self, query: str) -> TeamConfig:
        """
        配置3: Planner-Enhanced
        适用于: 复杂查询,需要多步推理和规划
        """
        tool_ecosystem = self.analyze_required_tool_ecosystem(query)
        
        return TeamConfig(
            agents=["planner", "executor", "writer"],
            strategy="multi_step_planning",
            max_iterations=5,
            tools=tool_ecosystem,
            estimated_cost=0.20,
            estimated_latency=10.0,
            parallel_execution=True  # 启用并行执行
        )
    
    def select_relevant_tools(self, query: str) -> List[Tool]:
        """
        为中等复杂度查询选择相关工具
        """
        # 使用COLT方法选择工具
        semantic_match = self.semantic_tool_matching(query)
        return semantic_match[:3]  # 限制工具数量,保持轻量
    
    def analyze_required_tool_ecosystem(self, query: str) -> List[Tool]:
        """
        为复杂查询分析所需的完整工具生态
        """
        # 使用COLT的协作维度分析
        tool_ecosystem = self.collaborative_tool_analysis(query)
        return tool_ecosystem

职责3: 执行监控

Master 需要实时监控整个执行过程,及时发现问题。

class ExecutionMonitor:
    """
    执行监控器 - Master 的实时监控系统
    """
    
    def __init__(self):
        self.execution_state = ExecutionState()
        self.failure_detector = FailureDetector()
        self.performance_tracker = PerformanceTracker()
    
    def monitor_execution(self, team_config: TeamConfig, 
                         execution_context: ExecutionContext) -> MonitorResult:
        """
        实时监控执行过程
        """
        monitor_result = MonitorResult()
        
        # 1. 状态跟踪
        current_state = self.track_execution_state(execution_context)
        monitor_result.current_state = current_state
        
        # 2. 失败检测
        failures = self.detect_failures(execution_context)
        monitor_result.failures = failures
        
        # 3. 性能监控
        performance = self.track_performance(execution_context)
        monitor_result.performance = performance
        
        # 4. 质量评估
        quality_issues = self.assess_intermediate_quality(execution_context)
        monitor_result.quality_issues = quality_issues
        
        return monitor_result
    
    def detect_failures(self, context: ExecutionContext) -> List[Failure]:
        """
        多维度失败检测
        """
        failures = []
        
        # 工具调用失败
        if context.tool_failures:
            failures.extend(self.analyze_tool_failures(context.tool_failures))
        
        # 超时检测
        if context.execution_time > context.max_allowed_time:
            failures.append(Failure(
                type="timeout",
                component="executor", 
                description="Execution exceeded time limit"
            ))
        
        # 质量异常检测
        if context.intermediate_results:
            quality_failures = self.detect_quality_failures(
                context.intermediate_results
            )
            failures.extend(quality_failures)
        
        return failures
    
    def assess_intermediate_quality(self, context: ExecutionContext) -> List[QualityIssue]:
        """
        中间结果质量评估
        """
        issues = []
        
        for result in context.intermediate_results:
            # 检查结果完整性
            if not self.is_result_complete(result):
                issues.append(QualityIssue(
                    type="incomplete_result",
                    severity="medium",
                    affected_task=result.task_id
                ))
            
            # 检查结果一致性
            if not self.is_result_consistent(result, context.previous_results):
                issues.append(QualityIssue(
                    type="inconsistent_result", 
                    severity="high",
                    affected_task=result.task_id
                ))
        
        return issues

职责4: 反思与重规划

这是 Master 最复杂也最关键的能力。

class ReflectionEngine:
    """
    反思引擎 - Master 的高级认知能力
    """
    
    def __init__(self):
        self.reflection_llm = DeepSeekR1Model()
        self.replanning_strategies = ReplanningStrategies()
    
    def reflect_and_replan(self, monitor_result: MonitorResult, original_query: str, current_plan: Optional[DAG]) -> ReflectionResult:
        """
        基于监控结果进行反思并重新规划
        """
        # 1. 失败分析
        failure_analysis = self.analyze_failures(monitor_result.failures)
        
        # 2. 反思决策
        reflection_decision = self.make_reflection_decision(
            failure_analysis, monitor_result
        )
        
        # 3. 执行相应策略
        if reflection_decision.action == "abort":
            return self.create_abort_result(reflection_decision.reason)
        elif reflection_decision.action == "retry":
            return self.create_retry_result(reflection_decision.retry_strategy)
        elif reflection_decision.action == "replan":
            return self.execute_replanning(
                original_query, current_plan, failure_analysis
            )
        else:
            return self.create_continue_result()
    
    def analyze_failures(self, failures: List[Failure]) -> FailureAnalysis:
        """
        深度失败分析
        """
        analysis_prompt = f"""
        分析以下执行失败情况,判断根本原因和可能的解决方案:
        
        失败列表:
        {json.dumps([f.to_dict() for f in failures], indent=2)}
        
        请分析:
        1. 根本原因是什么?
        2. 是否可以通过重试解决?
        3. 是否需要调整计划?
        4. 是否需要更换工具?
        
        返回JSON格式的分析结果。
        """
        
        response = self.reflection_llm.generate(analysis_prompt)
        return FailureAnalysis.from_json(response)
    
    def execute_replanning(self, original_query: str, current_plan: DAG,
failure_analysis: FailureAnalysis) -> ReflectionResult:
        """
        执行重规划
        """
        replanning_prompt = f"""
        原始查询: {original_query}
        
        当前计划:
        {current_plan.to_json()}
        
        失败分析:
        {failure_analysis.to_json()}
        
        请基于失败经验重新制定计划:
        1. 保留成功的部分
        2. 修复失败的部分  
        3. 考虑备选工具和策略
        4. 确保新计划更鲁棒
        
        返回新的DAG计划。
        """
        
        new_plan_response = self.reflection_llm.generate(replanning_prompt)
        new_plan = DAG.from_json(new_plan_response)
        
        return ReflectionResult(
            action="replan",
            new_plan=new_plan,
            reasoning=failure_analysis.recommended_changes
        )

Master 与其他 Agent 的协同机制

完整的协同流程实现

class MasterAgent:
    """
    Master Agent 的完整实现
    """
    
    def __init__(self):
        self.complexity_analyzer = ComplexityAnalyzer()
        self.team_configurator = TeamConfigurator()
        self.execution_monitor = ExecutionMonitor()
        self.reflection_engine = ReflectionEngine()
        
        # 下游智能体实例
        self.planner = PlannerAgent()
        self.executor = ExecutorAgent()  
        self.writer = WriterAgent()
    
    async def process_query(self, query: str) -> SearchResult:
        """
        Master 的主要处理流程
        """
        # 阶段1: 复杂度分析和团队配置
        complexity = self.complexity_analyzer.analyze_complexity(query)
        team_config = self.team_configurator.configure_team(query, complexity)
        
        # 阶段2: 执行监控循环
        max_attempts = 3
        attempt = 0
        
        while attempt < max_attempts:
            try:
                # 根据配置执行相应策略
                if team_config.strategy == "direct_answer":
                    result = await self.execute_writer_only(query, team_config)
                elif team_config.strategy == "single_step_retrieval":
                    result = await self.execute_executor_inclusive(query, team_config)  
                else:
                    result = await self.execute_planner_enhanced(query, team_config)
                
                # 成功则返回结果
                return result
                
            except ExecutionException as e:
                # 阶段3: 失败处理和反思
                monitor_result = MonitorResult(failures=[e.failure])
                reflection_result = self.reflection_engine.reflect_and_replan(
                    monitor_result, query, getattr(e, 'current_plan', None)
                )
                
                if reflection_result.action == "abort":
                    return SearchResult.create_error(reflection_result.reasoning)
                elif reflection_result.action == "replan":
                    # 更新团队配置并重试
                    team_config = self.update_team_config(
                        team_config, reflection_result
                    )
                    attempt += 1
                else:
                    attempt += 1
        
        # 超过最大尝试次数
        return SearchResult.create_error("Maximum retry attempts exceeded")
    
    async def execute_planner_enhanced(self, query: str, team_config: TeamConfig) -> SearchResult:
        """
        执行 Planner-Enhanced 配置的完整流程
        """
        # 1. Planner 生成计划
        plan = await self.planner.create_plan(query, team_config.tools)
        
        # 2. 创建执行上下文
        execution_context = ExecutionContext(
            query=query,
            plan=plan,
            team_config=team_config,
            start_time=time.time()
        )
        
        # 3. 监控执行循环
        while not execution_context.is_complete():
            # Executor 执行下一批任务
            batch_results = await self.executor.execute_batch(
                execution_context.get_ready_tasks()
            )
            
            # 更新执行上下文
            execution_context.update_results(batch_results)
            
            # 实时监控
            monitor_result = self.execution_monitor.monitor_execution(
                team_config, execution_context
            )
            
            # 检查是否需要干预
            if monitor_result.needs_intervention():
                reflection_result = self.reflection_engine.reflect_and_replan(
                    monitor_result, query, plan
                )
                
                if reflection_result.action == "replan":
                    # 局部重规划
                    plan = self.update_plan_with_reflection(
                        plan, reflection_result
                    )
                    execution_context.update_plan(plan)
        
        # 4. Writer 生成最终答案
        final_result = await self.writer.synthesize_answer(
            query, execution_context.get_all_results()
        )
        
        return final_result

总结

通过以上设计,Master Agent 实现了:

  1. 智能决策: 准确的复杂度分析和动态资源配置
  2. 实时监控: 高效的执行过程监控和问题检测
  3. 自适应调整: 快速的反思决策和重规划能力
  4. 协同指挥: 与 Planner、Executor、Writer 的无缝协作

预知 Planner Agent 如何工作,我们下回分解。


大家好,我是自在哪吒的创始人、首席服务官 Kafka。让我们一起进化吧。


原论文可点击“原文链接”

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容