「AI 流式接口」的完整代码

一套可直接运行、贴合前端联调场景的 AI 流式接口完整代码,包含「FastAPI 后端流式接口 + Vue3 前端对接代码」,还标注了前端视角的关键细节(比如 SSE 格式、断连重连),你复制就能跑通 AI 聊天功能。

一、核心说明(前端视角)

AI 流式返回的本质是 SSE(Server-Sent Events)

  • 后端:逐字/逐段返回数据,格式必须是 data: 内容\n\n
  • 前端:用 EventSourcefetch + ReadableStream 接收,实现「边输边显」的聊天效果
  • 优势:比 WebSocket 更轻量,不用维护连接,适合 AI 对话场景

二、FastAPI 后端:AI 流式接口完整代码

1. 完整后端代码(main.py)

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio  # 异步睡眠模拟AI思考/返回
import json

# 初始化FastAPI
app = FastAPI(title="AI 流式接口示例", version="1.0")

# 配置跨域(前端必配,否则SSE会跨域报错)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 生产环境替换为前端域名
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 定义请求体模型(前端发送的聊天消息)
class ChatRequest(BaseModel):
    message: str  # 用户输入的消息
    model: str = "default"  # 可选:指定AI模型

# 模拟AI回复生成(真实场景替换为调用豆包/OpenAI API)
async def generate_ai_response(user_msg: str):
    """
    模拟AI逐字返回回复,真实场景替换为:
    1. 调用豆包API:https://www.doubao.com/open-api
    2. 调用OpenAI API:https://api.openai.com/v1/chat/completions
    """
    # 模拟AI思考(前端可显示「正在思考...」)
    await asyncio.sleep(1)
    
    # 示例回复(真实场景替换为API返回的流式内容)
    ai_reply = f"你问的是:{user_msg}。这是AI的流式回复,逐字返回给前端~"
    
    # 逐字返回(模拟流式输出)
    for char in ai_reply:
        # 封装SSE格式:必须是 data: 内容\n\n,前端才能解析
        # 建议把内容JSON序列化,方便前端解析(支持多字段)
        yield json.dumps({
            "type": "text",  # 类型:文本/结束/错误
            "content": char
        }) + "\n\n"
        await asyncio.sleep(0.05)  # 控制返回速度(模拟打字效果)
    
    # 发送结束标识(前端收到后关闭连接)
    yield json.dumps({
        "type": "end",
        "content": ""
    }) + "\n\n"

# AI流式聊天接口(核心)
@app.post("/api/ai/chat/stream")
async def ai_chat_stream(request: ChatRequest):
    """
    AI流式聊天接口
    前端请求方式:POST + SSE
    """
    if not request.message:
        raise HTTPException(status_code=400, detail="消息不能为空")
    
    # 返回流式响应,media_type必须是 text/event-stream
    return StreamingResponse(
        generate_ai_response(request.message),
        media_type="text/event-stream"
    )

# 测试接口(验证服务是否正常)
@app.get("/")
def root():
    return {"message": "AI流式接口已启动,访问 /api/ai/chat/stream 测试"}

2. 依赖安装(补充到 requirements.txt)

fastapi>=0.104.1
uvicorn>=0.24.0
pydantic>=2.0.0
aiohttp>=3.9.0  # 真实场景调用AI API需要的异步请求库

3. 启动后端

uvicorn main:app --reload --host 0.0.0.0 --port 8000

三、Vue3 前端:对接流式接口完整代码

1. 前端组件(ChatAI.vue)

<template>
  <div class="ai-chat">
    <!-- 聊天记录 -->
    <div class="chat-history">
      <div v-for="(msg, index) in chatHistory" :key="index" class="chat-item">
        <div class="user-msg" v-if="msg.role === 'user'">
          你:{{ msg.content }}
        </div>
        <div class="ai-msg" v-if="msg.role === 'ai'">
          AI:{{ msg.content }}
          <span v-if="msg.loading">🤔 正在思考...</span>
        </div>
      </div>
    </div>

    <!-- 输入框 -->
    <div class="chat-input">
      <input
        v-model="userInput"
        placeholder="输入你的问题..."
        @keyup.enter="sendMessage"
      />
      <button @click="sendMessage" :disabled="isLoading">发送</button>
    </div>
  </div>
</template>

<script setup>
import { ref, onUnmounted } from 'vue'

// 聊天记录
const chatHistory = ref([])
// 用户输入
const userInput = ref('')
// 是否加载中(防止重复发送)
const isLoading = ref(false)
// SSE连接对象(用于手动关闭)
let eventSource = null

/**
 * 发送消息并接收流式响应
 */
const sendMessage = async () => {
  const msg = userInput.value.trim()
  if (!msg || isLoading.value) return

  // 1. 清空输入框,添加用户消息到记录
  userInput.value = ''
  chatHistory.value.push({
    role: 'user',
    content: msg
  })

  // 2. 添加AI加载中的消息
  const aiMsgIndex = chatHistory.value.push({
    role: 'ai',
    content: '',
    loading: true
  }) - 1

  isLoading.value = true

  try {
    // 3. 调用流式接口(用fetch实现,比EventSource更灵活,支持POST)
    const response = await fetch('http://localhost:8000/api/ai/chat/stream', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        message: msg,
        model: 'default'
      })
    })

    if (!response.ok) {
      throw new Error(`请求失败:${response.status}`)
    }

    // 4. 读取流式响应
    const reader = response.body.getReader()
    const decoder = new TextDecoder('utf-8')
    let buffer = ''

    while (true) {
      const { done, value } = await reader.read()
      if (done) break

      // 解码二进制数据为文本
      buffer += decoder.decode(value, { stream: true })
      // 按SSE分隔符分割(\n\n)
      const messages = buffer.split('\n\n')
      buffer = messages.pop() || ''

      // 处理每一条SSE消息
      for (const msgChunk of messages) {
        if (!msgChunk.startsWith('data: ')) continue
        
        // 提取data内容并解析JSON
        const dataStr = msgChunk.slice(6)
        if (!dataStr) continue
        
        try {
          const data = JSON.parse(dataStr)
          
          // 文本内容:追加到AI消息
          if (data.type === 'text' && data.content) {
            chatHistory.value[aiMsgIndex].content += data.content
          }
          
          // 结束标识:停止加载
          if (data.type === 'end') {
            chatHistory.value[aiMsgIndex].loading = false
            isLoading.value = false
            break
          }
        } catch (e) {
          console.error('解析SSE消息失败:', e)
        }
      }
    }
  } catch (e) {
    // 异常处理
    chatHistory.value[aiMsgIndex].content = `请求失败:${e.message}`
    chatHistory.value[aiMsgIndex].loading = false
    isLoading.value = false
    console.error('流式接口调用失败:', e)
  }
}

// 组件卸载时关闭连接
onUnmounted(() => {
  if (eventSource) {
    eventSource.close()
  }
  isLoading.value = false
})
</script>

<style scoped>
.ai-chat {
  width: 600px;
  margin: 20px auto;
  border: 1px solid #eee;
  border-radius: 8px;
  padding: 10px;
}

.chat-history {
  height: 400px;
  overflow-y: auto;
  margin-bottom: 10px;
}

.chat-item {
  margin: 8px 0;
}

.user-msg {
  text-align: right;
  color: #333;
}

.ai-msg {
  text-align: left;
  color: #666;
}

.chat-input {
  display: flex;
  gap: 10px;
}

.chat-input input {
  flex: 1;
  padding: 8px;
  border: 1px solid #ddd;
  border-radius: 4px;
}

.chat-input button {
  padding: 8px 16px;
  background: #409eff;
  color: white;
  border: none;
  border-radius: 4px;
  cursor: pointer;
}

.chat-input button:disabled {
  background: #999;
  cursor: not-allowed;
}
</style>

四、真实场景改造:对接豆包/OpenAI API

上面是模拟AI回复,真实项目中替换 generate_ai_response 函数即可,以下是对接豆包API的示例(需要先申请API Key):

import aiohttp  # 异步请求库

# 替换为你的豆包API Key
DOUBAO_API_KEY = "your-doubao-api-key"

async def generate_ai_response(user_msg: str):
    """真实对接豆包API的流式响应"""
    # 豆包API地址
    url = "https://www.doubao.com/open-api/v1/chat/completions"
    
    # 请求参数
    payload = {
        "model": "doubao-pro",
        "messages": [{"role": "user", "content": user_msg}],
        "stream": True  # 开启流式返回
    }
    
    # 请求头
    headers = {
        "Authorization": f"Bearer {DOUBAO_API_KEY}",
        "Content-Type": "application/json"
    }
    
    try:
        # 异步请求豆包API
        async with aiohttp.ClientSession() as session:
            async with session.post(url, json=payload, headers=headers) as response:
                if response.status != 200:
                    yield json.dumps({
                        "type": "text",
                        "content": f"API请求失败:{response.status}"
                    }) + "\n\n"
                    yield json.dumps({"type": "end"}) + "\n\n"
                    return
                
                # 逐行读取流式响应
                async for line in response.content:
                    if not line:
                        continue
                    
                    # 解析豆包API的流式返回(格式:data: { ... })
                    line_str = line.decode('utf-8').strip()
                    if line_str.startswith('data: '):
                        data_str = line_str[6:]
                        if data_str == '[DONE]':  # 结束标识
                            yield json.dumps({"type": "end"}) + "\n\n"
                            break
                        
                        try:
                            data = json.loads(data_str)
                            # 提取AI回复内容
                            content = data["choices"][0]["delta"].get("content", "")
                            if content:
                                yield json.dumps({
                                    "type": "text",
                                    "content": content
                                }) + "\n\n"
                        except Exception as e:
                            continue
    except Exception as e:
        yield json.dumps({
            "type": "text",
            "content": f"调用AI API失败:{str(e)}"
        }) + "\n\n"
        yield json.dumps({"type": "end"}) + "\n\n"

五、前端联调关键避坑点

  1. SSE格式必须严格:后端返回的每一行必须是 data: 内容\n\n,少一个换行都会导致前端解析失败;
  2. 跨域配置:FastAPI的CORS必须配全,尤其是 allow_credentials=True,否则前端无法接收SSE;
  3. 断连处理:前端要监听「结束标识」或连接关闭事件,避免AI回复完成后还显示「加载中」;
  4. 错误处理:网络中断、API报错时,前端要友好提示,不要让用户看到空白;
  5. 速度控制:后端通过 asyncio.sleep(0.05) 控制返回速度,模拟「打字机效果」,提升用户体验。

总结

  1. 核心逻辑:后端用 StreamingResponse 返回SSE格式数据,前端用 fetch + ReadableStream 逐段接收;
  2. 真实对接:替换 generate_ai_response 函数为真实AI API调用即可落地;
  3. 前端体验:重点处理「加载状态、断连、错误提示」,保证流式聊天的流畅性。

这套代码你可以直接整合到之前的 FastAPI + Vue3 全栈项目中,形成「前端 + Python + AI 流式接口」的完整作品集,写进简历里会是核心亮点。如果需要补充「AI 知识库 RAG 接口」或「接口鉴权」的代码,直接说就行。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容