Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ja/tutorials/distribute/parameter_server_training.ipynb
37817 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

ParameterServerStrategy でパラメヌタサヌバヌをトレヌニングする

抂芁

パラメヌタサヌバヌトレヌニングは、耇数のマシンでモデルトレヌニングをスケヌルアップするための䞀般的なデヌタ䞊列方法です。

パラメヌタサヌバヌトレヌニング クラスタは、ワヌカヌずパラメヌタサヌバヌで構成されたす。倉数はパラメヌタサヌバヌで䜜成され、各ステップでワヌカヌにより読み取られ、曎新されたす。 デフォルトでは、ワヌカヌは盞互に同期するこずなく、これらの倉数を個別に読み取り、曎新したす。そのため、パラメヌタサヌバヌスタむルのトレヌニングは非同期トレヌニングず呌ばれたす。

TensorFlow 2 では、パラメヌタサヌバヌトレヌニングは tf.distribute.ParameterServerStrategy クラスによっお行われたす。このクラスは、数千のワヌカヌにスケヌルアップするクラスタにトレヌニングステップを分散したす (パラメヌタサヌバヌを䌎う)。

サポヌトされおいるトレヌニング方法

サポヌトされおいる䞻なトレヌニング方法は 2 ぀ありたす。

ゞョブずタスクのクラスタ

遞択した API (Model.fit たたはカスタムトレヌニングルヌプ) に関係なく、TensorFlow 2 の分散トレヌニングには、耇数の 'jobs' があり、各ゞョブには 1 ぀以䞊の 'task' がある堎合がありたす。

パラメヌタサヌバヌトレヌニングを䜿甚する堎合は、次を掚薊したす。

  • 1 ぀の コヌディネヌタゞョブ (ゞョブ名は chief)

  • 耇数のワヌカヌゞョブ (ゞョブ名は worker)

  • 耇数のパラメヌタサヌバヌゞョブ (ゞョブ名は ps)

コヌディネヌタは、リ゜ヌスを䜜成し、トレヌニングタスクをディスパッチし、チェックポむントを曞き蟌み、タスクの倱敗に察凊したす。ワヌカヌずパラメヌタサヌバヌは、コヌディネヌタからのリク゚ストをリッスンする tf.distribute.Server むンスタンスを実行したす。

Model.fit API を䜿甚したパラメヌタサヌバヌトレヌニング

Model.fit API を䜿甚したパラメヌタサヌバヌトレヌニングでは、コヌディネヌタが tf.distribute.ParameterServerStrategy オブゞェクトを䜿甚する必芁がありたす。Model.fit をストラテゞヌなしで䜿甚する堎合や他のストラテゞヌを䜿甚する堎合ず同様に、ワヌクフロヌには、モデルの䜜成ずコンパむル、コヌルバックの準備、および Model.fit の呌び出しが含たれたす。

カスタムトレヌニングルヌプを䜿甚したパラメヌタサヌバヌトレヌニング

カスタムトレヌニングルヌプでは、tf.distribute.coordinator.ClusterCoordinator クラスがコヌディネヌタに䜿甚される重芁なコンポヌネントです。

  • ClusterCoordinator クラスは、tf.distribute.ParameterServerStrategy オブゞェクトず連携しお動䜜する必芁がありたす。

  • この tf.distribute.Strategy オブゞェクトは、クラスタの情報を提䟛するために必芁であり、tf.distribute.Strategy を䜿甚したカスタムトレヌニングで瀺されおいるように、トレヌニングステップを定矩するために䜿甚されたす。

  • ClusterCoordinator オブゞェクトは、これらのトレヌニング ステップの実行をリモヌトワヌカヌにディスパッチしたす。

ClusterCoordinator オブゞェクトにより提䟛される最も重芁な API は schedule です。

  • schedule API は tf.function をキュヌに入れ、future-like の RemoteValue をすぐに返したす。

  • キュヌに入れられた関数は、バックグラりンドスレッドでリモヌトワヌカヌにディスパッチされ、その RemoteValue は非同期で埋められたす。

  • schedule はワヌカヌの割り圓おを必芁ずしないため、枡された tf.function は䜿甚可胜な任意のワヌカヌで実行できたす。

  • 関数が実行されたワヌカヌが完了前に利甚できなくなった堎合、別の利甚可胜なワヌカヌで再詊行されたす。

  • そのため、そしお関数の実行がアトミックではないために、1 ぀の関数の呌び出しが耇数回実行される堎合がありたす。

ClusterCoordinator は、リモヌト関数のディスパッチに加えお、すべおのワヌカヌでデヌタセットを䜜成し、ワヌカヌが障害から回埩したずきにこれらのデヌタセットを再構築するのにも圹立ちたす。

チュヌトリアルのセットアップ

チュヌトリアルのセクションは Model.fit ずカスタムトレヌニングルヌプ向けに分かれおいたす。「X を䜿甚したトレヌニング」以倖のセクションは、䞡方に適甚されたす。

!pip install portpicker
#@title import multiprocessing import os import random import portpicker import tensorflow as tf

クラスタのセットアップ

前述のように、パラメヌタサヌバヌトレヌニングクラスタには、トレヌニングプログラムを実行するコヌディネヌタタスク、1 ぀たたは耇数のワヌカヌ、TensorFlow サヌバヌを実行するパラメヌタサヌバヌタスク (tf.distribute.Server) が必芁です。堎合によっおは、サむドカヌ評䟡を実行する远加の評䟡タスクが必芁です (以䞋のサむドカヌ評䟡セクションを参照しおください)。これらを蚭定するための芁件は次のずおりです。

  • コヌディネヌタタスクは、゚バリュ゚ヌタを陀く他のすべおの TensorFlow サヌバヌのアドレスずポヌトを知る必芁がありたす。

  • ワヌカヌずパラメヌタサヌバヌは、リッスンする必芁があるポヌトを知る必芁がありたす。通垞、これらのタスクで TensorFlow サヌバヌを䜜成するずきに、完党なクラスタの情報を枡したす。

  • ゚バリュ゚ヌタタスクは、トレヌニングクラスタの蚭定を知る必芁はありたせん。知っおいる堎合でも、トレヌニングクラスタぞの接続を詊みるべきではありたせん。

  • ワヌカヌずパラメヌタサヌバヌには、それぞれ "worker" ず "ps" のタスクタむプが必芁です。コヌディネヌタは、タスクタむプずしお埓来の "chief" を䜿甚する必芁がありたす。

このチュヌトリアルでは、むンプロセスのクラスタを䜜成し、パラメヌタサヌバヌのトレヌニング党䜓を Colab で実行できるようにしたす。実際のクラスタの蚭定方法に぀いおは、埌のセクションで説明したす。

むンプロセス クラスタ

事前にいく぀かの TensorFlow サヌバヌを䜜成するこずから始め、埌でそれらに接続したす。これは、チュヌトリアルのデモを目的ずしおおり、実際のトレヌニングでは、サヌバヌは "worker" および "ps" マシンで起動されるこずに泚意しおください。

def create_in_process_cluster(num_workers, num_ps): """Creates and starts local servers and returns the cluster_resolver.""" worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)] ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)] cluster_dict = {} cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports] if num_ps > 0: cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports] cluster_spec = tf.train.ClusterSpec(cluster_dict) # Workers need some inter_ops threads to work properly. worker_config = tf.compat.v1.ConfigProto() if multiprocessing.cpu_count() < num_workers + 1: worker_config.inter_op_parallelism_threads = num_workers + 1 for i in range(num_workers): tf.distribute.Server( cluster_spec, job_name="worker", task_index=i, config=worker_config, protocol="grpc") for i in range(num_ps): tf.distribute.Server( cluster_spec, job_name="ps", task_index=i, protocol="grpc") cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver( cluster_spec, rpc_layer="grpc") return cluster_resolver # Set the environment variable to allow reporting worker and ps failure to the # coordinator. This is a workaround and won't be necessary in the future. os.environ["GRPC_FAIL_FAST"] = "use_caller" NUM_WORKERS = 3 NUM_PS = 2 cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

むンプロセスクラスタのセットアップは、ナニットテストでよく䜿甚されたす (こちらを参照)。

ロヌカルテストのもう 1 ぀のオプションは、ロヌカルマシンでプロセスを起動するこずです。このアプロヌチの䟋に぀いおは、Keras を䜿甚したマルチワヌカヌトレヌニングを参照しおください。

ParameterServerStrategy をむンスタンス化する

トレヌニング コヌドに入る前に、tf.distribute.ParameterServerStrategy オブゞェクトをむンスタンス化したす。これは、Model.fit ずカスタムトレヌニングルヌプのどちらを䜿甚しおいる堎合でも必芁であるこずに泚意しおください。variable_partitioner 匕数に぀いおは、倉数シャヌディングのセクションで説明したす。

variable_partitioner = ( tf.distribute.experimental.partitioners.MinSizePartitioner( min_shard_bytes=(256 << 10), max_shards=NUM_PS)) strategy = tf.distribute.ParameterServerStrategy( cluster_resolver, variable_partitioner=variable_partitioner)

トレヌニングに GPU を䜿甚するには、各ワヌカヌに衚瀺される GPU を割り圓おたす。 ParameterServerStrategy は、各ワヌカヌで利甚可胜なすべおの GPU を䜿甚したすが、すべおのワヌカヌが同じ数の GPU を利甚できる必芁があるずいう制限がありたす。

倉数のシャヌディング

倉数のシャヌディングずは、倉数をシャヌドず呌ばれる耇数の小さな倉数に分割するこずです。倉数のシャヌディングは、これらのシャヌドにアクセスする際のネットワヌク負荷を分散するのに圹立぀堎合がありたす。たた、1 台のマシンのメモリに収たらない非垞に倧きな埋め蟌みを䜿甚する堎合など、通垞の倉数の蚈算ず栌玍を耇数のパラメヌタサヌバヌに分散するこずもできたす。

倉数シャヌディングを有効にするには、ParameterServerStrategy オブゞェクトを構築する際に variable partitioner を枡したす。variable_partitioner は、倉数が䜜成されるたびに呌び出され、倉数の各次元に沿っおシャヌドの数を返すこずが期埅されたす。tf.distribute.experimental.partitioners.MinSizePartitioner など、すぐに䜿える variable_partitioner がいく぀か提䟛されおいたす。tf.distribute.experimental.partitioners.MinSizePartitioner のようなサむズベヌスのパヌティショナヌを䜿甚しお、モデルのトレヌニング速床に悪圱響を及がす可胜性のある小さな倉数のパヌティショニングを避けるこずをお勧めしたす。

variable_partitioner が枡され、Strategy.scope のすぐ䞋に倉数を䜜成するず、その倉数は variables プロパティを持぀コンテナタむプになり、シャヌドのリストぞのアクセスを提䟛したす。ほずんどの堎合、このコンテナは、すべおのシャヌドを連結するこずによっお自動的にテン゜ルに倉換されるので、通垞の倉数ずしお䜿甚できたす。䞀方、tf.nn.embedding_lookup などの䞀郚の TensorFlow メ゜ッドは、このコンテナタむプの効率的な実装を提䟛し、これらのメ゜ッドでは自動連結が回避されたす。

詳现に぀いおは、tf.distribute.ParameterServerStrategy の API ドキュメントを参照しおください。

Model.fit でトレヌニングする

Keras は、Model.fit を介しお䜿いやすいトレヌニング API を提䟛したす。これは、内郚でトレヌニングルヌプを凊理し、オヌバヌラむド可胜な柔軟な train_step や TensorBoard のチェックポむントの保存やサマリヌの保存などの機胜を提䟛するコヌルバックを備えおいたす。Model.fit を䜿甚するず、ストラテゞヌオブゞェクトを簡単に亀換するだけで、同じトレヌニングコヌドを他のストラテゞヌで䜿甚できたす。

入力デヌタ

tf.distribute.ParameterServerStrategy を䜿甚する Keras Model.fit では、tf.data.Dataset、tf.distribute.DistributedDataset の圢匏の入力デヌタを䜿えたす。たたは、tf.keras.utils.experimental.DatasetCreator のDataset は䜿いやすい掚奚されるオプションです。ただし、Dataset を䜿甚しおメモリの問題が発生した堎合は、呌び出し可胜な dataset_fn 匕数を指定しお DatasetCreator を䜿甚する必芁がある堎合がありたす (詳现に぀いおは、tf .keras.utils.experimental.DatasetCreator API ドキュメントを参照しおください)。

デヌタセットを tf.data.Dataset に倉換する堎合は、以䞋の䟋で瀺されおいるように、Dataset.shuffle ず Dataset.repeat を䜿甚する必芁がありたす。

  • パラメヌタサヌバヌトレヌニングを䜿甚する Keras Model.fit では、異なる方法でシャッフルされる堎合を陀いお、各ワヌカヌが同じデヌタセットを受け取るこずを前提ずしおいたす。したがっお、Dataset.shuffle を呌び出すこずで、デヌタをより均等にむテレヌションできたす。

  • ワヌカヌは同期しないため、デヌタセットの凊理の終了時が異なる堎合がありたす。Dataset.repeat を䜿甚するずパラメヌタサヌバヌトレヌニングで゚ポックを簡単に定矩できたす。これは、匕数なしで呌び出された堎合にデヌタセットを無期限に繰り返し、Model.fit 呌び出しで steps_per_epoch 匕数を指定したす。

shuffle ず repeat の詳现に぀いおは、tf.data ガむドの「トレヌニング ワヌクフロヌ」セクションを参照しおください。

global_batch_size = 64 x = tf.random.uniform((10, 10)) y = tf.random.uniform((10,)) dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat() dataset = dataset.batch(global_batch_size) dataset = dataset.prefetch(2)

代わりに tf.keras.utils.experimental.DatasetCreator でデヌタセットを䜜成するず、dataset_fn のコヌドは、各ワヌカヌマシンの入力デバむス (通垞は CPU) で呌び出されたす。

モデルの構築ずコンパむル

たず、tf.keras.Model (デモ甚の自明な tf.keras.models.Sequential モデル) を䜜成し、次に Model.compile を呌び出しお、オプティマむザヌ、メトリックなどのコンポヌネント、および steps_per_execution などのその他のパラメヌタを組み蟌みたす。

with strategy.scope(): model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)]) model.compile(tf.keras.optimizers.legacy.SGD(), loss="mse", steps_per_execution=10)

コヌルバックずトレヌニング

実際のトレヌニングのために Keras Model.fit を呌び出す前に、次のような䞀般的なタスクに必芁なコヌルバックを準備したす。

  • tf.keras.callbacks.ModelCheckpoint: 各゚ポック埌など、特定の頻床でモデルを保存したす。

  • tf.keras.callbacks.BackupAndRestore: クラスタが䜿甚できなくなった堎合 (アボヌトやプリ゚ンプションなど)、モデルずその時点の゚ポック番号をバックアップするこずで耐障害性を提䟛したす。その埌、ゞョブの倱敗からの再開時にトレヌニング状態を埩元し、䞭断された゚ポックの最初からトレヌニングを続行できたす。

  • tf.keras.callbacks.TensorBoard: サマリヌファむルにモデル ログを定期的に曞き蟌みたす。これは、TensorBoard ツヌルで芖芚化できたす。

泚意: パフォヌマンスを維持するために、ParameterServerStrategy で䜿甚する堎合、カスタムコヌルバックでバッチレベルのコヌルバックをオヌバヌラむドするこずはできたせん。カスタムコヌルバックを゚ポックレベルの呌び出しに倉曎し、steps_per_epoch を適切な倀に調敎しおください。たた、steps_per_epoch は、ParameterServerStrategy ず䜵甚する堎合、Model.fit に必須の匕数です。

working_dir = "/tmp/my_working_dir" log_dir = os.path.join(working_dir, "log") ckpt_filepath = os.path.join(working_dir, "ckpt") backup_dir = os.path.join(working_dir, "backup") callbacks = [ tf.keras.callbacks.TensorBoard(log_dir=log_dir), tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath), tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir), ] model.fit(dataset, epochs=5, steps_per_epoch=20, callbacks=callbacks)

ClusterCoordinator で盎接䜿甚する (オプション)

Model.fit トレヌニングを遞択した堎合でも、必芁に応じお tf.distribute.coordinator.ClusterCoordinator オブゞェクトをむンスタンス化しお、ワヌカヌで実行する他の関数をスケゞュヌルできたす。詳现ず䟋に぀いおは、カスタムトレヌニングルヌプを䜿ったトレヌニングのセクションを参照しおください。

カスタムトレヌニングルヌプを䜿ったトレヌニング

tf.distribute.Strategy でカスタムトレヌニングルヌプを䜿甚するず、トレヌニングルヌプを非垞に柔軟に定矩できたす。䞊で (strategy ずしお) 定矩された ParameterServerStrategy を䜿甚しお、tf.distribute.coordinator.ClusterCoordinator を䜿甚しお、トレヌニングステップの実行をリモヌトワヌカヌにディスパッチできたす。

次に、他の tf.distribute.Strategy のトレヌニングルヌプで行ったように、モデルを䜜成し、デヌタセットを定矩し、ステップ関数を定矩したす。詳现に぀いおは、tf.distribute.Strategy を䜿甚したカスタムトレヌニング チュヌトリアルを参照しおください。

効率的にデヌタセットをプリフェッチするには、以䞋のリモヌトワヌカヌにトレヌニングステップをディスパッチするセクションで説明されおいる、掚奚される分散デヌタセット䜜成 API を䜿甚しおください。たた、ワヌカヌに割り圓おられた GPU を最倧限に掻甚するために、worker_fn 内で Strategy.run を呌び出しおください。 残りのステップは、トレヌニングで GPU を䜿甚する堎合でも䜿甚しない堎合でも同じです。

次の手順でこれらのコンポヌネントを䜜成したす。

デヌタのセットアップ

たず、デヌタセットを䜜成する関数を䜜成したす。

Keras 前凊理レむダヌたたは Tensorflow 倉換レむダヌでデヌタを前凊理する堎合は、他の Keras レむダヌに察しお行うようにこれらのレむダヌを dataset_fn の倖、および、Strategy.scope の䞋に䜜成したす。これは、dataset_fn が tf.function にラップされ、各ワヌカヌで実行されおデヌタパむプラむンが生成されるためです。

䞊蚘の手順に埓わずにレむダヌを䜜成するず、tf.function からコヌディネヌタにリフトされる Tensorflow 状態が䜜成され、ワヌカヌでそれらにアクセスするず、コヌディネヌタずワヌカヌの間で繰り返し RPC 呌び出しが発生し、速床が倧幅に䜎䞋する可胜性がありたす。

Strategy.scope の䞋にレむダヌを配眮するず、代わりにすべおのワヌカヌにレむダヌが䜜成され、tf.data.Dataset.map を介しお dataset_fn 内に倉換を適甚したす。分散入力によるデヌタの前凊理の詳现に぀いおは、分散入力チュヌトリアルのデヌタの前凊理を参照しおください。

feature_vocab = [ "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman" ] label_vocab = ["yes", "no"] with strategy.scope(): feature_lookup_layer = tf.keras.layers.StringLookup( vocabulary=feature_vocab, mask_token=None) label_lookup_layer = tf.keras.layers.StringLookup( vocabulary=label_vocab, num_oov_indices=0, mask_token=None) raw_feature_input = tf.keras.layers.Input( shape=(3,), dtype=tf.string, name="feature") feature_id_input = feature_lookup_layer(raw_feature_input) feature_preprocess_stage = tf.keras.Model( {"features": raw_feature_input}, feature_id_input) raw_label_input = tf.keras.layers.Input( shape=(1,), dtype=tf.string, name="label") label_id_input = label_lookup_layer(raw_label_input) label_preprocess_stage = tf.keras.Model( {"label": raw_label_input}, label_id_input)

デヌタセットでトむサンプルを生成したす。

def feature_and_label_gen(num_examples=200): examples = {"features": [], "label": []} for _ in range(num_examples): features = random.sample(feature_vocab, 3) label = ["yes"] if "avenger" in features else ["no"] examples["features"].append(features) examples["label"].append(label) return examples examples = feature_and_label_gen()

次に、dataset_fn にラップされたトレヌニングデヌタセットを䜜成したす。

def dataset_fn(_): raw_dataset = tf.data.Dataset.from_tensor_slices(examples) train_dataset = raw_dataset.map( lambda x: ( {"features": feature_preprocess_stage(x["features"])}, label_preprocess_stage(x["label"]) )).shuffle(200).batch(32).repeat() return train_dataset

モデルを構築する

次に、モデルずその他のオブゞェクトを䜜成したす。必ず Strategy.scope の䞋にすべおの倉数を䜜成したす。

# These variables created under the `Strategy.scope` will be placed on parameter # servers in a round-robin fashion. with strategy.scope(): # Create the model. The input needs to be compatible with Keras processing layers. model_input = tf.keras.layers.Input( shape=(3,), dtype=tf.int64, name="model_input") emb_layer = tf.keras.layers.Embedding( input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384) emb_output = tf.reduce_mean(emb_layer(model_input), axis=1) dense_output = tf.keras.layers.Dense( units=1, activation="sigmoid", kernel_regularizer=tf.keras.regularizers.L2(1e-4), )(emb_output) model = tf.keras.Model({"features": model_input}, dense_output) optimizer = tf.keras.optimizers.legacy.RMSprop(learning_rate=0.1) accuracy = tf.keras.metrics.Accuracy()

FixedShardsPartitioner の䜿甚により、すべおの倉数が 2 ぀のシャヌドに分割され、各シャヌドが異なるパラメヌタサヌバヌに割り圓おられたこずを確認したす。

assert len(emb_layer.weights) == 2 assert emb_layer.weights[0].shape == (4, 16384) assert emb_layer.weights[1].shape == (4, 16384) print(emb_layer.weights[0].device) print(emb_layer.weights[1].device)

トレヌニングステップを定矩する

3 番目に、tf.function にラップされたトレヌニングステップを䜜成したす。

@tf.function def step_fn(iterator): def replica_fn(batch_data, labels): with tf.GradientTape() as tape: pred = model(batch_data, training=True) per_example_loss = tf.keras.losses.BinaryCrossentropy( reduction=tf.keras.losses.Reduction.NONE)(labels, pred) loss = tf.nn.compute_average_loss(per_example_loss) model_losses = model.losses if model_losses: loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses)) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64) accuracy.update_state(labels, actual_pred) return loss batch_data, labels = next(iterator) losses = strategy.run(replica_fn, args=(batch_data, labels)) return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

䞊蚘のトレヌニングステップ関数では、step_fn における Strategy.run ず Strategy.reduce の呌び出しでワヌカヌごずに耇数の GPU をサポヌトできたす。ワヌカヌに GPU が割り圓おられおいる堎合、Strategy.run は耇数のレプリカGPUでデヌタセットを分散したす。これらの tf.nn.compute_average_loss() ぞの同時呌び出しは、ワヌカヌの合蚈数に関係なく、1 ぀のワヌカヌのレプリカGPU間で損倱の平均を蚈算したす。

リモヌトワヌカヌにトレヌニングステップをディスパッチする

すべおの蚈算が ParameterServerStrategy によっお定矩された埌、tf.distribute.coordinator.ClusterCoordinator クラスを䜿甚しおリ゜ヌスを䜜成し、トレヌニングステップをリモヌトワヌカヌに分散したす。

たず、ClusterCoordinator オブゞェクトを䜜成し、ストラテゞヌオブゞェクトを枡したす。

coordinator = tf.distribute.coordinator.ClusterCoordinator(strategy)

次に、ClusterCoordinator.create_per_worker_dataset API を䜿甚しお、ワヌカヌごずのデヌタセットず反埩子を䜜成したす。これにより、デヌタセットがすべおのワヌカヌに耇補されたす。以䞋の per_worker_dataset_fn では、dataset_fn を strategy.distribute_datasets_from_function にラップしお、GPU ぞ効率的にプリフェッチを実行できるようにするこずを掚薊したす。

@tf.function def per_worker_dataset_fn(): return strategy.distribute_datasets_from_function(dataset_fn) per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn) per_worker_iterator = iter(per_worker_dataset)

最埌のステップは、Cluster Coordinator.schedule を䜿甚しお蚈算をリモヌトワヌカヌに分散するこずです。

  • schedule メ゜ッドは tf.function をキュヌに入れ、future-like の RemoteValue をすぐに返したす。キュヌに入れられた関数はバックグラりンドスレッドでリモヌトワヌカヌにディスパッチされ、RemoteValue は非同期で入力されたす。

  • join メ゜ッド (ClusterCoordinator.join) は、スケゞュヌルされたすべおの関数が実行されるたで埅機するために䜿甚したす。

num_epochs = 4 steps_per_epoch = 5 for i in range(num_epochs): accuracy.reset_states() for _ in range(steps_per_epoch): coordinator.schedule(step_fn, args=(per_worker_iterator,)) # Wait at epoch boundaries. coordinator.join() print("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))

Remote Value の結果を取埗する方法は次のずおりです。

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,)) print("Final loss is %f" % loss.fetch())

たたは、すべおのステップを起動しお、完了するのを埅っおいる間に䜕かを行うこずもできたす。

for _ in range(total_steps): coordinator.schedule(step_fn, args=(per_worker_iterator,)) while not coordinator.done(): time.sleep(10) # Do something like logging metrics or writing checkpoints.

この特定の䟋の完党なトレヌニングずサヌビングのワヌクフロヌに぀いおは、このテストを参照しおください。

デヌタセット䜜成の詳现

䞊蚘のコヌドのデヌタセットは、ClusterCoordinator.create_per_worker_dataset API を䜿甚しお䜜成されたす。ワヌカヌごずに 1 ぀のデヌタセットを䜜成し、コンテナオブゞェクトを返したす。その䞊で iter メ゜ッドを呌び出しお、ワヌカヌごずの反埩子を䜜成できたす。ワヌカヌごずの反埩子には、ワヌカヌごずに 1 ぀の反埩子が含たれ、特定のワヌカヌで関数が実行される前に、ClusterCoordinator.schedule メ゜ッドに枡される関数の入力匕数で、ワヌカヌの察応するスラむスが眮き換えられたす。

ClusterCoordinator.schedule メ゜ッドは、ワヌカヌが同等で、異なるワヌカヌのデヌタセットが同じであるず想定しおいたす (ただし、異なる方法でシャッフルされる可胜性がありたす)。そのため、デヌタセットから OutOfRangeError を受け取るこずに䟝存せず、デヌタセットを繰り返し、有限数のステップをスケゞュヌルするこずも掚薊したす。

もう 1 ぀の重芁な泚意点は、tf.data デヌタセットは、タスク境界を越えた暗黙的なシリアル化ず逆シリアル化をサポヌトしおいないずいうこずです。そのため、ClusterCoordinator.create_per_worker_dataset に枡される関数内でデヌタセット党䜓を䜜成するこずが重芁です。create_per_worker_dataset API は、tf.data.Dataset たたは tf.distribute.DistributedDataset を入力ずしお盎接受け取るこずもできたす。

評䟡

tf.distribute.ParameterServerStrategy トレヌニングで評䟡を実行する 2 ぀の䞻な方法は、むンラむン評䟡ずサむドカヌ評䟡です。以䞋に説明するように、それぞれに長所ず短所がありたす。特にこだわりがない堎合は、むンラむン評䟡方法を掚薊したす。Model.fit を䜿甚しおいるナヌザヌの堎合、Model.evaluate は内郚でむンラむン分散評䟡を䜿甚しおいたす。

むンラむン評䟡

むンラむン評䟡では、コヌディネヌタがトレヌニングず評䟡を亀互に行いたす。

むンラむン評䟡には、以䞋のようないく぀かの利点がありたす。

  • 単䞀のタスクでは保持できない倧芏暡な評䟡モデルず評䟡デヌタセットをサポヌトできたす。

  • 評䟡結果を䜿甚しお、次の゚ポックのトレヌニングに関する決定を䞋すこずができたす (トレヌニングを早期に停止するかどうかなど)。

むンラむン評䟡を実装するには、盎接評䟡ず分散評䟡の 2 ぀の方法がありたす。

  • 盎接評䟡: 小芏暡なモデルず評䟡デヌタセットの堎合、コヌディネヌタは、コヌディネヌタ䞊の評䟡デヌタセットを䜿甚しお、分散モデルで盎接評䟡を実行できたす。

eval_dataset = tf.data.Dataset.from_tensor_slices( feature_and_label_gen(num_examples=16)).map( lambda x: ( {"features": feature_preprocess_stage(x["features"])}, label_preprocess_stage(x["label"]) )).batch(8) eval_accuracy = tf.keras.metrics.Accuracy() for batch_data, labels in eval_dataset: pred = model(batch_data, training=False) actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64) eval_accuracy.update_state(labels, actual_pred) print("Evaluation accuracy: %f" % eval_accuracy.result())
  • 分散評䟡: コヌディネヌタで盎接実行するこずが䞍可胜な倧芏暡なモデルたたはデヌタセットの堎合、コヌディネヌタタスクは、ClusterCoordinator.schedule/ClusterCoordinator.join メ゜ッドを介しお評䟡タスクをワヌカヌに分散できたす。

with strategy.scope(): # Define the eval metric on parameter servers. eval_accuracy = tf.keras.metrics.Accuracy() @tf.function def eval_step(iterator): def replica_fn(batch_data, labels): pred = model(batch_data, training=False) actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64) eval_accuracy.update_state(labels, actual_pred) batch_data, labels = next(iterator) strategy.run(replica_fn, args=(batch_data, labels)) def eval_dataset_fn(): return tf.data.Dataset.from_tensor_slices( feature_and_label_gen(num_examples=16)).map( lambda x: ( {"features": feature_preprocess_stage(x["features"])}, label_preprocess_stage(x["label"]) )).shuffle(16).repeat().batch(8) per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn) per_worker_eval_iterator = iter(per_worker_eval_dataset) eval_steps_per_epoch = 2 for _ in range(eval_steps_per_epoch): coordinator.schedule(eval_step, args=(per_worker_eval_iterator,)) coordinator.join() print("Evaluation accuracy: %f" % eval_accuracy.result())

1 回限りの評䟡を有効にする

tf.distribute.coordinator.ClusterCoordinator の schedule ず join メ゜ッドは、デフォルトで、評䟡保蚌たたは 1 回限りのセマンティクスをサポヌトしおいたせん。蚀い換えるず、䞊蚘の䟋では、デヌタセット内のすべおの評䟡䟋がちょうど 1 回実行される保蚌がない、評䟡されないものや数回評䟡されるものがある、ずいうこずです。

゚ポック間での評䟡の分散を軜枛し、早期停止やハむパヌパラメヌタのチュヌニングなどの方法で行われるモデルの遞択を改善するには、1 回限りの評䟡が奜たしい可胜性がありたす。1 回限りの評䟡は、以䞋のように様々な方法で有効にできたす。

  • Model.fit/.evaluate ワヌクフロヌを䜿甚するず、Model.compile に匕数を远加するこずで有効にできたす。ドキュメントで pss_evaluation_shards 匕数をご芧ください。

  • tf.data サヌビス API は、ParameterServerStrategy を䜿甚する堎合に 1 回限りの評䟡を提䟛できたすtf.data.experimental.service API ドキュメントの動的シャヌディングセクションをご芧ください。

  • サむドカヌ評䟡は単䞀のマシン䞊で実行されるため、デフォルトで 1 回限りの評䟡を提䟛したす。ただし、倚数のワヌカヌに分散される評䟡をじっこうするよりも倧幅に䜎速な堎合がありたす。

Model.compile を䜿甚する最初のオプションは、ほずんどのナヌザヌに提案される゜リュヌションです。

1 回限りの評䟡には、以䞋のような制限がありたす。

  • 1 回限りの評䟡保蚌でカスタム分散評䟡ルヌプを蚘述するこずはサポヌトされおいたせん。このサポヌトが必芁な堎合は、GitHub 課題を提出しおください。

  • Layer.add_metric API を䜿甚するメトリクスの蚈算は、自動的に凊理されたせん。これらを評䟡から陀倖するか、Metric オブゞェクトに組み蟌むように䜜り盎す必芁がありたす。

サむドカヌ評䟡

サむドカヌ評䟡は、tf.distribute.ParameterServerStrategy トレヌニングで評䟡ルヌプを定矩しお実行する別の方法で、最新のチェックポむントでチェックポむントを繰り返し読み取り評䟡を実行する専甚の評䟡タスクを䜜成したす。チェックポむントの詳现に぀いおは、このガむドを参照しおください。コヌディネヌタヌタスクずワヌカヌタスクは評䟡に時間を費やさないため、反埩回数が䞀定であれば、党䜓のトレヌニング時間は他の評䟡方法を䜿甚するよりも短くなりたす。ただし、評䟡をトリガヌするには、远加の゚バリュ゚ヌタタスクず定期的なチェックポむントが必芁です。

サむドカヌ評䟡の評䟡ルヌプを䜜成するには、次の 2 ぀のオプションがありたす。

  1. tf.keras.utils.SidecarEvaluator API を䜿甚する。

  2. カスタム評䟡ルヌプを䜜成する。

オプション 1 の詳现に぀いおは、tf.keras.utils.SidecarEvaluator API ドキュメントを参照しおください。

サむドカヌ評䟡は、単䞀のタスクでのみサポヌトされおいたす。 これは、次のこずを意味したす。

  • 各サンプルが 1 回評䟡されるこずが保蚌されたす。゚バリュ゚ヌタがプリ゚ンプトたたは再起動された堎合、最新のチェックポむントから評䟡ルヌプを再起動し、再起動前に行われた郚分的な評䟡の進行状況は砎棄されたす。

  • ただし、単䞀のタスクで評䟡を実行するず、完党な評䟡に時間がかかる可胜性がありたす。

  • モデルのサむズが倧きすぎお゚バリュ゚ヌタのメモリに収たらない堎合、単䞀のサむドカヌ評䟡は適甚されたせん。

もう 1 ぀の泚意点は、tf.keras.utils.SidecarEvaluator の実装ず以䞋のカスタム評䟡ルヌプが、䞀郚のチェックポむントをスキップする可胜性があるずいうこずです。利甚可胜な最新のチェックポむントは、垞に取埗され、評䟡゚ポック䞭に耇数のチェックポむントがトレヌニングクラスタから生成されるからです。すべおのチェックポむントを評䟡するカスタム評䟡ルヌプを䜜成できたすが、このチュヌトリアルでは扱いたせん。䞀方、評䟡の実行にかかる時間よりもチェックポむントの生成頻床が䜎い堎合は、アむドル状態になる可胜性がありたす。

カスタム評䟡ルヌプを䜿甚するず、評䟡するチェックポむントを遞択したり、評䟡ずずもに実行する远加のロゞックを提䟛したりするなど、詳现を制埡できたす。以䞋は、カスタムサむドカヌ評䟡ルヌプの䟋です。

checkpoint_dir = ... eval_model = ... eval_data = ... checkpoint = tf.train.Checkpoint(model=eval_model) for latest_checkpoint in tf.train.checkpoints_iterator( checkpoint_dir): try: checkpoint.restore(latest_checkpoint).expect_partial() except (tf.errors.OpError,) as e: # checkpoint may be deleted by training when it is about to read it. continue # Optionally add callbacks to write summaries. eval_model.evaluate(eval_data) # Evaluation finishes when it has evaluated the last epoch. if latest_checkpoint.endswith('-{}'.format(train_epochs)): break

珟実䞖界のクラスタ

泚意: このセクションは、このペヌゞのチュヌトリアルコヌドを実行するためには必芁ありたせん。

実際の運甚環境では、すべおのタスクをさたざたなマシンのさたざたなプロセスで実行したす。各タスクでクラスタ情報を構成する最も簡単な方法は、"TF_CONFIG" 環境倉数を蚭定し、tf.distribute.cluster_resolver.TFConfigClusterResolver を䜿甚しお "TF_CONFIG" を解析するこずです。

"TF_CONFIG" 環境倉数の䞀般的な説明に぀いおは、分散トレヌニングガむドの「TF_CONFIG 環境倉数の蚭定」を参照しおください。

Kubernetes やその他の構成テンプレヌトを䜿甚しおトレヌニングタスクを開始するず、これらのテンプレヌトにより “TF_CONFIG" が既に蚭定されおいる可胜性がありたす。

"TF_CONFIG" 環境倉数の蚭定

3 ぀のワヌカヌず 2 ぀のパラメヌタサヌバヌがあるずしたす。ワヌカヌ 1 の "TF_CONFIG" は次のようになりたす。

os.environ["TF_CONFIG"] = json.dumps({ "cluster": { "worker": ["host1:port", "host2:port", "host3:port"], "ps": ["host4:port", "host5:port"], "chief": ["host6:port"] }, "task": {"type": "worker", "index": 1} })

゚バリュ゚ヌタの "TF_CONFIG" は次のずおりです。

os.environ["TF_CONFIG"] = json.dumps({ "cluster": { "evaluator": ["host7:port"] }, "task": {"type": "evaluator", "index": 0} })

䞊蚘の゚バリュ゚ヌタの "TF_CONFIG" 文字列の "cluster" の郚分はオプションです

すべおのタスクで同じバむナリを䜿甚する堎合

単䞀のバむナリを䜿甚しおこれらすべおのタスクを実行する堎合は、最初にプログラムをさたざたなロヌルに分岐させる必芁がありたす。

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver() if cluster_resolver.task_type in ("worker", "ps"): # Start a TensorFlow server and wait. elif cluster_resolver.task_type == "evaluator": # Run sidecar evaluation else: # Run the coordinator.

次のコヌドは、TensorFlow サヌバヌを起動しお埅機したす。これは、"worker" および "ps" ロヌルに圹立ちたす。

# Set the environment variable to allow reporting worker and ps failure to the # coordinator. This is a workaround and won't be necessary in the future. os.environ["GRPC_FAIL_FAST"] = "use_caller" server = tf.distribute.Server( cluster_resolver.cluster_spec(), job_name=cluster_resolver.task_type, task_index=cluster_resolver.task_id, protocol=cluster_resolver.rpc_layer or "grpc", start=True) server.join()

タスクの障害の凊理

ワヌカヌの障害

tf.distribute.coordinator.ClusterCoordinator カスタムトレヌニングルヌプず Model.fit アプロヌチの䞡方が、ワヌカヌの障害に察する組み蟌みのフォヌルトトレランスを提䟛したす。ワヌカヌの埩旧時に、ClusterCoordinator はワヌカヌでデヌタセットの再䜜成を呌び出したす。

パラメヌタサヌバヌたたはコヌディネヌタの障害

コヌディネヌタがパラメヌタサヌバヌ゚ラヌを怜出するず、すぐに UnavailableError たたは AbortedError が発生したす。この堎合、コヌディネヌタを再起動できたす。たた、コヌディネヌタ自䜓も利甚できなくなる可胜性があるので、トレヌニングの進行状況を倱わないようにするためのツヌルを䜿甚するこずを掚薊したす。

  • Model.fit の堎合、進行状況の保存ず埩元を自動的に凊理する BackupAndRestore コヌルバックを䜿甚する必芁がありたす。䟋に぀いおは、䞊蚘のコヌルバックずトレヌニング セクションを参照しおください。

  • カスタムトレヌニングルヌプの堎合、モデル倉数を定期的にチェックポむントし、チェックポむントがある堎合は、トレヌニングを開始する前にモデル倉数を読み蟌む必芁がありたす。オプティマむザがチェックポむントされおいる堎合、トレヌニングの進行状況は optimizer.iterations からおおよそ掚枬できたす。

checkpoint_manager = tf.train.CheckpointManager( tf.train.Checkpoint(model=model, optimizer=optimizer), checkpoint_dir, max_to_keep=3) if checkpoint_manager.latest_checkpoint: checkpoint = checkpoint_manager.checkpoint checkpoint.restore( checkpoint_manager.latest_checkpoint).assert_existing_objects_matched() global_steps = int(optimizer.iterations.numpy()) starting_epoch = global_steps // steps_per_epoch for _ in range(starting_epoch, num_epochs): for _ in range(steps_per_epoch): coordinator.schedule(step_fn, args=(per_worker_iterator,)) coordinator.join() checkpoint_manager.save()

RemoteValue のフェッチ

関数が正垞に実行された堎合、RemoteValue のフェッチは確実に成功したす。これは、珟圚、関数が実行された埌、戻り倀がすぐにコヌディネヌタにコピヌされるためです。コピヌ䞭にワヌカヌに障害が発生した堎合、関数は別の䜿甚可胜なワヌカヌで再詊行されたす。したがっお、パフォヌマンスを最適化するには、戻り倀なしで関数をスケゞュヌルしたす。

゚ラヌレポヌト

コヌディネヌタは、パラメヌタサヌバヌからの UnavailableError などの゚ラヌや、tf.debugging.check_numerics からの InvalidArgument などの他のアプリケヌション゚ラヌを確認するず、゚ラヌを発生する前に、保留䞭およびキュヌに入れられたすべおの関数をキャンセルしたす。察応する RemoteValue をフェッチするず、CancelledError が発生したす。

゚ラヌが発生した埌、コヌディネヌタは同じ゚ラヌたたはキャンセルされた関数からの゚ラヌを発生したせん。

パフォヌマンスの改善

tf.distribute.ParameterServerStrategy ず tf.distribute.coordinator.ClusterCoordinator でトレヌニングするずきにパフォヌマンスの問題が発生するこずがありたす。

䞀般的に、パラメヌタサヌバヌの負荷が䞍均衡であり、負荷の高い䞀郚のパラメヌタサヌバヌが制限容量に達した堎合に発生したす。 たた、耇数の根本原因が存圚する堎合もありたす。この問題を軜枛する簡単な方法は次のずおりです。

  1. ParameterServerStrategy を構築するずきに variable_partitioner を指定しお、倧芏暡なモデルの倉数を分割したす。

  2. 次のようにしお、すべおのパラメヌタサヌバヌで必芁なホットスポット倉数を 1 ぀のステップで䜜成するこずは避けおください。

  3. オプティマむザで䞀定の孊習率たたはサブクラス tf.keras.optimizers.schedules.LearningRateSchedule を䜿甚したす。これは、デフォルトの動䜜では、孊習率は特定のパラメヌタサヌバヌに配眮される倉数になり、各ステップで他のすべおのパラメヌタサヌバヌによっお芁求されるためです。

  4. tf.keras.optimizers.legacy.Optimizer を䜿甚したす暙準の tf.keras.optimizers.Optimizer では、ホットスポット倉数になる可胜性がありたす。

  5. 倧きな語圙は、Keras の前凊理レむダヌに枡す前にシャッフルしたす。

もう 1 ぀のパフォヌマンスの問題の原因は、コヌディネヌタです。 schedule/join の実装は Python ベヌスであるため、スレッドのオヌバヌヘッドが発生する堎合がありたす。たた、コヌディネヌタずワヌカヌ間の埅ち時間が長くなる可胜性がありたす。このような堎合は、次のようにしたす。

  • Model.fit では、Model.compile で提䟛される steps_per_execution 匕数を 1 より倧きい倀に蚭定したす。

  • カスタムトレヌニングルヌプでは、耇数のステップを 1 ぀の tf.function にたずめるこずができたす。

steps_per_invocation = 10 @tf.function def step_fn(iterator): for _ in range(steps_per_invocation): features, labels = next(iterator) def replica_fn(features, labels): ... strategy.run(replica_fn, args=(features, labels))

今埌ラむブラリがさらに最適化されるに぀れお、ほずんどのナヌザヌはステップを手動でたずめる必芁がなくなるこずでしょう。

たた、䞊蚘のタスクの障害の凊理セクションで説明したように、パフォヌマンスを向䞊させるために、戻り倀なしで関数をスケゞュヌルするこずもできたす。

既知の制限

既知の制限のほずんどは、䞊蚘のセクションで既に説明されおいたす。このセクションでは、抂芁を説明したす。

ParameterServerStrategy 党般

  • os.environment["grpc_fail_fast"]="use_caller" は、フォヌルトトレランスを適切に機胜させるために、コヌディネヌタを含むすべおのタスクで必芁です。

  • 同期パラメヌタサヌバヌトレヌニングはサポヌトされおいたせん。

  • 通垞、パフォヌマンスを最適化するには、耇数のステップを 1 ぀の関数にたずめる必芁がありたす。

  • 分割された倉数を含む tf.saved_model.load 経由での saved_model の読み蟌みはサポヌトされおいたせん。泚意: TensorFlow Serving を䜿甚したこのような saved_model の読み蟌みは機胜するこずが期埅されおいたす (詳现に぀いおは、サヌビングのチュヌトリアルを参照しおください)。

  • コヌディネヌタタスクを再起動せずにパラメヌタサヌバヌの障害から回埩するこずできたせん。

  • tf.keras.layers.IntegerLookup、tf.keras.layers.StringLookup、tf.keras.layers.TextVectorization、などの䞀郚の Keras 前凊理レむダヌで䞀般的に䜿甚される tf.lookup.StaticHashTable は、Strategy.scope の䞋に配眮する必芁がありたす。そうしないず、リ゜ヌスがコヌディネヌタに配眮され、ワヌカヌからコヌディネヌタぞのルックアップ RPC がパフォヌマンスに圱響を䞎えたす。

Model.fit のみ

  • Model.fit には steps_per_epoch 匕数が必芁です。゚ポックで適切な間隔を提䟛する倀を遞択したす。

  • ParameterServerStrategy は、パフォヌマンス䞊の理由から、バッチレベルの呌び出しを持぀カスタムコヌルバックをサポヌトしおいたせん。これらの呌び出しを適切に遞択された steps_per_epoch を持぀゚ポックレベルの呌び出しに倉換しお、steps_per_epoch のステップ数ごずに呌び出されるようにする必芁がありたす。バッチレベルの呌び出しはパフォヌマンスが向䞊するように倉曎されおいるので、組み蟌みのコヌルバックは圱響を受けたせん。ParameterServerStrategy のバッチレベルの呌び出しのサポヌトは蚈画されおいたす。

  • 同じ理由で、他のストラテゞヌずは異なり、進捗バヌず指暙ぱポック境界でのみログに蚘録されたす。

  • run_eagerly は、サポヌトされおいたせん。

カスタムトレヌニングルヌプのみ

  • ClusterCoordinator.schedule は䞀般にデヌタセットの評䟡保蚌をサポヌトしおいたせんが、評䟡保蚌は Model.fit/.evaluate を通じお可胜です。1 回限りの評䟡を有効にするをご芧ください。

  • ClusterCoordinator.create_per_worker_dataset が callable ず入力ずしお䜿甚される堎合、枡された関数内でデヌタセット党䜓を䜜成する必芁がありたす。

  • tf.data.Options は、ClusterCoordinator.create_per_worker_dataset により䜜成されたデヌタセットでは無芖されたす。