AI应用开发LangGraph改造-宝妈AI视频平台之Spring AI Alibaba Graph

Spring AI Alibaba Graph + DDD 改造实战:把串行 Skill 链升级为可恢复、可回放、可治理的 Agent 工作流

一、写在前面:这次改造到底解决了什么问题

在很多 AI 应用的早期阶段,工程实现都会先走一条“能跑就行”的路径:
把几个 Skill(RAG 检索、内容生成、合规检查、计费结算、发布)按顺序串起来,形成一个线性的执行链。

这种方式在 MVP 阶段非常高效,但当业务进入“多场景、多策略、高并发、强可观测”的阶段后,线性链路很快触顶:

  • 分支决策复杂:无召回、高风险、超预算都需要分支,线性代码膨胀明显;
  • 失败恢复困难:某个节点失败只能重跑整条链路,成本高且不可控;
  • 观测粒度粗:只能知道“成功/失败”,很难定位到底是哪个节点慢、哪个节点贵;
  • 演进成本高:新增一个分支策略,往往牵一发动全身;
  • 运维排障困难:没有流程回放与事件轨迹,线上问题靠日志“猜”。

这篇文章总结的是一套已经落地的改造方案:

基于 Spring AI Alibaba Graph 思路 + DDD 分层,把原串行 Skill 链升级为状态驱动的 Graph 工作流系统

目标不是“炫技”,而是非常工程化的四件事:

  1. 可治理:路由策略、预算策略、场景策略可配置;
  2. 可恢复:关键节点 checkpoint,支持 resume;
  3. 可观测:节点指标 + 事件流 + 回放接口;
  4. 可灰度:特性开关 + 场景白名单 + 可回滚。

二、改造目标与边界

2.1 业务目标

  • 支撑母婴内容生产场景下的稳定 AI 生成链路;
  • 处理典型异常分支:
    • 无召回 -> 通用生成;
    • 高风险 -> 安全改写(MVP);
    • 超预算 -> 短答/拒答;
  • 在可控成本前提下,提高吞吐与可维护性。

2.2 技术目标

  • 统一 Graph 执行入口,替换串行适配器;

    好处是流程编排、分支决策、重试恢复都在同一套机制里,避免多套执行逻辑并存。

  • 引入运行态状态机模型;

    AgentGraphState 表示一次执行在“当前节点、累计 token、节点输出、状态”等维度的实时状态。
    这让流程从“代码顺序驱动”变成“状态驱动”,天然支持分支、恢复和回放。

  • 建立 checkpoint/metrics/events 三类运行数据

      • heckpoint`:关键节点快照,用于失败后断点恢复;
      • metrics:每节点耗时、token、重试、成功失败,用于性能和成本治理;
      • events:节点执行与分支事件轨迹,用于问题定位和过程解释。
        三者合起来,就是“可恢复 + 可观测 + 可解释”。
  • 提供 replay/resume 接口用于运维与排障。

2.3 改造边界(MVP)

  • 先不追求全自动策略学习;
  • 先实现 SafeRewrite,HumanReview 预留扩展点;
  • 先覆盖主链路 + 关键分支 + 恢复能力;
  • 与既有系统并存,支持按场景灰度。

2.4 先引入概念:LangChain 是什么

如果把大模型应用比作“后端系统”,LangChain 更像一个“应用开发框架层”:

  • 把模型调用、Prompt 模板、记忆、工具调用、检索等能力做统一抽象;
  • 提供“链(Chain)”思想,把多个步骤串联成一个执行单元;
  • 降低不同模型厂商接入成本,便于快速迭代原型。

一句话理解:
LangChain 解决的是“如何更快组织 LLM 应用能力”。

2.5 再引入概念:LangGraph 是什么

LangGraph 可以理解为 LangChain 在复杂流程场景下的增强:
当你的应用不再是线性步骤,而是“有状态 + 有分支 + 可恢复 + 可回放”的流程时,Graph 思想更合适。

LangGraph 的核心机制包括:

  • 状态对象贯穿整个流程;
  • 节点执行与路由决策分离;
  • 支持条件分支与循环;
  • 支持 checkpoint 与中断恢复;
  • 更适合 Agent 系统、长流程编排、多工具协同。

一句话理解:
LangGraph 解决的是“复杂 Agent 流程如何工程化落地”。

2.6 为什么在 Java 体系选 Spring AI Alibaba Graph

在宝妈 AI 视频平台 Java 版本里,技术栈以 Spring Boot + DDD 为主,最终采用 Spring AI Alibaba Graph 路线,原因是:

  • 与 Spring 生态天然融合(配置、依赖注入、AOP、事务);
  • 更容易与现有模块(RAG、发布、计费、合规)整合;
  • 对团队而言学习和维护成本更低;
  • 既能借鉴 LangChain/LangGraph 思想,又符合 Java 工程落地习惯。

可用下面这个映射去理解三者关系:

  • LangChain:能力编排思想(Prompt/Tool/Retrieval 组织方式)
  • LangGraph:状态流转思想(节点、路由、恢复、回放)
  • Spring AI Alibaba Graph:Java/Spring 体系下的工程实现路径

三、整体架构:从“线性调用器”到“状态驱动编排器”

3.1 架构分层(DDD)

本次拆分遵循 DDD 的“用例编排 / 领域规则 / 基础设施适配”原则:

  • Domain(领域层)

    • 状态模型:AgentGraphState
    • 策略服务:GraphRoutePolicyGraphBudgetPolicyGraphScenarioPolicy
    • 查询策略:GraphEventQueryPolicy
    • 端口:GraphExecutorPortGraphRuntimeGatewayGraphRunEventHook
  • Application(应用层)

    • 用例入口:GraphRunApplicationService
    • 流程编排:GraphFlowOrchestrator
    • 节点协议:GraphNodeHandler + 各具体节点处理器
    • 回放能力:GraphReplayApplicationService
  • Infrastructure(基础设施层)

    • 网关适配:GraphRuntimeGatewayAdapter
    • 持久化:Mapper + DO(checkpoint、node_metric、event)
    • 事件钩子:日志钩子 + 持久化钩子
    • Web 触发:AgentGraphReplayController

3.2 宝妈 AI 视频平台(Java 版)模块功能总览

按“模块职责 + 输入输出 + 边界”来讲。

3.2.1 bootstrap(启动与系统装配)

  • 职责:应用启动、配置装配、拦截器注册、Mapper 扫描;
  • 关键点:统一读取 application.yml 中 Graph 开关、预算、场景白名单配置;
  • 边界:不写业务流程,只做系统集成与基础设施入口。

3.2.2 common(通用能力)

  • 职责:通用异常、响应包装、Prompt 规则内核、基础工具;
  • 关键点BaomaPromptKernelGlobalPromptRules 承担 Prompt Guard 基础能力;
  • 边界:与具体业务场景解耦,供各业务模块复用。

3.2.3 module-ai(AI 通用域)

  • 职责:承载 AI 运行记录与核心 AI 应用服务;
  • 关键点agent_run 主记录落在这里,Graph 改造后扩展 graph 元字段;
  • 边界:偏“AI 领域通用能力”,不承接具体图编排细节。

3.2.4 module-rag(检索增强)

  • 职责:向量检索、召回、压缩、证据组织;
  • 关键点:对接 Milvus/Embedding;在 Graph 中对应 RagRetrieveRagCompress 相关能力;
  • 边界:只负责“找什么、怎么压缩”,不负责最终生成策略。

3.2.5 module-publish(内容生产与技能体系)

  • 职责:维护 Skill 注册与执行能力,承接内容生成链路;
  • 关键点:原串行 AiSkillExecutorAdapter 从这里起步,后被 Graph 适配层接管流转;
  • 边界:保留技能实现,不直接主导流程编排。

3.2.6 module-token-billing(成本治理)

  • 职责:token 计量、扣费、预算相关能力;
  • 关键点:Graph 的 TokenSettle 节点在流程末端统一结算;
  • 边界:只做计费逻辑,不干预生成内容语义。

3.2.7 module-agent-graph(本次改造核心模块)

  • 职责:把原线性 Skill 链升级为可编排、可恢复、可回放的 Graph 工作流;
  • 关键点
    • Port:GraphExecutorPortGraphRuntimeGateway
    • Application:GraphRunApplicationServiceGraphFlowOrchestrator
    • Node:GraphNodeHandler + 各节点实现
    • Policy:route/budget/scenario/event-query
    • Replay:GraphReplayApplicationService + AgentGraphReplayController
  • 边界:负责编排与状态推进,不侵入底层 Skill 的具体实现细节。

3.2.8 其他业务模块(user/member/points/video 等)

  • 职责:用户域、会员域、积分域、视频域等业务上下文;
  • 关键点:通过 command/context 为 Graph 提供业务输入;
  • 边界:保持领域边界,不把流程编排逻辑带回本域。

3.3 从 LangGraph 思想到 Spring AI Alibaba Graph 实现的映射

可以用下面这张映射表解释“概念如何落地”:

  • State(状态) -> AgentGraphState
  • Node(节点) -> GraphNodeHandler 体系
  • Edge/Router(边) -> GraphRoutePolicy + GraphScenarioPolicy
  • Checkpoint(断点) -> agent_graph_checkpoint + resume
  • Event/Trace(事件追踪) -> agent_graph_event + replay API
  • Tool Invocation(工具调用) -> GraphRuntimeGateway.executeSkill(...)

3.4 运行流程图(主链路 + 分支)

image.png

这张图的重点不在“节点名字”,而在三层能力:

  • 主流程稳定推进;
  • 条件分支可插拔;
  • 最终都收敛到 TokenSettle -> Return/SSE,保持出口一致性。

四、核心状态模型:AgentGraphState 为什么是这次改造的中心

AgentGraphState 是整个图执行过程的“单一事实来源(Single Source of Truth)”,典型字段如下:

  • runId:一次执行链路唯一标识(可直接作为 traceId);
  • userId:归属用户;
  • conversationId:多轮会话关联键;
  • scenario:场景标识(publish、chat、xhs 等);
  • currentNode:当前执行节点;
  • nodeOutputs:节点输出快照集合(流程上下文);
  • tokenUsage:累计 token 消耗;
  • status:执行状态(RUNNING/SUCCESS/FAILED);
  • error:失败错误信息;
  • checkpointId:最近 checkpoint 标识;
  • retryCount:当前节点重试次数。

为什么不用“散落变量”而要统一状态对象

  1. 恢复执行:checkpoint 需要完整状态序列化;
  2. 节点解耦:节点只关心上下文,不关心调用栈细节;
  3. 可回放:回放时必须有一致快照与事件锚点;
  4. 可测试:状态驱动比线程局部变量更好写测试。

五、数据模型与持久化:运行轨迹如何落盘

5.1 agent_run 扩展字段

在既有运行表上增加 Graph 相关字段(统一入口视角):

  • graph_name:图名称(如 baoma-agent-graph
  • graph_version:图版本(灰度/回滚依据)
  • checkpoint_id:最后一次 checkpoint
  • last_node:最近执行节点
  • replayable_snapshot:可回放快照(压缩/裁剪后)

5.2 新增表:checkpoint / metric / event

  1. agent_graph_checkpoint(断点快照,供 resume 反序列化后继续跑)

    • id:自增主键;列表/关联用。
    • checkpoint_id:业务侧断点唯一 ID(UUID),与 /checkpoints/{checkpointId}/resume 入参一致,agent_run.checkpoint_id 也会回写最近一次。
    • run_id:本次 Agent 图运行的 trace/run 标识,与 agent_run.trace_id 对齐。
    • node_name:写入 checkpoint 时所处的图节点(如 RAG_RETRIEVEGENERATE);续跑时由此推算「下一节点」。
    • snapshot_json:可反序列化的完整快照(通常含 GraphRunCommand + AgentGraphState JSON),恢复即以此为起点。
    • create_time:落库时间(审计与排序)。
  2. agent_graph_node_metric(节点级 SLI/SLO:耗时、token、成败)

    • id:自增主键。
    • run_id:归属运行 ID。
    • node_name:被执行的节点枚举名(与 GraphNodeName 一致)。
    • duration_ms:该节点从进入执行到成功或耗尽重试的 wall time(毫秒),含重试退避;成功或最终失败时各落库一条指标。
    • input_tokens / output_tokens:输入/输出 token 估计或计量(当前实现里常与累计用量同一口径落库,便于后续拆维度)。
    • hit_count:扩展计数位(如缓存命中、召回条数等,缺省 0)。
    • retry_count:该节点失败前的已重试次数(与编排退避策略对应)。
    • status:该次节点执行结果代码,如 SUCCESS / FAILED
    • error_message:失败时错误摘要(成功一般为空)。
    • create_time:指标写入时间。
  3. agent_graph_event(过程事件流,供回放 API 与排障)

    • id:自增主键;分页游标常按 id 升/降序。
    • run_id:归属运行 ID。
    • node_name:事件关联节点(运行级事件可为空)。
    • event_type:事件类型,如 run-startnode-successcheckpoint-savedrun-failed 等,供筛选用。
    • message:人类可读短描述。
    • attributes_json:结构化附加字段(tokenUsage、error、checkpointId 等)的 JSON。
    • event_time:业务事件发生时间(排序、时间窗查询)。
    • create_time:落库时间(可能与 event_time 略有先后)。

5.3 这三类表分别解决什么问题

  • checkpoint:解决“断点恢复”;
  • metric:解决“性能与成本治理”;
  • event:解决“过程可解释与回放排障”。

六、编排与节点设计:把“流程控制”从“能力执行”中解耦

6.1 Orchestrator 的职责边界

GraphFlowOrchestrator 只负责:

  • 节点执行顺序;
  • 分支判断;
  • checkpoint 时机;
  • 重试与退避;
  • 事件发布与指标记录。

不负责具体业务能力实现(例如真正的 RAG 或合规判定),这些通过 GraphRuntimeGateway 下沉到基础设施适配层。

6.2 NodeHandler 策略化

每个节点一个 GraphNodeHandler,例如:

  • InputGuardNodeHandler
  • RagRetrieveNodeHandler
  • GenerateNodeHandler
  • ComplianceCheckNodeHandler
  • TokenSettleNodeHandler
  • ReturnSseNodeHandler

优势:

  • 新节点扩展只需注册 handler,不需要改大 switch
  • 节点可单测;
  • 节点失败/重试行为统一由 Orchestrator 托管。

6.3 Support 拆分为领域服务

为降低“万能工具类”聚合度,拆出了:

  • GraphContentAssemblyService:终态内容拼装、截断;
  • GraphTokenEstimator:token 估算;
  • GraphEventQueryPolicy:事件查询契约归一化(limit/order/cursor)。

这一步是典型 DDD 改造:
把“规则”抽到领域服务,把“调用”留在应用编排。


七、可靠性设计:重试、checkpoint、resume 是怎么配合的

7.1 节点级重试

  • 每节点有 maxAttempts
  • 指数退避(initialBackoffMs -> maxBackoffMs);
  • 重试与失败写入 metric + event;
  • 最终失败时更新运行状态并抛出。

7.2 checkpoint 策略

关键节点后持久化:

  • RAG_RETRIEVE
  • GENERATE
  • COMPLIANCE_CHECK
  • TOKEN_SETTLE

每次 checkpoint 都会同步更新 agent_run.checkpoint_id/last_node,保证恢复入口最新。

7.3 resume 流程

  1. 根据 checkpointId 找到最新快照;
  2. 反序列化得到 state + command
  3. 计算下一节点;
  4. 从下一节点继续执行;
  5. 收敛结果并更新 run。

这使得“偶发外部故障”不再导致“全链路重跑”。


八、观测与回放:从“日志猜错”到“轨迹排障”

8.1 事件体系

典型事件:

  • run-start
  • node-success
  • node-retry
  • node-failed
  • checkpoint-saved
  • branch-safe-rewrite
  • branch-general-generate
  • branch-short-answer
  • run-success
  • run-failed

8.2 回放接口(增强后)

当前回放接口不仅返回 run/checkpoints/metrics,还支持事件筛选与分页。

接口 1:获取回放详情

GET /api/v1/agent/graph/runs/{runId}/replay

支持参数:

  • eventType(可选)
  • nodeName(可选)
  • fromTime(可选,ISO 时间)
  • toTime(可选)
  • cursor(可选,游标)
  • order(可选,asc|desc,默认 desc
  • limit(可选,默认 200,范围 1~1000)

请求示例

curl -G "http://localhost:8080/api/v1/agent/graph/runs/9f6a/replay" \
  --data-urlencode "eventType=node-failed" \
  --data-urlencode "order=desc" \
  --data-urlencode "limit=50"

响应示例(精简)

{
  "code": 0,
  "msg": "ok",
  "data": {
    "run": {
      "traceId": "9f6a",
      "status": "FAILED",
      "graphName": "baoma-agent-graph",
      "graphVersion": "v1",
      "lastNode": "GENERATE"
    },
    "checkpoints": [],
    "metrics": [],
    "events": [],
    "eventsPage": {
      "nextCursor": 10231,
      "hasMore": true,
      "returnedCount": 50,
      "order": "desc",
      "limit": 50
    }
  }
}

8.3 查询契约治理(关键细节)

GraphEventQueryPolicy 会统一归一化参数,避免不同层重复判断导致行为不一致:

  • 非法 order 自动回落 desc
  • limit 自动钳制到 [1, 1000]
  • 无效 cursor(<=0)自动忽略。

这类“契约收敛”虽然看似小优化,但对长期稳定性非常重要。


九、接口示例:恢复执行(Resume)

接口 2:从 checkpoint 恢复

POST /api/v1/agent/graph/checkpoints/{checkpointId}/resume

请求示例

curl -X POST "http://localhost:8080/api/v1/agent/graph/checkpoints/cp_abc123/resume"

响应示例(精简)

{
  "code": 0,
  "msg": "ok",
  "data": {
    "state": {
      "runId": "9f6a",
      "status": "SUCCESS",
      "currentNode": "RETURN_SSE",
      "tokenUsage": 932
    },
    "snapshots": {
      "prompt": "请生成母婴脚本",
      "ragCompressed": "...",
      "generate": {
        "wechatContent": "..."
      }
    }
  }
}

十、配置与灰度:如何做到“可上线、可回滚”

建议配置维度:

  • baoma.ai.graph.enabled:总开关;
  • baoma.ai.graph.scenario-whitelist:场景白名单;
  • maxAttempts/initialBackoffMs/maxBackoffMs:重试参数;
  • defaultTokenBudget + scenarioTokenBudget:预算策略;
  • allowSafeRewriteScenarios / allowGeneralFallbackScenarios / allowShortAnswerScenarios:分支许可。

灰度顺序建议:

  1. 小流量场景开启 Graph;
  2. 先开主链路,后开分支;
  3. 观察 node metric 与 event;
  4. 达标后扩大场景;
  5. 留好一键回退到串行链路的能力。

十一、测试策略:别只测 happy path

最少覆盖四类测试:

  1. 策略测试GraphRoutePolicy/GraphBudgetPolicy/GraphEventQueryPolicy
  2. 编排测试:主链路 + 三大分支(无召回/高风险/超预算)
  3. 恢复测试:checkpoint -> resume -> 收敛成功
  4. 契约测试:回放接口参数边界(order/limit/cursor/time window)

目前已覆盖:

  • 主链路与分支集成测试;
  • resume 流程测试;
  • 查询策略归一化测试;
  • 事件分页与排序能力落地。

十二、踩坑与复盘

12.1 典型坑

  • 把所有逻辑堆进 Orchestrator,导致“巨型服务”;
  • 事件查询参数在控制器、应用层、网关层分别处理,行为不一致;
  • 只记录失败日志,不记录分支决策事件,排障成本高;
  • 没有 checkpoint 粒度设计,恢复要么太频繁要么不够用。

12.2 复盘结论

真正好用的 Graph 架构,不是“会画图”:

  • 是规则有边界(策略服务);
  • 是状态可恢复(checkpoint);
  • 是过程可解释(event + replay);
  • 是上线可灰度(feature flag + scenario whitelist)。

结语

这次改造最有价值的,不是“用了 Graph”这件事本身,而是建立了一条长期可演进的工程主线:
状态驱动 + 策略下沉 + 可恢复 + 可观测 + 可灰度
当业务继续增长时,你不需要重写系统,只需要在既有边界内扩节点、扩策略、扩观测,这就是 DDD 改造真正带来的复利。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容