Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ja/tutorials/distribute/parameter_server_training.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

ParameterServerStrategy でパラメータサーバーをトレーニングする

概要

パラメータサーバートレーニングは、複数のマシンでモデルトレーニングをスケールアップするための一般的なデータ並列方法です。

パラメータサーバートレーニング クラスタは、ワーカーパラメータサーバーで構成されます。変数はパラメータサーバーで作成され、各ステップでワーカーにより読み取られ、更新されます。 デフォルトでは、ワーカーは相互に同期することなく、これらの変数を個別に読み取り、更新します。そのため、パラメータサーバースタイルのトレーニングは非同期トレーニングと呼ばれます。

TensorFlow 2 では、パラメータサーバートレーニングは tf.distribute.ParameterServerStrategy クラスによって行われます。このクラスは、数千のワーカーにスケールアップするクラスタにトレーニングステップを分散します (パラメータサーバーを伴う)。

サポートされているトレーニング方法

サポートされている主なトレーニング方法は 2 つあります。

ジョブとタスクのクラスタ

選択した API (Model.fit またはカスタムトレーニングループ) に関係なく、TensorFlow 2 の分散トレーニングには、複数の 'jobs' があり、各ジョブには 1 つ以上の 'task' がある場合があります。

パラメータサーバートレーニングを使用する場合は、次を推薦します。

  • 1 つの コーディネータジョブ (ジョブ名は chief)

  • 複数のワーカージョブ (ジョブ名は worker)

  • 複数のパラメータサーバージョブ (ジョブ名は ps)

コーディネータは、リソースを作成し、トレーニングタスクをディスパッチし、チェックポイントを書き込み、タスクの失敗に対処します。ワーカーパラメータサーバーは、コーディネータからのリクエストをリッスンする tf.distribute.Server インスタンスを実行します。

Model.fit API を使用したパラメータサーバートレーニング

Model.fit API を使用したパラメータサーバートレーニングでは、コーディネータが tf.distribute.ParameterServerStrategy オブジェクトを使用する必要があります。Model.fit をストラテジーなしで使用する場合や他のストラテジーを使用する場合と同様に、ワークフローには、モデルの作成とコンパイル、コールバックの準備、および Model.fit の呼び出しが含まれます。

カスタムトレーニングループを使用したパラメータサーバートレーニング

カスタムトレーニングループでは、tf.distribute.coordinator.ClusterCoordinator クラスがコーディネータに使用される重要なコンポーネントです。

  • ClusterCoordinator クラスは、tf.distribute.ParameterServerStrategy オブジェクトと連携して動作する必要があります。

  • この tf.distribute.Strategy オブジェクトは、クラスタの情報を提供するために必要であり、tf.distribute.Strategy を使用したカスタムトレーニングで示されているように、トレーニングステップを定義するために使用されます。

  • ClusterCoordinator オブジェクトは、これらのトレーニング ステップの実行をリモートワーカーにディスパッチします。

ClusterCoordinator オブジェクトにより提供される最も重要な API は schedule です。

  • schedule API は tf.function をキューに入れ、future-like の RemoteValue をすぐに返します。

  • キューに入れられた関数は、バックグラウンドスレッドでリモートワーカーにディスパッチされ、その RemoteValue は非同期で埋められます。

  • schedule はワーカーの割り当てを必要としないため、渡された tf.function は使用可能な任意のワーカーで実行できます。

  • 関数が実行されたワーカーが完了前に利用できなくなった場合、別の利用可能なワーカーで再試行されます。

  • そのため、そして関数の実行がアトミックではないために、1 つの関数の呼び出しが複数回実行される場合があります。

ClusterCoordinator は、リモート関数のディスパッチに加えて、すべてのワーカーでデータセットを作成し、ワーカーが障害から回復したときにこれらのデータセットを再構築するのにも役立ちます。

チュートリアルのセットアップ

チュートリアルのセクションは Model.fit とカスタムトレーニングループ向けに分かれています。「X を使用したトレーニング」以外のセクションは、両方に適用されます。

!pip install portpicker
#@title import multiprocessing import os import random import portpicker import tensorflow as tf

クラスタのセットアップ

前述のように、パラメータサーバートレーニングクラスタには、トレーニングプログラムを実行するコーディネータタスク、1 つまたは複数のワーカー、TensorFlow サーバーを実行するパラメータサーバータスク (tf.distribute.Server) が必要です。場合によっては、サイドカー評価を実行する追加の評価タスクが必要です (以下のサイドカー評価セクションを参照してください)。これらを設定するための要件は次のとおりです。

  • コーディネータタスクは、エバリュエータを除く他のすべての TensorFlow サーバーのアドレスとポートを知る必要があります。

  • ワーカーとパラメータサーバーは、リッスンする必要があるポートを知る必要があります。通常、これらのタスクで TensorFlow サーバーを作成するときに、完全なクラスタの情報を渡します。

  • エバリュエータタスクは、トレーニングクラスタの設定を知る必要はありません。知っている場合でも、トレーニングクラスタへの接続を試みるべきではありません。

  • ワーカーとパラメータサーバーには、それぞれ "worker""ps" のタスクタイプが必要です。コーディネータは、タスクタイプとして従来の "chief" を使用する必要があります。

このチュートリアルでは、インプロセスのクラスタを作成し、パラメータサーバーのトレーニング全体を Colab で実行できるようにします。実際のクラスタの設定方法については、後のセクションで説明します。

インプロセス クラスタ

事前にいくつかの TensorFlow サーバーを作成することから始め、後でそれらに接続します。これは、チュートリアルのデモを目的としており、実際のトレーニングでは、サーバーは "worker" および "ps" マシンで起動されることに注意してください。

def create_in_process_cluster(num_workers, num_ps): """Creates and starts local servers and returns the cluster_resolver.""" worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)] ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)] cluster_dict = {} cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports] if num_ps > 0: cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports] cluster_spec = tf.train.ClusterSpec(cluster_dict) # Workers need some inter_ops threads to work properly. worker_config = tf.compat.v1.ConfigProto() if multiprocessing.cpu_count() < num_workers + 1: worker_config.inter_op_parallelism_threads = num_workers + 1 for i in range(num_workers): tf.distribute.Server( cluster_spec, job_name="worker", task_index=i, config=worker_config, protocol="grpc") for i in range(num_ps): tf.distribute.Server( cluster_spec, job_name="ps", task_index=i, protocol="grpc") cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver( cluster_spec, rpc_layer="grpc") return cluster_resolver # Set the environment variable to allow reporting worker and ps failure to the # coordinator. This is a workaround and won't be necessary in the future. os.environ["GRPC_FAIL_FAST"] = "use_caller" NUM_WORKERS = 3 NUM_PS = 2 cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

インプロセスクラスタのセットアップは、ユニットテストでよく使用されます (こちらを参照)。

ローカルテストのもう 1 つのオプションは、ローカルマシンでプロセスを起動することです。このアプローチの例については、Keras を使用したマルチワーカートレーニングを参照してください。

ParameterServerStrategy をインスタンス化する

トレーニング コードに入る前に、tf.distribute.ParameterServerStrategy オブジェクトをインスタンス化します。これは、Model.fit とカスタムトレーニングループのどちらを使用している場合でも必要であることに注意してください。variable_partitioner 引数については、変数シャーディングのセクションで説明します。

variable_partitioner = ( tf.distribute.experimental.partitioners.MinSizePartitioner( min_shard_bytes=(256 << 10), max_shards=NUM_PS)) strategy = tf.distribute.ParameterServerStrategy( cluster_resolver, variable_partitioner=variable_partitioner)

トレーニングに GPU を使用するには、各ワーカーに表示される GPU を割り当てます。 ParameterServerStrategy は、各ワーカーで利用可能なすべての GPU を使用しますが、すべてのワーカーが同じ数の GPU を利用できる必要があるという制限があります。

変数のシャーディング

変数のシャーディングとは、変数をシャードと呼ばれる複数の小さな変数に分割することです。変数のシャーディングは、これらのシャードにアクセスする際のネットワーク負荷を分散するのに役立つ場合があります。また、1 台のマシンのメモリに収まらない非常に大きな埋め込みを使用する場合など、通常の変数の計算と格納を複数のパラメータサーバーに分散することもできます。

変数シャーディングを有効にするには、ParameterServerStrategy オブジェクトを構築する際に variable partitioner を渡します。variable_partitioner は、変数が作成されるたびに呼び出され、変数の各次元に沿ってシャードの数を返すことが期待されます。tf.distribute.experimental.partitioners.MinSizePartitioner など、すぐに使える variable_partitioner がいくつか提供されています。tf.distribute.experimental.partitioners.MinSizePartitioner のようなサイズベースのパーティショナーを使用して、モデルのトレーニング速度に悪影響を及ぼす可能性のある小さな変数のパーティショニングを避けることをお勧めします。

variable_partitioner が渡され、Strategy.scope のすぐ下に変数を作成すると、その変数は variables プロパティを持つコンテナタイプになり、シャードのリストへのアクセスを提供します。ほとんどの場合、このコンテナは、すべてのシャードを連結することによって自動的にテンソルに変換されるので、通常の変数として使用できます。一方、tf.nn.embedding_lookup などの一部の TensorFlow メソッドは、このコンテナタイプの効率的な実装を提供し、これらのメソッドでは自動連結が回避されます。

詳細については、tf.distribute.ParameterServerStrategy の API ドキュメントを参照してください。

Model.fit でトレーニングする

Keras は、Model.fit を介して使いやすいトレーニング API を提供します。これは、内部でトレーニングループを処理し、オーバーライド可能な柔軟な train_step や TensorBoard のチェックポイントの保存やサマリーの保存などの機能を提供するコールバックを備えています。Model.fit を使用すると、ストラテジーオブジェクトを簡単に交換するだけで、同じトレーニングコードを他のストラテジーで使用できます。

入力データ

tf.distribute.ParameterServerStrategy を使用する Keras Model.fit では、tf.data.Datasettf.distribute.DistributedDataset の形式の入力データを使えます。または、tf.keras.utils.experimental.DatasetCreatorDataset は使いやすい推奨されるオプションです。ただし、Dataset を使用してメモリの問題が発生した場合は、呼び出し可能な dataset_fn 引数を指定して DatasetCreator を使用する必要がある場合があります (詳細については、tf .keras.utils.experimental.DatasetCreator API ドキュメントを参照してください)。

データセットを tf.data.Dataset に変換する場合は、以下の例で示されているように、Dataset.shuffleDataset.repeat を使用する必要があります。

  • パラメータサーバートレーニングを使用する Keras Model.fit では、異なる方法でシャッフルされる場合を除いて、各ワーカーが同じデータセットを受け取ることを前提としています。したがって、Dataset.shuffle を呼び出すことで、データをより均等にイテレーションできます。

  • ワーカーは同期しないため、データセットの処理の終了時が異なる場合があります。Dataset.repeat を使用するとパラメータサーバートレーニングでエポックを簡単に定義できます。これは、引数なしで呼び出された場合にデータセットを無期限に繰り返し、Model.fit 呼び出しで steps_per_epoch 引数を指定します。

shufflerepeat の詳細については、tf.data ガイドの「トレーニング ワークフロー」セクションを参照してください。

global_batch_size = 64 x = tf.random.uniform((10, 10)) y = tf.random.uniform((10,)) dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat() dataset = dataset.batch(global_batch_size) dataset = dataset.prefetch(2)

代わりに tf.keras.utils.experimental.DatasetCreator でデータセットを作成すると、dataset_fn のコードは、各ワーカーマシンの入力デバイス (通常は CPU) で呼び出されます。

モデルの構築とコンパイル

まず、tf.keras.Model (デモ用の自明な tf.keras.models.Sequential モデル) を作成し、次に Model.compile を呼び出して、オプティマイザー、メトリックなどのコンポーネント、および steps_per_execution などのその他のパラメータを組み込みます。

with strategy.scope(): model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)]) model.compile(tf.keras.optimizers.legacy.SGD(), loss="mse", steps_per_execution=10)

コールバックとトレーニング

実際のトレーニングのために Keras Model.fit を呼び出す前に、次のような一般的なタスクに必要なコールバックを準備します。

  • tf.keras.callbacks.ModelCheckpoint: 各エポック後など、特定の頻度でモデルを保存します。

  • tf.keras.callbacks.BackupAndRestore: クラスタが使用できなくなった場合 (アボートやプリエンプションなど)、モデルとその時点のエポック番号をバックアップすることで耐障害性を提供します。その後、ジョブの失敗からの再開時にトレーニング状態を復元し、中断されたエポックの最初からトレーニングを続行できます。

  • tf.keras.callbacks.TensorBoard: サマリーファイルにモデル ログを定期的に書き込みます。これは、TensorBoard ツールで視覚化できます。

注意: パフォーマンスを維持するために、ParameterServerStrategy で使用する場合、カスタムコールバックでバッチレベルのコールバックをオーバーライドすることはできません。カスタムコールバックをエポックレベルの呼び出しに変更し、steps_per_epoch を適切な値に調整してください。また、steps_per_epoch は、ParameterServerStrategy と併用する場合、Model.fit に必須の引数です。

working_dir = "/tmp/my_working_dir" log_dir = os.path.join(working_dir, "log") ckpt_filepath = os.path.join(working_dir, "ckpt") backup_dir = os.path.join(working_dir, "backup") callbacks = [ tf.keras.callbacks.TensorBoard(log_dir=log_dir), tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath), tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir), ] model.fit(dataset, epochs=5, steps_per_epoch=20, callbacks=callbacks)

ClusterCoordinator で直接使用する (オプション)

Model.fit トレーニングを選択した場合でも、必要に応じて tf.distribute.coordinator.ClusterCoordinator オブジェクトをインスタンス化して、ワーカーで実行する他の関数をスケジュールできます。詳細と例については、カスタムトレーニングループを使ったトレーニングのセクションを参照してください。

カスタムトレーニングループを使ったトレーニング

tf.distribute.Strategy でカスタムトレーニングループを使用すると、トレーニングループを非常に柔軟に定義できます。上で (strategy として) 定義された ParameterServerStrategy を使用して、tf.distribute.coordinator.ClusterCoordinator を使用して、トレーニングステップの実行をリモートワーカーにディスパッチできます。

次に、他の tf.distribute.Strategy のトレーニングループで行ったように、モデルを作成し、データセットを定義し、ステップ関数を定義します。詳細については、tf.distribute.Strategy を使用したカスタムトレーニング チュートリアルを参照してください。

効率的にデータセットをプリフェッチするには、以下のリモートワーカーにトレーニングステップをディスパッチするセクションで説明されている、推奨される分散データセット作成 API を使用してください。また、ワーカーに割り当てられた GPU を最大限に活用するために、worker_fn 内で Strategy.run を呼び出してください。 残りのステップは、トレーニングで GPU を使用する場合でも使用しない場合でも同じです。

次の手順でこれらのコンポーネントを作成します。

データのセットアップ

まず、データセットを作成する関数を作成します。

Keras 前処理レイヤーまたは Tensorflow 変換レイヤーでデータを前処理する場合は、他の Keras レイヤーに対して行うようにこれらのレイヤーを dataset_fn の外、および、Strategy.scope の下に作成します。これは、dataset_fntf.function にラップされ、各ワーカーで実行されてデータパイプラインが生成されるためです。

上記の手順に従わずにレイヤーを作成すると、tf.function からコーディネータにリフトされる Tensorflow 状態が作成され、ワーカーでそれらにアクセスすると、コーディネータとワーカーの間で繰り返し RPC 呼び出しが発生し、速度が大幅に低下する可能性があります。

Strategy.scope の下にレイヤーを配置すると、代わりにすべてのワーカーにレイヤーが作成され、tf.data.Dataset.map を介して dataset_fn 内に変換を適用します。分散入力によるデータの前処理の詳細については、分散入力チュートリアルのデータの前処理を参照してください。

feature_vocab = [ "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman" ] label_vocab = ["yes", "no"] with strategy.scope(): feature_lookup_layer = tf.keras.layers.StringLookup( vocabulary=feature_vocab, mask_token=None) label_lookup_layer = tf.keras.layers.StringLookup( vocabulary=label_vocab, num_oov_indices=0, mask_token=None) raw_feature_input = tf.keras.layers.Input( shape=(3,), dtype=tf.string, name="feature") feature_id_input = feature_lookup_layer(raw_feature_input) feature_preprocess_stage = tf.keras.Model( {"features": raw_feature_input}, feature_id_input) raw_label_input = tf.keras.layers.Input( shape=(1,), dtype=tf.string, name="label") label_id_input = label_lookup_layer(raw_label_input) label_preprocess_stage = tf.keras.Model( {"label": raw_label_input}, label_id_input)

データセットでトイサンプルを生成します。

def feature_and_label_gen(num_examples=200): examples = {"features": [], "label": []} for _ in range(num_examples): features = random.sample(feature_vocab, 3) label = ["yes"] if "avenger" in features else ["no"] examples["features"].append(features) examples["label"].append(label) return examples examples = feature_and_label_gen()

次に、dataset_fn にラップされたトレーニングデータセットを作成します。

def dataset_fn(_): raw_dataset = tf.data.Dataset.from_tensor_slices(examples) train_dataset = raw_dataset.map( lambda x: ( {"features": feature_preprocess_stage(x["features"])}, label_preprocess_stage(x["label"]) )).shuffle(200).batch(32).repeat() return train_dataset

モデルを構築する

次に、モデルとその他のオブジェクトを作成します。必ず Strategy.scope の下にすべての変数を作成します。

# These variables created under the `Strategy.scope` will be placed on parameter # servers in a round-robin fashion. with strategy.scope(): # Create the model. The input needs to be compatible with Keras processing layers. model_input = tf.keras.layers.Input( shape=(3,), dtype=tf.int64, name="model_input") emb_layer = tf.keras.layers.Embedding( input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384) emb_output = tf.reduce_mean(emb_layer(model_input), axis=1) dense_output = tf.keras.layers.Dense( units=1, activation="sigmoid", kernel_regularizer=tf.keras.regularizers.L2(1e-4), )(emb_output) model = tf.keras.Model({"features": model_input}, dense_output) optimizer = tf.keras.optimizers.legacy.RMSprop(learning_rate=0.1) accuracy = tf.keras.metrics.Accuracy()

FixedShardsPartitioner の使用により、すべての変数が 2 つのシャードに分割され、各シャードが異なるパラメータサーバーに割り当てられたことを確認します。

assert len(emb_layer.weights) == 2 assert emb_layer.weights[0].shape == (4, 16384) assert emb_layer.weights[1].shape == (4, 16384) print(emb_layer.weights[0].device) print(emb_layer.weights[1].device)

トレーニングステップを定義する

3 番目に、tf.function にラップされたトレーニングステップを作成します。

@tf.function def step_fn(iterator): def replica_fn(batch_data, labels): with tf.GradientTape() as tape: pred = model(batch_data, training=True) per_example_loss = tf.keras.losses.BinaryCrossentropy( reduction=tf.keras.losses.Reduction.NONE)(labels, pred) loss = tf.nn.compute_average_loss(per_example_loss) model_losses = model.losses if model_losses: loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses)) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64) accuracy.update_state(labels, actual_pred) return loss batch_data, labels = next(iterator) losses = strategy.run(replica_fn, args=(batch_data, labels)) return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

上記のトレーニングステップ関数では、step_fn における Strategy.runStrategy.reduce の呼び出しでワーカーごとに複数の GPU をサポートできます。ワーカーに GPU が割り当てられている場合、Strategy.run は複数のレプリカ(GPU)でデータセットを分散します。これらの tf.nn.compute_average_loss() への同時呼び出しは、ワーカーの合計数に関係なく、1 つのワーカーのレプリカ(GPU)間で損失の平均を計算します。

リモートワーカーにトレーニングステップをディスパッチする

すべての計算が ParameterServerStrategy によって定義された後、tf.distribute.coordinator.ClusterCoordinator クラスを使用してリソースを作成し、トレーニングステップをリモートワーカーに分散します。

まず、ClusterCoordinator オブジェクトを作成し、ストラテジーオブジェクトを渡します。

coordinator = tf.distribute.coordinator.ClusterCoordinator(strategy)

次に、ClusterCoordinator.create_per_worker_dataset API を使用して、ワーカーごとのデータセットと反復子を作成します。これにより、データセットがすべてのワーカーに複製されます。以下の per_worker_dataset_fn では、dataset_fnstrategy.distribute_datasets_from_function にラップして、GPU へ効率的にプリフェッチを実行できるようにすることを推薦します。

@tf.function def per_worker_dataset_fn(): return strategy.distribute_datasets_from_function(dataset_fn) per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn) per_worker_iterator = iter(per_worker_dataset)

最後のステップは、Cluster Coordinator.schedule を使用して計算をリモートワーカーに分散することです。

  • schedule メソッドは tf.function をキューに入れ、future-like の RemoteValue をすぐに返します。キューに入れられた関数はバックグラウンドスレッドでリモートワーカーにディスパッチされ、RemoteValue は非同期で入力されます。

  • join メソッド (ClusterCoordinator.join) は、スケジュールされたすべての関数が実行されるまで待機するために使用します。

num_epochs = 4 steps_per_epoch = 5 for i in range(num_epochs): accuracy.reset_states() for _ in range(steps_per_epoch): coordinator.schedule(step_fn, args=(per_worker_iterator,)) # Wait at epoch boundaries. coordinator.join() print("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))

Remote Value の結果を取得する方法は次のとおりです。

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,)) print("Final loss is %f" % loss.fetch())

または、すべてのステップを起動して、完了するのを待っている間に何かを行うこともできます。

for _ in range(total_steps): coordinator.schedule(step_fn, args=(per_worker_iterator,)) while not coordinator.done(): time.sleep(10) # Do something like logging metrics or writing checkpoints.

この特定の例の完全なトレーニングとサービングのワークフローについては、このテストを参照してください。

データセット作成の詳細

上記のコードのデータセットは、ClusterCoordinator.create_per_worker_dataset API を使用して作成されます。ワーカーごとに 1 つのデータセットを作成し、コンテナオブジェクトを返します。その上で iter メソッドを呼び出して、ワーカーごとの反復子を作成できます。ワーカーごとの反復子には、ワーカーごとに 1 つの反復子が含まれ、特定のワーカーで関数が実行される前に、ClusterCoordinator.schedule メソッドに渡される関数の入力引数で、ワーカーの対応するスライスが置き換えられます。

ClusterCoordinator.schedule メソッドは、ワーカーが同等で、異なるワーカーのデータセットが同じであると想定しています (ただし、異なる方法でシャッフルされる可能性があります)。そのため、データセットから OutOfRangeError を受け取ることに依存せず、データセットを繰り返し、有限数のステップをスケジュールすることも推薦します。

もう 1 つの重要な注意点は、tf.data データセットは、タスク境界を越えた暗黙的なシリアル化と逆シリアル化をサポートしていないということです。そのため、ClusterCoordinator.create_per_worker_dataset に渡される関数内でデータセット全体を作成することが重要です。create_per_worker_dataset API は、tf.data.Dataset または tf.distribute.DistributedDataset を入力として直接受け取ることもできます。

評価

tf.distribute.ParameterServerStrategy トレーニングで評価を実行する 2 つの主な方法は、インライン評価とサイドカー評価です。以下に説明するように、それぞれに長所と短所があります。特にこだわりがない場合は、インライン評価方法を推薦します。Model.fit を使用しているユーザーの場合、Model.evaluate は内部でインライン(分散)評価を使用しています。

インライン評価

インライン評価では、コーディネータがトレーニングと評価を交互に行います。

インライン評価には、以下のようないくつかの利点があります。

  • 単一のタスクでは保持できない大規模な評価モデルと評価データセットをサポートできます。

  • 評価結果を使用して、次のエポックのトレーニングに関する決定を下すことができます (トレーニングを早期に停止するかどうかなど)。

インライン評価を実装するには、直接評価と分散評価の 2 つの方法があります。

  • 直接評価: 小規模なモデルと評価データセットの場合、コーディネータは、コーディネータ上の評価データセットを使用して、分散モデルで直接評価を実行できます。

eval_dataset = tf.data.Dataset.from_tensor_slices( feature_and_label_gen(num_examples=16)).map( lambda x: ( {"features": feature_preprocess_stage(x["features"])}, label_preprocess_stage(x["label"]) )).batch(8) eval_accuracy = tf.keras.metrics.Accuracy() for batch_data, labels in eval_dataset: pred = model(batch_data, training=False) actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64) eval_accuracy.update_state(labels, actual_pred) print("Evaluation accuracy: %f" % eval_accuracy.result())
  • 分散評価: コーディネータで直接実行することが不可能な大規模なモデルまたはデータセットの場合、コーディネータタスクは、ClusterCoordinator.schedule/ClusterCoordinator.join メソッドを介して評価タスクをワーカーに分散できます。

with strategy.scope(): # Define the eval metric on parameter servers. eval_accuracy = tf.keras.metrics.Accuracy() @tf.function def eval_step(iterator): def replica_fn(batch_data, labels): pred = model(batch_data, training=False) actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64) eval_accuracy.update_state(labels, actual_pred) batch_data, labels = next(iterator) strategy.run(replica_fn, args=(batch_data, labels)) def eval_dataset_fn(): return tf.data.Dataset.from_tensor_slices( feature_and_label_gen(num_examples=16)).map( lambda x: ( {"features": feature_preprocess_stage(x["features"])}, label_preprocess_stage(x["label"]) )).shuffle(16).repeat().batch(8) per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn) per_worker_eval_iterator = iter(per_worker_eval_dataset) eval_steps_per_epoch = 2 for _ in range(eval_steps_per_epoch): coordinator.schedule(eval_step, args=(per_worker_eval_iterator,)) coordinator.join() print("Evaluation accuracy: %f" % eval_accuracy.result())

1 回限りの評価を有効にする

tf.distribute.coordinator.ClusterCoordinatorschedulejoin メソッドは、デフォルトで、評価保証または 1 回限りのセマンティクスをサポートしていません。言い換えると、上記の例では、データセット内のすべての評価例がちょうど 1 回実行される保証がない、評価されないものや数回評価されるものがある、ということです。

エポック間での評価の分散を軽減し、早期停止やハイパーパラメータのチューニングなどの方法で行われるモデルの選択を改善するには、1 回限りの評価が好ましい可能性があります。1 回限りの評価は、以下のように様々な方法で有効にできます。

  • Model.fit/.evaluate ワークフローを使用すると、Model.compile に引数を追加することで有効にできます。ドキュメントで pss_evaluation_shards 引数をご覧ください。

  • tf.data サービス API は、ParameterServerStrategy を使用する場合に 1 回限りの評価を提供できます(tf.data.experimental.service API ドキュメントの動的シャーディングセクションをご覧ください)。

  • サイドカー評価は単一のマシン上で実行されるため、デフォルトで 1 回限りの評価を提供します。ただし、多数のワーカーに分散される評価をじっこうするよりも大幅に低速な場合があります。

Model.compile を使用する最初のオプションは、ほとんどのユーザーに提案されるソリューションです。

1 回限りの評価には、以下のような制限があります。

  • 1 回限りの評価保証でカスタム分散評価ループを記述することはサポートされていません。このサポートが必要な場合は、GitHub 課題を提出してください。

  • Layer.add_metric API を使用するメトリクスの計算は、自動的に処理されません。これらを評価から除外するか、Metric オブジェクトに組み込むように作り直す必要があります。

サイドカー評価

サイドカー評価は、tf.distribute.ParameterServerStrategy トレーニングで評価ループを定義して実行する別の方法で、最新のチェックポイントでチェックポイントを繰り返し読み取り評価を実行する専用の評価タスクを作成します。(チェックポイントの詳細については、このガイドを参照してください)。コーディネータータスクとワーカータスクは評価に時間を費やさないため、反復回数が一定であれば、全体のトレーニング時間は他の評価方法を使用するよりも短くなります。ただし、評価をトリガーするには、追加のエバリュエータタスクと定期的なチェックポイントが必要です。

サイドカー評価の評価ループを作成するには、次の 2 つのオプションがあります。

  1. tf.keras.utils.SidecarEvaluator API を使用する。

  2. カスタム評価ループを作成する。

オプション 1 の詳細については、tf.keras.utils.SidecarEvaluator API ドキュメントを参照してください。

サイドカー評価は、単一のタスクでのみサポートされています。 これは、次のことを意味します。

  • 各サンプルが 1 回評価されることが保証されます。エバリュエータがプリエンプトまたは再起動された場合、最新のチェックポイントから評価ループを再起動し、再起動前に行われた部分的な評価の進行状況は破棄されます。

  • ただし、単一のタスクで評価を実行すると、完全な評価に時間がかかる可能性があります。

  • モデルのサイズが大きすぎてエバリュエータのメモリに収まらない場合、単一のサイドカー評価は適用されません。

もう 1 つの注意点は、tf.keras.utils.SidecarEvaluator の実装と以下のカスタム評価ループが、一部のチェックポイントをスキップする可能性があるということです。利用可能な最新のチェックポイントは、常に取得され、評価エポック中に複数のチェックポイントがトレーニングクラスタから生成されるからです。すべてのチェックポイントを評価するカスタム評価ループを作成できますが、このチュートリアルでは扱いません。一方、評価の実行にかかる時間よりもチェックポイントの生成頻度が低い場合は、アイドル状態になる可能性があります。

カスタム評価ループを使用すると、評価するチェックポイントを選択したり、評価とともに実行する追加のロジックを提供したりするなど、詳細を制御できます。以下は、カスタムサイドカー評価ループの例です。

checkpoint_dir = ... eval_model = ... eval_data = ... checkpoint = tf.train.Checkpoint(model=eval_model) for latest_checkpoint in tf.train.checkpoints_iterator( checkpoint_dir): try: checkpoint.restore(latest_checkpoint).expect_partial() except (tf.errors.OpError,) as e: # checkpoint may be deleted by training when it is about to read it. continue # Optionally add callbacks to write summaries. eval_model.evaluate(eval_data) # Evaluation finishes when it has evaluated the last epoch. if latest_checkpoint.endswith('-{}'.format(train_epochs)): break

現実世界のクラスタ

注意: このセクションは、このページのチュートリアルコードを実行するためには必要ありません。

実際の運用環境では、すべてのタスクをさまざまなマシンのさまざまなプロセスで実行します。各タスクでクラスタ情報を構成する最も簡単な方法は、"TF_CONFIG" 環境変数を設定し、tf.distribute.cluster_resolver.TFConfigClusterResolver を使用して "TF_CONFIG" を解析することです。

"TF_CONFIG" 環境変数の一般的な説明については、分散トレーニングガイドの「TF_CONFIG 環境変数の設定」を参照してください。

Kubernetes やその他の構成テンプレートを使用してトレーニングタスクを開始すると、これらのテンプレートにより “TF_CONFIG" が既に設定されている可能性があります。

"TF_CONFIG" 環境変数の設定

3 つのワーカーと 2 つのパラメータサーバーがあるとします。ワーカー 1 の "TF_CONFIG" は次のようになります。

os.environ["TF_CONFIG"] = json.dumps({ "cluster": { "worker": ["host1:port", "host2:port", "host3:port"], "ps": ["host4:port", "host5:port"], "chief": ["host6:port"] }, "task": {"type": "worker", "index": 1} })

エバリュエータの "TF_CONFIG" は次のとおりです。

os.environ["TF_CONFIG"] = json.dumps({ "cluster": { "evaluator": ["host7:port"] }, "task": {"type": "evaluator", "index": 0} })

上記のエバリュエータの "TF_CONFIG" 文字列の "cluster" の部分はオプションです

すべてのタスクで同じバイナリを使用する場合

単一のバイナリを使用してこれらすべてのタスクを実行する場合は、最初にプログラムをさまざまなロールに分岐させる必要があります。

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver() if cluster_resolver.task_type in ("worker", "ps"): # Start a TensorFlow server and wait. elif cluster_resolver.task_type == "evaluator": # Run sidecar evaluation else: # Run the coordinator.

次のコードは、TensorFlow サーバーを起動して待機します。これは、"worker" および "ps" ロールに役立ちます。

# Set the environment variable to allow reporting worker and ps failure to the # coordinator. This is a workaround and won't be necessary in the future. os.environ["GRPC_FAIL_FAST"] = "use_caller" server = tf.distribute.Server( cluster_resolver.cluster_spec(), job_name=cluster_resolver.task_type, task_index=cluster_resolver.task_id, protocol=cluster_resolver.rpc_layer or "grpc", start=True) server.join()

タスクの障害の処理

ワーカーの障害

tf.distribute.coordinator.ClusterCoordinator カスタムトレーニングループと Model.fit アプローチの両方が、ワーカーの障害に対する組み込みのフォールトトレランスを提供します。ワーカーの復旧時に、ClusterCoordinator はワーカーでデータセットの再作成を呼び出します。

パラメータサーバーまたはコーディネータの障害

コーディネータがパラメータサーバーエラーを検出すると、すぐに UnavailableError または AbortedError が発生します。この場合、コーディネータを再起動できます。また、コーディネータ自体も利用できなくなる可能性があるので、トレーニングの進行状況を失わないようにするためのツールを使用することを推薦します。

  • Model.fit の場合、進行状況の保存と復元を自動的に処理する BackupAndRestore コールバックを使用する必要があります。例については、上記のコールバックとトレーニング セクションを参照してください。

  • カスタムトレーニングループの場合、モデル変数を定期的にチェックポイントし、チェックポイントがある場合は、トレーニングを開始する前にモデル変数を読み込む必要があります。オプティマイザがチェックポイントされている場合、トレーニングの進行状況は optimizer.iterations からおおよそ推測できます。

checkpoint_manager = tf.train.CheckpointManager( tf.train.Checkpoint(model=model, optimizer=optimizer), checkpoint_dir, max_to_keep=3) if checkpoint_manager.latest_checkpoint: checkpoint = checkpoint_manager.checkpoint checkpoint.restore( checkpoint_manager.latest_checkpoint).assert_existing_objects_matched() global_steps = int(optimizer.iterations.numpy()) starting_epoch = global_steps // steps_per_epoch for _ in range(starting_epoch, num_epochs): for _ in range(steps_per_epoch): coordinator.schedule(step_fn, args=(per_worker_iterator,)) coordinator.join() checkpoint_manager.save()

RemoteValue のフェッチ

関数が正常に実行された場合、RemoteValue のフェッチは確実に成功します。これは、現在、関数が実行された後、戻り値がすぐにコーディネータにコピーされるためです。コピー中にワーカーに障害が発生した場合、関数は別の使用可能なワーカーで再試行されます。したがって、パフォーマンスを最適化するには、戻り値なしで関数をスケジュールします。

エラーレポート

コーディネータは、パラメータサーバーからの UnavailableError などのエラーや、tf.debugging.check_numerics からの InvalidArgument などの他のアプリケーションエラーを確認すると、エラーを発生する前に、保留中およびキューに入れられたすべての関数をキャンセルします。対応する RemoteValue をフェッチすると、CancelledError が発生します。

エラーが発生した後、コーディネータは同じエラーまたはキャンセルされた関数からのエラーを発生しません。

パフォーマンスの改善

tf.distribute.ParameterServerStrategytf.distribute.coordinator.ClusterCoordinator でトレーニングするときにパフォーマンスの問題が発生することがあります。

一般的に、パラメータサーバーの負荷が不均衡であり、負荷の高い一部のパラメータサーバーが制限容量に達した場合に発生します。 また、複数の根本原因が存在する場合もあります。この問題を軽減する簡単な方法は次のとおりです。

  1. ParameterServerStrategy を構築するときに variable_partitioner を指定して、大規模なモデルの変数を分割します。

  2. 次のようにして、すべてのパラメータサーバーで必要なホットスポット変数を 1 つのステップで作成することは避けてください。

  3. オプティマイザで一定の学習率またはサブクラス tf.keras.optimizers.schedules.LearningRateSchedule を使用します。これは、デフォルトの動作では、学習率は特定のパラメータサーバーに配置される変数になり、各ステップで他のすべてのパラメータサーバーによって要求されるためです。

  4. tf.keras.optimizers.legacy.Optimizer を使用します(標準の tf.keras.optimizers.Optimizer では、ホットスポット変数になる可能性があります)。

  5. 大きな語彙は、Keras の前処理レイヤーに渡す前にシャッフルします。

もう 1 つのパフォーマンスの問題の原因は、コーディネータです。 schedule/join の実装は Python ベースであるため、スレッドのオーバーヘッドが発生する場合があります。また、コーディネータとワーカー間の待ち時間が長くなる可能性があります。このような場合は、次のようにします。

  • Model.fit では、Model.compile で提供される steps_per_execution 引数を 1 より大きい値に設定します。

  • カスタムトレーニングループでは、複数のステップを 1 つの tf.function にまとめることができます。

steps_per_invocation = 10 @tf.function def step_fn(iterator): for _ in range(steps_per_invocation): features, labels = next(iterator) def replica_fn(features, labels): ... strategy.run(replica_fn, args=(features, labels))

今後ライブラリがさらに最適化されるにつれて、ほとんどのユーザーはステップを手動でまとめる必要がなくなることでしょう。

また、上記のタスクの障害の処理セクションで説明したように、パフォーマンスを向上させるために、戻り値なしで関数をスケジュールすることもできます。

既知の制限

既知の制限のほとんどは、上記のセクションで既に説明されています。このセクションでは、概要を説明します。

ParameterServerStrategy 全般

  • os.environment["grpc_fail_fast"]="use_caller" は、フォールトトレランスを適切に機能させるために、コーディネータを含むすべてのタスクで必要です。

  • 同期パラメータサーバートレーニングはサポートされていません。

  • 通常、パフォーマンスを最適化するには、複数のステップを 1 つの関数にまとめる必要があります。

  • 分割された変数を含む tf.saved_model.load 経由での saved_model の読み込みはサポートされていません。注意: TensorFlow Serving を使用したこのような saved_model の読み込みは機能することが期待されています (詳細については、サービングのチュートリアルを参照してください)。

  • コーディネータタスクを再起動せずにパラメータサーバーの障害から回復することできません。

  • tf.keras.layers.IntegerLookuptf.keras.layers.StringLookuptf.keras.layers.TextVectorization、などの一部の Keras 前処理レイヤーで一般的に使用される tf.lookup.StaticHashTable は、Strategy.scope の下に配置する必要があります。そうしないと、リソースがコーディネータに配置され、ワーカーからコーディネータへのルックアップ RPC がパフォーマンスに影響を与えます。

Model.fit のみ

  • Model.fit には steps_per_epoch 引数が必要です。エポックで適切な間隔を提供する値を選択します。

  • ParameterServerStrategy は、パフォーマンス上の理由から、バッチレベルの呼び出しを持つカスタムコールバックをサポートしていません。これらの呼び出しを適切に選択された steps_per_epoch を持つエポックレベルの呼び出しに変換して、steps_per_epoch のステップ数ごとに呼び出されるようにする必要があります。バッチレベルの呼び出しはパフォーマンスが向上するように変更されているので、組み込みのコールバックは影響を受けません。ParameterServerStrategy のバッチレベルの呼び出しのサポートは計画されています。

  • 同じ理由で、他のストラテジーとは異なり、進捗バーと指標はエポック境界でのみログに記録されます。

  • run_eagerly は、サポートされていません。

カスタムトレーニングループのみ

  • ClusterCoordinator.schedule は一般にデータセットの評価保証をサポートしていませんが、評価保証は Model.fit/.evaluate を通じて可能です。1 回限りの評価を有効にするをご覧ください。

  • ClusterCoordinator.create_per_worker_dataset が callable と入力として使用される場合、渡された関数内でデータセット全体を作成する必要があります。

  • tf.data.Options は、ClusterCoordinator.create_per_worker_dataset により作成されたデータセットでは無視されます。