奖励模型 #

奖励模型概述 #

奖励模型(Reward Model,RM)是 RLHF 的核心组件,负责将人类偏好转化为可学习的奖励信号。一个好的奖励模型能够准确预测人类对模型输出的偏好程度。

奖励模型的作用 #

text
┌─────────────────────────────────────────────────────────────┐
│                    奖励模型的作用                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  输入:提示 + 模型回复                                       │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  Prompt: "什么是机器学习?"                          │   │
│  │  Response: "机器学习是人工智能的一个分支..."         │   │
│  └─────────────────────────────────────────────────────┘   │
│                          │                                  │
│                          ▼                                  │
│  奖励模型处理:                                              │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  编码器 → 隐藏状态 → 价值头 → 标量奖励               │   │
│  └─────────────────────────────────────────────────────┘   │
│                          │                                  │
│                          ▼                                  │
│  输出:奖励分数                                              │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  Reward Score: 0.85                                  │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  用途:                                                      │
│  ├── 为 PPO 训练提供奖励信号                                │
│  ├── 评估模型输出质量                                       │
│  └── 筛选高质量回复                                         │
│                                                             │
└─────────────────────────────────────────────────────────────┘

奖励模型架构 #

基础架构 #

python
import torch
import torch.nn as nn
from transformers import AutoModel, AutoConfig

class RewardModel(nn.Module):
    def __init__(
        self, 
        model_name_or_path,
        hidden_size=None,
        dropout=0.1
    ):
        super().__init__()
        
        self.config = AutoConfig.from_pretrained(model_name_or_path)
        self.base_model = AutoModel.from_pretrained(model_name_or_path)
        
        hidden_size = hidden_size or self.config.hidden_size
        
        self.value_head = nn.Sequential(
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size // 2, 1)
        )
    
    def forward(self, input_ids, attention_mask=None):
        outputs = self.base_model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        
        last_hidden_states = outputs.last_hidden_state
        sequence_lengths = attention_mask.sum(dim=1) - 1
        
        batch_size = input_ids.shape[0]
        last_token_indices = torch.arange(
            batch_size, device=input_ids.device
        )
        last_token_hidden = last_hidden_states[
            last_token_indices, sequence_lengths
        ]
        
        rewards = self.value_head(last_token_hidden)
        return rewards.squeeze(-1)

多层价值头 #

python
class MultiLayerValueHead(nn.Module):
    def __init__(self, hidden_size, num_layers=3):
        super().__init__()
        
        layers = []
        current_size = hidden_size
        for i in range(num_layers - 1):
            next_size = current_size // 2
            layers.extend([
                nn.Linear(current_size, next_size),
                nn.LayerNorm(next_size),
                nn.ReLU(),
                nn.Dropout(0.1)
            ])
            current_size = next_size
        
        layers.append(nn.Linear(current_size, 1))
        self.value_head = nn.Sequential(*layers)
    
    def forward(self, hidden_states):
        return self.value_head(hidden_states)

共享编码器架构 #

python
class SharedEncoderRewardModel(nn.Module):
    def __init__(self, base_model_name):
        super().__init__()
        self.base_model = AutoModel.from_pretrained(base_model_name)
        hidden_size = self.base_model.config.hidden_size
        
        self.reward_head = nn.Linear(hidden_size, 1)
        self.critic_head = nn.Linear(hidden_size, 1)
    
    def forward(self, input_ids, attention_mask=None, head="reward"):
        outputs = self.base_model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        
        last_hidden = outputs.last_hidden_state[:, -1, :]
        
        if head == "reward":
            return self.reward_head(last_hidden).squeeze(-1)
        elif head == "critic":
            return self.critic_head(last_hidden).squeeze(-1)

奖励模型训练 #

Bradley-Terry 损失函数 #

text
Bradley-Terry 模型:
────────────────────────
给定提示 x 和两个回复 y_w(胜者)和 y_l(败者):

P(y_w > y_l | x) = sigmoid(r(x, y_w) - r(x, y_l))

损失函数:
────────────────────────
L = -log σ(r(x, y_w) - r(x, y_l))

目标:最大化被选择回复获得更高奖励的对数概率

训练代码实现 #

python
import torch
import torch.nn.functional as F

def compute_rm_loss(reward_model, batch):
    input_ids_chosen = batch["input_ids_chosen"]
    attention_mask_chosen = batch["attention_mask_chosen"]
    input_ids_rejected = batch["input_ids_rejected"]
    attention_mask_rejected = batch["attention_mask_rejected"]
    
    rewards_chosen = reward_model(
        input_ids_chosen, 
        attention_mask_chosen
    )
    rewards_rejected = reward_model(
        input_ids_rejected, 
        attention_mask_rejected
    )
    
    loss = -F.logsigmoid(rewards_chosen - rewards_rejected).mean()
    
    accuracy = (rewards_chosen > rewards_rejected).float().mean()
    
    return loss, accuracy

class RewardModelTrainer:
    def __init__(self, model, train_dataloader, val_dataloader, 
                 learning_rate=1e-5, weight_decay=0.01):
        self.model = model
        self.train_dataloader = train_dataloader
        self.val_dataloader = val_dataloader
        
        self.optimizer = torch.optim.AdamW(
            model.parameters(),
            lr=learning_rate,
            weight_decay=weight_decay
        )
        
        self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
            self.optimizer,
            T_max=len(train_dataloader) * 3
        )
    
    def train_epoch(self):
        self.model.train()
        total_loss = 0
        total_accuracy = 0
        
        for batch in self.train_dataloader:
            loss, accuracy = compute_rm_loss(self.model, batch)
            
            self.optimizer.zero_grad()
            loss.backward()
            
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
            
            self.optimizer.step()
            self.scheduler.step()
            
            total_loss += loss.item()
            total_accuracy += accuracy.item()
        
        return total_loss / len(self.train_dataloader), \
               total_accuracy / len(self.train_dataloader)
    
    def validate(self):
        self.model.eval()
        total_loss = 0
        total_accuracy = 0
        
        with torch.no_grad():
            for batch in self.val_dataloader:
                loss, accuracy = compute_rm_loss(self.model, batch)
                total_loss += loss.item()
                total_accuracy += accuracy.item()
        
        return total_loss / len(self.val_dataloader), \
               total_accuracy / len(self.val_dataloader)

多回复排序损失 #

python
def compute_ranking_loss(reward_model, batch, margin=0.1):
    input_ids_list = batch["input_ids_list"]
    attention_mask_list = batch["attention_mask_list"]
    rankings = batch["rankings"]
    
    rewards_list = []
    for input_ids, attention_mask in zip(input_ids_list, attention_mask_list):
        reward = reward_model(input_ids, attention_mask)
        rewards_list.append(reward)
    
    rewards = torch.stack(rewards_list, dim=1)
    
    loss = 0
    num_pairs = 0
    
    for i in range(len(rankings[0])):
        for j in range(i + 1, len(rankings[0])):
            higher_rank_mask = rankings[:, i] < rankings[:, j]
            if higher_rank_mask.sum() > 0:
                margin_loss = F.margin_ranking_loss(
                    rewards[higher_rank_mask, i],
                    rewards[higher_rank_mask, j],
                    torch.ones(higher_rank_mask.sum(), device=rewards.device),
                    margin=margin
                )
                loss += margin_loss
                num_pairs += 1
    
    return loss / max(num_pairs, 1)

数据处理 #

数据集构建 #

python
from torch.utils.data import Dataset
from transformers import AutoTokenizer

class PreferenceDataset(Dataset):
    def __init__(self, data_path, tokenizer, max_length=512):
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.data = self._load_data(data_path)
    
    def _load_data(self, data_path):
        import json
        with open(data_path, 'r') as f:
            return [json.loads(line) for line in f]
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        item = self.data[idx]
        prompt = item["prompt"]
        chosen = item["chosen"]
        rejected = item["rejected"]
        
        chosen_text = prompt + "\n" + chosen
        rejected_text = prompt + "\n" + rejected
        
        chosen_enc = self.tokenizer(
            chosen_text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )
        
        rejected_enc = self.tokenizer(
            rejected_text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )
        
        return {
            "input_ids_chosen": chosen_enc["input_ids"].squeeze(0),
            "attention_mask_chosen": chosen_enc["attention_mask"].squeeze(0),
            "input_ids_rejected": rejected_enc["input_ids"].squeeze(0),
            "attention_mask_rejected": rejected_enc["attention_mask"].squeeze(0),
        }

数据增强 #

python
class PreferenceDataAugmenter:
    def __init__(self, tokenizer):
        self.tokenizer = tokenizer
    
    def augment(self, prompt, chosen, rejected):
        augmented_samples = []
        
        augmented_samples.append({
            "prompt": prompt,
            "chosen": chosen,
            "rejected": rejected
        })
        
        aug_prompt = self._paraphrase_prompt(prompt)
        if aug_prompt != prompt:
            augmented_samples.append({
                "prompt": aug_prompt,
                "chosen": chosen,
                "rejected": rejected
            })
        
        return augmented_samples
    
    def _paraphrase_prompt(self, text):
        paraphrases = {
            "请解释": "请说明",
            "什么是": "解释一下",
            "如何": "怎样",
        }
        for old, new in paraphrases.items():
            if old in text:
                return text.replace(old, new, 1)
        return text

数据过滤 #

python
class DataFilter:
    def __init__(self, min_length=10, max_length=2048, 
                 min_score_diff=0.1):
        self.min_length = min_length
        self.max_length = max_length
        self.min_score_diff = min_score_diff
    
    def filter(self, samples):
        filtered = []
        for sample in samples:
            if not self._is_valid(sample):
                continue
            filtered.append(sample)
        return filtered
    
    def _is_valid(self, sample):
        prompt = sample["prompt"]
        chosen = sample["chosen"]
        rejected = sample["rejected"]
        
        if len(chosen) < self.min_length:
            return False
        if len(rejected) < self.min_length:
            return False
        if len(prompt) + len(chosen) > self.max_length:
            return False
        if len(prompt) + len(rejected) > self.max_length:
            return False
        
        if abs(len(chosen) - len(rejected)) > 500:
            return False
        
        return True

奖励模型评估 #

评估指标 #

text
┌─────────────────────────────────────────────────────────────┐
│                    奖励模型评估指标                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  准确率(Accuracy):                                        │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  正确预测偏好对的比例                                │   │
│  │  Accuracy = (正确预测数) / (总预测数)                │   │
│  │  目标:> 70%                                         │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  奖励差值分布:                                              │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  r_chosen - r_rejected 的分布                        │   │
│  │  期望:均值 > 0,方差适中                            │   │
│  │  过大:可能过拟合                                    │   │
│  │  过小:区分度不够                                    │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  Spearman 相关系数:                                        │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  奖励与人工评分的相关性                              │   │
│  │  目标:> 0.6                                        │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

评估代码 #

python
from scipy.stats import spearmanr
import numpy as np

class RewardModelEvaluator:
    def __init__(self, reward_model, tokenizer):
        self.model = reward_model
        self.tokenizer = tokenizer
    
    def evaluate_accuracy(self, test_dataset):
        self.model.eval()
        correct = 0
        total = 0
        
        with torch.no_grad():
            for batch in test_dataset:
                rewards_chosen = self.model(
                    batch["input_ids_chosen"],
                    batch["attention_mask_chosen"]
                )
                rewards_rejected = self.model(
                    batch["input_ids_rejected"],
                    batch["attention_mask_rejected"]
                )
                
                correct += (rewards_chosen > rewards_rejected).sum().item()
                total += len(rewards_chosen)
        
        return correct / total
    
    def evaluate_correlation(self, samples_with_scores):
        self.model.eval()
        predicted_rewards = []
        human_scores = []
        
        with torch.no_grad():
            for sample in samples_with_scores:
                enc = self.tokenizer(
                    sample["text"],
                    return_tensors="pt",
                    truncation=True,
                    max_length=512
                )
                reward = self.model(enc["input_ids"], enc["attention_mask"])
                predicted_rewards.append(reward.item())
                human_scores.append(sample["human_score"])
        
        correlation, p_value = spearmanr(predicted_rewards, human_scores)
        return correlation, p_value
    
    def analyze_reward_distribution(self, test_dataset):
        self.model.eval()
        reward_diffs = []
        
        with torch.no_grad():
            for batch in test_dataset:
                rewards_chosen = self.model(
                    batch["input_ids_chosen"],
                    batch["attention_mask_chosen"]
                )
                rewards_rejected = self.model(
                    batch["input_ids_rejected"],
                    batch["attention_mask_rejected"]
                )
                reward_diffs.extend(
                    (rewards_chosen - rewards_rejected).tolist()
                )
        
        return {
            "mean": np.mean(reward_diffs),
            "std": np.std(reward_diffs),
            "min": np.min(reward_diffs),
            "max": np.max(reward_diffs),
            "median": np.median(reward_diffs)
        }

奖励模型优化技巧 #

防止过拟合 #

python
class RegularizedRewardModel(nn.Module):
    def __init__(self, base_model_name, dropout=0.2):
        super().__init__()
        self.base_model = AutoModel.from_pretrained(base_model_name)
        hidden_size = self.base_model.config.hidden_size
        
        self.dropout = nn.Dropout(dropout)
        self.value_head = nn.Linear(hidden_size, 1)
        
        self.label_smoothing = 0.1
    
    def forward(self, input_ids, attention_mask=None):
        outputs = self.base_model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        
        hidden = self.dropout(outputs.last_hidden_state[:, -1, :])
        return self.value_head(hidden).squeeze(-1)
    
    def compute_loss_with_smoothing(self, rewards_chosen, rewards_rejected):
        probs = torch.sigmoid(rewards_chosen - rewards_rejected)
        probs = probs * (1 - self.label_smoothing) + 0.5 * self.label_smoothing
        return -torch.log(probs).mean()

奖励缩放 #

python
class ScaledRewardModel(nn.Module):
    def __init__(self, base_model_name, scale_init=1.0):
        super().__init__()
        self.base_model = AutoModel.from_pretrained(base_model_name)
        hidden_size = self.base_model.config.hidden_size
        
        self.value_head = nn.Linear(hidden_size, 1)
        self.reward_scale = nn.Parameter(torch.tensor(scale_init))
        self.reward_bias = nn.Parameter(torch.tensor(0.0))
    
    def forward(self, input_ids, attention_mask=None):
        outputs = self.base_model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        
        raw_reward = self.value_head(
            outputs.last_hidden_state[:, -1, :]
        ).squeeze(-1)
        
        return self.reward_scale * raw_reward + self.reward_bias

集成奖励模型 #

python
class EnsembleRewardModel(nn.Module):
    def __init__(self, model_names, weights=None):
        super().__init__()
        self.models = nn.ModuleList([
            RewardModel(name) for name in model_names
        ])
        self.weights = weights or [1.0 / len(model_names)] * len(model_names)
    
    def forward(self, input_ids, attention_mask=None):
        rewards = []
        for model in self.models:
            reward = model(input_ids, attention_mask)
            rewards.append(reward)
        
        stacked_rewards = torch.stack(rewards, dim=0)
        weights_tensor = torch.tensor(
            self.weights, 
            device=stacked_rewards.device
        ).view(-1, 1)
        
        return (stacked_rewards * weights_tensor).sum(dim=0)

常见问题与解决方案 #

奖励黑客 #

text
问题:
────────────────────────
模型学会欺骗奖励模型,生成奖励高但实际质量差的输出

原因:
────────────────────────
├── 奖励模型存在漏洞
├── 训练数据分布偏差
├── 奖励模型泛化能力不足
└── 过度优化

解决方案:
────────────────────────
├── 使用 KL 约束
├── 增加训练数据多样性
├── 集成多个奖励模型
├── 定期更新奖励模型
└── 人工审核高奖励输出

奖励模型偏见 #

text
问题:
────────────────────────
奖励模型对某些类型的回复存在系统性偏见

常见偏见:
────────────────────────
├── 长度偏见:偏好更长的回复
├── 风格偏见:偏好某种写作风格
├── 内容偏见:偏好某些主题
└── 格式偏见:偏好特定格式

解决方案:
────────────────────────
├── 平衡训练数据分布
├── 控制回复长度
├── 多样化标注者
├── 去偏见训练
└── 后处理校正

下一步 #

现在你已经掌握了奖励模型的训练方法,接下来学习 PPO 算法,了解如何使用奖励模型优化语言模型!

最后更新:2026-04-05