PyTorch 循环神经网络 #
RNN 简介 #
循环神经网络(Recurrent Neural Network, RNN)是一类专门处理序列数据的神经网络,能够捕捉序列中的时序依赖关系。
text
┌─────────────────────────────────────────────────────────────┐
│ RNN 核心思想 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 序列数据的特点: │
│ - 文本:单词序列 │
│ - 语音:音频序列 │
│ - 视频:帧序列 │
│ - 时间序列:股票价格 │
│ │
│ RNN 的特点: │
│ - 隐藏状态传递时序信息 │
│ - 参数在时间步之间共享 │
│ - 可以处理变长序列 │
│ │
│ RNN 结构: │
│ │
│ x₁ ──► h₁ ──► y₁ │
│ │ │
│ ▼ │
│ x₂ ──► h₂ ──► y₂ │
│ │ │
│ ▼ │
│ x₃ ──► h₃ ──► y₃ │
│ │
└─────────────────────────────────────────────────────────────┘
基本 RNN #
RNN 单元 #
python
import torch
import torch.nn as nn
rnn = nn.RNN(
input_size=10,
hidden_size=20,
num_layers=1,
nonlinearity='tanh',
bias=True,
batch_first=True,
dropout=0,
bidirectional=False
)
x = torch.randn(32, 5, 10)
output, h_n = rnn(x)
print(f"输出形状: {output.shape}")
print(f"隐藏状态形状: {h_n.shape}")
手动实现 RNN #
python
import torch
import torch.nn as nn
class RNNCell(nn.Module):
def __init__(self, input_size, hidden_size):
super().__init__()
self.hidden_size = hidden_size
self.i2h = nn.Linear(input_size + hidden_size, hidden_size)
self.tanh = nn.Tanh()
def forward(self, x, h_prev):
combined = torch.cat((x, h_prev), dim=1)
h = self.tanh(self.i2h(combined))
return h
class ManualRNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.hidden_size = hidden_size
self.rnn_cell = RNNCell(input_size, hidden_size)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
batch_size, seq_len, _ = x.size()
h = torch.zeros(batch_size, self.hidden_size, device=x.device)
outputs = []
for t in range(seq_len):
h = self.rnn_cell(x[:, t, :], h)
outputs.append(h)
outputs = torch.stack(outputs, dim=1)
return outputs, h
model = ManualRNN(10, 20, 5)
x = torch.randn(32, 5, 10)
output, h_n = model(x)
print(f"输出形状: {output.shape}")
LSTM #
LSTM 原理 #
text
┌─────────────────────────────────────────────────────────────┐
│ LSTM 结构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ LSTM 解决 RNN 的梯度消失问题,引入三个门: │
│ │
│ 遗忘门(Forget Gate): │
│ f_t = σ(W_f · [h_{t-1}, x_t] + b_f) │
│ 决定丢弃哪些信息 │
│ │
│ 输入门(Input Gate): │
│ i_t = σ(W_i · [h_{t-1}, x_t] + b_i) │
│ C̃_t = tanh(W_C · [h_{t-1}, x_t] + b_C) │
│ 决定存储哪些新信息 │
│ │
│ 单元状态更新: │
│ C_t = f_t * C_{t-1} + i_t * C̃_t │
│ │
│ 输出门(Output Gate): │
│ o_t = σ(W_o · [h_{t-1}, x_t] + b_o) │
│ h_t = o_t * tanh(C_t) │
│ │
└─────────────────────────────────────────────────────────────┘
PyTorch LSTM #
python
import torch
import torch.nn as nn
lstm = nn.LSTM(
input_size=10,
hidden_size=20,
num_layers=2,
bias=True,
batch_first=True,
dropout=0.5,
bidirectional=False
)
x = torch.randn(32, 5, 10)
h0 = torch.zeros(2, 32, 20)
c0 = torch.zeros(2, 32, 20)
output, (h_n, c_n) = lstm(x, (h0, c0))
print(f"输出形状: {output.shape}")
print(f"隐藏状态形状: {h_n.shape}")
print(f"单元状态形状: {c_n.shape}")
LSTM 分类模型 #
python
import torch
import torch.nn as nn
class LSTMClassifier(nn.Module):
def __init__(self, vocab_size, embed_dim, hidden_size, num_layers, num_classes, dropout=0.5):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.lstm = nn.LSTM(
embed_dim,
hidden_size,
num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
bidirectional=True
)
self.fc = nn.Linear(hidden_size * 2, num_classes)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
embedded = self.dropout(self.embedding(x))
lstm_out, (hidden, _) = self.lstm(embedded)
hidden = torch.cat((hidden[-2], hidden[-1]), dim=1)
out = self.fc(self.dropout(hidden))
return out
model = LSTMClassifier(
vocab_size=10000,
embed_dim=128,
hidden_size=256,
num_layers=2,
num_classes=2
)
x = torch.randint(0, 10000, (32, 50))
output = model(x)
print(f"输出形状: {output.shape}")
GRU #
GRU 原理 #
text
┌─────────────────────────────────────────────────────────────┐
│ GRU 结构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ GRU 是 LSTM 的简化版本,只有两个门: │
│ │
│ 重置门(Reset Gate): │
│ r_t = σ(W_r · [h_{t-1}, x_t]) │
│ │
│ 更新门(Update Gate): │
│ z_t = σ(W_z · [h_{t-1}, x_t]) │
│ │
│ 候选隐藏状态: │
│ h̃_t = tanh(W · [r_t * h_{t-1}, x_t]) │
│ │
│ 隐藏状态更新: │
│ h_t = (1 - z_t) * h_{t-1} + z_t * h̃_t │
│ │
│ GRU vs LSTM: │
│ - GRU 参数更少,训练更快 │
│ - LSTM 表达能力更强 │
│ - 数据量大时 LSTM 可能更好 │
│ │
└─────────────────────────────────────────────────────────────┘
PyTorch GRU #
python
import torch
import torch.nn as nn
gru = nn.GRU(
input_size=10,
hidden_size=20,
num_layers=2,
bias=True,
batch_first=True,
dropout=0.5,
bidirectional=False
)
x = torch.randn(32, 5, 10)
h0 = torch.zeros(2, 32, 20)
output, h_n = gru(x, h0)
print(f"输出形状: {output.shape}")
print(f"隐藏状态形状: {h_n.shape}")
GRU 序列标注 #
python
import torch
import torch.nn as nn
class GRUTagger(nn.Module):
def __init__(self, vocab_size, embed_dim, hidden_size, num_layers, num_tags):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.gru = nn.GRU(
embed_dim,
hidden_size,
num_layers,
batch_first=True,
bidirectional=True
)
self.fc = nn.Linear(hidden_size * 2, num_tags)
def forward(self, x):
embedded = self.embedding(x)
gru_out, _ = self.gru(embedded)
out = self.fc(gru_out)
return out
model = GRUTagger(
vocab_size=10000,
embed_dim=128,
hidden_size=256,
num_layers=2,
num_tags=10
)
x = torch.randint(0, 10000, (32, 50))
output = model(x)
print(f"输出形状: {output.shape}")
双向 RNN #
python
import torch
import torch.nn as nn
class BiLSTM(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes):
super().__init__()
self.lstm = nn.LSTM(
input_size,
hidden_size,
num_layers,
batch_first=True,
bidirectional=True
)
self.fc = nn.Linear(hidden_size * 2, num_classes)
def forward(self, x):
lstm_out, (hidden, _) = self.lstm(x)
out = self.fc(lstm_out[:, -1, :])
return out
model = BiLSTM(10, 20, 2, 5)
x = torch.randn(32, 5, 10)
output = model(x)
print(f"双向 LSTM 输出: {output.shape}")
序列到序列模型 #
Encoder-Decoder #
python
import torch
import torch.nn as nn
class Encoder(nn.Module):
def __init__(self, vocab_size, embed_dim, hidden_size, num_layers):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.lstm = nn.LSTM(embed_dim, hidden_size, num_layers, batch_first=True)
def forward(self, x):
embedded = self.embedding(x)
outputs, (hidden, cell) = self.lstm(embedded)
return outputs, hidden, cell
class Decoder(nn.Module):
def __init__(self, vocab_size, embed_dim, hidden_size, num_layers):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.lstm = nn.LSTM(embed_dim, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, vocab_size)
def forward(self, x, hidden, cell):
embedded = self.embedding(x)
output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
prediction = self.fc(output)
return prediction, hidden, cell
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder, device):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device
def forward(self, src, trg, teacher_forcing_ratio=0.5):
batch_size, trg_len = trg.shape
trg_vocab_size = self.decoder.fc.out_features
outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)
_, hidden, cell = self.encoder(src)
input = trg[:, 0].unsqueeze(1)
for t in range(1, trg_len):
output, hidden, cell = self.decoder(input, hidden, cell)
outputs[:, t] = output.squeeze(1)
teacher_force = torch.rand(1) < teacher_forcing_ratio
top1 = output.argmax(2)
input = trg[:, t].unsqueeze(1) if teacher_force else top1
return outputs
encoder = Encoder(vocab_size=10000, embed_dim=256, hidden_size=512, num_layers=2)
decoder = Decoder(vocab_size=10000, embed_dim=256, hidden_size=512, num_layers=2)
device = torch.device("cpu")
model = Seq2Seq(encoder, decoder, device)
src = torch.randint(0, 10000, (32, 20))
trg = torch.randint(0, 10000, (32, 15))
output = model(src, trg)
print(f"Seq2Seq 输出: {output.shape}")
注意力机制 #
基本注意力 #
python
import torch
import torch.nn as nn
import torch.nn.functional as F
class Attention(nn.Module):
def __init__(self, hidden_size):
super().__init__()
self.attn = nn.Linear(hidden_size * 2, hidden_size)
self.v = nn.Linear(hidden_size, 1, bias=False)
def forward(self, hidden, encoder_outputs):
batch_size, src_len, _ = encoder_outputs.shape
hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
attention = self.v(energy).squeeze(2)
return F.softmax(attention, dim=1)
class AttnDecoder(nn.Module):
def __init__(self, vocab_size, embed_dim, hidden_size, num_layers):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.lstm = nn.LSTM(embed_dim + hidden_size, hidden_size, num_layers, batch_first=True)
self.attention = Attention(hidden_size)
self.fc = nn.Linear(hidden_size * 2, vocab_size)
def forward(self, x, hidden, cell, encoder_outputs):
embedded = self.embedding(x)
attn_weights = self.attention(hidden[-1], encoder_outputs)
attn_weights = attn_weights.unsqueeze(1)
context = torch.bmm(attn_weights, encoder_outputs)
lstm_input = torch.cat((embedded, context), dim=2)
output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))
output = torch.cat((output.squeeze(1), context.squeeze(1)), dim=1)
prediction = self.fc(output)
return prediction, hidden, cell, attn_weights
实用技巧 #
打包序列 #
python
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
class PackedLSTM(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, num_classes)
def forward(self, x, lengths):
lengths = torch.as_tensor(lengths, dtype=torch.long)
packed = pack_padded_sequence(x, lengths.cpu(), batch_first=True, enforce_sorted=False)
packed_out, (hidden, _) = self.lstm(packed)
out, _ = pad_packed_sequence(packed_out, batch_first=True)
return self.fc(hidden[-1])
model = PackedLSTM(10, 20, 2, 5)
x = torch.randn(4, 10, 10)
lengths = [10, 8, 6, 4]
output = model(x, lengths)
print(f"打包 LSTM 输出: {output.shape}")
梯度裁剪 #
python
import torch
import torch.nn as nn
def train_with_clip(model, dataloader, criterion, optimizer, device, max_grad_norm=1.0):
model.train()
for batch in dataloader:
inputs, targets = batch
inputs, targets = inputs.to(device), targets.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
optimizer.step()
完整示例:文本分类 #
python
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
class TextDataset(Dataset):
def __init__(self, texts, labels, vocab, max_length=100):
self.texts = texts
self.labels = labels
self.vocab = vocab
self.max_length = max_length
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
tokens = text.lower().split()[:self.max_length]
token_ids = [self.vocab.get(t, self.vocab['<unk>']) for t in tokens]
if len(token_ids) < self.max_length:
token_ids += [self.vocab['<pad>']] * (self.max_length - len(token_ids))
return torch.tensor(token_ids), torch.tensor(self.labels[idx])
class TextClassifier(nn.Module):
def __init__(self, vocab_size, embed_dim, hidden_size, num_layers, num_classes, dropout=0.5):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
self.lstm = nn.LSTM(
embed_dim, hidden_size, num_layers,
batch_first=True, bidirectional=True, dropout=dropout
)
self.fc = nn.Linear(hidden_size * 2, num_classes)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
embedded = self.dropout(self.embedding(x))
lstm_out, (hidden, _) = self.lstm(embedded)
hidden = torch.cat((hidden[-2], hidden[-1]), dim=1)
out = self.fc(self.dropout(hidden))
return out
vocab = {'<pad>': 0, '<unk>': 1, 'hello': 2, 'world': 3}
texts = ['hello world', 'world hello', 'hello hello world']
labels = [0, 1, 0]
dataset = TextDataset(texts, labels, vocab, max_length=10)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TextClassifier(
vocab_size=len(vocab),
embed_dim=32,
hidden_size=64,
num_layers=2,
num_classes=2
).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
for epoch in range(10):
model.train()
total_loss = 0
for texts, labels in dataloader:
texts, labels = texts.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(texts)
loss = criterion(outputs, labels)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}, Loss: {total_loss/len(dataloader):.4f}")
下一步 #
现在你已经掌握了 PyTorch 循环神经网络的核心概念,接下来学习 迁移学习,了解如何利用预训练模型!
最后更新:2026-03-29