Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/zh-cn/guide/migrate/fault_tolerance.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

容错是指定期保存参数和模型等可跟踪对象的状态的机制。这样,您便能够在训练期间出现程序/机器故障时恢复它们。

本指南首先演示了如何通过使用 tf.estimator.RunConfig 指定指标保存以在 TensorFlow 1 中使用 tf.estimator.Estimator 向训练添加容错。随后,您将学习如何通过以下两种方式在 Tensorflow 2 中实现容错训练:

  • 如果您使用 Keras Model.fit API,则可以将 tf.keras.callbacks.BackupAndRestore 回调传递给它。

  • 如果您使用自定义训练循环(使用 tf.GradientTape),则可以使用 tf.train.Checkpointtf.train.CheckpointManager API 任意保存检查点。

这两种方式都会备份和恢复检查点文件中的训练状态。

安装

安装 tf-nightly,因为使用 tf.keras.callbacks.BackupAndRestore 中的 save_freq 参数设置特定步骤保存检查点的频率是从 TensorFlow 2.10 引入的:

!pip install tf-nightly
import tensorflow.compat.v1 as tf1 import tensorflow as tf import numpy as np import tempfile import time
mnist = tf.keras.datasets.mnist (x_train, y_train),(x_test, y_test) = mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0

TensorFlow 1:使用 tf.estimator.RunConfig 保存检查点

在 TensorFlow 1 中,可以配置 tf.estimator,随后通过配置 tf.estimator.RunConfig 在每一步保存检查点。

在此示例中,首先编写一个在第五个检查点期间人为抛出错误的钩子:

class InterruptHook(tf1.train.SessionRunHook): # A hook for artificially interrupting training. def begin(self): self._step = -1 def before_run(self, run_context): self._step += 1 def after_run(self, run_context, run_values): if self._step == 5: raise RuntimeError('Interruption')

接下来,配置 tf.estimator.Estimator 以保存每个检查点并使用 MNIST 数据集:

feature_columns = [tf1.feature_column.numeric_column("x", shape=[28, 28])] config = tf1.estimator.RunConfig(save_summary_steps=1, save_checkpoints_steps=1) path = tempfile.mkdtemp() classifier = tf1.estimator.DNNClassifier( feature_columns=feature_columns, hidden_units=[256, 32], optimizer=tf1.train.AdamOptimizer(0.001), n_classes=10, dropout=0.2, model_dir=path, config = config ) train_input_fn = tf1.estimator.inputs.numpy_input_fn( x={"x": x_train}, y=y_train.astype(np.int32), num_epochs=10, batch_size=50, shuffle=True, )

开始训练模型。您之前定义的钩子将引发人为异常。

try: classifier.train(input_fn=train_input_fn, hooks=[InterruptHook()], max_steps=10) except Exception as e: print(f'{type(e).__name__}:{e}')

使用最后保存的检查点重新构建 tf.estimator.Estimator 并继续训练:

classifier = tf1.estimator.DNNClassifier( feature_columns=feature_columns, hidden_units=[256, 32], optimizer=tf1.train.AdamOptimizer(0.001), n_classes=10, dropout=0.2, model_dir=path, config = config ) classifier.train(input_fn=train_input_fn, max_steps = 10)

TensorFlow 2:使用回调和 Model.fit 备份和恢复

在 TensorFlow 2 中,如果使用 Keras Model.fit API 进行训练,则可以提供 tf.keras.callbacks.BackupAndRestore 回调来添加容错功能。

为了帮助演示这一点,首先定义一个 Keras Callback 类,该类会在第四个周期检查点期间人为抛出错误:

class InterruptAtEpoch(tf.keras.callbacks.Callback): # A callback for artificially interrupting training. def __init__(self, interrupting_epoch=3): self.interrupting_epoch = interrupting_epoch def on_epoch_end(self, epoch, log=None): if epoch == self.interrupting_epoch: raise RuntimeError('Interruption')

然后,定义并实例化一个简单的 Keras 模型,定义损失函数,调用 Model.compile 并设置一个 tf.keras.callbacks.BackupAndRestore 回调,它会将检查点保存在周期边界的临时目录中:

def create_model(): return tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(512, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10) ]) loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) model = create_model() model.compile(optimizer='adam', loss=loss, metrics=['accuracy']) log_dir = tempfile.mkdtemp() backup_restore_callback = tf.keras.callbacks.BackupAndRestore( backup_dir = log_dir)

开始使用 Model.fit 训练模型。在训练期间,由于上面实例化的 tf.keras.callbacks.BackupAndRestore 将保存检查点,而 InterruptAtEpoch 类将引发人为异常来模拟第四个周期后的失败。

try: model.fit(x=x_train, y=y_train, epochs=10, steps_per_epoch=100, validation_data=(x_test, y_test), callbacks=[backup_restore_callback, InterruptAtEpoch()]) except Exception as e: print(f'{type(e).__name__}:{e}')

接下来,实例化 Keras 模型,调用 Model.compile,并从之前保存的检查点继续使用 Model.fit 训练模型:

model = create_model() model.compile(optimizer='adam', loss=loss, metrics=['accuracy'], steps_per_execution=10) model.fit(x=x_train, y=y_train, epochs=10, steps_per_epoch=100, validation_data=(x_test, y_test), callbacks=[backup_restore_callback])

定义另一个 Callback 类,该类会在第 140 步期间人为抛出错误:

class InterruptAtStep(tf.keras.callbacks.Callback): # A callback for artificially interrupting training. def __init__(self, interrupting_step=140): self.total_step_count = 0 self.interrupting_step = interrupting_step def on_batch_begin(self, batch, logs=None): self.total_step_count += 1 def on_batch_end(self, batch, logs=None): if self.total_step_count == self.interrupting_step: print("\nInterrupting at step count", self.total_step_count) raise RuntimeError('Interruption')

注:本部分使用了仅在 Tensorflow 2.10 发布后才能在 tf-nightly 中可用的功能。

要确保检查点每 30 个步骤保存一次,请将 BackupAndRestore 回调中的 save_freq 设置为 30InterruptAtStep 将引发一个人为的异常来模拟周期 1 和步骤 40 的失败(总步数为 140)。最后会在周期 1 和步骤 20 保存检查点。

log_dir_2 = tempfile.mkdtemp() backup_restore_callback = tf.keras.callbacks.BackupAndRestore( backup_dir = log_dir_2, save_freq=30 ) model = create_model() model.compile(optimizer='adam', loss=loss, metrics=['accuracy']) try: model.fit(x=x_train, y=y_train, epochs=10, steps_per_epoch=100, validation_data=(x_test, y_test), callbacks=[backup_restore_callback, InterruptAtStep()]) except Exception as e: print(f'{type(e).__name__}:{e}')

接下来,实例化 Keras 模型,调用 Model.compile,并从之前保存的检查点继续使用 Model.fit 训练模型。请注意,训练从周期 2 和步骤 21 开始。

model = create_model() model.compile(optimizer='adam', loss=loss, metrics=['accuracy'], steps_per_execution=10) model.fit(x=x_train, y=y_train, epochs=10, steps_per_epoch=100, validation_data=(x_test, y_test), callbacks=[backup_restore_callback])

TensorFlow 2:使用自定义训练循环编写手动检查点

如果您在 TensorFlow 2 中使用自定义训练循环,则可以使用 tf.train.Checkpointtf.train.CheckpointManager API 实现容错机制。

此示例演示了如何执行以下操作:

  • 使用 tf.train.Checkpoint 对象手动创建一个检查点,其中要保存的可跟踪对象设置为特性。

  • 使用 tf.train.CheckpointManager 管理多个检查点。

首先,定义和实例化 Keras 模型、优化器和损失函数。然后,创建一个 Checkpoint 来管理两个具有可跟踪状态的对象(模型和优化器),以及一个 CheckpointManager 来记录多个检查点并将它们保存在临时目录中。

model = create_model() optimizer = tf.keras.optimizers.SGD(learning_rate=0.001) loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) log_dir = tempfile.mkdtemp() epochs = 5 steps_per_epoch = 5 checkpoint = tf.train.Checkpoint(model=model, optimizer=optimizer) checkpoint_manager = tf.train.CheckpointManager( checkpoint, log_dir, max_to_keep=2)

现在,实现一个自定义训练循环,在第一个周期之后,每次新周期开始时都会加载最后一个检查点:

for epoch in range(epochs): if epoch > 0: tf.train.load_checkpoint(save_path) print(f"\nStart of epoch {epoch}") for step in range(steps_per_epoch): with tf.GradientTape() as tape: logits = model(x_train, training=True) loss_value = loss_fn(y_train, logits) grads = tape.gradient(loss_value, model.trainable_weights) optimizer.apply_gradients(zip(grads, model.trainable_weights)) save_path = checkpoint_manager.save() print(f"Checkpoint saved to {save_path}") print(f"Training loss at step {step}: {loss_value}")

后续步骤

要详细了解 TensorFlow 2 中的容错和检查点,请查看以下文档:

  • tf.keras.callbacks.BackupAndRestore 回调 API 文档。

  • tf.train.Checkpointtf.train.CheckpointManager API 文档。

  • 训练检查点指南,包括编写检查点部分。

此外,您可能还会发现下列与分布式训练相关的材料十分有用: