Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ja/guide/migrate/fault_tolerance.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

フォールトトレランスとは、パラメータやモデルなどの追跡可能なオブジェクトの状態を定期的に保存するメカニズムを指します。これにより、トレーニング中にプログラム/マシンに障害が発生した場合に回復が可能になります。

このガイドでは、まず tf.estimator.RunConfig で指標の保存を指定することにより、TensorFlow 1 で tf.estimator.Estimator を使用してトレーニングにフォールトトレランスを追加する方法を示します。次に、Tensorflow 2 でのトレーニングにフォールトトレランスを実装する 2 つの方法を学習します。

  • Keras Model.fit API を使用する場合、tf.keras.callbacks.BackupAndRestore コールバックを渡すことができます。

  • カスタムトレーニングループ(tf.GradientTapeを使用)を使用する場合、tf.train.Checkpoint および tf.train.CheckpointManager API を使用してチェックポイントを任意に保存できます。

これらの方法は両方とも、チェックポイントファイルのトレーニング状態をバックアップおよび復元します。

セットアップ

tf.keras.callbacks.BackupAndRestoresave_freq 引数を使用した特定のステップでのチェックポイント保存の頻度が TensorFlow 2.10 から導入されたため、tf-nightly をインストールします。

!pip install tf-nightly
import tensorflow.compat.v1 as tf1 import tensorflow as tf import numpy as np import tempfile import time
mnist = tf.keras.datasets.mnist (x_train, y_train),(x_test, y_test) = mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0

TensorFlow 1: tf.estimator.RunConfig でチェックポイントを保存する

TensorFlow 1 では、tf.estimator.RunConfig を構成して、各ステップでチェックポイントを保存するために tf.estimator を構成できます。

この例では、5 番目のチェックポイントで人為的にエラーをスローするフックを作成することから始めます。

class InterruptHook(tf1.train.SessionRunHook): # A hook for artificially interrupting training. def begin(self): self._step = -1 def before_run(self, run_context): self._step += 1 def after_run(self, run_context, run_values): if self._step == 5: raise RuntimeError('Interruption')

次に、すべてのチェックポイントを保存し、MNIST データセットを使用するように tf.estimator.Estimator を構成します。

feature_columns = [tf1.feature_column.numeric_column("x", shape=[28, 28])] config = tf1.estimator.RunConfig(save_summary_steps=1, save_checkpoints_steps=1) path = tempfile.mkdtemp() classifier = tf1.estimator.DNNClassifier( feature_columns=feature_columns, hidden_units=[256, 32], optimizer=tf1.train.AdamOptimizer(0.001), n_classes=10, dropout=0.2, model_dir=path, config = config ) train_input_fn = tf1.estimator.inputs.numpy_input_fn( x={"x": x_train}, y=y_train.astype(np.int32), num_epochs=10, batch_size=50, shuffle=True, )

モデルのトレーニングを開始します。前に定義したフックによって人為的な例外が発生します。

try: classifier.train(input_fn=train_input_fn, hooks=[InterruptHook()], max_steps=10) except Exception as e: print(f'{type(e).__name__}:{e}')

最後に保存されたチェックポイントを使用して tf.estimator.Estimator を再構築し、トレーニングを続行します。

classifier = tf1.estimator.DNNClassifier( feature_columns=feature_columns, hidden_units=[256, 32], optimizer=tf1.train.AdamOptimizer(0.001), n_classes=10, dropout=0.2, model_dir=path, config = config ) classifier.train(input_fn=train_input_fn, max_steps = 10)

TensorFlow 2: コールバックと Model.fit を使用したバックアップと復元

TensorFlow 2 では、トレーニングに Keras Model.fit API を使用する場合、tf.keras.callbacks.BackupAndRestore コールバックを提供してフォールトトレランス機能を追加できます。

これを実証するために、最初に 4 番目のエポックチェックポイントで人為的にエラーをスローする Keras Callback クラスを定義することから始めます。

class InterruptAtEpoch(tf.keras.callbacks.Callback): # A callback for artificially interrupting training. def __init__(self, interrupting_epoch=3): self.interrupting_epoch = interrupting_epoch def on_epoch_end(self, epoch, log=None): if epoch == self.interrupting_epoch: raise RuntimeError('Interruption')

次に、単純な Keras モデルを定義してインスタンス化し、損失関数を定義して Model.compile を呼び出し、エポックの境界で一時ディレクトリにチェックポイントを保存する tf.keras.callbacks.BackupAndRestore コールバックを設定します。

def create_model(): return tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(512, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10) ]) loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) model = create_model() model.compile(optimizer='adam', loss=loss, metrics=['accuracy']) log_dir = tempfile.mkdtemp() backup_restore_callback = tf.keras.callbacks.BackupAndRestore( backup_dir = log_dir)

Model.fit でモデルのトレーニングを開始します。トレーニング中、上記でインスタンス化された tf.keras.callbacks.BackupAndRestore のおかげでチェックポイントが保存されますが、InterruptAtEpoch クラスは 4 番目のエポックの後に失敗をシミュレートするために人為的な例外を発生させます。

try: model.fit(x=x_train, y=y_train, epochs=10, steps_per_epoch=100, validation_data=(x_test, y_test), callbacks=[backup_restore_callback, InterruptAtEpoch()]) except Exception as e: print(f'{type(e).__name__}:{e}')

次に、Keras モデルをインスタンス化し、Model.compile を呼び出し、以前に保存したチェックポイントから Model.fit を使用してモデルのトレーニングを続けます。

model = create_model() model.compile(optimizer='adam', loss=loss, metrics=['accuracy'], steps_per_execution=10) model.fit(x=x_train, y=y_train, epochs=10, steps_per_epoch=100, validation_data=(x_test, y_test), callbacks=[backup_restore_callback])

140 番目のステップで人為的にエラーをスローする別の Callback クラスを定義します。

class InterruptAtStep(tf.keras.callbacks.Callback): # A callback for artificially interrupting training. def __init__(self, interrupting_step=140): self.total_step_count = 0 self.interrupting_step = interrupting_step def on_batch_begin(self, batch, logs=None): self.total_step_count += 1 def on_batch_end(self, batch, logs=None): if self.total_step_count == self.interrupting_step: print("\nInterrupting at step count", self.total_step_count) raise RuntimeError('Interruption')

注意: このセクションでは、Tensorflow 2.10 がリリースされるまで tf-nightly でのみ利用可能な機能を使用します。

チェックポイントが 30 ステップごとに保存されるようにするには、BackupAndRestore コールバックの save_freq30 に設定します。 InterruptAtStep は、エポック 1 およびステップ 40(合計ステップ数 140)での失敗をシミュレートするために人為的な例外を発生させます。チェックポイントは、エポック 1 とステップ 20 で最後に保存されます。

log_dir_2 = tempfile.mkdtemp() backup_restore_callback = tf.keras.callbacks.BackupAndRestore( backup_dir = log_dir_2, save_freq=30 ) model = create_model() model.compile(optimizer='adam', loss=loss, metrics=['accuracy']) try: model.fit(x=x_train, y=y_train, epochs=10, steps_per_epoch=100, validation_data=(x_test, y_test), callbacks=[backup_restore_callback, InterruptAtStep()]) except Exception as e: print(f'{type(e).__name__}:{e}')

次に、Keras モデルをインスタンス化し、Model.compile を呼び出し、以前に保存したチェックポイントから Model.fit を使用してモデルのトレーニングを続けます。エポック 2 とステップ 21 からトレーニングが開始されることに注意してください。

model = create_model() model.compile(optimizer='adam', loss=loss, metrics=['accuracy'], steps_per_execution=10) model.fit(x=x_train, y=y_train, epochs=10, steps_per_epoch=100, validation_data=(x_test, y_test), callbacks=[backup_restore_callback])

TensorFlow 2: カスタムトレーニングループを使用して手動チェックポイントを作成する

TensorFlow 2 でカスタムトレーニングループを使用する場合、tf.train.Checkpoint および tf.train.CheckpointManager API を使用してフォールトトレランスメカニズムを実装できます。

この例は、次の方法を示しています。

  • tf.train.Checkpoint オブジェクトを使用してチェックポイントを手動で作成します。保存する追跡可能なオブジェクトが属性として設定されます。

  • 複数のチェックポイントを管理するには、tf.train.CheckpointManager を使用します。

Keras モデル、オプティマイザ、および損失関数を定義してインスタンス化することから始めます。次に、追跡可能な状態を持つ 2 つのオブジェクト(モデルとオプティマイザ)を管理する Checkpoint と、いくつかのチェックポイントを一時ディレクトリに記録して保持するための CheckpointManager を作成します。

model = create_model() optimizer = tf.keras.optimizers.SGD(learning_rate=0.001) loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) log_dir = tempfile.mkdtemp() epochs = 5 steps_per_epoch = 5 checkpoint = tf.train.Checkpoint(model=model, optimizer=optimizer) checkpoint_manager = tf.train.CheckpointManager( checkpoint, log_dir, max_to_keep=2)

ここで、新しいエポックが開始されるたびに最初のエポックの後に最後のチェックポイントが読み込まれるカスタムトレーニングループを実装します。

for epoch in range(epochs): if epoch > 0: tf.train.load_checkpoint(save_path) print(f"\nStart of epoch {epoch}") for step in range(steps_per_epoch): with tf.GradientTape() as tape: logits = model(x_train, training=True) loss_value = loss_fn(y_train, logits) grads = tape.gradient(loss_value, model.trainable_weights) optimizer.apply_gradients(zip(grads, model.trainable_weights)) save_path = checkpoint_manager.save() print(f"Checkpoint saved to {save_path}") print(f"Training loss at step {step}: {loss_value}")

Next steps

TensorFlow 2 のフォールトトレランスとチェックポイントの詳細については、次のドキュメントを参照してください。

  • tf.keras.callbacks.BackupAndRestore コールバック API ドキュメント。

  • tf.train.Checkpoint および tf.train.CheckpointManager API ドキュメント。

  • 書き込みチェックポイントセクションを含むトレーニングチェックポイントガイド。

分散トレーニングに関連する次の資料も役立つ場合があります。