Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/es-419/tutorials/distribute/input.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Entrada distribuida

Las API tf.distribute ofrecen a los usuarios una forma sencilla de escalar su entrenamiento de una sola máquina a varias. Cuando escalan su modelo, los usuarios también necesitan distribuir sus entradas entre varios dispositivos. tf.distribute proporciona APIs que le permiten distribuir automáticamente sus entradas entre los dispositivos.

Esta guía le mostrará las distintas formas de crear un conjunto de datos e iteradores distribuidos usando las API tf.distribute. Además, se tratarán los siguientes temas:

  • Opciones de uso, fragmentación y loteado al usar tf.distribute.Strategy.experimental_distribute_dataset y tf.distribute.Strategy.distribute_datasets_from_function.

  • Diferentes formas de iterar sobre el conjunto de datos distribuido.

  • Diferencias entre las API tf.distribute.Strategy.experimental_distribute_dataset/tf.distribute.Strategy.distribute_datasets_from_function y APIs tf.data, así como cualquier limitación que los usuarios puedan encontrar en su uso.

Esta guía no cubre el uso de la entrada distribuida con las API de Keras.

Conjuntos distribuidos de datos

Para usar las API tf.distribute, use tf.data.Dataset para representar su entrada. tf.distribute funciona eficazmente con tf.data.Dataset; por ejemplo, mediante la preextracción automática en cada dispositivo acelerador y actualizaciones periódicas del rendimiento. Si tiene un caso de uso donde necesita algo distinto de tf.data.Dataset, consulte la sección entradas de Tensor de esta guía. En primer lugar, cree una instancia de tf.data.Dataset en un bucle de entrenamiento no distribuido y, a continuación, itere sobre los elementos. Por ejemplo:

import tensorflow as tf # Helper libraries import numpy as np import os print(tf.__version__)
# Simulate multiple CPUs with virtual devices N_VIRTUAL_DEVICES = 2 physical_devices = tf.config.list_physical_devices("CPU") tf.config.set_logical_device_configuration( physical_devices[0], [tf.config.LogicalDeviceConfiguration() for _ in range(N_VIRTUAL_DEVICES)])
print("Available devices:") for i, device in enumerate(tf.config.list_logical_devices()): print("%d) %s" % (i, device))
global_batch_size = 16 # Create a tf.data.Dataset object. dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size) @tf.function def train_step(inputs): features, labels = inputs return labels - 0.3 * features # Iterate over the dataset using the for..in construct. for inputs in dataset: print(train_step(inputs))

Para permitir a los usuarios usar la estrategia tf.distribute con cambios mínimos en el código existente de un usuario, se introdujeron dos API que distribuirían una instancia tf.data.Dataset y devolverían un objeto de conjunto de datos distribuido. Los usuarios podrían iterar sobre esta instancia de conjunto de datos distribuido y entrenar su modelo como antes. Veamos ahora con más detalle las dos API: tf.distribute.Strategy.experimental_distribute_dataset y tf.distribute.Strategy.distribute_datasets_from_function:

tf.distribute.Strategy.experimental_distribute_dataset

Uso

Esta API toma una instancia tf.data.Dataset como entrada y devuelve una instancia tf.distribute.DistributedDataset. Debe distribuir por lotes el conjunto de datos de entrada con un valor igual al tamaño global del lote, es decir, el número de muestras que quiere procesar en todos los dispositivos en 1 paso. Puede iterar sobre este conjunto de datos distribuido al estilo Python, o crear un iterador usando iter. El objeto devuelto no es una instancia de tf.data.Dataset y no admite otras API que transformen o inspeccionen el conjunto de datos de alguna forma. Ésta es la API recomendada si no tiene formas específicas de dividir su entrada en distintas réplicas.

global_batch_size = 16 mirrored_strategy = tf.distribute.MirroredStrategy() dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size) # Distribute input using the `experimental_distribute_dataset`. dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset) # 1 global batch of data fed to the model in 1 step. print(next(iter(dist_dataset)))

Propiedades

Distribución por lotes

tf.distribute vuelve a mezclar la instancia tf.data.Dataset de entrada con un nuevo tamaño de lote igual al tamaño de lote global dividido por el número de réplicas sincronizadas. El número de réplicas en sincronía es igual al número de dispositivos que participan en el gradiente allreduce durante el entrenamiento. Cuando un usuario llama a next en el iterador distribuido, se devuelve un tamaño de lote de datos por réplica en cada réplica. La cardinalidad del conjunto de datos devuelto siempre será múltiplo del número de réplicas. He aquí algunos ejemplos:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • Sin distribución:

      • Lote 1: [0, 1, 2, 3]

      • Lote 2: [4, 5]

    • Con distribución sobre 2 réplicas. El último lote ([4, 5]) es dividido entre 2 réplicas.

    • Lote 1:

      • Réplica 1:[0, 1]

      • Réplica 2:[2, 3]

    • Lote 2:

      • Réplica 1: [4]

      • Réplica 2: [5]

  • tf.data.Dataset.range(4).batch(4)

    • Sin distribución:

      • Lote 1: [0, 1, 2, 3]

    • Con distribución sobre 5 réplicas:

      • Lote 1:

        • Réplica 1: [0]

        • Réplica 2: [1]

        • Réplica 3: [2]

        • Réplica 4: [3]

        • Réplica 5: []

  • tf.data.Dataset.range(8).batch(4)

    • Sin distribución:

      • Lote 1: [0, 1, 2, 3]

      • Lote 2: [4, 5, 6, 7]

    • Con distribución sobre 3 réplicas:

      • Lote 1:

        • Réplica 1: [0, 1]

        • Réplica 2: [2, 3]

        • Réplica 3: []

      • Lote 2:

        • Réplica 1: [4, 5]

        • Réplica 2: [6, 7]

        • Réplica 3: []

Nota: Los ejemplos anteriores sólo ilustran cómo se divide un lote global entre distintas réplicas. No es aconsejable confiar en los valores reales que pueden acabar en cada réplica, ya que pueden cambiar dependiendo de la implementación.

El reajuste del conjunto de datos tiene una complejidad espacial que crece linealmente con el número de réplicas. Por lo tanto, para el caso de uso de entrenamiento con varios trabajadores, la canalización de entrada puede tropezar con errores OOM.

Fragmentación

tf.distribute también fragmenta automáticamente el conjunto de datos de entrada en entrenamiento multitrabajador con MultiWorkerMirroredStrategy y TPUStrategy. Cada conjunto de datos se crea en el dispositivo CPU del trabajador. Si un conjunto de datos se fragmenta automáticamente entre un conjunto de trabajadores, se asigna a cada trabajador un subconjunto del conjunto de datos total (si se configura tf.data.experimental.AutoShardPolicy correctamente). Esto garantiza que, en cada paso, cada trabajador procese un tamaño de lote global de elementos del conjunto de datos que no se superpongan. Hay varias opciones diferentes para la fragmentación automática, que pueden especificarse usando tf.data.experimental.DistributeOptions. Tenga en cuenta que no hay fragmentación automática en el entrenamiento multitrabajador con ParameterServerStrategy. Para obtener más información sobre la creación de conjuntos de datos con esta estrategia, consulte el tutorial ParameterServerStrategy.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(64).batch(16) options = tf.data.Options() options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA dataset = dataset.with_options(options)

Hay tres opciones diferentes que puede establecer para la tf.data.experimental.AutoShardPolicy:

  • AUTO: Opción predeterminada, lo que significa que se intentará fragmentar por FILE (archivo). El intento de distribución FILE falla si no se detecta un conjunto de datos basado en archivos. tf.distribute recurrirá entonces a la fragmentación por DATA (datos). Note que si el conjunto de datos de entrada está basado en archivos, pero el número de archivos es menor que el número de trabajadores, se producirá un error InvalidArgumentError. Si esto ocurre, establezca explícitamente la política a AutoShardPolicy.DATA, o divida su fuente de entrada en archivos más pequeños de forma que el número de archivos sea mayor que el número de trabajadores.

  • FILE: Esta es la opción si quiere dividir los archivos de entrada entre todos los trabajadores. Esta opción debe usarse si el número de archivos de entrada es mucho mayor que el número de trabajadores y los datos de los archivos están distribuidos uniformemente. La desventaja de esta opción es que tendrá trabajadores ociosos si los datos de los archivos no están distribuidos uniformemente. Se producirá un InvalidArgumentError si el número de archivos es menor que el número de trabajadores. Si esto ocurre, fije la política explícitamente a AutoShardPolicy.DATA. Por ejemplo, supongamos que tenemos 2 archivos distribuidos entre 2 trabajadores, cada uno con 1 réplica. El archivo 1 contiene [0, 1, 2, 3, 4, 5] y el archivo 2 contiene [6, 7, 8, 9, 10, 11]. Sea 2 el número total de réplicas sincronizadas y 4 el tamaño global del lote.

    • Trabajador 0:

      • Lote 1 = Réplica 1: [0, 1]

      • Lote 2 = Réplica 1: [2, 3]

      • Lote 3 = Réplica 1: [4]

      • Lote 4 = Réplica 1: [5]

    • Trabajador 1:

      • Lote 1 = Réplica 2: [6, 7]

      • Lote 2 = Réplica 2: [8, 9]

      • Lote 3 = Réplica 2: [10]

      • Lote 4 = Réplica 2: [11]

  • DATA: Esta opción repartirá automáticamente los elementos entre todos los trabajadores. Cada uno de los trabajadores leerá todo el conjunto de datos y sólo procesará el fragmento que se le haya asignado. Todos los demás fragmentos se descartarán. Suele usarse si el número de archivos de entrada es menor que el número de trabajadores y se quiere una mejor distribución de los datos entre todos los trabajadores. El inconveniente es que se leerá todo el conjunto de datos en cada trabajador. Por ejemplo, distribuyamos 1 archivo entre 2 trabajadores. El archivo 1 contiene [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Que el número total de réplicas sincronizadas sea 2.

    • Trabajador 0:

      • Lote 1 = Réplica 1: [0, 1]

      • Lote 2 = Réplica 1: [4, 5]

      • Lote 3 = Réplica 1: [8, 9]

    • Trabajador 1:

      • Lote 1 = Réplica 2: [2, 3]

      • Lote 2 = Réplica 2: [6, 7]

      • Lote 3 = Réplica 2: [10, 11]

  • OFF: Si desactiva la fragmentación automática, cada trabajador procesará todos los datos. Por ejemplo, distribuyamos 1 archivo entre 2 trabajadores. El archivo 1 contiene [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Que el número total de réplicas sincronizadas sea 2. Entonces cada trabajador verá la siguiente distribución:

    • Trabajador 0:

      • Lote 1 = Réplica 1: [0, 1]

      • Lote 2 = Réplica 1: [2, 3]

      • Lote 3 = Réplica 1: [4, 5]

      • Lote 4 = Réplica 1: [6, 7]

      • Lote 5 = Réplica 1: [8, 9]

      • Lote 6 = Réplica 1: [10, 11]

    • Trabajador 1:

      • Lote 1 = Réplica 2: [0, 1]

      • Lote 2 = Réplica 2: [2, 3]

      • Lote 3 = Réplica 2: [4, 5]

      • Lote 4 = Réplica 2: [6, 7]

      • Lote 5 = Réplica 2: [8, 9]

      • Lote 6 = Réplica 2: [10, 11]

Preextracción

De forma predeterminada, tf.distribute añade una transformación de preextracción al final de la instancia tf.data.Dataset proporcionada por el usuario. El argumento de la transformación de preextracción, que es buffer_size, es igual al número de réplicas sincronizadas.

tf.distribute.Strategy.distribute_datasets_from_function

Uso

Esta API toma una función de entrada y devuelve una instancia tf.distribute.DistributedDataset. La función de entrada pasada por el usuario tiene un argumento tf.distribute.InputContext y debe devolver una instancia tf.data.Dataset. Con esta API, tf.distribute no realiza más cambios en la instancia tf.data.Dataset del usuario devuelta por la función de entrada. El usuario es responsable de distribuir por lotes y fragmentar el conjunto de datos. tf.distribute llama a la función de entrada en el dispositivo CPU de cada trabajador. Además de permitir a los usuarios especificar su propia lógica de distribución por lotes y fragmentación, esta API también muestra una mejor escalabilidad y rendimiento en comparación con tf.distribute.Strategy.experimental_distribute_dataset cuando se usa para el entrenamiento de varios trabajadores.

mirrored_strategy = tf.distribute.MirroredStrategy() def dataset_fn(input_context): batch_size = input_context.get_per_replica_batch_size(global_batch_size) dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(64).batch(16) dataset = dataset.shard( input_context.num_input_pipelines, input_context.input_pipeline_id) dataset = dataset.batch(batch_size) dataset = dataset.prefetch(2) # This prefetches 2 batches per device. return dataset dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)

Propiedades

Distribución por lotes

La instancia tf.data.Dataset que es el valor de retorno de la función de entrada debe distribuirse por lotes usando el tamaño de lote por réplica. El tamaño de lote por réplica es el tamaño de lote global dividido por el número de réplicas que participan en el entrenamiento sincronizado. Esto se debe a que tf.distribute llama a la función de entrada en el dispositivo CPU de cada uno de los trabajadores. El conjunto de datos que se crea en un determinado trabajador debería estar listo para ser usado por todas las réplicas de ese trabajador.

Fragmentación

El objeto tf.distribute.InputContext que se pasa implícitamente como argumento a la función de entrada del usuario es creado por tf.distribute en segundo plano. Lleva información sobre el número de trabajadores, el ID del trabajador actual, etc. Dicha función de entrada puede gestionar la fragmentación según las políticas establecidas por el usuario usando estas propiedades que forman parte del objeto tf.distribute.InputContext.

Preextracción

tf.distribute no añade una transformación de preextracción al final del tf.data.Dataset devuelto por la función de entrada proporcionada por el usuario, por lo que debe llamar explícitamente a Dataset.prefetch en el ejemplo anterior.

Nota: Tanto tf.distribute.Strategy.experimental_distribute_dataset como tf.distribute.Strategy.distribute_datasets_from_function devuelven instancias tf.distribute.DistributedDataset que no son de tipo tf.data.Dataset. Se puede iterar sobre estas instancias (como se muestra en la sección Iteradores distribuidos) usando la propiedad element_spec.

Iteradores distribuidos

Parecido a las instancias tf.data.Dataset no distribuidas, tendrá que crear un iterador en las instancias tf.distribute.DistributedDataset para iterar sobre ellas y acceder a los elementos en el tf.distribute.DistributedDataset. Éstas son las formas en que puede crear un tf.distribute.DistributedIterator y usarlo para entrenar su modelo:

Usos

Usar un constructo de bucle for tipo Python

Usando un bucle Python sencillo, puede iterar sobre el tf.distribute.DistributedDataset. Los elementos devueltos por el tf.distribute.DistributedIterator pueden ser un único tf.Tensor o un tf.distribute.DistributedValues que contenga un valor por réplica. Al poner el bucle dentro de una tf.function se obtendrá un aumento del rendimiento. Sin embargo, break y return no son compatibles actualmente con un bucle sobre un tf.distribute.DistributedDataset colocado dentro de una tf.function.

global_batch_size = 16 mirrored_strategy = tf.distribute.MirroredStrategy() dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size) dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset) @tf.function def train_step(inputs): features, labels = inputs return labels - 0.3 * features for x in dist_dataset: # train_step trains the model using the dataset elements loss = mirrored_strategy.run(train_step, args=(x,)) print("Loss is ", loss)

Use iter para crear un iterador explícito

Si quiere iterar sobre los elementos de una instancia de tf.distribute.DistributedDataset, puede crear un tf.distribute.DistributedIterator con la API iter encima. Usando un iterador explícito, se puede iterar durante un número fijo de pasos. Para obtener el siguiente elemento de una instancia tf.distribute.DistributedIterator llamada dist_iterator, puede llamar a next(dist_iterator), dist_iterator.get_next() o dist_iterator.get_next_as_optional(). Los dos primeros son esencialmente lo mismo:

num_epochs = 10 steps_per_epoch = 5 for epoch in range(num_epochs): dist_iterator = iter(dist_dataset) for step in range(steps_per_epoch): # train_step trains the model using the dataset elements loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),)) # which is the same as # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),)) print("Loss is ", loss)

Con next o tf.distribute.DistributedIterator.get_next, si el tf.distribute.DistributedIterator ha llegado a su final, se lanzará un error OutOfRange. El cliente puede capturar el error en el lado Python y continuar con otras tareas, como revisar el punto de verificación y la evaluación. Sin embargo, esto no funcionará si se está usando un bucle de entrenamiento host (es decir, ejecutando varios pasos por tf.function), lo que se ve así:

@tf.function def train_fn(iterator): for _ in tf.range(steps_per_loop): strategy.run(step_fn, args=(next(iterator),))

Este ejemplo train_fn contiene múltiples pasos al envolver el cuerpo del paso dentro de un tf.range. En este caso, las distintas iteraciones del bucle sin dependencia podrían iniciarse en paralelo, por lo que se puede desencadenar un error OutOfRange en iteraciones posteriores antes de que finalice la computación de las iteraciones anteriores. Una vez lanzado un error OutOfRange, todas las operaciones de la función terminarán de inmediato. Si quiere evitar este caso, una alternativa que no lanza un error OutOfRange es tf.distribute.DistributedIterator.get_next_as_optional. get_next_as_optional devuelve un tf.experimental.Optional que contiene el siguiente elemento o ningún valor si el tf.distribute.DistributedIterator ha llegado al final.

# You can break the loop with `get_next_as_optional` by checking if the `Optional` contains a value global_batch_size = 4 steps_per_loop = 5 strategy = tf.distribute.MirroredStrategy() dataset = tf.data.Dataset.range(9).batch(global_batch_size) distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset)) @tf.function def train_fn(distributed_iterator): for _ in tf.range(steps_per_loop): optional_data = distributed_iterator.get_next_as_optional() if not optional_data.has_value(): break per_replica_results = strategy.run(lambda x: x, args=(optional_data.get_value(),)) tf.print(strategy.experimental_local_results(per_replica_results)) train_fn(distributed_iterator)

Uso de la propiedad element_spec

Si pasa los elementos de un conjunto de datos distribuido a una tf.function y quiere una garantía de tf.TypeSpec, puede especificar el argumento input_signature de la tf.function. La salida de un conjunto de datos distribuido es tf.distribute.DistributedValues, que puede representar la entrada a un único dispositivo o a varios. Para obtener el tf.TypeSpec correspondiente a este valor distribuido, puede usar tf.distribute.DistributedDataset.element_spec o tf.distribute.DistributedIterator.element_spec.

global_batch_size = 16 epochs = 5 steps_per_epoch = 5 mirrored_strategy = tf.distribute.MirroredStrategy() dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size) dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset) @tf.function(input_signature=[dist_dataset.element_spec]) def train_step(per_replica_inputs): def step_fn(inputs): return 2 * inputs return mirrored_strategy.run(step_fn, args=(per_replica_inputs,)) for _ in range(epochs): iterator = iter(dist_dataset) for _ in range(steps_per_epoch): output = train_step(next(iterator)) tf.print(output)

Preprocesamiento de datos

Hasta ahora, ha aprendido a distribuir un tf.data.Dataset. Pero antes de que los datos estén listos para el modelo, hay que preprocesarlos, por ejemplo, limpiarlos, transformarlos y aumentarlos. Dos grupos de estas útiles herramientas son:

  • Capas de preprocesamiento de Keras: un conjunto de capas Keras que permiten a los desarrolladores crear canalizaciones de procesamiento de entrada nativas de Keras. Algunas capas de preprocesamiento Keras contienen estados no entrenables, que pueden fijarse al inicio o modificarlas con adapt (consulte la sección adapt de la guía Capas de preprocesamiento Keras). Si distribuye capas de preprocesamiento con estados, éstos deben replicarse a todos los trabajadores. Para usar estas capas, puede hacer que formen parte del modelo o aplicarlas a los conjuntos de datos.

  • TensorFlow Transform (tf.Transform): una librería para TensorFlow que permite definir la transformación de datos tanto a nivel de instancia como de paso completo a través de canalizaciones de preprocesamiento de datos. Tensorflow Transform tiene dos fases. La primera es la fase Analizar, en la que los datos de entrenamiento sin procesar se analizan en un proceso de paso completo para calcular las estadísticas necesarias para las transformaciones, y la lógica de transformación se genera como operaciones a nivel de instancia. La segunda es la fase Transform, en la que los datos de entrenamiento brutos son transformados dentro de un proceso a nivel de instancia.

Capas de preprocesamiento Keras vs. Tensorflow Transform

Tanto Tensorflow Transform como las capas de preprocesamiento de Keras ofrecen el modo de dividir el preprocesamiento durante el entrenamiento y agrupar el preprocesamiento con un modelo durante la inferencia, reduciendo el sesgo entrenar/servir.

Tensorflow Transform, profundamente integrado con TFX, ofrece una solución escalable de mapeo-reducción para analizar y transformar conjuntos de datos de cualquier tamaño en un trabajo independiente del canal de entrenamiento. Si necesita un análisis de un conjunto de datos que no cabe en una sola máquina, Tensorflow Transform debe ser su primera opción.

Las capas de preprocesamiento de Keras están más orientadas al preprocesamiento aplicado durante el entrenamiento, después de leer los datos del disco. Se adaptan perfectamente al desarrollo de modelos en la librería Keras. Permiten analizar un conjunto de datos más pequeño mediante adapt y permiten casos de uso como el aumento de datos de imágenes, donde cada pasada sobre el conjunto de datos de entrada producirá ejemplos diferentes para el entrenamiento.

También se pueden mezclar las dos librerías, usando Tensorflow Transform para el análisis y las transformaciones estáticas de los datos de entrada, y las capas de preprocesamiento de Keras para las transformaciones en tiempo de entrenamiento (por ejemplo, la codificación en un solo paso o aumentación de datos).

Buenas prácticas con tf.distribute

Trabajar con ambas herramientas implica inicializar la lógica de transformación que se aplicará a los datos, lo que podría crear recursos Tensorflow. Estos recursos o estados deben replicarse a todos los trabajadores para guardar la comunicación entre trabajadores o trabajador-coordinador. Se recomienda para ello crear capas de preprocesamiento Keras, tft.TFTransformOutput.transform_features_layer, o tft.TransformFeaturesLayer bajo tf.distribute.Strategy.scope, igual que haría con cualquier otra capa Keras.

Los siguientes ejemplos demuestran el uso de la API tf.distribute.Strategy con la API de alto nivel de Keras Model.fit y con un bucle de entrenamiento personalizado por separado.

Notas adicionales para los usuarios de las capas de preprocesamiento Keras:

Capas de preprocesamiento y grandes vocabularios

Cuando trata con vocabularios grandes (más de un gigabyte) en un entorno multitrabajador (por ejemplo, tf.distribute.MultiWorkerMirroredStrategy, tf.distribute.experimental.ParameterServerStrategy, tf.distribute.TPUStrategy), se recomienda guardar el vocabulario en un archivo estático accesible desde todos los trabajadores (por ejemplo, con almacenamiento en la nube). Esto reducirá el tiempo dedicado a replicar el vocabulario a todos los trabajadores durante el entrenamiento.

Preprocesamiento en la canalización tf.data versus en el modelo

Mientras que las capas de preprocesamiento de Keras pueden aplicarse como parte del modelo o directamente a un tf.data.Dataset, cada una de las opciones viene con su ventaja:

  • Aplicar las capas de preprocesamiento dentro del modelo hace que su modelo sea portátil, y ayuda a reducir el sesgo de entrenamiento/servicio. (Para más detalles, consulte la sección Beneficios de realizar el preprocesamiento dentro del modelo en el momento de la inferencia de la guía Trabajar con capas de preprocesamiento)

  • Aplicarla dentro de la canalización tf.data permite preextraer o aliviar la carga de la CPU, lo que generalmente ofrece un mejor rendimiento cuando se usan aceleradores.

Cuando se ejecuta en una o más TPU, los usuarios casi siempre deben colocar las capas de preprocesamiento de Keras en la canalización tf.data, ya que no todas las capas son compatibles con las TPU, y las operaciones de cadena no se ejecutan en las TPU. (Las dos excepciones son tf.keras.layers.Normalization y tf.keras.layers.Rescaling, que se ejecutan sin problemas en las TPU y se suelen usar como primera capa en un modelo de imagen).

Preprocesamiento con Model.fit

Al usar Model.fit de Keras, no necesita distribuir los datos con tf.distribute.Strategy.experimental_distribute_dataset ni con tf.distribute.Strategy.distribute_datasets_from_function. Consulte la guía Trabajar con capas de preprocesamiento y la guía Entrenamiento distribuido con Keras para más detalles. Un ejemplo abreviado puede tener el siguiente aspecto:

strategy = tf.distribute.MirroredStrategy() with strategy.scope(): # Create the layer(s) under scope. integer_preprocessing_layer = tf.keras.layers.IntegerLookup(vocabulary=FILE_PATH) model = ... model.compile(...) dataset = dataset.map(lambda x, y: (integer_preprocessing_layer(x), y)) model.fit(dataset)

Los usuarios de tf.distribute.experimental.ParameterServerStrategy con la API Model.fit deben usar un tf.keras.utils.experimental.DatasetCreator como entrada. (Para más información, consulte la guía Entrenamiento del servidor de parámetros)

strategy = tf.distribute.experimental.ParameterServerStrategy( cluster_resolver, variable_partitioner=variable_partitioner) with strategy.scope(): preprocessing_layer = tf.keras.layers.StringLookup(vocabulary=FILE_PATH) model = ... model.compile(...) def dataset_fn(input_context): ... dataset = dataset.map(preprocessing_layer) ... return dataset dataset_creator = tf.keras.utils.experimental.DatasetCreator(dataset_fn) model.fit(dataset_creator, epochs=5, steps_per_epoch=20, callbacks=callbacks)

Preprocesamiento con un bucle de entrenamiento personalizado

Cuando escriba un bucle de entrenamiento personalizado, distribuirá sus datos con la API tf.distribute.Strategy.experimental_distribute_dataset o la API tf.distribute.Strategy.distribute_datasets_from_function API. Si distribuye su conjunto de datos mediante tf.distribute.Strategy.experimental_distribute_dataset, cuando aplique estas API de preprocesamiento en su canalización de datos, los recursos se ubicarán automáticamente con la canalización de datos para evitar el acceso remoto a los recursos. Por ello, todos los ejemplos de aquí usarán tf.distribute.Strategy.distribute_datasets_from_function, en cuyo caso es crucial situar la inicialización de estas API bajo strategy.scope() por eficiencia:

strategy = tf.distribute.MirroredStrategy() vocab = ["a", "b", "c", "d", "f"] with strategy.scope(): # Create the layer(s) under scope. layer = tf.keras.layers.StringLookup(vocabulary=vocab) def dataset_fn(input_context): # a tf.data.Dataset dataset = tf.data.Dataset.from_tensor_slices(["a", "c", "e"]).repeat() # Custom your batching, sharding, prefetching, etc. global_batch_size = 4 batch_size = input_context.get_per_replica_batch_size(global_batch_size) dataset = dataset.batch(batch_size) dataset = dataset.shard( input_context.num_input_pipelines, input_context.input_pipeline_id) # Apply the preprocessing layer(s) to the tf.data.Dataset def preprocess_with_kpl(input): return layer(input) processed_ds = dataset.map(preprocess_with_kpl) return processed_ds distributed_dataset = strategy.distribute_datasets_from_function(dataset_fn) # Print out a few example batches. distributed_dataset_iterator = iter(distributed_dataset) for _ in range(3): print(next(distributed_dataset_iterator))

Tenga en cuenta que si está entrenando con tf.distribute.experimental.ParameterServerStrategy, también llamará a tf.distribute.experimental.coordinator.ClusterCoordinator.create_per_worker_dataset

@tf.function def per_worker_dataset_fn(): return strategy.distribute_datasets_from_function(dataset_fn) per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn) per_worker_iterator = iter(per_worker_dataset)

Para Tensorflow Transform, como ya se ha mencionado, la etapa Analizar se realiza por separado del entrenamiento y, por tanto, se omite aquí. Consulte el tutorial para ver cómo hacerlo. Normalmente, esta etapa incluye la creación de una función de preprocesamiento tf.Transform y la transformación de los datos en una canalización Apache Beam con esta función de preprocesamiento. Para el final de la etapa Analizar, la salida puede exportarse como un grafo TensorFlow que puede usar tanto para entrenamiento como para servicio. Nuestro ejemplo sólo cubre la parte del canal de entrenamiento:

with strategy.scope(): # working_dir contains the tf.Transform output. tf_transform_output = tft.TFTransformOutput(working_dir) # Loading from working_dir to create a Keras layer for applying the tf.Transform output to data tft_layer = tf_transform_output.transform_features_layer() ... def dataset_fn(input_context): ... dataset.map(tft_layer, num_parallel_calls=tf.data.AUTOTUNE) ... return dataset distributed_dataset = strategy.distribute_datasets_from_function(dataset_fn)

Lotes parciales

Los lotes parciales aparecen cuando 1) las instancias tf.data.Dataset que crean los usuarios pueden contener tamaños de lote que no son divisibles uniformemente por el número de réplicas; o 2) cuando la cardinalidad de la instancia del conjunto de datos no es divisible por el tamaño del lote. Esto significa que cuando el conjunto de datos se distribuye en varias réplicas, la llamada next a algunos iteradores dará como resultado un tf.errors.OutOfRangeError. Para gestionar este caso de uso, tf.distribute devuelve lotes ficticios de tamaño 0 en las réplicas que no tienen más datos que procesar.

Para el caso de un solo trabajador, si los datos no son devueltos por la llamada next del iterador, se crean lotes ficticios de tamaño 0 y se usan junto con los datos reales del conjunto de datos. En el caso de lotes parciales, el último lote global de datos contendrá datos reales junto con lotes ficticios de datos. La condición de parada para procesar los datos comprueba ahora si alguna de las réplicas tiene datos. Si no hay datos en ninguna de las réplicas, obtendrá un tf.errors.OutOfRangeError.

En el caso de múltiples trabajadores, el valor booleano que representa la presencia de datos en cada uno de los trabajadores se agrega utilizando la comunicación entre réplicas y se usa para identificar si todos los trabajadores han terminado de procesar el conjunto de datos distribuido. Como esto implica la comunicación entre trabajadores, hay cierta sanción de rendimiento.

Precauciones

  • Al usar las APIs tf.distribute.Strategy.experimental_distribute_dataset con una configuración multitrabajador, pasa un tf.data.Dataset que lee de los archivos. Si la tf.data.experimental.AutoShardPolicy se configura en AUTO o FILE, el tamaño real del lote por paso puede ser menor que el que definió para el tamaño global del lote. Por ejemplo, esto ocurre cuando los elementos restantes del archivo son menores que el tamaño de lote global. Puede agotar el conjunto de datos sin depender del número de pasos a ejecutar, o configurar tf.data.experimental.AutoShardPolicy a DATA para evitarlo.

  • Las transformaciones con estado de los conjuntos de datos no se admiten actualmente con tf.distribute y se ignoran las operaciones con estado que pueda tener el conjunto de datos. Por ejemplo, si su conjunto de datos tiene un map_fn que usa tf.random.uniform para rotar una imagen, entonces tiene un grafo de conjunto de datos que depende del estado (es decir, de la semilla aleatoria) en la máquina local donde se está ejecutando el proceso python.

  • Las tf.data.experimental.OptimisationOptions experimentales, que están desactivadas por defecto, pueden degradar el rendimiento en determinados contextos, como al usarse con tf.distribute. Sólo debe habilitarlas si está seguro de que serán útiles para el rendimiento de su carga de trabajo en un entorno distribuido.

  • Consulte esta guía para saber cómo optimizar su canalización de entrada con tf.data en general. Algunos consejos adicionales:

    • Si tiene varios trabajadores y usa tf.data.Dataset.list_files para crear un conjunto de datos con todos los archivos que coincidan con uno o varios patrones glob, recuerde configurar el argumento seed o configurar shuffle=False para que cada trabajador desglose el archivo de forma consistente.

  • Si su canalización de entrada incluye tanto barajar los datos a nivel de registro como analizarlos, a menos que los datos no parseados sean significativamente mayores que los analizados (que no suele ser el caso), barajée primero y luego parsée, como se muestra en el siguiente ejemplo. Esto puede ayudarle a optimizar el uso de la memoria y el rendimiento.

d = tf.data.Dataset.list_files(pattern, shuffle=False) d = d.shard(num_workers, worker_index) d = d.repeat(num_epochs) d = d.shuffle(shuffle_buffer_size) d = d.interleave(tf.data.TFRecordDataset, cycle_length=num_readers, block_length=1) d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) mantienen un búfer interno de buffer_size elementos, por lo que reducir buffer_size podría aliviar el problema de OOM.

  • Si usas tf.distribute.experimental_distribute_dataset o tf.distribute.distribute_datasets_from_function, no se garantiza el orden en que los trabajadores procesan los datos. Normalmente, esto es necesario si usas tf.distribute para escalar la predicción. Sin embargo, puedes insertar un índice para cada elemento del lote y ordenar las salidas en consecuencia. El siguiente fragmento es un ejemplo de cómo ordenar las salidas.

Nota: aquí se usa tf.distribute.MirroredStrategy por comodidad. Sólo necesita reordenar las entradas al usar varios trabajadores, pero tf.distribute.MirroredStrategy se usa para distribuir el entrenamiento en un único trabajador.

mirrored_strategy = tf.distribute.MirroredStrategy() dataset_size = 24 batch_size = 6 dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size) dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset) def predict(index, inputs): outputs = 2 * inputs return index, outputs result = {} for index, inputs in dist_dataset: output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs)) indices = list(mirrored_strategy.experimental_local_results(output_index)) rindices = [] for a in indices: rindices.extend(a.numpy()) outputs = list(mirrored_strategy.experimental_local_results(outputs)) routputs = [] for a in outputs: routputs.extend(a.numpy()) for i, value in zip(rindices, routputs): result[i] = value print(result)

Si se da el caso de que los usuarios no pueden usar un tf.data.Dataset para representar sus entradas y, posteriormente, las APIs mencionadas para distribuir el conjunto de datos a varios dispositivos, puede usar tensores en bruto o entradas de un generador.

Use experimental_distribute_values_from_function para entradas tensor arbitrarias

strategy.run acepta tf.distribute.DistributedValues, que es la salida de next(iterador). Para pasar los valores tensor, use tf.distribute.Strategy.experimental_distribute_values_from_function para construir tf.distribute.DistributedValues a partir de tensores en bruto. Con esta opción, el usuario tendrá que especificar su propia lógica de distribución en lotes y fragmentación en la función de entrada, lo que puede hacerse usando el objeto de entrada tf.distribute.experimental.ValueContext.

mirrored_strategy = tf.distribute.MirroredStrategy() def value_fn(ctx): return tf.constant(ctx.replica_id_in_sync_group) distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn) for _ in range(4): result = mirrored_strategy.run(lambda x: x, args=(distributed_values,)) print(result)

Use tf.data.Dataset.from_generator si su entrada viene de un generador

Si tiene una función generadora que quiera usar, puede crear una instancia tf.data.Dataset utilizando la API from_generator.

Nota: Esto no es compatible actualmente con tf.distribute.TPUStrategy.

mirrored_strategy = tf.distribute.MirroredStrategy() def input_gen(): while True: yield np.random.rand(4) # use Dataset.from_generator dataset = tf.data.Dataset.from_generator( input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4])) dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset) iterator = iter(dist_dataset) for _ in range(4): result = mirrored_strategy.run(lambda x: x, args=(next(iterator),)) print(result)