Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/pt-br/tutorials/distribute/custom_training.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Treinamento personalizado com tf.distribute.Strategy

Este tutorial demonstra como usar o tf.distribute.Strategy, uma API do TensorFlow que fornece uma abstração para distribuir o treinamento em diversas unidades de processamento (GPUs, diversas máquinas ou TPUs), com loops de treinamento personalizado. Neste exemplo, você treinará uma rede neural convolucional simples no dataset Fashion MNIST, que contém mais de 70 mil imagens de tamanho 28x28 pixels.

Os loops de treinamento personalizados oferecem flexibilidade e um controle maior do treinamento. Além disso, facilitam a depuração do modelo e do loop de treinamento.

# Import TensorFlow import tensorflow as tf # Helper libraries import numpy as np import os print(tf.__version__)

Download do dataset Fashion MNIST

fashion_mnist = tf.keras.datasets.fashion_mnist (train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data() # Add a dimension to the array -> new shape == (28, 28, 1) # This is done because the first layer in our model is a convolutional # layer and it requires a 4D input (batch_size, height, width, channels). # batch_size dimension will be added later on. train_images = train_images[..., None] test_images = test_images[..., None] # Scale the images to the [0, 1] range. train_images = train_images / np.float32(255) test_images = test_images / np.float32(255)

Criação de uma estratégia para distribuir as variáveis e o grafo

Como a estratégia tf.distribute.MirroredStrategy funciona?

  • Todas as variáveis e o modelo de grafos são replicados em todas as réplicas.

  • As entradas são distribuídas para as réplicas de maneira uniforme.

  • Cada réplica calcula a perda e os gradientes para a entrada recebida.

  • Os gradientes são sincronizados em todas as réplicas através da soma.

  • Após a sincronização, a mesma atualização é feita nas cópias das variáveis em cada réplica.

Observação: você pode colocar todo o código abaixo em um único escopo. Este exemplo divide-o em diversas células de código para fins de exemplificação.

# If the list of devices is not specified in # `tf.distribute.MirroredStrategy` constructor, they will be auto-detected. strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

Configuração de um pipeline de entrada

BUFFER_SIZE = len(train_images) BATCH_SIZE_PER_REPLICA = 64 GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync EPOCHS = 10

Crie e distribua os datasets:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset) test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

Criação do modelo

Crie um modelo usando tf.keras.Sequential. Também é possível usar a API de subclasse de modelos ou a API funcional para fazer isso.

def create_model(): regularizer = tf.keras.regularizers.L2(1e-5) model = tf.keras.Sequential([ tf.keras.layers.Conv2D(32, 3, activation='relu', kernel_regularizer=regularizer), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Conv2D(64, 3, activation='relu', kernel_regularizer=regularizer), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Flatten(), tf.keras.layers.Dense(64, activation='relu', kernel_regularizer=regularizer), tf.keras.layers.Dense(10, kernel_regularizer=regularizer) ]) return model
# Create a checkpoint directory to store the checkpoints. checkpoint_dir = './training_checkpoints' checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

Definição da função de perda

Lembre-se de que a função de perda consiste em uma ou duas partes:

  • A perda de previsão mede a distância entre as previsões do modelo e os rótulos de treinamento para um lote de exemplos de treinamento. Ela é computada para cada exemplo rotulado e depois reduzida em todo o lote ao computar o valor médio.

  • Opcionalmente, termos de perda de regularização podem ser adicionados à perda de previsão, para evitar que o modelo faça overfitting dos dados de treinamento. Uma escolha comum é a regularização L2, que adiciona um pequeno múltiplo fixo da soma dos quadrados de todos os pesos do modelo, independente do número de exemplos. O modelo acima usa regularização L2 para demonstrar seu tratamento no loop de treinamento abaixo.

Para treinamento numa única máquina com uma única GPU/CPU, isto funciona da seguinte maneira:

  • A perda de previsão é computada para cada exemplo no lote, somada ao lote e depois dividida pelo tamanho do lote.

  • A perda de regularização é adicionada à perda de previsão.

  • O gradiente da perda total é computado com base em cada peso de modelo, e o otimizador atualiza cada peso de modelo a partir do gradiente correspondente.

Com tf.distribute.Strategy, o lote de entrada é dividido entre réplicas. Por exemplo, digamos que você tenha 4 GPUs, cada uma com uma réplica do modelo. Um lote de 256 exemplos de entrada é distribuído uniformemente pelas 4 réplicas, então cada réplica recebe um lote de tamanho 64: Temos 256 = 4*64, ou geralmente GLOBAL_BATCH_SIZE = num_replicas_in_sync * BATCH_SIZE_PER_REPLICA.

Cada réplica computa a perda dos exemplos de treinamento que obtém e calcula os gradientes da perda em relação ao peso de cada modelo. O otimizador cuida para que esses gradientes sejam somados nas réplicas antes de usá-los para atualizar as cópias dos pesos do modelo em cada réplica.

Então, como a perda deve ser calculada ao usar um tf.distribute.Strategy?

  • Cada réplica computa a perda de previsão para todos os exemplos distribuídos a ela, soma os resultados e os divide por num_replicas_in_sync * BATCH_SIZE_PER_REPLICA, ou equivalentemente, GLOBAL_BATCH_SIZE.

  • Cada réplica computa as perdas de regularização e as divide por num_replicas_in_sync.

Em comparação com o treinamento não distribuído, todos os termos de perda por réplica são reduzidos por um fator de 1/num_replicas_in_sync. Por outro lado, todos os termos de perda – ou melhor, seus gradientes – são somados nesse número de réplicas antes que o otimizador os aplique. Na verdade, o otimizador em cada réplica usa os mesmos gradientes como se uma computação não distribuída com GLOBAL_BATCH_SIZE tivesse acontecido. Isso é consistente com o comportamento distribuído e não distribuído de Keras Model.fit. Consulte o tutorial Treinamento distribuído com Keras sobre como um tamanho de lote global maior permite aumentar a taxa de aprendizado.

Como fazer isso no TensorFlow?

  • O escalonamento e a redução da perda são feitos automaticamente no Model.compile e Model.fit do Keras.

  • Se você estiver escrevendo um loop de treinamento personalizado, como neste tutorial, você deve somar as perdas por exemplo e dividir a soma pelo tamanho global do lote usando tf.nn.compute_average_loss, que leva as perdas por exemplo e pesos de amostra opcionais como argumentos e retorna a perda escalonada.

  • Se estiver usando classes tf.keras.losses (como no exemplo abaixo), a redução de perda precisa ser especificada explicitamente como NONE ou SUM. Os padrões AUTO e SUM_OVER_BATCH_SIZE não são permitidos fora de Model.fit.

    • AUTO não é permitido porque o usuário deve pensar explicitamente sobre qual redução deseja para ter certeza de que está correta no caso distribuído.

    • SUM_OVER_BATCH_SIZE não é permitido porque atualmente ele dividiria apenas pelo tamanho do lote por réplica e deixaria a divisão pelo número de réplicas para o usuário, o que pode ser fácil de esquecer. Então, em vez disso, você mesmo precisa fazer a redução explicitamente.

  • Se você estiver escrevendo um loop de treinamento personalizado para um modelo com uma lista não vazia de Model.losses (por exemplo, regularizadores de peso), você deve resumi-los e dividir a soma pelo número de réplicas. Você pode fazer isso usando a função tf.nn.scale_regularization_loss. O código do modelo permanece alheio ao número de réplicas.

No entanto, os modelos podem definir perdas de regularização dependentes de entrada com APIs Keras, como Layer.add_loss(...) e Layer(activity_regularizer=...). Para Layer.add_loss(...), cabe ao código de modelagem realizar a divisão dos termos somados por exemplo pelo tamanho do lote por réplica(!), por exemplo, usando tf.math.reduce_mean().

with strategy.scope(): # Set reduction to `NONE` so you can do the reduction yourself. loss_object = tf.keras.losses.SparseCategoricalCrossentropy( from_logits=True, reduction=tf.keras.losses.Reduction.NONE) def compute_loss(labels, predictions, model_losses): per_example_loss = loss_object(labels, predictions) loss = tf.nn.compute_average_loss(per_example_loss) if model_losses: loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses)) return loss

Casos especiais

Usuários avançados também devem considerar os seguintes casos especiais.

  • Lotes de entrada menores que GLOBAL_BATCH_SIZE criam casos específicos e desagradáveis ​​em diversos lugares. Na prática, muitas vezes funciona melhor evitá-los, permitindo que os lotes abranjam os limites da época usando Dataset.repeat().batch() e definindo épocas aproximadas por contagens de passos, e não pelas pontas do dataset. Alternativamente, Dataset.batch(drop_remainder=True) mantém a noção de época, mas descarta os últimos exemplos.

Como ilustração, considere este exemplo que segue o caminho mais difícil e permite lotes curtos, de modo que cada época de treinamento contenha cada exemplo de treinamento exatamente uma vez.

Qual denominador deve ser usado por tf.nn.compute_average_loss()?

* By default, in the example code above and equivalently in `Keras.fit()`, the sum of prediction losses is divided by `num_replicas_in_sync` times the actual batch size seen on the replica (with empty batches silently ignored). This preserves the balance between the prediction loss on the one hand and the regularization losses on the other hand. It is particularly appropriate for models that use input-dependent regularization losses. Plain L2 regularization just superimposes weight decay onto the gradients of the prediction loss and is less in need of such a balance. * In practice, many custom training loops pass as a constant Python value into `tf.nn.compute_average_loss(..., global_batch_size=GLOBAL_BATCH_SIZE)` to use it as the denominator. This preserves the relative weighting of training examples between batches. Without it, the smaller denominator in short batches effectively upweights the examples in those. (Before TensorFlow 2.13, this was also needed to avoid NaNs in case some replica received an actual batch size of zero.)

Ambas as opções são equivalentes se forem evitados lotes curtos, conforme sugerido acima.

  • labels multidimensionais exigem que você calcule a média da per_example_loss entre o número de previsões em cada exemplo. Considere uma tarefa de classificação para todos os pixels de uma imagem de entrada, com predictions de formato (batch_size, H, W, n_classes) e labels de formato (batch_size, H, W). Você precisará atualizar per_example_loss como: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

Cuidado: verifique o formato da sua perda. As funções de perda em tf.losses/tf.keras.losses costumam retornar a média da última dimensão da entrada. As classes de perda encapsulam essas funções. Se for passado reduction=Reduction.NONE ao criar uma instância de uma classe de perda, significa "nenhuma redução adicional". Para perdas categóricas com um formato de entrada do exemplo igual [batch, W, H, n_classes], a dimensão n_classes é reduzida. Para perdas pontuais, como losses.mean_squared_error ou losses.binary_crossentropy, inclua um eixo fictício para que [batch, W, H, 1] seja reduzido para [batch, W, H]. Sem o eixo fictício, [batch, W, H] será reduzido incorretamente para [batch, W].

Definição das métricas para monitorar a perda e a exatidão

Estas métricas monitoram a perda do teste e a precisão do treinamento e do teste. É possível usar .result() para obter as estatísticas acumuladas a qualquer momento.

with strategy.scope(): test_loss = tf.keras.metrics.Mean(name='test_loss') train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy( name='train_accuracy') test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy( name='test_accuracy')

Loop de treinamento

# A model, an optimizer, and a checkpoint must be created under `strategy.scope`. with strategy.scope(): model = create_model() optimizer = tf.keras.optimizers.Adam(learning_rate=0.001) checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs): images, labels = inputs with tf.GradientTape() as tape: predictions = model(images, training=True) loss = compute_loss(labels, predictions, model.losses) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) train_accuracy.update_state(labels, predictions) return loss def test_step(inputs): images, labels = inputs predictions = model(images, training=False) t_loss = loss_object(labels, predictions) test_loss.update_state(t_loss) test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it # with the distributed input. @tf.function def distributed_train_step(dataset_inputs): per_replica_losses = strategy.run(train_step, args=(dataset_inputs,)) return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None) @tf.function def distributed_test_step(dataset_inputs): return strategy.run(test_step, args=(dataset_inputs,)) for epoch in range(EPOCHS): # TRAIN LOOP total_loss = 0.0 num_batches = 0 for x in train_dist_dataset: total_loss += distributed_train_step(x) num_batches += 1 train_loss = total_loss / num_batches # TEST LOOP for x in test_dist_dataset: distributed_test_step(x) if epoch % 2 == 0: checkpoint.save(checkpoint_prefix) template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, " "Test Accuracy: {}") print(template.format(epoch + 1, train_loss, train_accuracy.result() * 100, test_loss.result(), test_accuracy.result() * 100)) test_loss.reset_states() train_accuracy.reset_states() test_accuracy.reset_states()

Observações sobre o exemplo acima:

  • Faça a iteração de train_dist_dataset e test_dist_dataset usando um constructo for x in ....

  • A perda escalonada é o valor de retorno de distributed_train_step. Este valor é agregado entre as réplicas usando a chamada tf.distribute.Strategy.reduce e depois entre os lotes somando o valor de retorno das chamadas feitas a tf.distribute.Strategy.reduce.

  • tf.keras.Metrics deve ser atualizado dentro de train_step e test_step, que é executado por tf.distribute.Strategy.run.

  • tf.distribute.Strategy.run retorna resultados de cada réplica local da estratégia, e há diversas formas de usar esse resultado. Você pode usar tf.distribute.Strategy.reduce para obter um valor agregado. Além disso, pode usar tf.distribute.Strategy.experimental_local_results para obter a lista de valores contidos no resultado, um por réplica local.

Restauração do último checkpoint e teste

É possível restaurar um modelo com tf.distribute.Strategy que tenha um checkpoint, com ou sem uma estratégia.

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy( name='eval_accuracy') new_model = create_model() new_optimizer = tf.keras.optimizers.Adam() test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function def eval_step(images, labels): predictions = new_model(images, training=False) eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model) checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir)) for images, labels in test_dataset: eval_step(images, labels) print('Accuracy after restoring the saved model without strategy: {}'.format( eval_accuracy.result() * 100))

Maneiras alternativas de fazer a iteração de um dataset

Uso de iteradores

Se você deseja fazer a iteração de um determinado número de passos e não do dataset inteiro, pode criar um iterador usando a chamada iter e chamando explicitamente next no iterador. Você pode optar por fazer a iteração do dataset tanto dentro como fora de tf.function. Veja um pequeno trecho de código que demonstra a iteração do dataset fora de tf.function usando um iterador.

for _ in range(EPOCHS): total_loss = 0.0 num_batches = 0 train_iter = iter(train_dist_dataset) for _ in range(10): total_loss += distributed_train_step(next(train_iter)) num_batches += 1 average_train_loss = total_loss / num_batches template = ("Epoch {}, Loss: {}, Accuracy: {}") print(template.format(epoch + 1, average_train_loss, train_accuracy.result() * 100)) train_accuracy.reset_states()

Iteração dentro de tf.function

Além disso, você pode fazer a iteração de toda a entrada train_dist_dataset dentro de tf.function usando o constructo for x in ... ou criando iteradores, conforme feito acima, O exemplo abaixo demonstra o encapsulamento de uma época de treinamento com um decorador @tf.function e fazendo a iteração de train_dist_dataset dentro da função.

@tf.function def distributed_train_epoch(dataset): total_loss = 0.0 num_batches = 0 for x in dataset: per_replica_losses = strategy.run(train_step, args=(x,)) total_loss += strategy.reduce( tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None) num_batches += 1 return total_loss / tf.cast(num_batches, dtype=tf.float32) for epoch in range(EPOCHS): train_loss = distributed_train_epoch(train_dist_dataset) template = ("Epoch {}, Loss: {}, Accuracy: {}") print(template.format(epoch + 1, train_loss, train_accuracy.result() * 100)) train_accuracy.reset_states()

Monitoramento da perda de treinamento nas réplicas

Observação: como regra geral, você deve usar tf.keras.Metrics para monitorar os valores por amostra e evitar valores que foram agregados dentro de uma réplica.

Devido à computação de escalonamento de perda realizada, não é recomendável usar tf.keras.metrics.Mean para monitorar a perda de treinamento nas diferentes réplicas.

Por exemplo, se você executar um trabalho de treinamento com as seguintes características:

  • Duas réplicas

  • Duas amostras processadas em cada réplica

  • Valores de perda resultante: [2, 3] e [4, 5] em cada réplica

  • Tamanho global do lote = 4

Com o escalonamento da perda, você calcula o valor da perda por amostra em cada réplica somando os valores de perda e depois dividindo pelo tamanho global do lote. Neste caso: (2 + 3) / 4 = 1,25 e (4 + 5) / 4 = 2,25.

Se você utilizar tf.keras.metrics.Mean para monitorar a perda nas duas réplicas, o resultado será diferente. Neste exemplo, você fica com um total de 3,50 e uma count de 2, que resulta em total/count = 1,75 quando result() é chamado na métrica. A perda calculada com tf.keras.Metrics é escalonada por um fator adicional, que é igual ao número de réplicas na sincronização.

Guia e exemplos

Veja alguns exemplos de como usar a estratégia de distribuição com loops de treinamento personalizado:

  1. Guia de treinamento distribuído

  2. Exemplo de DenseNet usando MirroredStrategy.

  3. Exemplo de BERT treinado usando MirroredStrategy e TPUStrategy. Este exemplo é bastante útil para entender como carregar um checkpoint e gerar checkpoints periódicos durante o treinamento distribuído, etc.

  4. Exemplo de NCF treinado usando MirroredStrategy, que pode ser ativado utilizando o sinalizador keras_use_ctl.

  5. Exemplo de NMT treinado usando MirroredStrategy.

Confira mais exemplos na seção Exemplos e tutoriais do Guia de estratégia de distribuição.

Próximos passos