Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/es-419/tutorials/distribute/custom_training.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Entrenamiento personalizado con tf.distribute.Strategy

Este tutorial muestra cómo usar tf.distribute.Strategy, una API de TensorFlow que da una abstracción para distribuir su entrenamiento entre múltiples unidades de procesamiento (GPUs, múltiples máquinas o TPUs), con bucles de entrenamiento personalizados. Aquí entrenarás una red neuronal convolucional sencilla en el conjunto de datos Fashion MNIST, que contiene 70,000 imágenes de tamaño 28 x 28.

Los bucles de entrenamiento personalizados dan flexibilidad y un mayor control sobre el entrenamiento. También facilitan depurar el modelo y el bucle de entrenamiento.

# Import TensorFlow import tensorflow as tf # Helper libraries import numpy as np import os print(tf.__version__)

Descargar el conjunto de datos Fashion MNIST

fashion_mnist = tf.keras.datasets.fashion_mnist (train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data() # Add a dimension to the array -> new shape == (28, 28, 1) # This is done because the first layer in our model is a convolutional # layer and it requires a 4D input (batch_size, height, width, channels). # batch_size dimension will be added later on. train_images = train_images[..., None] test_images = test_images[..., None] # Scale the images to the [0, 1] range. train_images = train_images / np.float32(255) test_images = test_images / np.float32(255)

Cree una estrategia para distribuir las variables y el grafo

¿Cómo funciona la estrategia tf.distribute.MirroredStrategy?

  • Todas las variables y el grafo del modelo se reproducen en las réplicas.

  • Las entradas se distribuyen uniformemente entre las réplicas.

  • Cada réplica calcula la pérdida y los gradientes de la entrada que ha recibido.

  • Se suman los gradientes de todas las réplicas para sincronizarlos.

  • Tras la sincronización, se realiza la misma actualización en las copias de las variables de cada réplica.

Nota: Puedes poner todo el código siguiente dentro de un único ámbito. Este ejemplo lo divide en varias celdas de código para ilustrarlo.

# If the list of devices is not specified in # `tf.distribute.MirroredStrategy` constructor, they will be auto-detected. strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

Configurar la canalización de entrada

BUFFER_SIZE = len(train_images) BATCH_SIZE_PER_REPLICA = 64 GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync EPOCHS = 10

Crea los conjuntos de datos y distribúyelos:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset) test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

Crear el modelo

Crea un modelo usando tf.keras.Sequential. También puedes usar la API de subclase de modelos o la API funcional para hacerlo.

def create_model(): regularizer = tf.keras.regularizers.L2(1e-5) model = tf.keras.Sequential([ tf.keras.layers.Conv2D(32, 3, activation='relu', kernel_regularizer=regularizer), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Conv2D(64, 3, activation='relu', kernel_regularizer=regularizer), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Flatten(), tf.keras.layers.Dense(64, activation='relu', kernel_regularizer=regularizer), tf.keras.layers.Dense(10, kernel_regularizer=regularizer) ]) return model
# Create a checkpoint directory to store the checkpoints. checkpoint_dir = './training_checkpoints' checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

Definir la función de pérdida

Recordemos que la función de pérdida consta de una o dos partes:

  • La pérdida por predicción mide lo alejadas que están las predicciones del modelo de las etiquetas de entrenamiento para un lote de ejemplos de entrenamiento. Se calcula para cada ejemplo etiquetado y luego se reduce para todo el lote calculando el valor promedio.

  • Opcionalmente, se pueden añadir términos de pérdida por regularización a la pérdida por predicción, para evitar que el modelo se ajuste en exceso a los datos del entrenamiento. Una elección común es la regularización L2, que añade un pequeño múltiplo fijo de la suma de cuadrados de todas las ponderaciones del modelo, independientemente del número de ejemplos. El modelo anterior usa la regularización L2 para demostrar su manejo en el bucle de entrenamiento a continuación.

Para el entrenamiento en una sola máquina con una sola GPU/CPU, esto funciona de la siguiente manera:

  • La pérdida por predicción se calcula para cada ejemplo del lote, se suma para todo el lote y luego se divide por el tamaño del lote.

  • La pérdida por regularización se añade a la pérdida por predicción.

  • El gradiente de la pérdida total se calcula respecto a cada ponderación del modelo, y el optimizador actualiza cada ponderación del modelo a partir del gradiente correspondiente.

Con tf.distribute.Strategy, el lote de entrada se divide entre las réplicas. Por ejemplo, supongamos que tiene 4 GPUs, cada una con una réplica del modelo. Un lote de 256 ejemplos de entrada se distribuye uniformemente entre las 4 réplicas, por lo que cada réplica recibe un lote de tamaño 64: Tenemos 256 = 4*64, o en general GLOBAL_BATCH_SIZE = num_replicas_in_sync * BATCH_SIZE_PER_REPLICA.

Cada réplica calcula la pérdida a partir de los ejemplos de entrenamiento que obtiene y calcula los gradientes de la pérdida respecto a cada ponderación del modelo. El optimizador se encarga de que estos gradientes se sumen en todas las réplicas antes de usarlos para actualizar las copias de las ponderaciones del modelo en cada réplica.

Entonces, ¿cómo se debe calcular la pérdida cuando se usa una tf.distribute.Strategy?

  • Cada réplica calcula la pérdida por predicción de todos los ejemplos que se le han distribuido, suma los resultados y los divide entre num_replicas_in_sync * BATCH_SIZE_PER_REPLICA, o lo que es lo mismo, GLOBAL_BATCH_SIZE.

  • Cada réplica computa la(s) pérdida(s) por regularización y las divide entre num_replicas_in_sync.

En comparación con el entrenamiento no distribuido, todos los términos de pérdida por cada réplica se reducen en un factor de 1/num_replicas_in_sync. Por otra parte, todos los términos de pérdida (o mejor dicho, sus gradientes) se suman a lo largo de ese número de réplicas antes de que el optimizador los aplique. En efecto, el optimizador en cada réplica usa los mismos gradientes que si se hubiera producido un cálculo no distribuido con GLOBAL_BATCH_SIZE. Esto es consistente con el comportamiento distribuido y no distribuido de Model.fit de Keras. Véase el tutorial Entrenamiento distribuido con Keras sobre cómo un mayor tamaño de lote global permite escalar la tasa de aprendizaje.

¿Cómo hacerlo en TensorFlow?

  • La reducción de pérdidas y el escalado se realizan automáticamente en Model.compile y Model.fit de Keras

  • Si está escribiendo un bucle de entrenamiento personalizado, como en este tutorial, debe sumar las pérdidas por cada ejemplo y dividir la suma entre el tamaño global del lote utilizando tf.nn.compute_average_loss, que toma las pérdidas por cada ejemplo y las ponderaciones opcionales de las muestras como argumentos y devuelve la pérdida escalada.

  • Si se usan las clases tf.keras.losses (como en el ejemplo siguiente), es necesario especificar explícitamente que la reducción de pérdidas sea una de las siguientes NONE o SUM. Los valores predeterminados AUTO y SUM_OVER_BATCH_SIZE no están permitidos fuera de Model.fit.

    • AUTO no está permitido porque el usuario debe pensar explícitamente qué reducción desea para asegurarse de que es correcta en el caso distribuido.

    • SUM_OVER_BATCH_SIZE no está permitido porque actualmente sólo dividiría entre el tamaño del lote por réplica, y dejaría la división entre el número de réplicas al usuario, lo que podría ser fácil de pasar por alto. Así que, en su lugar, tiene que hacer la reducción usted mismo de forma explícita.

  • Si está escribiendo un bucle de entrenamiento personalizado para un modelo con una lista no vacía de Model.losses (por ejemplo, regularizadores de ponderación), debe sumarlos y dividir la suma entre el número de réplicas. Puede hacerlo usando la función tf.nn.scale_regularization_loss. El propio código del modelo permanece ajeno al número de réplicas.

Sin embargo, los modelos pueden definir pérdidas de regularización dependientes de la entrada con API de Keras como Layer.add_loss(...) y Layer(activity_regularizer=...). Para Layer.add_loss(...), corresponde al código de modelado realizar la división de los términos sumados por cada ejemplo entre el tamaño del lote por cada réplica(!), por ejemplo, usando tf.math.reduce_mean().

with strategy.scope(): # Set reduction to `NONE` so you can do the reduction yourself. loss_object = tf.keras.losses.SparseCategoricalCrossentropy( from_logits=True, reduction=tf.keras.losses.Reduction.NONE) def compute_loss(labels, predictions, model_losses): per_example_loss = loss_object(labels, predictions) loss = tf.nn.compute_average_loss(per_example_loss) if model_losses: loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses)) return loss

Casos especiales

Los usuarios avanzados también deben tener en cuenta los siguientes casos especiales.

  • Los lotes de entrada más cortos que GLOBAL_BATCH_SIZE crean casos límite desagradables en varios lugares. En la práctica, suele funcionar mejor evitarlos permitiendo que los lotes abarquen los límites de las épocas usando Dataset.repeat().batch() y definiendo las épocas aproximadas por conteos de pasos, no por los extremos de los conjuntos de datos. Alternativamente, Dataset.batch(drop_remainder=True) mantiene la noción de época pero elimina algunos de los últimos ejemplos.

Como ilustración, este ejemplo va por el camino más difícil y permite lotes cortos, de modo que cada época de entrenamiento contenga cada ejemplo entrenado exactamente una vez.

¿Qué denominador debe usar tf.nn.compute_average_loss()?

* By default, in the example code above and equivalently in `Keras.fit()`, the sum of prediction losses is divided by `num_replicas_in_sync` times the actual batch size seen on the replica (with empty batches silently ignored). This preserves the balance between the prediction loss on the one hand and the regularization losses on the other hand. It is particularly appropriate for models that use input-dependent regularization losses. Plain L2 regularization just superimposes weight decay onto the gradients of the prediction loss and is less in need of such a balance. * In practice, many custom training loops pass as a constant Python value into `tf.nn.compute_average_loss(..., global_batch_size=GLOBAL_BATCH_SIZE)` to use it as the denominator. This preserves the relative weighting of training examples between batches. Without it, the smaller denominator in short batches effectively upweights the examples in those. (Before TensorFlow 2.13, this was also needed to avoid NaNs in case some replica received an actual batch size of zero.)

Ambas opciones son equivalentes si se evitan los lotes cortos, como se ha sugerido anteriormente.

  • Las labels multidimensionales requieren que se promedie la per_example_loss entre el número de predicciones en cada ejemplo. Considere una tarea de clasificación para todos los pixeles de una imagen de entrada, con predictions de forma (batch_size, H, W, n_classes) y labels de forma (batch_size, H, W). Tendrá que actualizar per_example_loss como: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

Precaución: Verifique la forma de su pérdida. Las funciones de pérdida en tf.losses/tf.keras.losses suelen devolver el promedio sobre la última dimensión de la entrada. Las clases de pérdida envuelven estas funciones. Pasar reduction=Reduction.NONE al crear una instancia de una clase de pérdida significa "ninguna reducción adicional". Para las pérdidas categóricas con una forma de entrada de ejemplo de [batch, W, H, n_clases] se reduce la dimensión n_clases. Para las pérdidas puntuales como losses.mean_squared_error o losses.binary_crossentropy incluye un eje ficticio de modo que [batch, W, H, 1] se reduzca a [batch, W, H]. Sin el eje ficticio, [batch, W, H] se reducirá incorrectamente a [batch, W].

Definir las métricas para controlar las pérdidas y la precisión

Estas métricas realizan un seguimiento de la pérdida por prueba y de la precisión del entrenamiento y de la prueba. Puedes usar .result() para obtener las estadísticas acumuladas en cualquier momento.

with strategy.scope(): test_loss = tf.keras.metrics.Mean(name='test_loss') train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy( name='train_accuracy') test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy( name='test_accuracy')

Bucle de entrenamiento

# A model, an optimizer, and a checkpoint must be created under `strategy.scope`. with strategy.scope(): model = create_model() optimizer = tf.keras.optimizers.Adam(learning_rate=0.001) checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs): images, labels = inputs with tf.GradientTape() as tape: predictions = model(images, training=True) loss = compute_loss(labels, predictions, model.losses) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) train_accuracy.update_state(labels, predictions) return loss def test_step(inputs): images, labels = inputs predictions = model(images, training=False) t_loss = loss_object(labels, predictions) test_loss.update_state(t_loss) test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it # with the distributed input. @tf.function def distributed_train_step(dataset_inputs): per_replica_losses = strategy.run(train_step, args=(dataset_inputs,)) return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None) @tf.function def distributed_test_step(dataset_inputs): return strategy.run(test_step, args=(dataset_inputs,)) for epoch in range(EPOCHS): # TRAIN LOOP total_loss = 0.0 num_batches = 0 for x in train_dist_dataset: total_loss += distributed_train_step(x) num_batches += 1 train_loss = total_loss / num_batches # TEST LOOP for x in test_dist_dataset: distributed_test_step(x) if epoch % 2 == 0: checkpoint.save(checkpoint_prefix) template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, " "Test Accuracy: {}") print(template.format(epoch + 1, train_loss, train_accuracy.result() * 100, test_loss.result(), test_accuracy.result() * 100)) test_loss.reset_states() train_accuracy.reset_states() test_accuracy.reset_states()

Cosas a notar en el ejemplo anterior:

  • Iterar sobre el conjunto de datos train_dist_dataset y test_dist_dataset usando una construcción for x in ....

  • La pérdida escalada es el valor de retorno del distributed_train_step. Este valor se agrega entre réplicas usando la llamada tf.distribute.Strategy.reduce y luego entre lotes sumando el valor de retorno de las llamadas tf.distribute.Strategy.reduce.

  • tf.keras.Metrics debe actualizarse dentro de train_step y test_step que es ejecutado por tf.distribute.Strategy.run.

  • tf.distribute.Strategy.run devuelve resultados de cada réplica local de la estrategia, y hay varias formas de consumir este resultado. Puedes hacer tf.distribute.Strategy.reduce para obtener un valor agregado. También puedes hacer tf.distribute.Strategy.experimental_local_results para obtener la lista de valores contenidos en el resultado, uno por réplica local.

Restaurar el último punto de verificación y probar

Un modelo verificado con un tf.distribute.Strategy puede restaurarse con o sin estrategia.

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy( name='eval_accuracy') new_model = create_model() new_optimizer = tf.keras.optimizers.Adam() test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function def eval_step(images, labels): predictions = new_model(images, training=False) eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model) checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir)) for images, labels in test_dataset: eval_step(images, labels) print('Accuracy after restoring the saved model without strategy: {}'.format( eval_accuracy.result() * 100))

Formas alternativas de iterar sobre un conjunto de datos

Usando iteradores

Si quieres iterar sobre un número determinado de pasos y no sobre todo el conjunto de datos, puedes crear un iterador utilizando la llamada iter y llamar explícitamente a next en el iterador. Puedes elegir iterar sobre el conjunto de datos tanto dentro como fuera de la tf.function. Aquí puedes ver un pequeño fragmento que demuestra la iteración sobre el conjunto de datos fuera de la tf.function usando un iterador.

for _ in range(EPOCHS): total_loss = 0.0 num_batches = 0 train_iter = iter(train_dist_dataset) for _ in range(10): total_loss += distributed_train_step(next(train_iter)) num_batches += 1 average_train_loss = total_loss / num_batches template = ("Epoch {}, Loss: {}, Accuracy: {}") print(template.format(epoch + 1, average_train_loss, train_accuracy.result() * 100)) train_accuracy.reset_states()

Iterar dentro de una tf.function

También puedes iterar sobre toda la entrada train_dist_dataset dentro de una tf.function usando la construcción for x in ... o creando iteradores igual que hiciste antes. El siguiente ejemplo muestra cómo envolver una época de entrenamiento con un decorador @tf.function e iterar sobre train_dist_dataset dentro de la función.

@tf.function def distributed_train_epoch(dataset): total_loss = 0.0 num_batches = 0 for x in dataset: per_replica_losses = strategy.run(train_step, args=(x,)) total_loss += strategy.reduce( tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None) num_batches += 1 return total_loss / tf.cast(num_batches, dtype=tf.float32) for epoch in range(EPOCHS): train_loss = distributed_train_epoch(train_dist_dataset) template = ("Epoch {}, Loss: {}, Accuracy: {}") print(template.format(epoch + 1, train_loss, train_accuracy.result() * 100)) train_accuracy.reset_states()

Seguimiento de la pérdida por entrenamiento en las réplicas

Nota: Como regla general, debes usar tf.keras.Metrics para hacer un seguimiento de los valores por muestra y evitar los valores que se hayan agregado dentro de una réplica.

Puesto que se lleva a cabo un cálculo de escalado de pérdidas, no se recomienda usar tf.keras.metrics.Mean para hacer un seguimiento de la pérdida por entrenamiento en distintas réplicas.

Por ejemplo, si ejecutas un trabajo de entrenamiento con las siguientes características:

  • Dos réplicas

  • Se procesan dos muestreos en cada réplica

  • Valores de pérdida resultantes: [2, 3] y [4, 5] en cada réplica

  • Tamaño global del lote = 4

Con el escalado de pérdidas, calculas el valor de pérdida por muestra en cada réplica sumando los valores de pérdida y dividiéndolos por el tamaño global del lote. En este caso (2 + 3) / 4 = 1,25 y (4 + 5) / 4 = 2,25.

Si usas tf.keras.metrics.Mean para hacer un seguimiento de la pérdida en las dos réplicas, el resultado es distinto. En este ejemplo, al final tienes un total de 3.50 y un count de 2, lo que da como resultado total/count = 1.75 cuando se llama a result() en la métrica. La pérdida calculada con tf.keras.Metrics se escala con un factor adicional que es igual al número de réplicas sincronizadas.

Guía y ejemplos

Estos son algunos ejemplos de cómo usar la estrategia de distribución con bucles de entrenamiento personalizados:

  1. Guía de entrenamiento distribuido

  2. Ejemplo de DenseNet usando MirroredStrategy.

  3. Ejemplo de BERT entrenado usando MirroredStrategy y TPUStrategy. Este ejemplo es especialmente útil para entender cómo cargar desde un punto de verificación y generar puntos de verificación periódicos durante el entrenamiento distribuido, etc.

  4. Ejemplo de NCF entrenado usando MirroredStrategy que puede ser habilitado usando la bandera keras_use_ctl.

  5. Ejemplo de NMT entrenado usando MirroredStrategy.

Puedes encontrar más ejemplos en Ejemplos y tutoriales la Guía de estrategias de distribución.

Siguientes pasos