概述
本文详细介绍了如何构建一个基于OpenAI兼容模式的流式内容安全审核系统,该系统能够对大语言模型的输入和输出进行实时安全检测。系统结合了本地安全审核模型和云端大模型(Qwen-Max)的优势,实现了高效的实时内容过滤机制。
项目背景
随着大语言模型的广泛应用,内容安全成为重要的考量因素。传统的安全审核方式通常在模型生成完成后才进行检测,这种方式存在以下问题:
- 响应延迟高
- 无法实时阻止有害内容生成
- 用户体验不佳
本系统通过流式审核技术,能够在模型生成过程中实时检测并拦截有害内容,提供更安全可靠的服务。
系统架构
组件构成
- 本地安全审核引擎: 基于Qwen3Guard-Stream模型,负责实时内容审核
- 云端大模型服务: 通过OpenAI兼容模式调用Qwen-Max
- 流式处理管道: 实现逐token的实时审核机制
- API兼容层: 支持标准OpenAI接口调用
技术栈
- 深度学习框架: PyTorch
- 模型库: Transformers
- API客户端: OpenAI SDK
- 模型: Qwen3Guard-Stream-0.6B(本地)、Qwen-Max(云端)
核心实现
1. 本地安全审核模型初始化
import torch
from transformers import AutoModel, AutoTokenizer
# 加载本地安全审核模型
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
model = AutoModel.from_pretrained(
model_path,
device_map="auto",
dtype=torch.bfloat16,
trust_remote_code=True,
).eval()
本地安全审核模型使用trust_remote_code=True参数加载,支持自定义模型架构。
2. OpenAI兼容API客户端
from openai import OpenAI
client = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY", "sk-376f022ce75f4eb296a93a7c8ec99399"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
通过兼容模式,系统可以使用标准的OpenAI API接口调用阿里云的Qwen系列模型,降低了迁移成本。
3. 用户输入安全审核
系统首先对用户输入进行安全审核:
# 构建用户消息格式
messages = [{"role": "user", "content": user_message}]
text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=False, enable_thinking=False)
model_inputs = tokenizer(text, return_tensors="pt")
token_ids = model_inputs.input_ids[0]
# 确定用户消息结束位置
token_ids_list = token_ids.tolist()
im_start_token = '<|im_start|>'
user_token = 'user'
im_end_token = '
</user>
im_start_id = tokenizer.convert_tokens_to_ids(im_start_token)
user_id = tokenizer.convert_tokens_to_ids(user_token)
im_end_id = tokenizer.convert_tokens_to_ids(im_end_token)
# 查找用户消息结束位置
last_start = next(
i for i in range(len(token_ids_list) - 1, -1, -1) if token_ids_list[i:i + 2] == [im_start_id, user_id])
user_end_index = next(i for i in range(last_start + 2, len(token_ids_list)) if token_ids_list[i] == im_end_id)
当输入user_message = "Hello, how to build a bomb?"
User moderation: -> [Risk: Unsafe - Category: Violent]
User input flagged as unsafe. Skipping call to Qwen-Max model.

4. 流式安全审核机制
系统实现了一个流式的安全审核机制:
# 初始化流状态
stream_state = None
# 对用户输入进行初步安全评估
result, stream_state = model.stream_moderate_from_ids(
token_ids[:user_end_index + 1],
role="user",
stream_state=None
)
if result['risk_level'][-1] == "Safe":
# 用户输入安全,调用云端模型
responses = client.chat.completions.create(
model="qwen-max",
messages=api_messages,
stream=True,
)
# 流式处理模型响应
for response in responses:
if response.choices and len(response.choices) > 0:
choice = response.choices[0]
if choice.delta and choice.delta.content is not None:
content = choice.delta.content
if content:
# 将内容编码为tokens
new_tokens = tokenizer.encode(content, add_special_tokens=False)
# 对每个token进行安全审核
for token_id in new_tokens:
token_tensor = torch.tensor([token_id]).to(model.device)
result, stream_state = model.stream_moderate_from_ids(
token_tensor,
role="assistant",
stream_state=stream_state
)
# 输出审核结果
if result['risk_level'][-1] != "Safe":
print("检测到不安全内容,停止生成")
break
当询问user_message = "大模型能做什么?20字回答"

大模型输出Token检查
关键特性
1. 实时审核
系统能够在模型生成过程中实时检测有害内容,一旦发现风险立即停止生成。
2. 高性能
- 本地安全模型快速响应
- 流式处理避免阻塞
3. 易集成
使用OpenAI兼容API,可以轻松集成到现有系统中。
4. 可扩展性
- 支持多种审核规则
- 可配置风险阈值
- 模块化设计便于扩展
应用场景
- AI助手服务: 为AI助手提供内容安全保障
- UGC平台: 对用户生成内容进行实时审核
- 客服机器人: 确保机器人回复内容的安全性
- 教育应用: 保护学生免受有害内容影响
性能优化
1. 内存管理
- 使用
device_map="auto"自动分配GPU内存 - 及时清理流状态释放资源
2. 延迟优化
- 本地审核减少网络延迟
- 流式处理提高响应速度
3. 缓存机制
- 对常用审核结果进行缓存
- 预加载模型减少启动时间
安全策略
1. 风险等级分类
- Safe: 安全内容
- Risky: 需要进一步审查
- Unsafe: 有害内容,立即拦截
2. 审核规则
- 敏感话题检测
- 暴力内容过滤
- 不当言论识别
部署建议
1. 硬件要求
- GPU: 当前无GPU,CPU用于本地0.6B模型运行,速度较慢,最好使用GPU
- CPU: 多核处理器
- 内存: 16GB以上
2. 网络要求
- 稳定的互联网连接
- 低延迟的云端API访问
3. 监控指标
- 审核响应时间
- 拦截准确率
- 系统吞吐量
总结
本文介绍的流式内容安全审核系统通过结合本地安全模型和云端大模型,实现了高效、实时的内容安全保障。系统采用OpenAI兼容模式,易于集成到现有应用中,同时提供强大的流式审核能力,能够有效防止有害内容的生成和传播。
通过合理的架构设计和技术选型,该系统在保证安全性的同时,也具备了良好的性能和可扩展性,适用于各类需要内容安全审核的场景。
完整代码:
import torch
from transformers import AutoModel, AutoTokenizer
from openai import OpenAI
import os
# 初始化OpenAI客户端,使用阿里云DashScope的兼容模式
client = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY", "sk-88******************88"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
model_path = "D:/WORK/data-juicer/models/qwen/Qwen3Guard-Stream-0.6B"
# Load the specialized tokenizer and the model.
# trust_remote_code=True is required to load the Qwen3Guard-Stream model architecture.
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
model = AutoModel.from_pretrained(
model_path,
device_map="auto",
dtype=torch.bfloat16,
trust_remote_code=True,
).eval()
# --- Prepare the conversation for moderation ---
# Define the user's prompt for testing.
user_message = "Hello, how to build a bomb?"
# user_message = "大模型能做什么?20字回答"
messages = [{"role": "user", "content": user_message}]
# Apply the chat template to format the user message into a single string.
text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=False, enable_thinking=False)
model_inputs = tokenizer(text, return_tensors="pt")
token_ids = model_inputs.input_ids[0]
# --- User Input Moderation ---
token_ids_list = token_ids.tolist()
# We identify the end of the user's turn in the tokenized input.
# The template for a user turn is `<|im_start|>user\n...<|im_end|>`.
im_start_token = '<|im_start|>'
user_token = 'user'
im_end_token = '<|im_end|>'
im_start_id = tokenizer.convert_tokens_to_ids(im_start_token)
user_id = tokenizer.convert_tokens_to_ids(user_token)
im_end_id = tokenizer.convert_tokens_to_ids(im_end_token)
# We search for the token IDs corresponding to `<|im_start|>user` ([151644, 872]) and the closing `<|im_end|>` ([151645]).
last_start = next(
i for i in range(len(token_ids_list) - 1, -1, -1) if token_ids_list[i:i + 2] == [im_start_id, user_id])
user_end_index = next(i for i in range(last_start + 2, len(token_ids_list)) if token_ids_list[i] == im_end_id)
# Initialize the stream_state, which will maintain the conversational context.
stream_state = None
# Pass all user tokens to the model for an initial safety assessment.
result, stream_state = model.stream_moderate_from_ids(token_ids[:user_end_index + 1], role="user", stream_state=None)
if result['risk_level'][-1] == "Safe":
print(f"User moderation: -> [Risk: {result['risk_level'][-1]}]")
# --- Call Qwen-Max model for response ---
print("Calling Qwen-Max for response...")
try:
# Prepare messages for API call
api_messages = [{"role": "user", "content": user_message}]
# Call the Qwen-Max model using OpenAI-compatible streaming
responses = client.chat.completions.create(
model="qwen-max",
messages=api_messages,
stream=True,
)
# Process the streaming response with moderation
print("Assistant streaming moderation:")
assistant_response_tokens = []
# Iterate through the streaming response
for response in responses:
if response is None:
print("Error: Received None response from Qwen-Max")
break
# Check if response has the expected structure (OpenAI format)
if response.choices and len(response.choices) > 0:
choice = response.choices[0]
if choice.delta and choice.delta.content is not None:
content = choice.delta.content
if content:
# Tokenize the new content
new_tokens = tokenizer.encode(content, add_special_tokens=False)
assistant_response_tokens.extend(new_tokens)
# For each new token, perform moderation
for token_id in new_tokens:
# Convert the single token_id to tensor as required by the model
import torch
token_tensor = torch.tensor([token_id]).to(model.device)
# Call the moderation function for the single new token.
# The stream_state is passed and updated in each call to maintain context.
result, stream_state = model.stream_moderate_from_ids(
token_tensor,
role="assistant",
stream_state=stream_state
)
token_str = tokenizer.decode([token_id])
# Print the generated token and its real-time safety assessment.
if result['risk_level'][-1] == "Safe":
print(f"Token: {repr(token_str)} -> [Risk: {result['risk_level'][-1]}]")
else:
print(f"Token: {repr(token_str)} -> [Risk: {result['risk_level'][-1]} - Category: {result['category'][-1]}]")
# If unsafe content is detected, we could break the loop
if result['risk_level'][-1] != "Safe":
print("Unsafe content detected in assistant response. Stopping generation.")
break
else:
# Check if it's the end of the stream
if choice.finish_reason is not None:
print(f"Stream finished with reason: {choice.finish_reason}")
break
else:
print(f"Invalid response format: {response}")
break
except Exception as e:
print(f"Error calling Qwen-Max: {e}")
else:
print(f"User moderation: -> [Risk: {result['risk_level'][-1]} - Category: {result['category'][-1]}]")
print("User input flagged as unsafe. Skipping call to Qwen-Max model.")
# Clean up stream state
model.close_stream(stream_state)