PyTorch GPU 加速 #
GPU 加速原理 #
GPU(图形处理器)具有大规模并行计算能力,非常适合深度学习中的矩阵运算。
text
┌─────────────────────────────────────────────────────────────┐
│ CPU vs GPU 对比 │
├─────────────────────────────────────────────────────────────┤
│ │
│ CPU: │
│ - 少量强大核心(4-64核) │
│ - 高主频,低延迟 │
│ - 适合串行任务 │
│ - 通用计算 │
│ │
│ GPU: │
│ - 大量简单核心(数千核) │
│ - 低主频,高吞吐量 │
│ - 适合并行任务 │
│ - 专用计算(图形、深度学习) │
│ │
│ 深度学习: │
│ - 大量矩阵乘法 │
│ - 可以高度并行化 │
│ - GPU 加速 10-100 倍 │
│ │
└─────────────────────────────────────────────────────────────┘
CUDA 基础 #
检查 CUDA 可用性 #
python
import torch
print(f"CUDA 可用: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"CUDA 版本: {torch.version.cuda}")
print(f"GPU 数量: {torch.cuda.device_count()}")
print(f"当前 GPU: {torch.cuda.current_device()}")
print(f"GPU 名称: {torch.cuda.get_device_name(0)}")
print(f"GPU 显存: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB")
设备管理 #
python
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")
device = torch.device("cuda:0")
device = torch.device("cuda:1")
print(f"当前设备: {torch.cuda.current_device()}")
torch.cuda.set_device(1)
for i in range(torch.cuda.device_count()):
print(f"GPU {i}: {torch.cuda.get_device_name(i)}")
张量设备转移 #
基本转移 #
python
import torch
x = torch.randn(3, 3)
print(f"CPU 张量设备: {x.device}")
x_gpu = x.to("cuda")
print(f"GPU 张量设备: {x_gpu.device}")
x_cpu = x_gpu.to("cpu")
print(f"CPU 张量设备: {x_cpu.device}")
x_gpu = x.cuda()
x_cpu = x_gpu.cpu()
x = torch.randn(3, 3, device="cuda")
print(f"直接创建 GPU 张量: {x.device}")
模型设备转移 #
python
import torch
import torch.nn as nn
class SimpleNet(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(784, 256)
self.fc2 = nn.Linear(256, 10)
def forward(self, x):
x = torch.relu(self.fc1(x))
return self.fc2(x)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleNet().to(device)
x = torch.randn(32, 784).to(device)
output = model(x)
print(f"输出设备: {output.device}")
model = nn.DataParallel(SimpleNet()).to(device)
设备一致性 #
python
import torch
x = torch.randn(3, 3, device="cuda")
y = torch.randn(3, 3)
try:
z = x + y
except RuntimeError as e:
print(f"错误: {e}")
y = y.to("cuda")
z = x + y
print(f"成功计算: {z.device}")
GPU 内存管理 #
内存监控 #
python
import torch
x = torch.randn(1000, 1000, device="cuda")
print(f"已分配内存: {torch.cuda.memory_allocated() / 1024**2:.2f} MB")
print(f"缓存内存: {torch.cuda.memory_reserved() / 1024**2:.2f} MB")
print(f"最大分配内存: {torch.cuda.max_memory_allocated() / 1024**2:.2f} MB")
def print_memory_usage():
allocated = torch.cuda.memory_allocated() / 1024**2
reserved = torch.cuda.memory_reserved() / 1024**2
print(f"已分配: {allocated:.2f} MB, 缓存: {reserved:.2f} MB")
print_memory_usage()
内存清理 #
python
import torch
import gc
x = torch.randn(10000, 10000, device="cuda")
print("分配后:")
print(f" 已分配: {torch.cuda.memory_allocated() / 1024**2:.2f} MB")
del x
gc.collect()
torch.cuda.empty_cache()
print("清理后:")
print(f" 已分配: {torch.cuda.memory_allocated() / 1024**2:.2f} MB")
内存优化技巧 #
python
import torch
import torch.nn as nn
x = torch.randn(32, 3, 224, 224, device="cuda")
with torch.no_grad():
output = model(x)
for x, y in dataloader:
x, y = x.to("cuda"), y.to("cuda")
optimizer.zero_grad(set_to_none=True)
output = model(x)
loss = criterion(output, y)
loss.backward()
optimizer.step()
x = torch.randn(1000, 1000, device="cuda", dtype=torch.float16)
多 GPU 训练 #
DataParallel #
python
import torch
import torch.nn as nn
class SimpleNet(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(784, 256)
self.fc2 = nn.Linear(256, 10)
def forward(self, x):
x = torch.relu(self.fc1(x))
return self.fc2(x)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleNet()
if torch.cuda.device_count() > 1:
print(f"使用 {torch.cuda.device_count()} 个 GPU")
model = nn.DataParallel(model)
model = model.to(device)
x = torch.randn(64, 784).to(device)
output = model(x)
print(f"输出形状: {output.shape}")
DistributedDataParallel #
python
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
def setup(rank, world_size):
dist.init_process_group("nccl", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
class SimpleNet(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(784, 256)
self.fc2 = nn.Linear(256, 10)
def forward(self, x):
x = torch.relu(self.fc1(x))
return self.fc2(x)
def train(rank, world_size):
setup(rank, world_size)
model = SimpleNet().to(rank)
ddp_model = DDP(model, device_ids=[rank])
optimizer = torch.optim.SGD(ddp_model.parameters(), lr=0.01)
train_sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = DataLoader(dataset, batch_size=64, sampler=train_sampler)
for epoch in range(10):
train_sampler.set_epoch(epoch)
for batch in dataloader:
optimizer.zero_grad()
output = ddp_model(batch)
loss = criterion(output, target)
loss.backward()
optimizer.step()
cleanup()
if __name__ == "__main__":
world_size = torch.cuda.device_count()
torch.multiprocessing.spawn(train, args=(world_size,), nprocs=world_size)
DDP 启动脚本 #
bash
torchrun --nproc_per_node=4 train.py
python
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def main():
dist.init_process_group("nccl")
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)
model = SimpleNet().to(local_rank)
model = DDP(model, device_ids=[local_rank])
for epoch in range(epochs):
train_one_epoch(model, dataloader, optimizer, local_rank)
dist.destroy_process_group()
if __name__ == "__main__":
main()
混合精度训练 #
自动混合精度 #
python
import torch
import torch.nn as nn
from torch.cuda.amp import autocast, GradScaler
device = torch.device("cuda")
model = nn.Linear(1000, 10).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
scaler = GradScaler()
for epoch in range(10):
for x, y in dataloader:
x, y = x.to(device), y.to(device)
optimizer.zero_grad()
with autocast():
output = model(x)
loss = criterion(output, y)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
手动混合精度 #
python
import torch
import torch.nn as nn
model = nn.Linear(1000, 10).cuda()
model = model.half()
x = torch.randn(32, 1000, dtype=torch.float16, device="cuda")
output = model(x)
print(f"输出类型: {output.dtype}")
output = output.float()
混合精度最佳实践 #
python
import torch
import torch.nn as nn
from torch.cuda.amp import autocast, GradScaler
class Model(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(3, 64, 3)
self.bn1 = nn.BatchNorm2d(64)
self.fc = nn.Linear(64, 10)
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = x.mean(dim=[2, 3])
return self.fc(x)
model = Model().cuda()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
scaler = GradScaler()
def train_step(x, y):
optimizer.zero_grad()
with autocast():
output = model(x)
loss = nn.CrossEntropyLoss()(output, y)
scaler.scale(loss).backward()
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
scaler.step(optimizer)
scaler.update()
return loss.item()
性能优化 #
数据加载优化 #
python
from torch.utils.data import DataLoader
dataloader = DataLoader(
dataset,
batch_size=64,
shuffle=True,
num_workers=4,
pin_memory=True,
prefetch_factor=2
)
for x, y in dataloader:
x = x.to("cuda", non_blocking=True)
y = y.to("cuda", non_blocking=True)
CUDA Graphs #
python
import torch
model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
static_input = torch.randn(32, 10, device="cuda")
static_target = torch.randn(32, 10, device="cuda")
s = torch.cuda.Stream()
with torch.cuda.stream(s):
for _ in range(3):
optimizer.zero_grad(set_to_none=True)
output = model(static_input)
loss = criterion(output, static_target)
loss.backward()
optimizer.step()
g = torch.cuda.CUDAGraph()
s = torch.cuda.Stream()
with torch.cuda.stream(s):
with torch.cuda.graph(g):
static_input = torch.randn(32, 10, device="cuda")
optimizer.zero_grad(set_to_none=True)
static_output = model(static_input)
static_loss = criterion(static_output, static_target)
static_loss.backward()
optimizer.step()
real_input = torch.randn(32, 10, device="cuda")
static_input.copy_(real_input)
g.replay()
编译优化 #
python
import torch
model = MyModel().cuda()
model = torch.compile(model)
output = model(input)
model = torch.compile(model, mode="reduce-overhead")
model = torch.compile(model, mode="max-autotune")
性能分析 #
使用 Profiler #
python
import torch
from torch.profiler import profile, record_function, ProfilerActivity
model = MyModel().cuda()
inputs = torch.randn(32, 10, device="cuda")
with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
record_shapes=True,
profile_memory=True) as prof:
with record_function("model_inference"):
model(inputs)
print(prof.key_averages().table(sort_by="cuda_time_total", row_limit=10))
prof.export_chrome_trace("trace.json")
内存分析 #
python
import torch
from torch.profiler import profile, ProfilerActivity
with profile(activities=[ProfilerActivity.CPU],
profile_memory=True,
record_shapes=True) as prof:
model(inputs)
print(prof.key_averages().table(sort_by="self_cpu_memory_usage", row_limit=10))
常见问题解决 #
GPU 内存不足 #
python
import torch
torch.cuda.empty_cache()
batch_size = 16
x = torch.randn(1000, 1000, device="cuda", dtype=torch.float16)
with torch.no_grad():
output = model(x)
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
output = model(batch)
del output
torch.cuda.empty_cache()
设备不匹配 #
python
import torch
def ensure_same_device(*tensors):
device = tensors[0].device
return [t.to(device) if t.device != device else t for t in tensors]
x = torch.randn(3, 3, device="cuda:0")
y = torch.randn(3, 3, device="cuda:1")
y = y.to(x.device)
z = x + y
下一步 #
现在你已经掌握了 PyTorch GPU 加速的核心概念,接下来学习 模型保存与加载,了解如何持久化你的模型!
最后更新:2026-03-29