AI 搜索范式(3)
“运筹帷幄之中,决胜千里之外。智周万物而道济,变应千机自圆融。”
上回书说到,AI 搜索之所以效果更好,是因为技术范式发生了巨大飞跃,架构从“检索”演变到了“推理”。AI 搜索已经是一个具备规划、分解、协作、反思能力的智能化系统,能够真正像一个助理一样,为用户完成复杂的信息处理和问题解决任务。
本讲开始,具体说说论文里提到的这几个 Agent,以及给我们的启发,即便我们不做 AI 搜索,只要是 Agent,都会有些价值。
Master Agent
图中最左边的 Master Agent 是整个 AI 搜索系统的"大脑中枢"和"总指挥",它的设计是整个系统能否高效运转的关键。我们详细看下 Master Agent 的设计原理、协同机制和实现难点等。为清晰表达逻辑,用了一些伪代码,注意原论文不包括以下代码,此处仅用于示例。
职责概览
论文第 2 节中讲了 Master Agent 的四大核心职责:
"""
注:本文所有代码仅为示意,并非原论文的真实实现。
"""
class MasterAgent:
"""
Master Agent 的四大核心职责:
1. 查询复杂度分析 (Query Complexity Analysis)
2. 动态组队决策 (Dynamic Team Configuration)
3. 执行监控 (Execution Monitoring)
4. 反思与重规划 (Reflection & Re-planning)
"""
def __init__(self):
self.complexity_analyzer = ComplexityAnalyzer()
self.team_configurator = TeamConfigurator()
self.execution_monitor = ExecutionMonitor()
self.reflection_engine = ReflectionEngine()
职责1: 查询复杂度分析
Master 首先需要准确判断用户查询的内在复杂度,这是整个系统选择处理策略的基础。
class ComplexityAnalyzer:
"""
复杂度分析器 - Master的核心组件之一
"""
def __init__(self):
self.classifier_llm = DeepSeekR1Model() # 强大的分类器 LLM
self.complexity_features = ComplexityFeatureExtractor()
def analyze_complexity(self, query: str) -> ComplexityLevel:
"""
多维度复杂度分析
"""
# 1. 静态特征提取
static_features = self.extract_static_features(query)
# 2. LLM 驱动的语义分析
semantic_analysis = self.semantic_complexity_analysis(query)
# 3. 综合判断
complexity_score = self.compute_complexity_score(
static_features, semantic_analysis
)
return self.score_to_level(complexity_score)
def extract_static_features(self, query: str) -> Dict:
"""
提取静态复杂度特征
"""
return {
"query_length": len(query.split()),
"question_words": self.count_question_words(query),
"entity_count": self.count_named_entities(query),
"comparison_indicators": self.detect_comparison_patterns(query),
"calculation_requirements": self.detect_calculation_needs(query),
"multi_step_indicators": self.detect_multi_step_patterns(query)
}
def semantic_complexity_analysis(self, query: str) -> Dict:
"""
使用 LLM 进行语义复杂度分析
"""
analysis_prompt = f"""
分析以下查询的复杂度,考虑以下维度:
1. 是否需要多步推理?
2. 是否需要外部信息?
3. 是否需要工具调用?
4. 是否需要信息对比或计算?
查询: "{query}"
请返回 JSON 格式的分析结果。
"""
response = self.classifier_llm.generate(analysis_prompt)
return self.parse_semantic_analysis(response)
def compute_complexity_score(self, static_features: Dict, semantic_analysis: Dict) -> float:
"""
综合计算复杂度分数 (0-1 之间)
"""
# 基于静态特征的分数
static_score = (
static_features["query_length"] * 0.1 +
static_features["question_words"] * 0.2 +
static_features["entity_count"] * 0.15 +
static_features["comparison_indicators"] * 0.3 +
static_features["calculation_requirements"] * 0.4 +
static_features["multi_step_indicators"] * 0.5
) / 6
# 基于语义分析的分数
semantic_score = (
semantic_analysis.get("multi_step_reasoning", 0) * 0.4 +
semantic_analysis.get("external_info_needed", 0) * 0.3 +
semantic_analysis.get("tool_usage_required", 0) * 0.3 +
semantic_analysis.get("comparison_calculation", 0) * 0.5
) / 4
# 加权平均
final_score = 0.4 * static_score + 0.6 * semantic_score
return min(final_score, 1.0)
def score_to_level(self, score: float) -> ComplexityLevel:
"""
分数转换为复杂度等级
"""
if score <= 0.3:
return ComplexityLevel.SIMPLE
elif score <= 0.7:
return ComplexityLevel.MODERATE
else:
return ComplexityLevel.COMPLEX
职责2: 动态组队决策
Master 根据复杂度分析结果,动态决定启用哪些智能体和资源。简单查询可能只需要 Writer 或者 Executor 和 Writer 就可以。复杂的查询,还需要包括 Planner。论文中称为 Writer-Only Configuration、Executor-Inclusive Configuration 和 Planner-Enhanced Configuration。
class TeamConfigurator:
"""
团队配置器 - 实现论文中的三种动态配置
"""
def configure_team(self, query: str, complexity: ComplexityLevel) -> TeamConfig:
"""
根据复杂度选择合适的团队配置
"""
if complexity == ComplexityLevel.SIMPLE:
return self.writer_only_config(query)
elif complexity == ComplexityLevel.MODERATE:
return self.executor_inclusive_config(query)
else:
return self.planner_enhanced_config(query)
def writer_only_config(self, query: str) -> TeamConfig:
"""
配置1: Writer-Only
适用于: 简单事实查询,LLM内部知识足够
"""
return TeamConfig(
agents=["writer"],
strategy="direct_answer",
max_iterations=1,
tools=[],
estimated_cost=0.01, # 最低成本
estimated_latency=0.5 # 最快响应
)
def executor_inclusive_config(self, query: str) -> TeamConfig:
"""
配置2: Executor-Inclusive
适用于: 需要外部信息但不需要复杂推理
"""
relevant_tools = self.select_relevant_tools(query)
return TeamConfig(
agents=["executor", "writer"],
strategy="single_step_retrieval",
max_iterations=2,
tools=relevant_tools,
estimated_cost=0.05,
estimated_latency=2.0
)
def planner_enhanced_config(self, query: str) -> TeamConfig:
"""
配置3: Planner-Enhanced
适用于: 复杂查询,需要多步推理和规划
"""
tool_ecosystem = self.analyze_required_tool_ecosystem(query)
return TeamConfig(
agents=["planner", "executor", "writer"],
strategy="multi_step_planning",
max_iterations=5,
tools=tool_ecosystem,
estimated_cost=0.20,
estimated_latency=10.0,
parallel_execution=True # 启用并行执行
)
def select_relevant_tools(self, query: str) -> List[Tool]:
"""
为中等复杂度查询选择相关工具
"""
# 使用COLT方法选择工具
semantic_match = self.semantic_tool_matching(query)
return semantic_match[:3] # 限制工具数量,保持轻量
def analyze_required_tool_ecosystem(self, query: str) -> List[Tool]:
"""
为复杂查询分析所需的完整工具生态
"""
# 使用COLT的协作维度分析
tool_ecosystem = self.collaborative_tool_analysis(query)
return tool_ecosystem
职责3: 执行监控
Master 需要实时监控整个执行过程,及时发现问题。
class ExecutionMonitor:
"""
执行监控器 - Master 的实时监控系统
"""
def __init__(self):
self.execution_state = ExecutionState()
self.failure_detector = FailureDetector()
self.performance_tracker = PerformanceTracker()
def monitor_execution(self, team_config: TeamConfig,
execution_context: ExecutionContext) -> MonitorResult:
"""
实时监控执行过程
"""
monitor_result = MonitorResult()
# 1. 状态跟踪
current_state = self.track_execution_state(execution_context)
monitor_result.current_state = current_state
# 2. 失败检测
failures = self.detect_failures(execution_context)
monitor_result.failures = failures
# 3. 性能监控
performance = self.track_performance(execution_context)
monitor_result.performance = performance
# 4. 质量评估
quality_issues = self.assess_intermediate_quality(execution_context)
monitor_result.quality_issues = quality_issues
return monitor_result
def detect_failures(self, context: ExecutionContext) -> List[Failure]:
"""
多维度失败检测
"""
failures = []
# 工具调用失败
if context.tool_failures:
failures.extend(self.analyze_tool_failures(context.tool_failures))
# 超时检测
if context.execution_time > context.max_allowed_time:
failures.append(Failure(
type="timeout",
component="executor",
description="Execution exceeded time limit"
))
# 质量异常检测
if context.intermediate_results:
quality_failures = self.detect_quality_failures(
context.intermediate_results
)
failures.extend(quality_failures)
return failures
def assess_intermediate_quality(self, context: ExecutionContext) -> List[QualityIssue]:
"""
中间结果质量评估
"""
issues = []
for result in context.intermediate_results:
# 检查结果完整性
if not self.is_result_complete(result):
issues.append(QualityIssue(
type="incomplete_result",
severity="medium",
affected_task=result.task_id
))
# 检查结果一致性
if not self.is_result_consistent(result, context.previous_results):
issues.append(QualityIssue(
type="inconsistent_result",
severity="high",
affected_task=result.task_id
))
return issues
职责4: 反思与重规划
这是 Master 最复杂也最关键的能力。
class ReflectionEngine:
"""
反思引擎 - Master 的高级认知能力
"""
def __init__(self):
self.reflection_llm = DeepSeekR1Model()
self.replanning_strategies = ReplanningStrategies()
def reflect_and_replan(self, monitor_result: MonitorResult, original_query: str, current_plan: Optional[DAG]) -> ReflectionResult:
"""
基于监控结果进行反思并重新规划
"""
# 1. 失败分析
failure_analysis = self.analyze_failures(monitor_result.failures)
# 2. 反思决策
reflection_decision = self.make_reflection_decision(
failure_analysis, monitor_result
)
# 3. 执行相应策略
if reflection_decision.action == "abort":
return self.create_abort_result(reflection_decision.reason)
elif reflection_decision.action == "retry":
return self.create_retry_result(reflection_decision.retry_strategy)
elif reflection_decision.action == "replan":
return self.execute_replanning(
original_query, current_plan, failure_analysis
)
else:
return self.create_continue_result()
def analyze_failures(self, failures: List[Failure]) -> FailureAnalysis:
"""
深度失败分析
"""
analysis_prompt = f"""
分析以下执行失败情况,判断根本原因和可能的解决方案:
失败列表:
{json.dumps([f.to_dict() for f in failures], indent=2)}
请分析:
1. 根本原因是什么?
2. 是否可以通过重试解决?
3. 是否需要调整计划?
4. 是否需要更换工具?
返回JSON格式的分析结果。
"""
response = self.reflection_llm.generate(analysis_prompt)
return FailureAnalysis.from_json(response)
def execute_replanning(self, original_query: str, current_plan: DAG,
failure_analysis: FailureAnalysis) -> ReflectionResult:
"""
执行重规划
"""
replanning_prompt = f"""
原始查询: {original_query}
当前计划:
{current_plan.to_json()}
失败分析:
{failure_analysis.to_json()}
请基于失败经验重新制定计划:
1. 保留成功的部分
2. 修复失败的部分
3. 考虑备选工具和策略
4. 确保新计划更鲁棒
返回新的DAG计划。
"""
new_plan_response = self.reflection_llm.generate(replanning_prompt)
new_plan = DAG.from_json(new_plan_response)
return ReflectionResult(
action="replan",
new_plan=new_plan,
reasoning=failure_analysis.recommended_changes
)
Master 与其他 Agent 的协同机制
完整的协同流程实现
class MasterAgent:
"""
Master Agent 的完整实现
"""
def __init__(self):
self.complexity_analyzer = ComplexityAnalyzer()
self.team_configurator = TeamConfigurator()
self.execution_monitor = ExecutionMonitor()
self.reflection_engine = ReflectionEngine()
# 下游智能体实例
self.planner = PlannerAgent()
self.executor = ExecutorAgent()
self.writer = WriterAgent()
async def process_query(self, query: str) -> SearchResult:
"""
Master 的主要处理流程
"""
# 阶段1: 复杂度分析和团队配置
complexity = self.complexity_analyzer.analyze_complexity(query)
team_config = self.team_configurator.configure_team(query, complexity)
# 阶段2: 执行监控循环
max_attempts = 3
attempt = 0
while attempt < max_attempts:
try:
# 根据配置执行相应策略
if team_config.strategy == "direct_answer":
result = await self.execute_writer_only(query, team_config)
elif team_config.strategy == "single_step_retrieval":
result = await self.execute_executor_inclusive(query, team_config)
else:
result = await self.execute_planner_enhanced(query, team_config)
# 成功则返回结果
return result
except ExecutionException as e:
# 阶段3: 失败处理和反思
monitor_result = MonitorResult(failures=[e.failure])
reflection_result = self.reflection_engine.reflect_and_replan(
monitor_result, query, getattr(e, 'current_plan', None)
)
if reflection_result.action == "abort":
return SearchResult.create_error(reflection_result.reasoning)
elif reflection_result.action == "replan":
# 更新团队配置并重试
team_config = self.update_team_config(
team_config, reflection_result
)
attempt += 1
else:
attempt += 1
# 超过最大尝试次数
return SearchResult.create_error("Maximum retry attempts exceeded")
async def execute_planner_enhanced(self, query: str, team_config: TeamConfig) -> SearchResult:
"""
执行 Planner-Enhanced 配置的完整流程
"""
# 1. Planner 生成计划
plan = await self.planner.create_plan(query, team_config.tools)
# 2. 创建执行上下文
execution_context = ExecutionContext(
query=query,
plan=plan,
team_config=team_config,
start_time=time.time()
)
# 3. 监控执行循环
while not execution_context.is_complete():
# Executor 执行下一批任务
batch_results = await self.executor.execute_batch(
execution_context.get_ready_tasks()
)
# 更新执行上下文
execution_context.update_results(batch_results)
# 实时监控
monitor_result = self.execution_monitor.monitor_execution(
team_config, execution_context
)
# 检查是否需要干预
if monitor_result.needs_intervention():
reflection_result = self.reflection_engine.reflect_and_replan(
monitor_result, query, plan
)
if reflection_result.action == "replan":
# 局部重规划
plan = self.update_plan_with_reflection(
plan, reflection_result
)
execution_context.update_plan(plan)
# 4. Writer 生成最终答案
final_result = await self.writer.synthesize_answer(
query, execution_context.get_all_results()
)
return final_result
总结
通过以上设计,Master Agent 实现了:
- 智能决策: 准确的复杂度分析和动态资源配置
- 实时监控: 高效的执行过程监控和问题检测
- 自适应调整: 快速的反思决策和重规划能力
- 协同指挥: 与 Planner、Executor、Writer 的无缝协作
预知 Planner Agent 如何工作,我们下回分解。
大家好,我是自在哪吒的创始人、首席服务官 Kafka。让我们一起进化吧。
原论文可点击“原文链接”