自定义训练 #

GradientTape 概述 #

GradientTape 是 TensorFlow 提供的自动微分工具,用于记录计算过程并计算梯度。

基本用法 #

python
import tensorflow as tf

x = tf.Variable(3.0)

with tf.GradientTape() as tape:
    y = x ** 2

grad = tape.gradient(y, x)
print(f"dy/dx = {grad.numpy()}")

# 多变量梯度
w = tf.Variable(tf.random.normal([3, 2]))
b = tf.Variable(tf.zeros([2]))

with tf.GradientTape() as tape:
    x = tf.ones([1, 3])
    y = tf.matmul(x, w) + b
    loss = tf.reduce_mean(y ** 2)

grads = tape.gradient(loss, [w, b])
print(f"w 梯度形状: {grads[0].shape}")
print(f"b 梯度形状: {grads[1].shape}")

GradientTape 配置 #

持久化 Tape #

python
import tensorflow as tf

x = tf.Variable(3.0)

with tf.GradientTape(persistent=True) as tape:
    y = x ** 2
    z = x ** 3

print(f"dy/dx = {tape.gradient(y, x).numpy()}")
print(f"dz/dx = {tape.gradient(z, x).numpy()}")

del tape

监控非变量 #

python
import tensorflow as tf

x = tf.constant(3.0)

with tf.GradientTape() as tape:
    tape.watch(x)
    y = x ** 2

grad = tape.gradient(y, x)
print(f"dy/dx = {grad.numpy()}")

高阶梯度 #

python
import tensorflow as tf

x = tf.Variable(3.0)

with tf.GradientTape() as tape1:
    with tf.GradientTape() as tape2:
        y = x ** 3
    dy_dx = tape2.gradient(y, x)
d2y_dx2 = tape1.gradient(dy_dx, x)

print(f"一阶导数: {dy_dx.numpy()}")
print(f"二阶导数: {d2y_dx2.numpy()}")

自定义训练循环 #

基本训练循环 #

python
import tensorflow as tf
import numpy as np

model = tf.keras.Sequential([
    tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
    tf.keras.layers.Dense(10)
])

optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

x_train = np.random.random((1000, 784)).astype(np.float32)
y_train = np.random.randint(10, size=(1000,))

batch_size = 32
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(1000).batch(batch_size)

epochs = 5
for epoch in range(epochs):
    epoch_loss = 0.0
    num_batches = 0
    
    for x_batch, y_batch in train_dataset:
        with tf.GradientTape() as tape:
            logits = model(x_batch, training=True)
            loss = loss_fn(y_batch, logits)
        
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        
        epoch_loss += loss.numpy()
        num_batches += 1
    
    print(f"Epoch {epoch + 1}: Loss = {epoch_loss / num_batches:.4f}")

使用 tf.function 加速 #

python
import tensorflow as tf

model = tf.keras.Sequential([
    tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
    tf.keras.layers.Dense(10)
])

optimizer = tf.keras.optimizers.Adam()
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
train_acc_metric = tf.keras.metrics.SparseCategoricalAccuracy()

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss = loss_fn(y, logits)
    
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    
    train_acc_metric.update_state(y, logits)
    return loss

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(1000).batch(32)

for epoch in range(5):
    for x_batch, y_batch in train_dataset:
        loss = train_step(x_batch, y_batch)
    
    train_acc = train_acc_metric.result()
    print(f"Epoch {epoch + 1}: Accuracy = {train_acc:.4f}")
    train_acc_metric.reset_states()

带验证的训练循环 #

python
import tensorflow as tf

model = tf.keras.Sequential([
    tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
    tf.keras.layers.Dense(10)
])

optimizer = tf.keras.optimizers.Adam()
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

train_acc_metric = tf.keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = tf.keras.metrics.SparseCategoricalAccuracy()

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss = loss_fn(y, logits)
    
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    train_acc_metric.update_state(y, logits)
    return loss

@tf.function
def val_step(x, y):
    logits = model(x, training=False)
    val_acc_metric.update_state(y, logits)

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(1000).batch(32)

val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(32)

for epoch in range(10):
    for x_batch, y_batch in train_dataset:
        train_step(x_batch, y_batch)
    
    for x_batch, y_batch in val_dataset:
        val_step(x_batch, y_batch)
    
    train_acc = train_acc_metric.result()
    val_acc = val_acc_metric.result()
    
    print(f"Epoch {epoch + 1}: Train Acc = {train_acc:.4f}, Val Acc = {val_acc:.4f}")
    
    train_acc_metric.reset_states()
    val_acc_metric.reset_states()

梯度处理 #

梯度裁剪 #

python
import tensorflow as tf

model = tf.keras.Sequential([
    tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
    tf.keras.layers.Dense(10)
])

optimizer = tf.keras.optimizers.Adam()

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss = tf.keras.losses.sparse_categorical_crossentropy(y, logits, from_logits=True)
        loss = tf.reduce_mean(loss)
    
    gradients = tape.gradient(loss, model.trainable_variables)
    
    # 按全局范数裁剪
    gradients, _ = tf.clip_by_global_norm(gradients, 1.0)
    
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss

梯度累积 #

python
import tensorflow as tf

model = tf.keras.Sequential([
    tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
    tf.keras.layers.Dense(10)
])

optimizer = tf.keras.optimizers.Adam()
accum_steps = 4

@tf.function
def train_step(x, y, accum_gradients):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss = tf.keras.losses.sparse_categorical_crossentropy(y, logits, from_logits=True)
        loss = tf.reduce_mean(loss) / accum_steps
    
    gradients = tape.gradient(loss, model.trainable_variables)
    
    if accum_gradients[0] is None:
        accum_gradients = gradients
    else:
        accum_gradients = [accum_g + g for accum_g, g in zip(accum_gradients, gradients)]
    
    return loss, accum_gradients

accum_gradients = [None] * len(model.trainable_variables)
step = 0

for x_batch, y_batch in train_dataset:
    loss, accum_gradients = train_step(x_batch, y_batch, accum_gradients)
    step += 1
    
    if step % accum_steps == 0:
        optimizer.apply_gradients(zip(accum_gradients, model.trainable_variables))
        accum_gradients = [None] * len(model.trainable_variables)

学习率调度 #

python
import tensorflow as tf

initial_lr = 0.001
decay_steps = 1000
decay_rate = 0.9

lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=initial_lr,
    decay_steps=decay_steps,
    decay_rate=decay_rate
)

optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

step = 0
for epoch in range(10):
    for x_batch, y_batch in train_dataset:
        with tf.GradientTape() as tape:
            logits = model(x_batch, training=True)
            loss = loss_fn(y_batch, logits)
        
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        
        step += 1
        if step % 100 == 0:
            print(f"Step {step}: LR = {optimizer.learning_rate(step):.6f}")

自定义 Model.train_step #

python
import tensorflow as tf

class CustomModel(tf.keras.Model):
    def train_step(self, data):
        x, y = data
        
        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)
        
        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        
        self.compiled_metrics.update_state(y, y_pred)
        
        return {m.name: m.result() for m in self.metrics}

model = CustomModel([
    tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
    tf.keras.layers.Dense(10)
])

model.compile(
    optimizer='adam',
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy']
)

model.fit(x_train, y_train, epochs=10, batch_size=32)

下一步 #

现在你已经掌握了自定义训练,接下来学习 GPU 加速,了解如何利用 GPU 加速模型训练!

最后更新:2026-04-04