一、引言:为什么 大模型 需要“被教育”?
大模型就像个天才但缺乏社会经验的书呆子。它读完了整个互联网的文本,能接住你任何一句话的下文,但问题在于——它只知道“文字接龙”的规则,不知道“人 类 喜欢什么”。当你问“帮我写一封辞职信”时,它可能给你一篇激情辱骂老板的泄愤文,因为你从网上扒下来的训练数据里,这种“真性情”的范文太多了。但你想要的,是一封得体、专业、保持体面的正式函件。
这个差距就是对齐问题:我们教 模型 学会了“怎么说”,但没教会它“说什么才是好的”。
InstructGPT的三阶段训练方案正是来补课的——它通过SFT、RM和PPO三步,把一个大模型从一个只会模仿的“书呆子”变成一个懂得审时度势、会说人话的“好助理”。本文将详细拆解这三步的每一处细节,并提供完整的可运行代码,让你亲手体验“驯服”一个语言模型的完整过程。
二、项目背景:电商评论生成场景
为了让整篇文章有一个具体的落脚点,我们先设定一个实战场景:电商平台需要为无人问津的“长尾商品”自动生成正向评论,以吸引买家下单。
我们的最终目标,是训练一个能够编写正向评论的 大语言模型 。具体来说:
-
首先,我们有一个中文预训练模型——gpt2-chinese-cluecorpussmall。
-
这个模型虽然能写中文,但写出来的可能是正向评论,也可能是负向评论,完全不受控。
-
我们需要用SFT先让它学会“写电商评论”;再用RM训练出一个可以判断“评论好坏”的评分模型;最后用PPO把SFT模型向“写正向评论”的方向微调。
三步走完,模型输出正向评论的概率就会大幅提升。
下面,我们就一步步动手实现。
三、第一步:SFT(有监督微调)——教会模型“如何说话”
1. SFT到底在干什么?
SFT是所有后续步骤的“地基”。它的原理非常简单:用人工标注的“高质量对话样本”对预训练模型进行一轮有监督的微调,让模型初步学会“按照指令完成任务”的能力。
在InstructGPT论文中,SFT用了大约13,000条人工标注的(Prompt,高质量回答)对进行训练。在本项目里,我们直接提供一个电商评论 数据集 ,让gpt2学习如何撰写“商品评论”。
2. 核心要点
-
训练时长
:通常只训练1个epoch即可,更多轮次反而容易过拟合和灾难性遗忘。
-
目标函数
:最大化模型生成回答的对数似然,也就是让模型在给定prompt的条件下,生成的回答越来越接近标注的高质量回答。
-
数据准备
:我们的数据集里既有正面评论也有负面评论,所以SFT模型只是学会了“怎么写评论”,还没有学会“只写好评论”——这是PPO要完成的任务。
3. 完整代码实现及逐行注释
"""
SFT阶段:对预训练的GPT2模型进行有监督微调,让模型学会写电商评论。
本代码完整实现了从加载数据到保存模型的全过程。
"""
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, DataCollatorForLanguageModeling
from torch.utils.data import DataLoader
from datasets import load_dataset
# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# ==================== 1. 加载数据集 ====================
dataset = load_dataset("csv", data_files="online_shopping_10_cats.csv")
ds_train = dataset["train"]
# 过滤掉过长或过短的评论
# GPT2的最大上下文长度为1024个token,评论长度必须在这个范围内
ds_train = ds_train.filter(
lambda x: x["review"] is not None
and len(x["review"]) > 20
and len(x["review"]) < 1024
)
print(f"数据集过滤后共有 {len(ds_train)} 条评论")
# ==================== 2. 加载分词器和模型 ====================
model_path = "./gpt2-chinese-cluecorpussmall" # 中文预训练GPT2模型路径
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForCausalLM.from_pretrained(model_path)
# 重要:将eos_token(结束符)设置为pad_token(填充符)
# 这是因为本模型在训练时需要进行批次填充(padding),而pad_token原本不存在
tokenizer.pad_token = tokenizer.eos_token
model.to(device)
# ==================== 3. 分词处理 ====================
def tokenize(batch):
"""
对批次中的评论进行分词处理
输入的batch包含文本字段,输出的是tokenized的结果
"""
return tokenizer(batch["review"])
map_kwargs = {
"batched": True, # 批量处理
"batch_size": 512, # 每批次处理512条数据
"remove_columns": ["cat", "label", "review"] # 移除原始字段
}
tokenized_dataset_train = ds_train.map(tokenize, **map_kwargs)
# 设置数据集格式为torch张量,便于后续dataloader使用
tokenized_dataset_train.set_format(type="torch")
# ==================== 4. 准备数据加载器 ====================
# DataCollatorForLanguageModeling: 专业的数据整理器
# mlm=False 表示使用因果语言模型(自回归)的数据格式
# 它会自动为模型准备好input_ids(输入token)、attention_mask(注意力掩码)和labels(标签)
data_collator = DataCollatorForLanguageModeling(
tokenizer,
mlm=False # 不启用掩码语言建模,使用因果语言建模
)
dataloader_params = {
"batch_size": 2, # 批次大小,可根据GPU显存调整
"collate_fn": data_collator
}
train_dataloader = DataLoader(tokenized_dataset_train, **dataloader_params)
# ==================== 5. 设置优化器 ====================
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5) # 学习率一般采用5e-5左右
# ==================== 6. 训练循环 ====================
# 只训练1个epoch,防止过拟合
num_epochs = 1
model.train()
for epoch in range(num_epochs):
total_loss = 0.0
for step, batch in enumerate(train_dataloader):
# 将批次数据移到GPU
batch = {k: v.to(device) for k, v in batch.items()}
# 前向传播:模型根据input_ids预测下一个token
outputs = model(**batch)
loss = outputs.loss # 交叉熵损失
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
# 每100个batch打印一次损失
if step % 100 == 0:
print(f"Step: {step}, Loss: {loss.item():.4f}")
print(f"Epoch {epoch+1} completed. Average Loss: {total_loss/len(train_dataloader):.4f}")
# ==================== 7. 保存模型 ====================
model.save_pretrained("./gpt2-sft")
tokenizer.save_pretrained("./gpt2-sft")
print("SFT模型已保存到 ./gpt2-sft")
# ==================== 8. 简单测试微调后的模型 ====================
from transformers import pipeline, set_seed
generator = pipeline("text-generation", model="./gpt2-sft", device=0 if torch.cuda.is_available() else -1)
set_seed(42)
print("\n测试模型生成效果(提示词:这本书真是):")
results = generator("这本书真是", max_length=30, num_return_sequences=2)
for i, result in enumerate(results):
print(f"{i+1}: {result['generated_text']}")
代码要点解读:
-
DataCollatorForLanguageModeling的作用
:它负责把input_ids、attention_mask和labels整合成模型训练需要的格式,标签就是input_ids本身(因为因果语言模型的任务是“根据前文预测下一个token”)。
-
设备选择
:如果有GPU就自动使用,否则CPU也能运行(但会很慢)。
-
只训练1个epoch
:这是SFT的最佳实践,因为预训练模型已经很强,再多轮次可能导致灾难性遗忘。
四、第二步:RM(奖励模型训练)——教会模型“判断好坏的标准”
1. 奖励模型是什么?
在SFT之后,模型会“写评论”了,但它不会判断自己写出来的评论是“好”还是“坏”。我们需要一个机制来告诉它:这篇评论是正向的还是负向的?写得好看不好看?
奖励模型就是解决这个问题:它的输入是一段文本,输出是一个评分(通常是0到1之间的小数),评分越高,代表这段文本越符合人类的偏好。
在InstructGPT的原始论文中,他们使用包含33,000条训练数据的偏好对比数据集来训练奖励模型。具体做法是在SFT模型上移除倒数第二层的输出,替换成一个线性层(reward_head),最终输出一个标量分数。
2. 核心要点
-
数据结构
:一条训练数据 = Prompt + 多个模型的回复(4-9个)+ 人类对回复的排序。
-
损失函数
:成对排序损失的表达式为 -log σ(R(x, y_w) - R(x, y_l)),其中y_w是更好的回复,y_l是更差的回复,σ是sigmoid函数。
-
重要细节
:使用text末尾添加一个reward_token(即eos_token)作为“评分桩”,reward_head对这个位置输出的值作为整条文本的最终评分。
-
训练轮数
:遵循InstructGPT论文,也只训练1个epoch。
3. 完整代码实现及逐行注释
"""
RM阶段:训练奖励模型,使其能够为给定的评论输出一个评判分数。
本代码完整实现了自定义RewardModel结构、数据预处理和训练流程。
"""
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForCausalLM, DataCollatorWithPadding
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import numpy as np
from sklearn.metrics import confusion_matrix
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# ==================== 1. 加载模型和分词器 ====================
model_path = "./gpt2-chinese-cluecorpussmall"
tokenizer = AutoTokenizer.from_pretrained(model_path)
tokenizer.pad_token = tokenizer.eos_token # 同样将pad_token设置为eos_token
# reward_token就是eos_token
REWARD_TOKEN_ID = tokenizer.eos_token_id
# ==================== 2. 加载和过滤数据集 ====================
# 这里使用的是带有标签的电商评论数据集(标签0表示负向,1表示正向)
dataset = load_dataset("csv", data_files="online_shopping_10_cats.csv")
ds_train = dataset["train"]
# 过滤无效评论
ds_train = ds_train.filter(
lambda x: x["review"] is not None
and len(x["review"]) > 20
and len(x["review"]) < 1024
)
# ==================== 3. 数据预处理:添加reward_token ====================
def tokenize(batch):
"""
对每条评论进行分词,并在末尾添加一个reward_token(即eos_token)
同时记录reward_token的位置索引,方便后续提取其对应的评分。
"""
# 对评论文本进行分词
outputs = tokenizer(batch["review"])
# outputs包含 input_ids 和 attention_mask
# 每条数据初始化一个score,用来存放真实标签(0或1)
outputs["score"] = [0] * len(outputs["input_ids"])
outputs["score_index"] = [0] * len(outputs["input_ids"])
for i in range(len(outputs["input_ids"])):
# 在每条文本的末尾添加reward_token(eos_token)
outputs["input_ids"][i].append(REWARD_TOKEN_ID)
outputs["attention_mask"][i].append(1) # 确保mask为1,表示该token参与attention计算
# 用数据集自带的label作为真实评分
outputs["score"][i] = float(batch["label"][i])
# score_index记录了reward_token的位置索引(=最后一个位置的索引)
outputs["score_index"][i] = len(outputs["input_ids"][i]) - 1
return outputs
map_kwargs = {
"batched": True, # 批量处理
"batch_size": 512,
"remove_columns": ["cat", "label", "review"]
}
tokenized_dataset_train = ds_train.map(tokenize, **map_kwargs)
tokenized_dataset_train.set_format(type="torch")
print(f"数据预处理完成,共 {len(tokenized_dataset_train)} 条数据")
# ==================== 4. 定义奖励模型 ====================
class RewardModel(nn.Module):
"""
奖励模型结构:
- 底层是一个GPT2预训练模型
- 顶层是一个线性层(reward_head),用于输出评分
"""
def __init__(self, model_name):
super().__init__()
# 加载预训练模型(因果语言模型)
self.llm = AutoModelForCausalLM.from_pretrained(model_name)
# reward_head:将隐藏层输出映射为1个标量分数
# self.llm.config.hidden_size是GPT2的隐藏层维度(通常是768)
self.reward_head = nn.Linear(self.llm.config.hidden_size, 1)
def forward(self, input_ids, attention_mask):
"""
前向传播:
input_ids: 输入token序列(已经包含了reward_token)
attention_mask: 注意力掩码
返回值:对每个token的预测分数(shape: batch_size x seq_len)
"""
# 通过GPT2获取所有token的隐藏层
transformer_outputs = self.llm.forward(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True # 要求返回隐藏层,我们需要最后一层
)
# 提取最后一层隐藏层
last_hidden_state = transformer_outputs.hidden_states[-1] # (B, T, H)
# 通过reward_head线性层得到每个token位置的评分
reward = self.reward_head(last_hidden_state).squeeze(-1) # (B, T)
# sigmoid将评分映射到(0, 1)范围内
return torch.sigmoid(reward)
# 实例化模型
model = RewardModel(model_path).to(device)
# ==================== 5. 数据加载器 ====================
data_collator = DataCollatorWithPadding(tokenizer) # 自动填充到批次内最大长度
dataloader_params = {
"batch_size": 16, # 根据GPU显存调整
"shuffle": True,
"collate_fn": data_collator
}
train_dataloader = DataLoader(tokenized_dataset_train, **dataloader_params)
# ==================== 6. 优化器与损失函数 ====================
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
criterion = nn.BCELoss() # 二分类交叉熵损失(标签为0/1)
# ==================== 7. 训练循环 ====================
num_epochs = 1 # 遵循InstructGPT论文设置
model.train()
for epoch in range(num_epochs):
total_loss = 0.0
for step, batch in enumerate(train_dataloader):
# 将数据移到设备(使用inputs['score']和inputs['score_index']处理)
batch = {k: v.to(device) for k, v in batch.items()}
# 前向传播
model_inputs = {
"input_ids": batch["input_ids"],
"attention_mask": batch["attention_mask"]
}
# scores: 模型对每一个token的预测分数
all_token_scores = model(**model_inputs) # (B, T)
# 提取reward_token位置的预测分数
batch_indices = torch.arange(all_token_scores.shape[0]) # [0,1,2,...,batch-1]
reward_scores = all_token_scores[batch_indices, batch["score_index"]]
# 计算损失
target = batch["score"] # 真实标签(0或1)
loss = criterion(reward_scores, target)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
if step % 100 == 0:
print(f"Step: {step}, Loss: {loss.item():.4f}")
print(f"Epoch {epoch+1} completed. Average Loss: {total_loss/len(train_dataloader):.4f}")
# 保存模型权重
torch.save(model.state_dict(), "reward_model.pt")
print("奖励模型已保存到 reward_model.pt")
# ==================== 8. 评估模型 ====================
# 在训练集上评估模型预测评论是正向还是负向的能力
model.eval()
all_predictions = []
all_labels = []
with torch.no_grad():
for step, batch in enumerate(train_dataloader):
batch = {k: v.to(device) for k, v in batch.items()}
model_inputs = {
"input_ids": batch["input_ids"],
"attention_mask": batch["attention_mask"]
}
all_token_scores = model(**model_inputs) # (B, T)
batch_indices = torch.arange(all_token_scores.shape[0])
reward_scores = all_token_scores[batch_indices, batch["score_index"]]
target = batch["score"]
# 将预测分数 > 0.5 视为预测为正向(类别为1)
predictions = (reward_scores > 0.5).int()
all_predictions.extend(predictions.cpu().numpy())
all_labels.extend(target.cpu().numpy())
# 计算混淆矩阵
print("\n混淆矩阵(行:真实标签,列:预测标签)")
print("[[TN FP]")
print(" [FN TP]]")
print(confusion_matrix(all_labels, all_predictions))
代码要点解读:
-
reward_head的设计
:它把GPT2最后一层隐藏层映射成1个标量分数,sigmoid把评分压缩到(0, 1)区间。
-
score_index的作用
:由于每个批次内的文本长度不一样,reward_token必然出现在不同长度序列的不同位置。score_index记录了每条数据reward_token的精确索引,模型通过batch_indices + score_index两个索引来取出正确位置的评分。
-
BCELoss的意义
:这是二元交叉熵损失,适用于二分类问题。标签0表示负向评论,1表示正向评论,模型被训练成“对正向评论输出接近1的分数,对负向评论输出接近0的分数”。
五、第三步:PPO 强化学习 微调——用“奖励信号”教会模型写正向评论
1. PPO在干什么?
SFT学会了“怎么写评论”,奖励模型学会了“什么评论是好的”,现在我们要让SFT模型按照奖励模型的标准去改进自己的输出:写出高分评论。
这一步就是PPO。PPO的全称是Proximal Policy Optimization(近端策略优化),它是InstructGPT中最核心、最复杂的强化学习模块。
2. PPO的核心思想
2.1 驯兽师训练海豚的比喻
你可以把PPO理解成“驯兽师训练海豚完成高难度动作”:
-
海豚
→ 语言模型(Actor),它的任务是生成回答。
-
驯兽师
→ 奖励模型(Reward Model),给生成结果打分。
-
鱼
→ 奖励分,高分给鱼,低分不给。
-
场边预测员
→ 价值模型(Critic Network),预测当前状态下的预期总奖励。
-
海豚的本能
→ 参考模型(Reference Model),防止海豚为了吃鱼而做出奇怪动作。
2.2 为什么需要KL散度惩罚(海豚的本能)
如果完全不限制模型,它会拼命去“刷分”——利用奖励模型可能存在的漏洞,生成一些乱七八糟的文本(比如重复同一句话几千遍、乱码等),但奖励模型可能误判为高分。
为了防止这种情况,PPO 损失函数 里有个KL散度惩罚项:当模型的输出概率偏离原始SFT模型太远时,就会受到惩罚。
2.3 奖励分配策略:加在最后一个token上,但GAE帮它分摊
在RLHF中,奖励模型只对整条完整的回复打分,而且这个分数只添加在最后一个token(即eos_token)的即时奖励上。那么,前面的token要怎么知道自己做得好不好呢?
答案是PPO使用广义优势估计(GAE) 来解决这个问题:通过价值网络(Critic)评估每一步状态的价值,GAE算法将最后的稀疏奖励“反推”到前面的所有动作上,评价每一步对最终得分贡献了多少。
GAE的公式逻辑是:每一步的优势 = 即时的实际回报 + 未来预期回报 - 当前状态预期回报,然后用γ和λ参数对多步优势进行指数加权平均,兼顾了小步数(低方差、高偏差)和多步数(高方差、低偏差)的优势。
2.4 PPO的裁剪机制
PPO最重要的创新是引入了裁剪因子ε(通常设为0.2)。更新策略时,如果新策略与旧策略的概率比超出[1-ε, 1+ε]区间,就把它“裁”掉。这样可以防止单步更新过大,确保训练稳定。
3. 完整代码实现及逐行注释
3.1 准备提示词数据集
"""
准备PPO训练用的提示词数据集。
我们从电商评论中截取前2-8个token作为提示词,补全内容由模型自动生成。
这样模型可以根据不同的开头尝试各种续写方式。
"""
import random
from datasets import load_dataset
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("./gpt2-sft")
tokenizer.pad_token = tokenizer.eos_token
dataset = load_dataset("csv", data_files="online_shopping_10_cats.csv")
ds_train = dataset["train"]
ds_train = ds_train.filter(
lambda x: x["review"] is not None
and len(x["review"]) > 20
and len(x["review"]) < 1024
)
input_min_token_length = 2
input_max_token_length = 8
input_token_length_range = list(range(input_min_token_length, input_max_token_length))
def tokenize(sample):
"""
从评论文本中随机截取前input_size个token作为提示词。
截取完毕后,删掉原始评论,只保留提示词部分。
"""
input_size = random.choice(input_token_length_range)
# 取review的前input_size个token作为提示词
sample["input_ids"] = tokenizer.encode(sample["review"])[:input_size]
sample["attention_mask"] = [1] * len(sample["input_ids"])
sample["query"] = tokenizer.decode(sample["input_ids"])
return sample
map_kwargs = {
"batched": False, # 逐条处理
"remove_columns": ["cat", "review", "label"]
}
tokenized_dataset_train = ds_train.map(tokenize, **map_kwargs)
tokenized_dataset_train.set_format(type="torch")
3.2 定义ActorCritic模型
"""
定义ActorCritic模型:演员(Actor)负责生成文本,评论家(Critic)负责评估每个状态的价值。
在InstructGPT的实现中,Actor和Critic共享底层Transformer的权重。
"""
import torch
import torch.nn as nn
from transformers import AutoModelForCausalLM
class ActorCriticModel(nn.Module):
"""
演员-评论家模型结构:
- 底层是一个GPT2(经过SFT的模型)
- 顶层是一个线性层v_head,用来输出每个token的“状态价值”V(s_t)
"""
def __init__(self, model_path):
super().__init__()
# 加载已经SFT过的GPT2模型(演员网络)
self.llm = AutoModelForCausalLM.from_pretrained(model_path)
# 价值头(评论家网络):将隐藏层映射为1个标量
self.v_head = nn.Linear(self.llm.config.hidden_size, 1)
def forward(self, input_ids, attention_mask):
"""
返回:
logits: 下一个token的概率分布
value: 每个时间步的状态价值(用于估计预期回报)
"""
transformer_outputs = self.llm.forward(
input_ids,
attention_mask=attention_mask,
output_hidden_states=True
)
logits = transformer_outputs.logits # (B, T, vocab_size)
last_hidden_state = transformer_outputs.hidden_states[-1] # (B, T, H)
value = self.v_head(last_hidden_state).squeeze(-1) # (B, T)
return logits, value
def generate(self, *args, **kwargs):
"""调用底层模型的生成方法"""
return self.llm.generate(*args, **kwargs)
3.3 核心奖励计算函数
"""
计算每个token的即时奖励R_t。
这是InstructGPT中最关键的部分:
- 中间token:只在KL惩罚上有奖励(负值,即“不要偏离原模型”)
- 最后一个token:KL惩罚 + 奖励模型的打分
"""
import torch.nn.functional as F
def compute_rewards(input_data, query_tensors, response_tensors, score_tensors, beta=0.2):
"""
input_data: 包含 input_ids 和 attention_mask(提示词+补全)
query_tensors: 提示词张量列表
response_tensors: 补全张量列表
score_tensors: 奖励模型的最终打分
beta: KL惩罚系数(控制模型对“偏离原模型”的敏感度)
返回值:
old_logprobs: 当前策略下实际生成token的对数概率
rewards: 每个时间步的即时奖励R_t
values: 每个时间步的价值预测V(s_t)
masks: 有效位置的掩码(只保留补全部分的有效token)
"""
# 计算当前策略模型和参考模型的logits和values
with torch.no_grad(): # 这里只做前向计算,不更新梯度
logits, values = model(**input_data) # π_θ的输出
ref_logits, _ = ref_model(**input_data) # π_ref的输出
# 计算每个位置的对数概率(log_softmax)
log_probs = F.log_softmax(logits[:, :-1, :], dim=-1)
ref_log_probs = F.log_softmax(ref_logits[:, :-1, :], dim=-1)
# 实际生成的token序列(去除第一个token,因为模型是next-token预测)
labels = input_data["input_ids"][:, 1:] # (B, T-1)
# 提取实际生成的token的对数概率
log_probs_actions = torch.gather(log_probs, 2, labels.unsqueeze(-1)).squeeze(-1)
ref_log_probs_actions = torch.gather(ref_log_probs, 2, labels.unsqueeze(-1)).squeeze(-1)
# KL散度 = log(π_θ/π_ref) = logπ_θ - logπ_ref
kl_divergence = log_probs_actions - ref_log_probs_actions
# 初始化奖励:只有KL惩罚部分(负值)
rewards = -beta * kl_divergence
# 创建掩码,标记哪些位置是属于补全部分的有效token
attention_mask = input_data["attention_mask"]
masks = torch.zeros_like(attention_mask[:, 1:]) #去掉第一个token
masks[:, :] = attention_mask[:, 1:]
# 根据提示词和补全的位置调整掩码
for j in range(len(query_tensors)):
start = len(query_tensors[j]) - 1 # 补全开始的位置
end = start + len(response_tensors[j]) # 补全结束的位置
masks[j, :start] = 0 # 提示词部分mask=0
masks[j, end:] = 0 # padding部分mask=0
# 奖励模型的得分加在最后一个token的奖励上
rewards[j, end - 1] += score_tensors[j]
# 应用掩码,只保留有效位置
rewards = rewards * masks
values = values[:, :-1] * masks # value也要去掉最后一个token
return log_probs_actions, rewards, values, masks
3.4 GAE优势估计函数
"""
广义优势估计(GAE):计算每个时间步的优势值和GAE目标。
优势值A_t = 动作a_t在该状态s_t下的表现好于平均水平的程度。
这是PPO算法的核心评估信号。
"""
def masked_mean(values, mask):
"""计算带掩码的平均值"""
return (values * mask).sum() / mask.sum()
def masked_whiten(values, mask):
"""
数据白化(标准化):调整数据的均值不变,但方差变为1。
这样可以提高训练稳定性。
"""
mean, var = masked_mean(values, mask), masked_mean((values - masked_mean(values, mask))**2, mask)
whitened = (values - mean) * torch.sqrt(var + 1e-8)
whitened += mean
return whitened
def compute_advantage(rewards, values, masks):
"""
rewards: 每个时间步的即时奖励R_t
values: 每个时间步的价值预测V(s_t)
masks: 有效位置的掩码
返回:
advantages: 优势值A_t^{GAE}
gae_targets: GAE目标值 = A_t^{GAE} + V(s_t)
"""
gamma, lam = 1.0, 0.95 # 折扣因子和GAE的平衡参数
seq_length = rewards.shape[-1]
last_gae = 0.0
advantages_reversed = []
# 逆序计算优势值
for t in reversed(range(seq_length)):
next_values = values[:, t + 1] if t < seq_length - 1 else 0.0
# TD误差:δ_t = R_t + γ·V(s_{t+1}) - V(s_t)
delta = rewards[:, t] + gamma * next_values - values[:, t]
# GAE递归公式:A_t = δ_t + γ·λ·A_{t+1}
last_gae = delta + gamma * lam * last_gae
advantages_reversed.append(last_gae)
advantages = torch.stack(advantages_reversed[:seq_length], dim=1)
advantages = masked_whiten(advantages, masks) # 白化处理
gae_targets = advantages + values # GAE目标
return advantages, gae_targets
3.5 PPO损失函数
"""
PPO的损失函数设计:
- actor loss:通过裁剪的替代目标(clipped surrogate objective)更新策略
- critic loss:MSE损失,让价值网络预测GAE目标
"""
def compute_loss(old_log_probs, new_log_probs, values, masks, advantages, gae_targets, clip_epsilon=0.2):
"""
old_log_probs: 旧策略下生成token的对数概率
new_log_probs: 当前策略下生成token的对数概率(正在训练的模型)
values: 当前模型预测的价值V(s_t)
masks: 有效位置的掩码
advantages: 优势值A_t^{GAE}
gae_targets: GAE目标值 = A_t^{GAE} + V(s_t)
clip_epsilon: 裁剪因子ε
"""
# 计算新旧策略的概率比 r_t(θ) = π_θ(a_t|s_t) / π_{old}(a_t|s_t)
ratio = torch.exp(new_log_probs - old_log_probs)
# Actor损失=PGLoss: -min(clip(ratio) * A, ratio * A)
pg_loss1 = -ratio * advantages
pg_loss2 = -torch.clamp(ratio, 1 - clip_epsilon, 1 + clip_epsilon) * advantages
pg_loss = masked_mean(torch.max(pg_loss1, pg_loss2), masks)
# Critic损失:MSE损失,让价值网络更准确
v_loss = masked_mean((values - gae_targets) ** 2, masks)
# 总损失 = Actor损失 + 0.1 × Critic损失
loss = pg_loss + 0.1 * v_loss
return loss
3.6 PPO更新函数
"""
使用收集到的轨迹数据对策略进行多轮更新。
"""
def ppo_update(input_data, old_log_probs, masks, advantages, gae_targets):
"""
使用32条轨迹,每次批量处理4条(mini_batch_size),循环更新ppo_epochs次。
每个epoch后,模型(演员网络)得到一个微小的改进。
"""
batch_size = input_data["input_ids"].shape[0] # 通常为32
mini_batch_size = 4
for epoch in range(ppo_epochs):
indices = list(range(batch_size))
random.shuffle(indices) # 随机打乱有助于提升泛化
for start in range(0, batch_size, mini_batch_size):
mini_batch_indices = indices[start:start + mini_batch_size]
# 提取mini batch数据
mb_inputs = {
"input_ids": input_data["input_ids"][mini_batch_indices],
"attention_mask": input_data["attention_mask"][mini_batch_indices]
}
# 前向传播:获取当前模型的对数概率和value
mb_logits, mb_values = model(**mb_inputs)
mb_logits = F.log_softmax(mb_logits[:, :-1, :], dim=-1)
mb_labels = mb_inputs["input_ids"][:, 1:]
mb_log_probs = torch.gather(mb_logits, 2, mb_labels.unsqueeze(-1)).squeeze(-1)
# 计算损失
loss = compute_loss(
old_log_probs[mini_batch_indices],
mb_log_probs,
mb_values[:, :-1],
masks[mini_batch_indices],
advantages[mini_batch_indices],
gae_targets[mini_batch_indices]
)
# 更新模型参数
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"PPO update epoch {epoch+1}, loss: {loss.item():.4f}")
print("PPO update completed.")
return
3.7 主训练循环
"""
PPO主训练循环:
1. 从dataloader中取一批提示词
2. 用当前策略模型生成补全内容
3. 用奖励模型给整句打分
4. 计算每个token的即时奖励、价值和掩码
5. 用GAE计算优势值和目标值
6. 用PPO算法更新策略模型
"""
# 初始化参数
learning_rate = 1e-5
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
ppo_epochs = 4 # 每条轨迹重复使用的次数
num_epochs = 1
batch_size = 32
output_min_length = 32 # 补全的最小token数
output_max_length = 64 # 补全的最大token数
# 生成参数设置
generation_kwargs = {
"max_new_tokens": 32,
"do_sample": True, # 启用采样
"top_p": 0.95, # 核采样
}
# 主训练循环
for epoch in range(num_epochs):
for batch_idx, batch in enumerate(train_dataloader):
print(f"\n=== 开始处理第 {batch_idx+1} 批提示词 ===")
# 提取当前批次的提示词
query_tensors = batch["input_ids"]
query_attention_masks = batch["attention_mask"]
response_tensors = [] # 存储补全的张量
query_response_tensors = [] # 存储提示词+补全的张量
score_tensors = [] # 存储奖励模型打分
# 对每个提示词独立进行生成
for i, query in enumerate(query_tensors):
query = query.to(device)
query_attention_mask = query_attention_masks[i].to(device)
# 随机设定本次补全的长度
new_tokens = random.choice(list(range(output_min_length, output_max_length)))
generation_kwargs["max_new_tokens"] = new_tokens
# 用当前模型生成补全内容
with torch.no_grad():
query_response = model.generate(
input_ids=query.unsqueeze(0),
attention_mask=query_attention_mask.unsqueeze(0),
**generation_kwargs
).squeeze(0)
# 记录补全内容和完整内容
response_len = len(query_response) - len(query)
response_tensors.append(query_response[-response_len:])
query_response_tensors.append(query_response)
# 用奖励模型打分
with torch.no_grad():
query_response_score = torch.cat([
query_response,
torch.tensor([REWARD_TOKEN_ID]).to(device)
])
attention_mask_score = torch.ones_like(query_response_score, dtype=torch.long)
score = reward_model(
query_response_score.unsqueeze(0),
attention_mask_score.unsqueeze(0)
).squeeze(0)[-1]
# 将打分从(0,1)缩放到(-1,1)范围,使学习更有效
score = 2 * (score - 0.5)
score_tensors.append(score)
# 准备输入数据(提示词+补全)
input_data = data_collator([
{"input_ids": ids, "attention_mask": torch.ones_like(ids)}
for ids in query_response_tensors
])
# 计算即时奖励和价值
old_log_probs, rewards, values, masks = compute_rewards(
input_data, query_tensors, response_tensors, score_tensors
)
# 计算优势和GAE目标
advantages, gae_targets = compute_advantage(rewards, values, masks)
# 执行PPO更新
ppo_update(input_data, old_log_probs, masks, advantages, gae_targets)
# 可选:保存检查点
if batch_idx % 10 == 0:
torch.save(model.state_dict(), f"ppo_checkpoint_{batch_idx}.pt")
print(f"检查点已保存: ppo_checkpoint_{batch_idx}.pt")
代码要点解读:
-
KL惩罚系数β
:通常设置为0.2左右,它决定了“允许模型偏离原始SFT模型的程度”。
-
GAE的γ和λ参数
:γ=1.0表示不折扣未来奖励(因为大语言模型没有“终止”的概念),λ=0.95平衡了偏差与方差。
-
裁剪参数ε=0.2
:这是PPO论文中的推荐值,限制策略步长在20%以内。
-
优势的白化处理
:对优势函数进行标准化可以使训练过程更加稳定和快速收敛。
六、模型评估与效果对比
训练完成后,我们需要对比PPO微调前后的模型表现。
"""
评估函数:比较PPO微调前的模型(SFT模型)和微调后的模型
让两个模型以相同的提示词生成文本,用奖励模型打分,对比平均分。
"""
def validate(model_to_evaluate, generation_kwargs, reward_model):
scores = []
count = 0
for batch_idx, batch in enumerate(train_dataloader):
if count >= 2:
break
count += 1
query_tensors = batch["input_ids"]
query_attention_masks = batch["attention_mask"]
for i, query in enumerate(query_tensors):
query = query.to(device)
query_attention_mask = query_attention_masks[i].to(device)
# 生成补全
new_tokens = random.choice(list(range(32, 65)))
generation_kwargs["max_new_tokens"] = new_tokens
with torch.no_grad():
query_response = model_to_evaluate.generate(
input_ids=query.unsqueeze(0),
attention_mask=query_attention_mask.unsqueeze(0),
**generation_kwargs
).squeeze(0)
# 奖励模型打分(需要添加reward_token)
with torch.no_grad():
query_response_score = torch.cat([
query_response,
torch.tensor([REWARD_TOKEN_ID]).to(device)
])
attention_mask_score = torch.ones_like(query_response_score, dtype=torch.long)
score = reward_model(
query_response_score.unsqueeze(0),
attention_mask_score.unsqueeze(0)
).squeeze(0)[-1]
score = 2 * (score - 0.5)
scores.append(score.item())
print(f"平均分数: {sum(scores) / len(scores):.4f}")
return sum(scores) / len(scores)
print("微调前的模型效果:")
before_score = validate(ref_model, generation_kwargs, reward_model)
print("微调后的模型效果:")
after_score = validate(model, generation_kwargs, reward_model)
print(f"\n分数提升: {after_score - before_score:.4f}")
通过对比,你会发现PPO微调后的模型生成的正向评论平均分数显著高于微调前的SFT模型——这就是强化学习的力量。
七、总结
本文完整呈现了从数据准备到RLHF微调应用的全过程。我们来回顾这条“驯服”之路的里程碑:
-
SFT阶段
:教会模型“怎么写评论”。用人工标注的高质量数据(13,000条左右)训练16轮,让预训练模型学会模仿人类写回答的方式。
-
RM阶段
:训练奖励模型(33,000条对比数据),让模型学会“判断评论好坏”。奖励模型是PPO阶段优化信号的来源。
-
PPO阶段
:用强化学习微调SFT模型,让它学会“只写正向评论”。这是InstructGPT三阶段中最复杂、效果最惊艳的一步,核心包括:
-
KL散度惩罚项
:确保模型不偏离原始SFT版本过远,防止“奖励黑客”行为。
-
稀疏奖励+GAE
:奖励模型的分只加在最后一个token上,用GAE把奖励“分摊”到每一步,解决信用分配问题。
-
裁剪机制ε=0.2
:限制每次策略更新的步长,保证训练稳定。
-
KL散度惩罚项
最终总结:整个流程如果你跟着完整走下来,会发现它其实就是一个“先模仿、再评判、最后在反馈中不断精进”的学习阶梯——SFT让模型学会说话,RM教模型分辨优劣,PPO引导模型主动趋利避害,三者环环相扣。当这三个步骤共同努力,一个原本只知道“词接词”的预训练模型,就这样被一步步打造成了能正向输出、懂得体察人类偏好的有用助手。