自定义训练 #
GradientTape 概述 #
GradientTape 是 TensorFlow 提供的自动微分工具,用于记录计算过程并计算梯度。
基本用法 #
python
import tensorflow as tf
x = tf.Variable(3.0)
with tf.GradientTape() as tape:
y = x ** 2
grad = tape.gradient(y, x)
print(f"dy/dx = {grad.numpy()}")
# 多变量梯度
w = tf.Variable(tf.random.normal([3, 2]))
b = tf.Variable(tf.zeros([2]))
with tf.GradientTape() as tape:
x = tf.ones([1, 3])
y = tf.matmul(x, w) + b
loss = tf.reduce_mean(y ** 2)
grads = tape.gradient(loss, [w, b])
print(f"w 梯度形状: {grads[0].shape}")
print(f"b 梯度形状: {grads[1].shape}")
GradientTape 配置 #
持久化 Tape #
python
import tensorflow as tf
x = tf.Variable(3.0)
with tf.GradientTape(persistent=True) as tape:
y = x ** 2
z = x ** 3
print(f"dy/dx = {tape.gradient(y, x).numpy()}")
print(f"dz/dx = {tape.gradient(z, x).numpy()}")
del tape
监控非变量 #
python
import tensorflow as tf
x = tf.constant(3.0)
with tf.GradientTape() as tape:
tape.watch(x)
y = x ** 2
grad = tape.gradient(y, x)
print(f"dy/dx = {grad.numpy()}")
高阶梯度 #
python
import tensorflow as tf
x = tf.Variable(3.0)
with tf.GradientTape() as tape1:
with tf.GradientTape() as tape2:
y = x ** 3
dy_dx = tape2.gradient(y, x)
d2y_dx2 = tape1.gradient(dy_dx, x)
print(f"一阶导数: {dy_dx.numpy()}")
print(f"二阶导数: {d2y_dx2.numpy()}")
自定义训练循环 #
基本训练循环 #
python
import tensorflow as tf
import numpy as np
model = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
tf.keras.layers.Dense(10)
])
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
x_train = np.random.random((1000, 784)).astype(np.float32)
y_train = np.random.randint(10, size=(1000,))
batch_size = 32
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(1000).batch(batch_size)
epochs = 5
for epoch in range(epochs):
epoch_loss = 0.0
num_batches = 0
for x_batch, y_batch in train_dataset:
with tf.GradientTape() as tape:
logits = model(x_batch, training=True)
loss = loss_fn(y_batch, logits)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
epoch_loss += loss.numpy()
num_batches += 1
print(f"Epoch {epoch + 1}: Loss = {epoch_loss / num_batches:.4f}")
使用 tf.function 加速 #
python
import tensorflow as tf
model = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
tf.keras.layers.Dense(10)
])
optimizer = tf.keras.optimizers.Adam()
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
train_acc_metric = tf.keras.metrics.SparseCategoricalAccuracy()
@tf.function
def train_step(x, y):
with tf.GradientTape() as tape:
logits = model(x, training=True)
loss = loss_fn(y, logits)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_acc_metric.update_state(y, logits)
return loss
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(1000).batch(32)
for epoch in range(5):
for x_batch, y_batch in train_dataset:
loss = train_step(x_batch, y_batch)
train_acc = train_acc_metric.result()
print(f"Epoch {epoch + 1}: Accuracy = {train_acc:.4f}")
train_acc_metric.reset_states()
带验证的训练循环 #
python
import tensorflow as tf
model = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
tf.keras.layers.Dense(10)
])
optimizer = tf.keras.optimizers.Adam()
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
train_acc_metric = tf.keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = tf.keras.metrics.SparseCategoricalAccuracy()
@tf.function
def train_step(x, y):
with tf.GradientTape() as tape:
logits = model(x, training=True)
loss = loss_fn(y, logits)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_acc_metric.update_state(y, logits)
return loss
@tf.function
def val_step(x, y):
logits = model(x, training=False)
val_acc_metric.update_state(y, logits)
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(1000).batch(32)
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(32)
for epoch in range(10):
for x_batch, y_batch in train_dataset:
train_step(x_batch, y_batch)
for x_batch, y_batch in val_dataset:
val_step(x_batch, y_batch)
train_acc = train_acc_metric.result()
val_acc = val_acc_metric.result()
print(f"Epoch {epoch + 1}: Train Acc = {train_acc:.4f}, Val Acc = {val_acc:.4f}")
train_acc_metric.reset_states()
val_acc_metric.reset_states()
梯度处理 #
梯度裁剪 #
python
import tensorflow as tf
model = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
tf.keras.layers.Dense(10)
])
optimizer = tf.keras.optimizers.Adam()
@tf.function
def train_step(x, y):
with tf.GradientTape() as tape:
logits = model(x, training=True)
loss = tf.keras.losses.sparse_categorical_crossentropy(y, logits, from_logits=True)
loss = tf.reduce_mean(loss)
gradients = tape.gradient(loss, model.trainable_variables)
# 按全局范数裁剪
gradients, _ = tf.clip_by_global_norm(gradients, 1.0)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
梯度累积 #
python
import tensorflow as tf
model = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
tf.keras.layers.Dense(10)
])
optimizer = tf.keras.optimizers.Adam()
accum_steps = 4
@tf.function
def train_step(x, y, accum_gradients):
with tf.GradientTape() as tape:
logits = model(x, training=True)
loss = tf.keras.losses.sparse_categorical_crossentropy(y, logits, from_logits=True)
loss = tf.reduce_mean(loss) / accum_steps
gradients = tape.gradient(loss, model.trainable_variables)
if accum_gradients[0] is None:
accum_gradients = gradients
else:
accum_gradients = [accum_g + g for accum_g, g in zip(accum_gradients, gradients)]
return loss, accum_gradients
accum_gradients = [None] * len(model.trainable_variables)
step = 0
for x_batch, y_batch in train_dataset:
loss, accum_gradients = train_step(x_batch, y_batch, accum_gradients)
step += 1
if step % accum_steps == 0:
optimizer.apply_gradients(zip(accum_gradients, model.trainable_variables))
accum_gradients = [None] * len(model.trainable_variables)
学习率调度 #
python
import tensorflow as tf
initial_lr = 0.001
decay_steps = 1000
decay_rate = 0.9
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
initial_learning_rate=initial_lr,
decay_steps=decay_steps,
decay_rate=decay_rate
)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
step = 0
for epoch in range(10):
for x_batch, y_batch in train_dataset:
with tf.GradientTape() as tape:
logits = model(x_batch, training=True)
loss = loss_fn(y_batch, logits)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
step += 1
if step % 100 == 0:
print(f"Step {step}: LR = {optimizer.learning_rate(step):.6f}")
自定义 Model.train_step #
python
import tensorflow as tf
class CustomModel(tf.keras.Model):
def train_step(self, data):
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True)
loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)
gradients = tape.gradient(loss, self.trainable_variables)
self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
self.compiled_metrics.update_state(y, y_pred)
return {m.name: m.result() for m in self.metrics}
model = CustomModel([
tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
tf.keras.layers.Dense(10)
])
model.compile(
optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy']
)
model.fit(x_train, y_train, epochs=10, batch_size=32)
下一步 #
现在你已经掌握了自定义训练,接下来学习 GPU 加速,了解如何利用 GPU 加速模型训练!
最后更新:2026-04-04