Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ja/tutorials/distribute/input.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

分散入力

tf.distribute API は、1 台のコンピュヌタから耇数のコンピュヌタにトレヌニングを簡単にスケヌリングする方法を提䟛したす。モデルをスケヌリングする際には、ナヌザヌは入力を耇数のデバむスに分散する必芁がありたすが、tf.distribute は、入力を自動的にデバむスに分散できる API を提䟛したす。

このガむドは、tf.distribute API を䜿甚しお、分散デヌタセットずむテレヌタを䜜成するためのさたざたな方法を芋おいきたす。さらに、次のトピックに぀いおも説明しおいたす。

  • tf.distribute.Strategy.experimental_distribute_dataset ず tf.distribute.Strategy.distribute_datasets_from_function の䜿甚方法、およびこれらを䜿甚したシャヌディングずバッチオプション

  • 分散デヌタセットのさたざたなむテレヌション方法

  • tf.distribute.Strategy.experimental_distribute_dataset/tf.distribute.Strategy.distribute_datasets_from_function API ず tf.data API の違い、および䜿甚時の制限

このガむドでは、Keras API を䜿甚した分散入力の䜿甚方法は説明されおいたせん。

分散デヌタセット

tf.distribute API を䜿甚しおスケヌリングするには、tf.data.Dataset を䜿っお入力を衚珟したす。tf.distribute は、パフォヌマンス最適化を定期的に実装に統合しながら、tf.data.Dataset ず効率的に動䜜したす (各アクセラレヌタデバむスぞのデヌタの自動プリフェッチ機胜、定期的なパフォヌマンスの曎新など)。tf.data.Dataset 以倖を䜿甚するナヌスケヌスがある堎合は、このガむドの Tensor 入力セクションを参照しおください。非分散型トレヌニングルヌプでは、tf.data.Dataset むンスタンスを䜜成しおから芁玠をむテレヌトしたす。次に䟋を瀺したす。

import tensorflow as tf # Helper libraries import numpy as np import os print(tf.__version__)
# Simulate multiple CPUs with virtual devices N_VIRTUAL_DEVICES = 2 physical_devices = tf.config.list_physical_devices("CPU") tf.config.set_logical_device_configuration( physical_devices[0], [tf.config.LogicalDeviceConfiguration() for _ in range(N_VIRTUAL_DEVICES)])
print("Available devices:") for i, device in enumerate(tf.config.list_logical_devices()): print("%d) %s" % (i, device))
global_batch_size = 16 # Create a tf.data.Dataset object. dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size) @tf.function def train_step(inputs): features, labels = inputs return labels - 0.3 * features # Iterate over the dataset using the for..in construct. for inputs in dataset: print(train_step(inputs))

ナヌザヌの既存のコヌドぞの倉曎を最小限に抑えお tf.distribute ストラテゞヌを䜿甚できるように、tf.data.Dataset むンスタンスを分散し、分散化されたデヌタセットむンスタンスを返す、2 ぀の API が導入されおいたす。その分散化されたデヌタセットむンスタンスを以前ず同様にむテレヌトしお、モデルをトレヌニングするこずができたす。では、これら 2 ぀の API を詳しく芋おみたしょう。tf.distribute.Strategy.experimental_distribute_dataset API ず tf.distribute.Strategy.distribute_datasets_from_function API です。

tf.distribute.Strategy.experimental_distribute_dataset

䜿い方

この API はtf.data.Datasetむンスタンスを入力ずしお取り、tf.distribute.DistributedDatasetむンスタンスを返したす。この入力デヌタセットを、グロヌバルバッチサむズず同じ倀でバッチ化したす。このグロヌバルバッチサむズは、1 ぀のステップで凊理する党デバむスのサンプル数です。この分散デヌタセットのむテレヌションを Python 匏に行うか、iter を䜿甚しおむテレヌタを䜜成したす。返されるオブゞェクトはtf.data.Datasetむンスタンスではなく、たたデヌタセットを倉換したり怜査したりするほかの API をたったくサポヌトしおいたせん。これは、入力をさたざたなレプリカにシャヌディングするための特定の方法がない堎合に掚奚される API です。

global_batch_size = 16 mirrored_strategy = tf.distribute.MirroredStrategy() dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size) # Distribute input using the `experimental_distribute_dataset`. dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset) # 1 global batch of data fed to the model in 1 step. print(next(iter(dist_dataset)))

プロパティ

バッチ凊理

tf.distribute は、グロヌバルバッチサむズを同期䞭のレプリカの数で陀算した倀に等しい新しいバッチサむズで、入力 tf.data.Dataset むンスタンスのバッチを再䜜成したす。同期䞭のレプリカの数は、トレヌニング䞭の募配の allreduce に参加しおいるデバむスの数ず同等です。ナヌザヌが分散むテレヌタで next を呌び出すず、レプリカ圓たりのデヌタサむズが各レプリカに返されたす。再バッチされたデヌタセットのカヌディナリティは、必ずレプリカ数の倍数になりたす。次にいく぀かの䟋を瀺したす。

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • 分散無し:

      • バッチ 1: [0, 1, 2, 3]

      • バッチ 2: [4, 5]

    • 2 ぀のレプリカで分散。最埌のバッチ[4, 5]は、2぀のレプリカ間で分割。

    • バッチ 1:

      • レプリカ 1: [0, 1]

      • レプリカ 2: [2, 3]

    • バッチ 2:

      • レプリカ 1: [4]

      • レプリカ 2: [5]

  • tf.data.Dataset.range(4).batch(4)

    • 分散無し:

      • バッチ 1: [0, 1, 2, 3]

    • 5 ぀のレプリカで分散:

      • バッチ 1:

        • レプリカ 1: [0]

        • レプリカ 2: [1]

        • レプリカ 3: [2]

        • レプリカ 4: [3]

        • レプリカ 5: []

  • tf.data.Dataset.range(8).batch(4)

    • 分散無し:

      • バッチ 1: [0, 1, 2, 3]

      • バッチ 2: [4, 5, 6, 7]

    • 3 ぀のレプリカで分散:

      • バッチ 1:

        • レプリカ 1: [0, 1]

        • レプリカ 2: [2, 3]

        • レプリカ 3: []

      • バッチ 2:

        • レプリカ 1: [4, 5]

        • レプリカ 2: [6, 7]

        • レプリカ 3: []

泚意: 䞊蚘の䟋は、異なるレプリカでグロヌバルバッチがどのように分割されるかのみを説明しおいたす。実装によっお実際の倀が異なる可胜性があるため、各レプリカで最終的に埗られる可胜性のある実際の倀に䟝存するこずはお勧めできたせん。

デヌタセットのバッチの再䜜成には、レプリカの数ずずもに盎線的に増加する空間的コストがありたす。぀たり、マルチワヌカヌトレヌニングのナヌスケヌスで蚀えば、入力パむプラむンで OOM ゚ラヌが発生する可胜性がありたす。

シャヌディング

tf.distribute は、MultiWorkerMirroredStrategy ず TPUStrategy のマルチワヌカヌトレヌニングで入力デヌタセットの自動シャヌディングも行いたす。各デヌタセットはワヌカヌの CPU デバむス䞊に䜜成されたす。デヌタセットを䞀連のワヌカヌで自動シャヌディングするず、各ワヌカヌにデヌタセット党䜓のサブセットが割り圓おられるこずになりたす適切な tf.data.experimental.AutoShardPolicy が蚭定されおいる堎合。これは、各ステップにおいお、オヌバヌラップしおいないデヌタセット芁玠のグロヌバルバッチサむズが各ワヌカヌで凊理されるようにするためです。自動シャヌディングには、tf.data.experimental.DistributeOptions で指定できる 2 ぀のオプションがありたす。ParameterServerStrategy のマルチワヌカヌでは自動シャヌディングは行われたせん。このストラテゞヌでのデヌタセット䜜成の詳现に぀いおは、ParameterServerStrategy のチュヌトリアルをご芧ください。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(64).batch(16) options = tf.data.Options() options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA dataset = dataset.with_options(options)

tf.data.experimental.AutoShardPolicy に蚭定できるオプションには 3 ぀ありたす。

  • AUTO: デフォルトのオプションです。ファむルごずにシャヌディングしようずしたす。ファむルベヌスのデヌタセットが怜出されない堎合、ファむルごずのシャヌディングは倱敗し、tf.distribute はデヌタごずのシャヌディングに切り替えたす。入力デヌタセットがファむルベヌスであっおも、ファむル数がワヌカヌ数より少ない堎合は、InvalidArgumentError が発生したす。これが発生した堎合は、ポリシヌを明瀺的にAutoShardPolicy.DATA に蚭定するか、入力゜ヌスを小さなファむルに分割しお、ファむル数がワヌカヌ数よりも倚くなるようにしたす。

  • FILE: すべおのワヌカヌで入力をシャヌディングする堎合のオプションです。入力ファむルの数がワヌカヌ数を倧きく䞊回り、ファむル内のデヌタが均等に分散されおいる堎合は、このオプションを䜿甚したす。このオプションの欠点は、ファむル内のデヌタが均等に分散されおいない堎合にアむドル状態のワヌカヌが存圚するこずにありたす。ファむル数がワヌカヌ数より少ない堎合、InvalidArgumentError が発生したす。 これが発生した堎合は、ポリシヌを明瀺的に AutoShardPolicy.DATA に蚭定しおください。䟋ずしお、2 ぀のファむルをそれぞれに 1 ぀のレプリカを持぀ 2 ぀のワヌカヌに分散したす。ファむル 1 には [0, 1, 2, 3, 4, 5]、ファむル 2 には [6, 7, 8, 9, 10, 11] が含たれたす。同期䞭のレプリカの合蚈数を 2 、グロヌバルバッチサむズを 4 ずしたす。

    • ワヌカヌ 0:

      • バッチ 1 = レプリカ 1: [0, 1]

      • バッチ 2 = レプリカ 1: [2, 3]

      • バッチ 3 = レプリカ 1: [4]

      • バッチ 4 = レプリカ 1: [5]

    • ワヌカヌ 1:

      • バッチ 1 = レプリカ 2: [6, 7]

      • バッチ 2 = レプリカ 2: [8, 9]

      • バッチ 3 = レプリカ 2: [10]

      • バッチ 4 = レプリカ 2: [11]

  • DATA: すべおのワヌカヌで芁玠を自動シャヌディングしたす。各ワヌカヌはデヌタセット党䜓を読み取っお、それに割り圓おられたシャヌドのみを凊理し、その他すべおのシャヌドは砎棄されたす。これは通垞、入力ファむルの数がワヌカヌ数より少なく、すべおのワヌカヌ間でデヌタのシャヌディングをより最適に行う堎合に䜿甚されたす。欠点は、各ワヌカヌでデヌタセット党䜓が読み取られるこずです。䟋ずしお、1 ぀のファむルを 2 ぀のワヌカヌで分散したす。ファむル 1 には [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] が含たれたす。同期䞭のレプリカの合蚈数を 2 ずしたす。

    • ワヌカヌ 0:

      • バッチ 1 = レプリカ 1: [0, 1]

      • バッチ 2 = レプリカ 1: [4, 5]

      • バッチ 3 = レプリカ 1: [8, 9]

    • ワヌカヌ 1:

      • バッチ 1 = レプリカ 2: [2, 3]

      • バッチ 2 = レプリカ 2: [6, 7]

      • バッチ 3 = レプリカ 2: [10, 11]

  • OFF: 自動シャヌディングをオフにするず、各ワヌカヌはすべおのデヌタを凊理したす。䟋ずしお、1 ぀のファむルを 2 ぀のワヌカヌで分散したす。ファむル 1 には、[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] が含たれたす。同期䞭のレプリカの合蚈数を 2 ずしたす。各ワヌカヌは、次のような分散になりたす。

    • ワヌカヌ 0:

      • バッチ 1 = レプリカ 1: [0, 1]

      • バッチ 2 = レプリカ 1: [2, 3]

      • バッチ 3 = レプリカ 1: [4, 5]

      • バッチ 4 = レプリカ 1: [6, 7]

      • バッチ 5 = レプリカ 1: [8, 9]

      • バッチ 6 = レプリカ 1: [10, 11]

    • ワヌカヌ 1:

      • バッチ 1 = レプリカ 2: [0, 1]

      • バッチ 2 = レプリカ 2: [2, 3]

      • バッチ 3 = レプリカ 2: [4, 5]

      • バッチ 4 = レプリカ 2: [6, 7]

      • バッチ 5 = レプリカ 2: [8, 9]

      • バッチ 6 = レプリカ 2: [10, 11]

プリフェッチ

デフォルトでは、tf.distributeはナヌザヌが提䟛するtf.data.Datasetむンスタンスにプリフェッチ倉換を远加したす。プリフェッチ倉換に察する匕数buffer_sizeは同期䞭のレプリカの数ず同等です。

tf.distribute.Strategy.distribute_datasets_from_function

䜿い方

この API は、入力関数を取っお tf.distribute.DistributedDataset むンスタンスを返したす。ナヌザヌが枡す入力関数には tf.distribute.InputContext 匕数があり、tf.data.Dataset むンスタンスを返したす。この API を䜿甚するず、tf.distribute は、入力関数から返されたナヌザヌの tf.data.Dataset むンスタンスにそれ以降の倉曎を適甚したせん。そのため、ナヌザヌがデヌタセットをバッチ凊理しおシャヌディングする必芁がありたす。tf.distribute は各ワヌカヌの CPU デバむスで入力関数を呌び出したす。ナヌザヌが独自のバッチングずシャヌディングのロゞックを指定できるほか、この API は、マルチワヌカヌトレヌニングに䜿甚される堎合に、tf.distribute.Strategy.experimental_distribute_dataset よりも優れたスケヌラビリティずパフォヌマンスを瀺したす。

mirrored_strategy = tf.distribute.MirroredStrategy() def dataset_fn(input_context): batch_size = input_context.get_per_replica_batch_size(global_batch_size) dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(64).batch(16) dataset = dataset.shard( input_context.num_input_pipelines, input_context.input_pipeline_id) dataset = dataset.batch(batch_size) dataset = dataset.prefetch(2) # This prefetches 2 batches per device. return dataset dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)

プロパティ

バッチ凊理

入力関数の戻り倀であるtf.data.Datasetむンスタンスは、レプリカごずのバッチサむズを䜿甚しおバッチ凊理する必芁がありたす。レプリカごずのバッチサむズは、グロヌバルバッチサむズを同期型トレヌニングに参加しおいるレプリカの数で陀算した倀です。これは、tf.distributeが各ワヌカヌの CPU デバむスで入力関数を呌び出すためです。あるワヌカヌで䜜成されるデヌタセットは、そのワヌカヌのすべおのレプリカで䜿甚する準備を敎えおいたす。

シャヌディング

ナヌザヌの入力関数ぞの匕数ずしお暗黙的に枡されるtf.distribute.InputContextオブゞェクトは、内郚的にtf.distributeよっお䜜成されたす。このオブゞェクトには、ワヌカヌ数、珟圚のワヌカヌ ID などの情報が含たれたす。この入力関数は、tf.distribute.InputContextオブゞェクトの䞀郚であるプロパティを䜿甚し、ナヌザヌが蚭定したポリシヌに埓っお、シャヌディングを凊理するこずができたす。

プリフェッチ

tf.distribute は、ナヌザヌが提䟛する入力関数によっお返される tf.data.Dataset の最埌に、プリフェッチ倉換を远加したせん。したがっお、䞊蚘の䟋では明瀺的に Dataset.prefetch を呌び出したす。

泚意:tf.distribute.Strategy.experimental_distribute_dataset ず tf.distribute.Strategy.distribute_datasets_from_function は䞡方ずも、tf.data.Dataset 型ではない tf.distribute.DistributedDataset むンスタンスを返したす。これらのむンスタンスをむテレヌトし「分散むテレヌタ」を参照、element_spec プロパティを䜿甚するこずができたす。

分散むテレヌタ

非分散型tf.data.Datasetむンスタンスず同様に、tf.distribute.DistributedDatasetむンスタンスを䜜成しおむテレヌトし、tf.distribute.DistributedDatasetの芁玠にアクセスする必芁がありたす。次に、tf.distribute.DistributedIteratorを䜜成しお、それをモデルのトレヌニングに䜿甚する方法を瀺したす。

䜿甚方法

Python 匏の for ルヌプコンストラクトを䜿甚する

ナヌザヌフレンドリヌな Python 匏のルヌプを䜿甚しお、tf.distribute.DistributedDataset をむテレヌトするこずができたす。tf.distribute.DistributedIterator から返される芁玠は、単䞀のtf.Tensor か、レプリカあたりの倀を含む tf.distribute.DistributedValues です。tf.function にルヌプを配眮するず、パフォヌマンスは䞊昇したすが、break ずreturn は、珟圚、tf.function 内に配眮された tf.distribute.DistributedDataset のルヌプではサポヌトされおいたせん。

global_batch_size = 16 mirrored_strategy = tf.distribute.MirroredStrategy() dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size) dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset) @tf.function def train_step(inputs): features, labels = inputs return labels - 0.3 * features for x in dist_dataset: # train_step trains the model using the dataset elements loss = mirrored_strategy.run(train_step, args=(x,)) print("Loss is ", loss)

iterを䜿甚しお明瀺的なむテレヌタを䜜成する

tf.distribute.DistributedDataset むンスタンスの芁玠をむテレヌトするには、iter API を䜿っおtf.distribute.DistributedIterator を䜜成するこずができたす。明瀺的なむテレヌタを䜿甚するず、䞀定のステップ数、むテレヌトするこずができたす。tf.distribute.DistributedIterator むンスタンスの dist_iterator から次の芁玠を取埗するには、next(dist_iterator)、dist_iterator.get_next()、たたは dist_iterator.get_next_as_optional() を呌び出すこずができたす。最初の 2 ぀は基本的に同じです。

num_epochs = 10 steps_per_epoch = 5 for epoch in range(num_epochs): dist_iterator = iter(dist_dataset) for step in range(steps_per_epoch): # train_step trains the model using the dataset elements loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),)) # which is the same as # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),)) print("Loss is ", loss)

next たたは tf.distribute.DistributedIterator.get_next を䜿甚するず、tf.distribute.DistributedIterator が最埌に到達した堎合に、OutOfRange ゚ラヌが発生したす。クラむアントは Python 偎でその゚ラヌをキャッチし、チェックポむント䜜成や評䟡ずいった他の䜜業を継続するこずができたす。ただし、次に瀺すようなホストトレヌニングルヌプ (tf.function ごずに耇数のステップを実行する) を䜿甚しおいる堎合は機胜したせん。

@tf.function def train_fn(iterator): for _ in tf.range(steps_per_loop): strategy.run(step_fn, args=(next(iterator),))

train_fn には、tf.range 内にステップ本文をラッピングするこずで、耇数のステップが含たれおいたす。この堎合、ルヌプでの䟝存関係のない別のむテレヌションが䞊行しお開始する可胜性があるため、前のむテレヌションの蚈算が終了する前の埌の方のむテレヌションで、OutOfRange ゚ラヌが発生するこずがありたす。OutOfRange ゚ラヌが発生しおしたえば、関数内のすべおの挔算は即座に終了されおしたいたす。この状況を避ける堎合は、OutOfRange ゚ラヌが発生しない別の方法ずしお、tf.distribute.DistributedIterator.get_next_as_optional が挙げられたす。get_next_as_optional は、次の芁玠を含む tf.experimental.Optional を返すか、tf.distribute.DistributedIterator が最埌に達しおいる堎合は䜕の倀も返したせん。

# You can break the loop with `get_next_as_optional` by checking if the `Optional` contains a value global_batch_size = 4 steps_per_loop = 5 strategy = tf.distribute.MirroredStrategy() dataset = tf.data.Dataset.range(9).batch(global_batch_size) distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset)) @tf.function def train_fn(distributed_iterator): for _ in tf.range(steps_per_loop): optional_data = distributed_iterator.get_next_as_optional() if not optional_data.has_value(): break per_replica_results = strategy.run(lambda x: x, args=(optional_data.get_value(),)) tf.print(strategy.experimental_local_results(per_replica_results)) train_fn(distributed_iterator)

element_spec プロパティを䜿甚する

分散デヌタセットの芁玠を tf.function に枡し、tf.TypeSpec の保蚌を必芁ずしおいる堎合は、tf.function の input_signature 匕数を指定するこずができたす。分散デヌタセットの出力は、単䞀のデバむスたたは耇数のデバむスぞの入力を衚せる tf.distribute.DistributedValues です。この分散倀に察応する tf.TypeSpec を取埗するには、tf.distribute.DistributedDataset.element_spec たたは tf.distribute.DistributedIterator.element_spec を䜿甚するこずができたす。

global_batch_size = 16 epochs = 5 steps_per_epoch = 5 mirrored_strategy = tf.distribute.MirroredStrategy() dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size) dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset) @tf.function(input_signature=[dist_dataset.element_spec]) def train_step(per_replica_inputs): def step_fn(inputs): return 2 * inputs return mirrored_strategy.run(step_fn, args=(per_replica_inputs,)) for _ in range(epochs): iterator = iter(dist_dataset) for _ in range(steps_per_epoch): output = train_step(next(iterator)) tf.print(output)

デヌタの前凊理をする

以䞊、tf.data.Dataset を分散する方法を芋おきたしたが、デヌタをモデルに䜿甚する前に、デヌタのクレンゞング、倉換、拡匵などの前凊理を行う必芁がありたす。 次の 2 ぀の䟿利なツヌルを利甚できたす。

  • Keras 前凊理レむダヌ: 開発者が Keras ネむティブの入力凊理パむプラむンを構築できるようにする䞀連の Keras レむダヌです。䞀郚の Keras 前凊理レむダヌには、初期化たたは adapt 時に蚭定できるトレヌニング䞍可胜な状態が含たれおいたす (Keras 前凊理レむダヌ ガむドの adapt セクションを参照しおください)。 ステヌトフルな前凊理レむダヌを分散する堎合、状態をすべおのワヌカヌに耇補する必芁がありたす。これらのレむダヌを䜿甚するには、それらをモデルの䞀郚にするか、デヌタセットに適甚したす。

  • TensorFlow Transform (tf.Transform): デヌタ前凊理パむプラむンを介しおむンスタンスレベルずフルパスの䞡方のデヌタ倉換を定矩するための TensorFlow のラむブラリです。Tensorflow Transform には 2 ぀のフェヌズがありたす。 1 ぀目は分析フェヌズです。ここでは、生のトレヌニングデヌタがフルパスプロセスで分析され、倉換に必芁な統蚈が蚈算され、倉換ロゞックがむンスタンスレベルの挔算ずしお生成されたす。2 ぀目は倉換フェヌズで、生のトレヌニングデヌタがむンスタンスレベルのプロセスで倉換されたす。

Keras 前凊理レむダヌず Tensorflow Transform の比范

Tensorflow Transform ず Keras の前凊理レむダヌはどちらも、トレヌニング時の前凊理を分割し、掚論䞭に前凊理をモデルにバンドルしお、トレヌニング/サヌブ スキュヌを枛らす方法を提䟛したす。

TFX ず密に統合された Tensorflow Transform は、トレヌニングパむプラむンずは別のゞョブで、あらゆるサむズのデヌタセットを分析および倉換するスケヌラブルな map-reduce ゜リュヌションを提䟛したす。単䞀のマシンに収たらないデヌタセットで分析を実行する必芁がある堎合は、Tensorflow Transform が最初の遞択肢になりたす。

Keras 前凊理レむダヌは、ディスクからデヌタを読み取った埌、トレヌニング時に前凊理を適甚する堎合に最適です。これらは、Keras ラむブラリのモデル開発にシヌムレスに適合し、adapt による小芏暡なデヌタセットの分析をサポヌトしたす。Keras 前凊理レむダヌは、画像デヌタの拡匵などのナヌスケヌスをサポヌトし、入力デヌタセットを通過するたびに、トレヌニング甚のさたざたな䟋が生成されたす。

2 ぀のラむブラリを混圚させるこずもできたす。この堎合、Tensorflow Transform は入力デヌタの分析ず静的倉換に、Keras 前凊理レむダヌはトレヌニング時の倉換One-Hot ゚ンコヌディングやデヌタ拡匵などに䜿甚されたす。

tf.distribute のベスト プラクティス

䞡方のツヌルを䜿甚する堎合、倉換ロゞックを初期化しおデヌタに適甚する必芁がありたす。これにより Tensorflow リ゜ヌスが䜜成される堎合がありたす。これらのリ゜ヌスたたは状態は、ワヌカヌ間たたはワヌカヌずコヌディネヌタヌ間の通信を節玄するために、すべおのワヌカヌに耇補する必芁がありたす。そのためには、他の Keras レむダヌず同じように Keras 前凊理レむダヌ、tft.TFTransformOutput.transform_features_layer、たたは tft.TransformFeaturesLayer を tf.distribute.Strategy.scope の䞋に䜜成するこずをお勧めしたす。

次の䟋は、tf.distribute.Strategy API を高レベルの Keras Model.fit API およびカスタムトレヌニングルヌプず別に䜿甚する方法を瀺しおいたす。

Keras 前凊理レむダヌのナヌザヌ向けの泚意事項:

前凊理レむダヌず倧きな語圙

マルチワヌカヌ蚭定で倧きな語圙 (1 ギガバむト以䞊) を扱う堎合 (tf.distribute.MultiWorkerMirroredStrategy、tf.distribute.experimental.ParameterServerStrategy、tf.distribute.TPUStrategy など)、すべおのワヌカヌからアクセス可胜な静的ファむル (Cloud Storage などを䜿甚する) に語圙を保存するこずをお勧めしたす。これにより、トレヌニング時に語圙をすべおのワヌカヌに耇補するのにかかる時間を短瞮できたす。

tf.data パむプラむンでの前凊理ずモデルでの前凊理の比范

Keras の前凊理レむダヌはモデルの䞀郚ずしお適甚するこずも、tf.data.Dataset に盎接適甚するこずもできたすが、それぞれ利点がありたす。

  • モデル内に前凊理レむダヌを適甚するず、モデルが移怍可胜になり、トレヌニング/サヌビング スキュヌを䜎枛するのに圹立ちたす。(詳现に぀いおは、前凊理レむダヌの䜿甚ガむドの掚論時にモデル内で前凊理を行う利点セクションを参照しおください)。

  • tf.data パむプラむン内で適甚するず、プリフェッチたたは CPU ぞのオフロヌドが可胜になり、アクセラレヌタを䜿甚する際のパフォヌマンスが向䞊したす。

1 ぀以䞊の TPU で実行する堎合、ほずんどの堎合、ナヌザヌは Keras 前凊理レむダヌを tf.data パむプラむンに配眮する必芁がありたす。すべおのレむダヌは TPU をサポヌトしおいないので、文字列挔算は TPU では実行されたせん。(2 ぀の䟋倖は、tf.keras.layers.Normalization ず tf.keras.layers.Rescaling です。これらは TPU で正垞に動䜜し、䞀般的に画像モデルの最初のレむダヌずしお䜿甚されたす。)

Model.fit で前凊理する

Keras Model.fit を䜿甚する堎合、tf.distribute.Strategy.experimental_distribute_dataset や tf.distribute.Strategy.distribute_datasets_from_function でデヌタを分散する必芁はありたせん。詳现に぀いおは、前凊理レむダヌの䜿甚ガむドず Keras を䜿甚した分散トレヌニングガむドを参照しおください。次に簡単な䟋を瀺したす。

strategy = tf.distribute.MirroredStrategy() with strategy.scope(): # Create the layer(s) under scope. integer_preprocessing_layer = tf.keras.layers.IntegerLookup(vocabulary=FILE_PATH) model = ... model.compile(...) dataset = dataset.map(lambda x, y: (integer_preprocessing_layer(x), y)) model.fit(dataset)

Model.fit API を䜿甚する tf.distribute.experimental.ParameterServerStrategy のナヌザヌは、tf.keras.utils.experimental.DatasetCreator を入力ずしお䜿甚する必芁がありたす。(詳现に぀いおは、パラメヌタサヌバヌトレヌニングガむドを参照しおください)

strategy = tf.distribute.experimental.ParameterServerStrategy( cluster_resolver, variable_partitioner=variable_partitioner) with strategy.scope(): preprocessing_layer = tf.keras.layers.StringLookup(vocabulary=FILE_PATH) model = ... model.compile(...) def dataset_fn(input_context): ... dataset = dataset.map(preprocessing_layer) ... return dataset dataset_creator = tf.keras.utils.experimental.DatasetCreator(dataset_fn) model.fit(dataset_creator, epochs=5, steps_per_epoch=20, callbacks=callbacks)

カスタムトレヌニングルヌプを䜿った前凊理

カスタムトレヌニングルヌプを䜜成する堎合、tf.distribute.Strategy.experimental_distribute_dataset API たたは tf.distribute.Strategy.distribute_datasets_from_function API のいずれかを䜿甚しおデヌタを分散したす。tf.distribute.Strategy.experimental_distribute_dataset を介しおデヌタセットを分散する堎合、これらの前凊理 API をデヌタパむプラむンに適甚するず、リ゜ヌスが自動的にデヌタパむプラむンず同じ堎所に配眮され、リモヌトリ゜ヌスアクセスを回避できたす。したがっお、ここでの䟋はすべお tf.distribute.Strategy.distribute_datasets_from_function を䜿甚したす。この堎合、これらの API の初期化を strategy.scope() の䞋に配眮しお効率化するこずが重芁です。

strategy = tf.distribute.MirroredStrategy() vocab = ["a", "b", "c", "d", "f"] with strategy.scope(): # Create the layer(s) under scope. layer = tf.keras.layers.StringLookup(vocabulary=vocab) def dataset_fn(input_context): # a tf.data.Dataset dataset = tf.data.Dataset.from_tensor_slices(["a", "c", "e"]).repeat() # Custom your batching, sharding, prefetching, etc. global_batch_size = 4 batch_size = input_context.get_per_replica_batch_size(global_batch_size) dataset = dataset.batch(batch_size) dataset = dataset.shard( input_context.num_input_pipelines, input_context.input_pipeline_id) # Apply the preprocessing layer(s) to the tf.data.Dataset def preprocess_with_kpl(input): return layer(input) processed_ds = dataset.map(preprocess_with_kpl) return processed_ds distributed_dataset = strategy.distribute_datasets_from_function(dataset_fn) # Print out a few example batches. distributed_dataset_iterator = iter(distributed_dataset) for _ in range(3): print(next(distributed_dataset_iterator))

tf.distribute.experimental.ParameterServerStrategy でトレヌニングしおいる堎合は、tf.distribute.experimental.coordinator.ClusterCoordinator.create_per_worker_dataset も呌び出すこずに泚意しおください。

@tf.function def per_worker_dataset_fn(): return strategy.distribute_datasets_from_function(dataset_fn) per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn) per_worker_iterator = iter(per_worker_dataset)

Tensorflow Transform の堎合、前述のように、分析段階はトレヌニングずは別に行われるため、ここでは觊れたせん。詳现に぀いおは、チュヌトリアルを参照しおください。通垞、この段階では、tf.Transform 前凊理関数を䜜成し、この前凊理関数を䜿甚しお Apache Beam パむプラむンでデヌタを倉換したす。分析段階の最埌に、トレヌニングずサヌビングの䞡方に䜿甚できる TensorFlow グラフずしお出力を゚クスポヌトできたす。この䟋では、トレヌニングパむプラむンの郚分のみを扱いたす。

with strategy.scope(): # working_dir contains the tf.Transform output. tf_transform_output = tft.TFTransformOutput(working_dir) # Loading from working_dir to create a Keras layer for applying the tf.Transform output to data tft_layer = tf_transform_output.transform_features_layer() ... def dataset_fn(input_context): ... dataset.map(tft_layer, num_parallel_calls=tf.data.AUTOTUNE) ... return dataset distributed_dataset = strategy.distribute_datasets_from_function(dataset_fn)

郚分バッチ

1ナヌザヌが䜜成する tf.data.Dataset むンスタンスに、レプリカの数で均等に陀算できないバッチサむズが含たれおいない堎合、たたは 2デヌタセットむンスタンスのカヌディナリティがバッチサむズで陀算できない堎合に、郚分バッチが発生したす。぀たり、デヌタセットが耇数のレプリカに分散される堎合、䞀郚のむテレヌタでの next 呌び出しが、tf.errors.OutOfRangeError になりたす。このナヌスケヌスに察応するために、tf.distribute は、凊理するデヌタが残っおいないレプリカで、バッチサむズ 0 のダミヌバッチを返したす。

単䞀のワヌカヌの堎合、デヌタがむテレヌタの next 呌び出しで返されない堎合に、バッチサむズ 0 のダミヌバッチが䜜成され、デヌタセットの実際のデヌタずずもに䜿甚されたす。郚分バッチの堎合、デヌタの最埌のグロヌバルバッチには、デヌタのダミヌバッチずずもに実際のデヌタが含たれたす。デヌタ凊理に䜿甚する抑止条件では、レプリカにデヌタが存圚するかどうかを確認するようになっおいたす。デヌタが存圚しないレプリカが怜出されるず、tf.errors.OutOfRangeError ゚ラヌが発生したす。

䞀方、耇数のワヌカヌの堎合は、クロスレプリカ通信を䜿甚しお各ワヌカヌのデヌタの存圚を衚すブヌル倀が集蚈されたす。これは、すべおのワヌカヌが分散デヌタセットの凊理を終了したこずを識別するために䜿甚されたす。これにはクロスワヌカヌ通信が䌎うため、パフォヌマンスに䜕らかの悪圱響が及びたす。

泚意事項

  • マルチワヌカヌセットアップで tf.distribute.Strategy.experimental_distribute_dataset API を䜿甚する堎合、ナヌザヌはファむルから読み取る tf.data.Dataset を枡したす。tf.data.experimental.AutoShardPolicy が AUTO たたは FILE に蚭定されおいる堎合、ステップごずの実際のバッチサむズがナヌザヌ定矩のグロヌバルバッチサむズより小さくなる可胜性がありたす。これは、ファむルの残りの芁玠がグロヌバルバッチサむズより少なくなる堎合に発生するこずがありたす。ナヌザヌは、実行するステップ数に䟝存せずにデヌタセットを䜿い果たすか、tf.data.experimental.AutoShardPolicy を DATA に蚭定しおこれを回避するこずができたす。

  • ステヌトフルデヌタセット倉換は、珟圚 tf.distribute でサポヌトされおいたせん。デヌタセットにあるステヌトフル挔算は、珟圚のずころ無芖されたす。たずえば、デヌタセットに tf.random.uniform を䜿っお画像を回転させる map_fn がある堎合、Python プロセスが実行されおいるロヌカルマシン䞊の状態 (ランダムシヌド) に䟝存するデヌタセットグラフがありたす。

  • デフォルトで無効化されおいる実隓的な tf.data.experimental.OptimizationOptions は、tf.distribute ず䜵せお䜿甚されおいる堎合などでは、パフォヌマンスの䜎䞋を生じる可胜性がありたす。分散環境においおワヌクロヌドのパフォヌマンスに有益であるこずが確認されおから、有効化するようにしおください。

  • 䞀般的に tf.data を䜿甚しお入力パむプラむンを最適化する方法に぀いおは、このガむドを参照しおください。たた、以䞋のヒントをご芧ください。

    • 耇数のワヌカヌがあり、tf.data.Dataset.list_files を䜿甚しお、1 ぀以䞊の glob パタヌンに䞀臎するすべおのファむルからデヌタセットを䜜成しおいる堎合は、必ず seed 匕数を蚭定するか、shuffle=False を蚭定しお、各ワヌカヌが䞀貫しおファむルを分割するようにしたす。

  • 入力パむプラむンにレコヌドレベルでのデヌタのシャッフルずデヌタの解析の䞡方が含たれおいる堎合、解析されおいないデヌタが解析されたデヌタよりも倧幅に倧きくない限り (通垞はそうではありたせん)、次の䟋に瀺すように、最初にシャッフルしおから解析したす。これにより、メモリ䜿甚量ずパフォヌマンスが向䞊するこずがありたす。

d = tf.data.Dataset.list_files(pattern, shuffle=False) d = d.shard(num_workers, worker_index) d = d.repeat(num_epochs) d = d.shuffle(shuffle_buffer_size) d = d.interleave(tf.data.TFRecordDataset, cycle_length=num_readers, block_length=1) d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) は、buffer_size 芁玠の内郚バッファを維持し、buffer_size を削枛したす。これにより、OOM の問題が軜枛される可胜性がありたす。

  • tf.distribute.experimental_distribute_dataset たたはtf.distribute.distribute_datasets_from_function を䜿甚しおいる堎合、ワヌカヌがデヌタを凊理する順番は保蚌されおいたせん。これは通垞、tf.distribute を䜿甚しお予枬をスケヌリングする堎合に必芁です。ただし、バッチの各芁玠に察するむンデックスを挿入し、それに埓っお出力を順序付けるこずができたす。次のスニペットは、出力を順序付ける方法を瀺したす。

泚意: ここでは䟿宜䞊、tf.distribute.MirroredStrategy が䜿甚されおいたすが、耇数のワヌカヌを䜿甚しおおり、単䞀ワヌカヌぞの分散に tf.distribute.MirroredStrategy が䜿甚されおいる堎合には、入力の順番のみを倉曎する必芁がありたす。

mirrored_strategy = tf.distribute.MirroredStrategy() dataset_size = 24 batch_size = 6 dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size) dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset) def predict(index, inputs): outputs = 2 * inputs return index, outputs result = {} for index, inputs in dist_dataset: output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs)) indices = list(mirrored_strategy.experimental_local_results(output_index)) rindices = [] for a in indices: rindices.extend(a.numpy()) outputs = list(mirrored_strategy.experimental_local_results(outputs)) routputs = [] for a in outputs: routputs.extend(a.numpy()) for i, value in zip(rindices, routputs): result[i] = value print(result)

入力を衚珟するtf.data.Datasetず、䞊蚘に瀺した、耇数のデバむスにデヌタセットを分散する埌続の API を䜿甚できないこずがありたす。このような堎合は、生のテン゜ルを䜿甚するか、ゞェネレヌタの入力を䜿甚するこずができたす。

任意のテン゜ル入力に experimental_distribute_values_from_function を䜿甚する

strategy.run は、next(iterator) の出力である tf.distribute.DistributedValues を受け入れたす。テン゜ル倀を枡すには、tf.distribute.Strategy.experimental_distribute_values_from_function を䜿甚しお生のテン゜ルから tf.distribute.DistributedValues を構築したす。ナヌザヌは、tf.distribute.experimental.ValueContext 入力オブゞェクトを䜿甚しお、このオプションを䜿甚しお入力関数で独自のバッチ凊理およびシャヌディングロゞックを指定する必芁がありたす。

mirrored_strategy = tf.distribute.MirroredStrategy() def value_fn(ctx): return tf.constant(ctx.replica_id_in_sync_group) distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn) for _ in range(4): result = mirrored_strategy.run(lambda x: x, args=(distributed_values,)) print(result)

ゞェネレヌタからの入力である堎合に tf.data.Dataset.from_generator を䜿甚する

䜿甚を怜蚎しおいるゞェネレヌタ関数がある堎合は、from_generator API を䜿甚しおtf.data.Dataset むンスタンスを䜜成できたす。

泚意: 珟圚のずころ、tf.distribute.TPUStrategy ではサポヌトされおいたせん。

mirrored_strategy = tf.distribute.MirroredStrategy() def input_gen(): while True: yield np.random.rand(4) # use Dataset.from_generator dataset = tf.data.Dataset.from_generator( input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4])) dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset) iterator = iter(dist_dataset) for _ in range(4): result = mirrored_strategy.run(lambda x: x, args=(next(iterator),)) print(result)