PyTorch 循环神经网络 #

RNN 简介 #

循环神经网络(Recurrent Neural Network, RNN)是一类专门处理序列数据的神经网络,能够捕捉序列中的时序依赖关系。

text
┌─────────────────────────────────────────────────────────────┐
│                    RNN 核心思想                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  序列数据的特点:                                            │
│  - 文本:单词序列                                           │
│  - 语音:音频序列                                           │
│  - 视频:帧序列                                             │
│  - 时间序列:股票价格                                       │
│                                                             │
│  RNN 的特点:                                                │
│  - 隐藏状态传递时序信息                                     │
│  - 参数在时间步之间共享                                     │
│  - 可以处理变长序列                                         │
│                                                             │
│  RNN 结构:                                                  │
│                                                             │
│  x₁ ──► h₁ ──► y₁                                          │
│          │                                                  │
│          ▼                                                  │
│  x₂ ──► h₂ ──► y₂                                          │
│          │                                                  │
│          ▼                                                  │
│  x₃ ──► h₃ ──► y₃                                          │
│                                                             │
└─────────────────────────────────────────────────────────────┘

基本 RNN #

RNN 单元 #

python
import torch
import torch.nn as nn

rnn = nn.RNN(
    input_size=10,
    hidden_size=20,
    num_layers=1,
    nonlinearity='tanh',
    bias=True,
    batch_first=True,
    dropout=0,
    bidirectional=False
)

x = torch.randn(32, 5, 10)

output, h_n = rnn(x)

print(f"输出形状: {output.shape}")
print(f"隐藏状态形状: {h_n.shape}")

手动实现 RNN #

python
import torch
import torch.nn as nn

class RNNCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.i2h = nn.Linear(input_size + hidden_size, hidden_size)
        self.tanh = nn.Tanh()
    
    def forward(self, x, h_prev):
        combined = torch.cat((x, h_prev), dim=1)
        h = self.tanh(self.i2h(combined))
        return h

class ManualRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.rnn_cell = RNNCell(input_size, hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        batch_size, seq_len, _ = x.size()
        h = torch.zeros(batch_size, self.hidden_size, device=x.device)
        
        outputs = []
        for t in range(seq_len):
            h = self.rnn_cell(x[:, t, :], h)
            outputs.append(h)
        
        outputs = torch.stack(outputs, dim=1)
        return outputs, h

model = ManualRNN(10, 20, 5)
x = torch.randn(32, 5, 10)
output, h_n = model(x)
print(f"输出形状: {output.shape}")

LSTM #

LSTM 原理 #

text
┌─────────────────────────────────────────────────────────────┐
│                    LSTM 结构                                 │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  LSTM 解决 RNN 的梯度消失问题,引入三个门:                   │
│                                                             │
│  遗忘门(Forget Gate):                                     │
│  f_t = σ(W_f · [h_{t-1}, x_t] + b_f)                       │
│  决定丢弃哪些信息                                           │
│                                                             │
│  输入门(Input Gate):                                      │
│  i_t = σ(W_i · [h_{t-1}, x_t] + b_i)                       │
│  C̃_t = tanh(W_C · [h_{t-1}, x_t] + b_C)                    │
│  决定存储哪些新信息                                         │
│                                                             │
│  单元状态更新:                                              │
│  C_t = f_t * C_{t-1} + i_t * C̃_t                          │
│                                                             │
│  输出门(Output Gate):                                     │
│  o_t = σ(W_o · [h_{t-1}, x_t] + b_o)                       │
│  h_t = o_t * tanh(C_t)                                     │
│                                                             │
└─────────────────────────────────────────────────────────────┘

PyTorch LSTM #

python
import torch
import torch.nn as nn

lstm = nn.LSTM(
    input_size=10,
    hidden_size=20,
    num_layers=2,
    bias=True,
    batch_first=True,
    dropout=0.5,
    bidirectional=False
)

x = torch.randn(32, 5, 10)
h0 = torch.zeros(2, 32, 20)
c0 = torch.zeros(2, 32, 20)

output, (h_n, c_n) = lstm(x, (h0, c0))

print(f"输出形状: {output.shape}")
print(f"隐藏状态形状: {h_n.shape}")
print(f"单元状态形状: {c_n.shape}")

LSTM 分类模型 #

python
import torch
import torch.nn as nn

class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_size, num_layers, num_classes, dropout=0.5):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(
            embed_dim,
            hidden_size,
            num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=True
        )
        self.fc = nn.Linear(hidden_size * 2, num_classes)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        embedded = self.dropout(self.embedding(x))
        
        lstm_out, (hidden, _) = self.lstm(embedded)
        
        hidden = torch.cat((hidden[-2], hidden[-1]), dim=1)
        
        out = self.fc(self.dropout(hidden))
        return out

model = LSTMClassifier(
    vocab_size=10000,
    embed_dim=128,
    hidden_size=256,
    num_layers=2,
    num_classes=2
)

x = torch.randint(0, 10000, (32, 50))
output = model(x)
print(f"输出形状: {output.shape}")

GRU #

GRU 原理 #

text
┌─────────────────────────────────────────────────────────────┐
│                    GRU 结构                                  │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  GRU 是 LSTM 的简化版本,只有两个门:                        │
│                                                             │
│  重置门(Reset Gate):                                      │
│  r_t = σ(W_r · [h_{t-1}, x_t])                             │
│                                                             │
│  更新门(Update Gate):                                     │
│  z_t = σ(W_z · [h_{t-1}, x_t])                             │
│                                                             │
│  候选隐藏状态:                                              │
│  h̃_t = tanh(W · [r_t * h_{t-1}, x_t])                     │
│                                                             │
│  隐藏状态更新:                                              │
│  h_t = (1 - z_t) * h_{t-1} + z_t * h̃_t                   │
│                                                             │
│  GRU vs LSTM:                                               │
│  - GRU 参数更少,训练更快                                   │
│  - LSTM 表达能力更强                                        │
│  - 数据量大时 LSTM 可能更好                                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

PyTorch GRU #

python
import torch
import torch.nn as nn

gru = nn.GRU(
    input_size=10,
    hidden_size=20,
    num_layers=2,
    bias=True,
    batch_first=True,
    dropout=0.5,
    bidirectional=False
)

x = torch.randn(32, 5, 10)
h0 = torch.zeros(2, 32, 20)

output, h_n = gru(x, h0)

print(f"输出形状: {output.shape}")
print(f"隐藏状态形状: {h_n.shape}")

GRU 序列标注 #

python
import torch
import torch.nn as nn

class GRUTagger(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_size, num_layers, num_tags):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.gru = nn.GRU(
            embed_dim,
            hidden_size,
            num_layers,
            batch_first=True,
            bidirectional=True
        )
        self.fc = nn.Linear(hidden_size * 2, num_tags)
    
    def forward(self, x):
        embedded = self.embedding(x)
        gru_out, _ = self.gru(embedded)
        out = self.fc(gru_out)
        return out

model = GRUTagger(
    vocab_size=10000,
    embed_dim=128,
    hidden_size=256,
    num_layers=2,
    num_tags=10
)

x = torch.randint(0, 10000, (32, 50))
output = model(x)
print(f"输出形状: {output.shape}")

双向 RNN #

python
import torch
import torch.nn as nn

class BiLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size,
            hidden_size,
            num_layers,
            batch_first=True,
            bidirectional=True
        )
        self.fc = nn.Linear(hidden_size * 2, num_classes)
    
    def forward(self, x):
        lstm_out, (hidden, _) = self.lstm(x)
        
        out = self.fc(lstm_out[:, -1, :])
        return out

model = BiLSTM(10, 20, 2, 5)
x = torch.randn(32, 5, 10)
output = model(x)
print(f"双向 LSTM 输出: {output.shape}")

序列到序列模型 #

Encoder-Decoder #

python
import torch
import torch.nn as nn

class Encoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_size, num_layers):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_size, num_layers, batch_first=True)
    
    def forward(self, x):
        embedded = self.embedding(x)
        outputs, (hidden, cell) = self.lstm(embedded)
        return outputs, hidden, cell

class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_size, num_layers):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)
    
    def forward(self, x, hidden, cell):
        embedded = self.embedding(x)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc(output)
        return prediction, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
    
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size, trg_len = trg.shape
        trg_vocab_size = self.decoder.fc.out_features
        
        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)
        
        _, hidden, cell = self.encoder(src)
        
        input = trg[:, 0].unsqueeze(1)
        
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t] = output.squeeze(1)
            
            teacher_force = torch.rand(1) < teacher_forcing_ratio
            top1 = output.argmax(2)
            input = trg[:, t].unsqueeze(1) if teacher_force else top1
        
        return outputs

encoder = Encoder(vocab_size=10000, embed_dim=256, hidden_size=512, num_layers=2)
decoder = Decoder(vocab_size=10000, embed_dim=256, hidden_size=512, num_layers=2)
device = torch.device("cpu")
model = Seq2Seq(encoder, decoder, device)

src = torch.randint(0, 10000, (32, 20))
trg = torch.randint(0, 10000, (32, 15))
output = model(src, trg)
print(f"Seq2Seq 输出: {output.shape}")

注意力机制 #

基本注意力 #

python
import torch
import torch.nn as nn
import torch.nn.functional as F

class Attention(nn.Module):
    def __init__(self, hidden_size):
        super().__init__()
        self.attn = nn.Linear(hidden_size * 2, hidden_size)
        self.v = nn.Linear(hidden_size, 1, bias=False)
    
    def forward(self, hidden, encoder_outputs):
        batch_size, src_len, _ = encoder_outputs.shape
        
        hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
        
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
        
        attention = self.v(energy).squeeze(2)
        
        return F.softmax(attention, dim=1)

class AttnDecoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_size, num_layers):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim + hidden_size, hidden_size, num_layers, batch_first=True)
        self.attention = Attention(hidden_size)
        self.fc = nn.Linear(hidden_size * 2, vocab_size)
    
    def forward(self, x, hidden, cell, encoder_outputs):
        embedded = self.embedding(x)
        
        attn_weights = self.attention(hidden[-1], encoder_outputs)
        attn_weights = attn_weights.unsqueeze(1)
        
        context = torch.bmm(attn_weights, encoder_outputs)
        
        lstm_input = torch.cat((embedded, context), dim=2)
        output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))
        
        output = torch.cat((output.squeeze(1), context.squeeze(1)), dim=1)
        prediction = self.fc(output)
        
        return prediction, hidden, cell, attn_weights

实用技巧 #

打包序列 #

python
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

class PackedLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
    
    def forward(self, x, lengths):
        lengths = torch.as_tensor(lengths, dtype=torch.long)
        
        packed = pack_padded_sequence(x, lengths.cpu(), batch_first=True, enforce_sorted=False)
        
        packed_out, (hidden, _) = self.lstm(packed)
        
        out, _ = pad_packed_sequence(packed_out, batch_first=True)
        
        return self.fc(hidden[-1])

model = PackedLSTM(10, 20, 2, 5)

x = torch.randn(4, 10, 10)
lengths = [10, 8, 6, 4]

output = model(x, lengths)
print(f"打包 LSTM 输出: {output.shape}")

梯度裁剪 #

python
import torch
import torch.nn as nn

def train_with_clip(model, dataloader, criterion, optimizer, device, max_grad_norm=1.0):
    model.train()
    
    for batch in dataloader:
        inputs, targets = batch
        inputs, targets = inputs.to(device), targets.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
        
        optimizer.step()

完整示例:文本分类 #

python
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

class TextDataset(Dataset):
    def __init__(self, texts, labels, vocab, max_length=100):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab
        self.max_length = max_length
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = self.texts[idx]
        tokens = text.lower().split()[:self.max_length]
        token_ids = [self.vocab.get(t, self.vocab['<unk>']) for t in tokens]
        
        if len(token_ids) < self.max_length:
            token_ids += [self.vocab['<pad>']] * (self.max_length - len(token_ids))
        
        return torch.tensor(token_ids), torch.tensor(self.labels[idx])

class TextClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_size, num_layers, num_classes, dropout=0.5):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.lstm = nn.LSTM(
            embed_dim, hidden_size, num_layers,
            batch_first=True, bidirectional=True, dropout=dropout
        )
        self.fc = nn.Linear(hidden_size * 2, num_classes)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        embedded = self.dropout(self.embedding(x))
        lstm_out, (hidden, _) = self.lstm(embedded)
        hidden = torch.cat((hidden[-2], hidden[-1]), dim=1)
        out = self.fc(self.dropout(hidden))
        return out

vocab = {'<pad>': 0, '<unk>': 1, 'hello': 2, 'world': 3}
texts = ['hello world', 'world hello', 'hello hello world']
labels = [0, 1, 0]

dataset = TextDataset(texts, labels, vocab, max_length=10)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TextClassifier(
    vocab_size=len(vocab),
    embed_dim=32,
    hidden_size=64,
    num_layers=2,
    num_classes=2
).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(10):
    model.train()
    total_loss = 0
    for texts, labels in dataloader:
        texts, labels = texts.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(texts)
        loss = criterion(outputs, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        
        total_loss += loss.item()
    
    print(f"Epoch {epoch+1}, Loss: {total_loss/len(dataloader):.4f}")

下一步 #

现在你已经掌握了 PyTorch 循环神经网络的核心概念,接下来学习 迁移学习,了解如何利用预训练模型!

最后更新:2026-03-29