奖励模型 #
奖励模型概述 #
奖励模型(Reward Model,RM)是 RLHF 的核心组件,负责将人类偏好转化为可学习的奖励信号。一个好的奖励模型能够准确预测人类对模型输出的偏好程度。
奖励模型的作用 #
text
┌─────────────────────────────────────────────────────────────┐
│ 奖励模型的作用 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 输入:提示 + 模型回复 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Prompt: "什么是机器学习?" │ │
│ │ Response: "机器学习是人工智能的一个分支..." │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 奖励模型处理: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 编码器 → 隐藏状态 → 价值头 → 标量奖励 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 输出:奖励分数 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Reward Score: 0.85 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 用途: │
│ ├── 为 PPO 训练提供奖励信号 │
│ ├── 评估模型输出质量 │
│ └── 筛选高质量回复 │
│ │
└─────────────────────────────────────────────────────────────┘
奖励模型架构 #
基础架构 #
python
import torch
import torch.nn as nn
from transformers import AutoModel, AutoConfig
class RewardModel(nn.Module):
def __init__(
self,
model_name_or_path,
hidden_size=None,
dropout=0.1
):
super().__init__()
self.config = AutoConfig.from_pretrained(model_name_or_path)
self.base_model = AutoModel.from_pretrained(model_name_or_path)
hidden_size = hidden_size or self.config.hidden_size
self.value_head = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size // 2, 1)
)
def forward(self, input_ids, attention_mask=None):
outputs = self.base_model(
input_ids=input_ids,
attention_mask=attention_mask
)
last_hidden_states = outputs.last_hidden_state
sequence_lengths = attention_mask.sum(dim=1) - 1
batch_size = input_ids.shape[0]
last_token_indices = torch.arange(
batch_size, device=input_ids.device
)
last_token_hidden = last_hidden_states[
last_token_indices, sequence_lengths
]
rewards = self.value_head(last_token_hidden)
return rewards.squeeze(-1)
多层价值头 #
python
class MultiLayerValueHead(nn.Module):
def __init__(self, hidden_size, num_layers=3):
super().__init__()
layers = []
current_size = hidden_size
for i in range(num_layers - 1):
next_size = current_size // 2
layers.extend([
nn.Linear(current_size, next_size),
nn.LayerNorm(next_size),
nn.ReLU(),
nn.Dropout(0.1)
])
current_size = next_size
layers.append(nn.Linear(current_size, 1))
self.value_head = nn.Sequential(*layers)
def forward(self, hidden_states):
return self.value_head(hidden_states)
共享编码器架构 #
python
class SharedEncoderRewardModel(nn.Module):
def __init__(self, base_model_name):
super().__init__()
self.base_model = AutoModel.from_pretrained(base_model_name)
hidden_size = self.base_model.config.hidden_size
self.reward_head = nn.Linear(hidden_size, 1)
self.critic_head = nn.Linear(hidden_size, 1)
def forward(self, input_ids, attention_mask=None, head="reward"):
outputs = self.base_model(
input_ids=input_ids,
attention_mask=attention_mask
)
last_hidden = outputs.last_hidden_state[:, -1, :]
if head == "reward":
return self.reward_head(last_hidden).squeeze(-1)
elif head == "critic":
return self.critic_head(last_hidden).squeeze(-1)
奖励模型训练 #
Bradley-Terry 损失函数 #
text
Bradley-Terry 模型:
────────────────────────
给定提示 x 和两个回复 y_w(胜者)和 y_l(败者):
P(y_w > y_l | x) = sigmoid(r(x, y_w) - r(x, y_l))
损失函数:
────────────────────────
L = -log σ(r(x, y_w) - r(x, y_l))
目标:最大化被选择回复获得更高奖励的对数概率
训练代码实现 #
python
import torch
import torch.nn.functional as F
def compute_rm_loss(reward_model, batch):
input_ids_chosen = batch["input_ids_chosen"]
attention_mask_chosen = batch["attention_mask_chosen"]
input_ids_rejected = batch["input_ids_rejected"]
attention_mask_rejected = batch["attention_mask_rejected"]
rewards_chosen = reward_model(
input_ids_chosen,
attention_mask_chosen
)
rewards_rejected = reward_model(
input_ids_rejected,
attention_mask_rejected
)
loss = -F.logsigmoid(rewards_chosen - rewards_rejected).mean()
accuracy = (rewards_chosen > rewards_rejected).float().mean()
return loss, accuracy
class RewardModelTrainer:
def __init__(self, model, train_dataloader, val_dataloader,
learning_rate=1e-5, weight_decay=0.01):
self.model = model
self.train_dataloader = train_dataloader
self.val_dataloader = val_dataloader
self.optimizer = torch.optim.AdamW(
model.parameters(),
lr=learning_rate,
weight_decay=weight_decay
)
self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
self.optimizer,
T_max=len(train_dataloader) * 3
)
def train_epoch(self):
self.model.train()
total_loss = 0
total_accuracy = 0
for batch in self.train_dataloader:
loss, accuracy = compute_rm_loss(self.model, batch)
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
self.optimizer.step()
self.scheduler.step()
total_loss += loss.item()
total_accuracy += accuracy.item()
return total_loss / len(self.train_dataloader), \
total_accuracy / len(self.train_dataloader)
def validate(self):
self.model.eval()
total_loss = 0
total_accuracy = 0
with torch.no_grad():
for batch in self.val_dataloader:
loss, accuracy = compute_rm_loss(self.model, batch)
total_loss += loss.item()
total_accuracy += accuracy.item()
return total_loss / len(self.val_dataloader), \
total_accuracy / len(self.val_dataloader)
多回复排序损失 #
python
def compute_ranking_loss(reward_model, batch, margin=0.1):
input_ids_list = batch["input_ids_list"]
attention_mask_list = batch["attention_mask_list"]
rankings = batch["rankings"]
rewards_list = []
for input_ids, attention_mask in zip(input_ids_list, attention_mask_list):
reward = reward_model(input_ids, attention_mask)
rewards_list.append(reward)
rewards = torch.stack(rewards_list, dim=1)
loss = 0
num_pairs = 0
for i in range(len(rankings[0])):
for j in range(i + 1, len(rankings[0])):
higher_rank_mask = rankings[:, i] < rankings[:, j]
if higher_rank_mask.sum() > 0:
margin_loss = F.margin_ranking_loss(
rewards[higher_rank_mask, i],
rewards[higher_rank_mask, j],
torch.ones(higher_rank_mask.sum(), device=rewards.device),
margin=margin
)
loss += margin_loss
num_pairs += 1
return loss / max(num_pairs, 1)
数据处理 #
数据集构建 #
python
from torch.utils.data import Dataset
from transformers import AutoTokenizer
class PreferenceDataset(Dataset):
def __init__(self, data_path, tokenizer, max_length=512):
self.tokenizer = tokenizer
self.max_length = max_length
self.data = self._load_data(data_path)
def _load_data(self, data_path):
import json
with open(data_path, 'r') as f:
return [json.loads(line) for line in f]
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
item = self.data[idx]
prompt = item["prompt"]
chosen = item["chosen"]
rejected = item["rejected"]
chosen_text = prompt + "\n" + chosen
rejected_text = prompt + "\n" + rejected
chosen_enc = self.tokenizer(
chosen_text,
max_length=self.max_length,
padding="max_length",
truncation=True,
return_tensors="pt"
)
rejected_enc = self.tokenizer(
rejected_text,
max_length=self.max_length,
padding="max_length",
truncation=True,
return_tensors="pt"
)
return {
"input_ids_chosen": chosen_enc["input_ids"].squeeze(0),
"attention_mask_chosen": chosen_enc["attention_mask"].squeeze(0),
"input_ids_rejected": rejected_enc["input_ids"].squeeze(0),
"attention_mask_rejected": rejected_enc["attention_mask"].squeeze(0),
}
数据增强 #
python
class PreferenceDataAugmenter:
def __init__(self, tokenizer):
self.tokenizer = tokenizer
def augment(self, prompt, chosen, rejected):
augmented_samples = []
augmented_samples.append({
"prompt": prompt,
"chosen": chosen,
"rejected": rejected
})
aug_prompt = self._paraphrase_prompt(prompt)
if aug_prompt != prompt:
augmented_samples.append({
"prompt": aug_prompt,
"chosen": chosen,
"rejected": rejected
})
return augmented_samples
def _paraphrase_prompt(self, text):
paraphrases = {
"请解释": "请说明",
"什么是": "解释一下",
"如何": "怎样",
}
for old, new in paraphrases.items():
if old in text:
return text.replace(old, new, 1)
return text
数据过滤 #
python
class DataFilter:
def __init__(self, min_length=10, max_length=2048,
min_score_diff=0.1):
self.min_length = min_length
self.max_length = max_length
self.min_score_diff = min_score_diff
def filter(self, samples):
filtered = []
for sample in samples:
if not self._is_valid(sample):
continue
filtered.append(sample)
return filtered
def _is_valid(self, sample):
prompt = sample["prompt"]
chosen = sample["chosen"]
rejected = sample["rejected"]
if len(chosen) < self.min_length:
return False
if len(rejected) < self.min_length:
return False
if len(prompt) + len(chosen) > self.max_length:
return False
if len(prompt) + len(rejected) > self.max_length:
return False
if abs(len(chosen) - len(rejected)) > 500:
return False
return True
奖励模型评估 #
评估指标 #
text
┌─────────────────────────────────────────────────────────────┐
│ 奖励模型评估指标 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 准确率(Accuracy): │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 正确预测偏好对的比例 │ │
│ │ Accuracy = (正确预测数) / (总预测数) │ │
│ │ 目标:> 70% │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 奖励差值分布: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ r_chosen - r_rejected 的分布 │ │
│ │ 期望:均值 > 0,方差适中 │ │
│ │ 过大:可能过拟合 │ │
│ │ 过小:区分度不够 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Spearman 相关系数: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 奖励与人工评分的相关性 │ │
│ │ 目标:> 0.6 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
评估代码 #
python
from scipy.stats import spearmanr
import numpy as np
class RewardModelEvaluator:
def __init__(self, reward_model, tokenizer):
self.model = reward_model
self.tokenizer = tokenizer
def evaluate_accuracy(self, test_dataset):
self.model.eval()
correct = 0
total = 0
with torch.no_grad():
for batch in test_dataset:
rewards_chosen = self.model(
batch["input_ids_chosen"],
batch["attention_mask_chosen"]
)
rewards_rejected = self.model(
batch["input_ids_rejected"],
batch["attention_mask_rejected"]
)
correct += (rewards_chosen > rewards_rejected).sum().item()
total += len(rewards_chosen)
return correct / total
def evaluate_correlation(self, samples_with_scores):
self.model.eval()
predicted_rewards = []
human_scores = []
with torch.no_grad():
for sample in samples_with_scores:
enc = self.tokenizer(
sample["text"],
return_tensors="pt",
truncation=True,
max_length=512
)
reward = self.model(enc["input_ids"], enc["attention_mask"])
predicted_rewards.append(reward.item())
human_scores.append(sample["human_score"])
correlation, p_value = spearmanr(predicted_rewards, human_scores)
return correlation, p_value
def analyze_reward_distribution(self, test_dataset):
self.model.eval()
reward_diffs = []
with torch.no_grad():
for batch in test_dataset:
rewards_chosen = self.model(
batch["input_ids_chosen"],
batch["attention_mask_chosen"]
)
rewards_rejected = self.model(
batch["input_ids_rejected"],
batch["attention_mask_rejected"]
)
reward_diffs.extend(
(rewards_chosen - rewards_rejected).tolist()
)
return {
"mean": np.mean(reward_diffs),
"std": np.std(reward_diffs),
"min": np.min(reward_diffs),
"max": np.max(reward_diffs),
"median": np.median(reward_diffs)
}
奖励模型优化技巧 #
防止过拟合 #
python
class RegularizedRewardModel(nn.Module):
def __init__(self, base_model_name, dropout=0.2):
super().__init__()
self.base_model = AutoModel.from_pretrained(base_model_name)
hidden_size = self.base_model.config.hidden_size
self.dropout = nn.Dropout(dropout)
self.value_head = nn.Linear(hidden_size, 1)
self.label_smoothing = 0.1
def forward(self, input_ids, attention_mask=None):
outputs = self.base_model(
input_ids=input_ids,
attention_mask=attention_mask
)
hidden = self.dropout(outputs.last_hidden_state[:, -1, :])
return self.value_head(hidden).squeeze(-1)
def compute_loss_with_smoothing(self, rewards_chosen, rewards_rejected):
probs = torch.sigmoid(rewards_chosen - rewards_rejected)
probs = probs * (1 - self.label_smoothing) + 0.5 * self.label_smoothing
return -torch.log(probs).mean()
奖励缩放 #
python
class ScaledRewardModel(nn.Module):
def __init__(self, base_model_name, scale_init=1.0):
super().__init__()
self.base_model = AutoModel.from_pretrained(base_model_name)
hidden_size = self.base_model.config.hidden_size
self.value_head = nn.Linear(hidden_size, 1)
self.reward_scale = nn.Parameter(torch.tensor(scale_init))
self.reward_bias = nn.Parameter(torch.tensor(0.0))
def forward(self, input_ids, attention_mask=None):
outputs = self.base_model(
input_ids=input_ids,
attention_mask=attention_mask
)
raw_reward = self.value_head(
outputs.last_hidden_state[:, -1, :]
).squeeze(-1)
return self.reward_scale * raw_reward + self.reward_bias
集成奖励模型 #
python
class EnsembleRewardModel(nn.Module):
def __init__(self, model_names, weights=None):
super().__init__()
self.models = nn.ModuleList([
RewardModel(name) for name in model_names
])
self.weights = weights or [1.0 / len(model_names)] * len(model_names)
def forward(self, input_ids, attention_mask=None):
rewards = []
for model in self.models:
reward = model(input_ids, attention_mask)
rewards.append(reward)
stacked_rewards = torch.stack(rewards, dim=0)
weights_tensor = torch.tensor(
self.weights,
device=stacked_rewards.device
).view(-1, 1)
return (stacked_rewards * weights_tensor).sum(dim=0)
常见问题与解决方案 #
奖励黑客 #
text
问题:
────────────────────────
模型学会欺骗奖励模型,生成奖励高但实际质量差的输出
原因:
────────────────────────
├── 奖励模型存在漏洞
├── 训练数据分布偏差
├── 奖励模型泛化能力不足
└── 过度优化
解决方案:
────────────────────────
├── 使用 KL 约束
├── 增加训练数据多样性
├── 集成多个奖励模型
├── 定期更新奖励模型
└── 人工审核高奖励输出
奖励模型偏见 #
text
问题:
────────────────────────
奖励模型对某些类型的回复存在系统性偏见
常见偏见:
────────────────────────
├── 长度偏见:偏好更长的回复
├── 风格偏见:偏好某种写作风格
├── 内容偏见:偏好某些主题
└── 格式偏见:偏好特定格式
解决方案:
────────────────────────
├── 平衡训练数据分布
├── 控制回复长度
├── 多样化标注者
├── 去偏见训练
└── 后处理校正
下一步 #
现在你已经掌握了奖励模型的训练方法,接下来学习 PPO 算法,了解如何使用奖励模型优化语言模型!
最后更新:2026-04-05