Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/pt-br/guide/core/distribution.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Treinamento distribuído com Core APIs e DTensor

Introdução

Este notebook usa as APIs de baixo nível do TensorFlow Core e o DTensor para demonstrar um exemplo de treinamento distribuído paralelo de dados. Acesse a visão geral das APIs principais para saber mais sobre o TensorFlow Core e os casos de uso pretendidos. Consulte o guia Visão geral do DTensor e o tutorial Treinamento distribuído com DTensors para saber mais sobre o DTensor.

Este exemplo usa o mesmo modelo e otimizador mostrado no tutorial sobre perceptrons multicamadas. Consulte esse tutorial primeiro para se sentir confortável ao escrever um workflow de aprendizado de máquina completo com as APIs Core.

Observação: o DTensor ainda é uma API experimental do TensorFlow, o que significa que seus recursos estão disponíveis para teste e se destinam ao uso apenas em ambientes de teste.

Visão geral do treinamento paralelo de dados com DTensor

Antes de construir um MLP que suporte distribuição, reserve um momento para explorar os fundamentos do DTensor para treinamento paralelo de dados.

O DTensor permite que você execute treinamento distribuído entre dispositivos para melhorar a eficiência, confiabilidade e escalabilidade. O DTensor distribui o programa e os tensores de acordo com as diretivas de sharding por meio de um procedimento denominado expansão de programa único, dados múltiplos (SPMD - Single Program, Multiple Data). Uma variável de uma camada que reconhece o DTensor é criada como dtensor.DVariable, e os construtores dos objetos da camada que reconhece o DTensor recebem entradas Layout adicionais, além dos parâmetros usuais da camada.

As principais ideias por trás do treinamento paralelo de dados são as seguintes:

  • Variáveis ​​do modelo são replicadas em N dispositivos cada.

  • Um lote global é dividido em N lotes por réplica.

  • Cada lote por réplica é treinado no dispositivo de réplica.

  • O gradiente é reduzido antes que os dados de ponderação sejam executados coletivamente em todas as réplicas.

  • O treinamento paralelo de dados fornece velocidade quase linear em relação ao número de dispositivos

Configuração

O DTensor faz parte da versão 2.9.0 do TensorFlow.

#!pip install --quiet --upgrade --pre tensorflow
import matplotlib from matplotlib import pyplot as plt # Preset Matplotlib figure sizes. matplotlib.rcParams['figure.figsize'] = [9, 6]
import tensorflow as tf import tensorflow_datasets as tfds from tensorflow.experimental import dtensor print(tf.__version__) # Set random seed for reproducible results tf.random.set_seed(22)

Configure 8 CPUs virtuais para este experimento. O DTensor também pode ser usado com dispositivos GPU ou TPU. Dado que este notebook utiliza dispositivos virtuais, a aceleração obtida com o treinamento distribuído não é perceptível.

def configure_virtual_cpus(ncpu): phy_devices = tf.config.list_physical_devices('CPU') tf.config.set_logical_device_configuration(phy_devices[0], [ tf.config.LogicalDeviceConfiguration(), ] * ncpu) configure_virtual_cpus(8) DEVICES = [f'CPU:{i}' for i in range(8)] devices = tf.config.list_logical_devices('CPU') device_names = [d.name for d in devices] device_names

O dataset MNIST

O dataset está disponível em TensorFlow Datasets. Divida os dados em datasets de treinamento e teste. Use apenas 5.000 exemplos para treinamento e teste para economizar tempo.

train_data, test_data = tfds.load("mnist", split=['train[:5000]', 'test[:5000]'], batch_size=128, as_supervised=True)

Pré-processamento dos dados

Pré-processe os dados remodelando-os para serem bidimensionais e redimensionando-os para caber no intervalo unitário, [0,1].

def preprocess(x, y): # Reshaping the data x = tf.reshape(x, shape=[-1, 784]) # Rescaling the data x = x/255 return x, y train_data, test_data = train_data.map(preprocess), test_data.map(preprocess)

Construção do MLP

Construa um modelo MLP com camadas compatíveis com DTensor.

A camada densa

Comece criando um módulo de camada densa que suporte DTensor. A função dtensor.call_with_layout pode ser usada para chamar uma função que recebe uma entrada do DTensor e produz uma saída do DTensor. Isto é útil para inicializar uma variável DTensor, dtensor.DVariable, com uma função compatível com TensorFlow.

class DenseLayer(tf.Module): def __init__(self, in_dim, out_dim, weight_layout, activation=tf.identity): super().__init__() # Initialize dimensions and the activation function self.in_dim, self.out_dim = in_dim, out_dim self.activation = activation # Initialize the DTensor weights using the Xavier scheme uniform_initializer = tf.function(tf.random.stateless_uniform) xavier_lim = tf.sqrt(6.)/tf.sqrt(tf.cast(self.in_dim + self.out_dim, tf.float32)) self.w = dtensor.DVariable( dtensor.call_with_layout( uniform_initializer, weight_layout, shape=(self.in_dim, self.out_dim), seed=(22, 23), minval=-xavier_lim, maxval=xavier_lim)) # Initialize the bias with the zeros bias_layout = weight_layout.delete([0]) self.b = dtensor.DVariable( dtensor.call_with_layout(tf.zeros, bias_layout, shape=[out_dim])) def __call__(self, x): # Compute the forward pass z = tf.add(tf.matmul(x, self.w), self.b) return self.activation(z)

O modelo sequencial MLP

Agora crie um módulo MLP que execute as camadas densas sequencialmente.

class MLP(tf.Module): def __init__(self, layers): self.layers = layers def __call__(self, x, preds=False): # Execute the model's layers sequentially for layer in self.layers: x = layer(x) return x

Realizar treinamento "paralelo a dados" com o DTensor é equivalente à estratégia tf.distribute.MirroredStrategy. Paraisto, cada dispositivo executará o mesmo modelo num fragmento do lote de dados. Então você precisará do seguinte:

  • Uma malha dtensor.Mesh com uma única dimensão de "batch"

  • Um dtensor.Layout para todos os pesos que os replica na malha (usando dtensor.UNSHARDED para cada eixo)

  • Um dtensor.Layout para os dados que dividem a dimensão do lote pela malha

Crie uma malha DTensor que consiste numa única dimensão de lote, onde cada dispositivo se torna uma réplica que recebe um fragmento do lote global. Use esta malha para instanciar um modo MLP com a seguinte arquitetura:

Passo para frente: ReLU (784 x 700) x ReLU (700 x 500) x Softmax (500 x 10)

mesh = dtensor.create_mesh([("batch", 8)], devices=DEVICES) weight_layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh) input_size = 784 hidden_layer_1_size = 700 hidden_layer_2_size = 500 hidden_layer_2_size = 10 mlp_model = MLP([ DenseLayer(in_dim=input_size, out_dim=hidden_layer_1_size, weight_layout=weight_layout, activation=tf.nn.relu), DenseLayer(in_dim=hidden_layer_1_size , out_dim=hidden_layer_2_size, weight_layout=weight_layout, activation=tf.nn.relu), DenseLayer(in_dim=hidden_layer_2_size, out_dim=hidden_layer_2_size, weight_layout=weight_layout)])

Métricas de treinamento

Use a função de perda de entropia cruzada e a métrica de exatidão para o treinamento.

def cross_entropy_loss(y_pred, y): # Compute cross entropy loss with a sparse operation sparse_ce = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=y_pred) return tf.reduce_mean(sparse_ce) def accuracy(y_pred, y): # Compute accuracy after extracting class predictions class_preds = tf.argmax(y_pred, axis=1) is_equal = tf.equal(y, class_preds) return tf.reduce_mean(tf.cast(is_equal, tf.float32))

Otimizador

O uso de um otimizador pode trazer como resultado uma convergência significativamente mais rápida em comparação com o método do gradiente descendente padrão. O otimizador Adam foi implementado abaixo e configurado para ser compatível com o DTensor. Para usar otimizadores Keras com o DTensor, veja o módulo experimental tf.keras.dtensor.experimental.optimizers.

class Adam(tf.Module): def __init__(self, model_vars, learning_rate=1e-3, beta_1=0.9, beta_2=0.999, ep=1e-7): # Initialize optimizer parameters and variable slots self.model_vars = model_vars self.beta_1 = beta_1 self.beta_2 = beta_2 self.learning_rate = learning_rate self.ep = ep self.t = 1. self.v_dvar, self.s_dvar = [], [] # Initialize optimizer variable slots for var in model_vars: v = dtensor.DVariable(dtensor.call_with_layout(tf.zeros, var.layout, shape=var.shape)) s = dtensor.DVariable(dtensor.call_with_layout(tf.zeros, var.layout, shape=var.shape)) self.v_dvar.append(v) self.s_dvar.append(s) def apply_gradients(self, grads): # Update the model variables given their gradients for i, (d_var, var) in enumerate(zip(grads, self.model_vars)): self.v_dvar[i].assign(self.beta_1*self.v_dvar[i] + (1-self.beta_1)*d_var) self.s_dvar[i].assign(self.beta_2*self.s_dvar[i] + (1-self.beta_2)*tf.square(d_var)) v_dvar_bc = self.v_dvar[i]/(1-(self.beta_1**self.t)) s_dvar_bc = self.s_dvar[i]/(1-(self.beta_2**self.t)) var.assign_sub(self.learning_rate*(v_dvar_bc/(tf.sqrt(s_dvar_bc) + self.ep))) self.t += 1. return

Empacotamento de dados

Comece escrevendo uma função helper para transferir dados para o dispositivo. Esta função deve usar dtensor.pack para enviar (e apenas enviar) o shard do lote global que se pretende usar para uma réplica ao dispositivo que suporta a réplica. Para simplificar, suponha um aplicativo de cliente único.

Em seguida, escreva uma função que use esta função auxiliar para compactar os lotes de dados de treinamento em DTensors fragmentados (sharded) ao longo do (primeiro) eixo do lote. Isto garante que o DTensor distribua uniformemente os dados de treinamento para a dimensão da malha em 'lote'. Observe que no DTensor, o tamanho do lote sempre se refere ao tamanho global do lote; portanto, o tamanho do lote deve ser escolhido de forma que possa ser dividido igualmente pelo tamanho da dimensão da malha do lote. Estão planejadas APIs DTensor adicionais para simplificar a integração tf.data, portanto, fique atento.

def repack_local_tensor(x, layout): # Repacks a local Tensor-like to a DTensor with layout # This function assumes a single-client application x = tf.convert_to_tensor(x) sharded_dims = [] # For every sharded dimension, use tf.split to split the along the dimension. # The result is a nested list of split-tensors in queue[0]. queue = [x] for axis, dim in enumerate(layout.sharding_specs): if dim == dtensor.UNSHARDED: continue num_splits = layout.shape[axis] queue = tf.nest.map_structure(lambda x: tf.split(x, num_splits, axis=axis), queue) sharded_dims.append(dim) # Now you can build the list of component tensors by looking up the location in # the nested list of split-tensors created in queue[0]. components = [] for locations in layout.mesh.local_device_locations(): t = queue[0] for dim in sharded_dims: split_index = locations[dim] # Only valid on single-client mesh. t = t[split_index] components.append(t) return dtensor.pack(components, layout) def repack_batch(x, y, mesh): # Pack training data batches into DTensors along the batch axis x = repack_local_tensor(x, layout=dtensor.Layout(['batch', dtensor.UNSHARDED], mesh)) y = repack_local_tensor(y, layout=dtensor.Layout(['batch'], mesh)) return x, y

Treinamento

Escreva uma função rastreável que execute uma única etapa de treinamento com base num lote de dados. Esta função não requer nenhuma anotação especial do DTensor. Escreva também uma função que execute uma etapa de teste e retorne as métricas de desempenho apropriadas.

@tf.function def train_step(model, x_batch, y_batch, loss, metric, optimizer): # Execute a single training step with tf.GradientTape() as tape: y_pred = model(x_batch) batch_loss = loss(y_pred, y_batch) # Compute gradients and update the model's parameters grads = tape.gradient(batch_loss, model.trainable_variables) optimizer.apply_gradients(grads) # Return batch loss and accuracy batch_acc = metric(y_pred, y_batch) return batch_loss, batch_acc @tf.function def test_step(model, x_batch, y_batch, loss, metric): # Execute a single testing step y_pred = model(x_batch) batch_loss = loss(y_pred, y_batch) batch_acc = metric(y_pred, y_batch) return batch_loss, batch_acc

Agora, treine o modelo MLP para 3 épocas com tamanho de lote de 128.

# Initialize the training loop parameters and structures epochs = 3 batch_size = 128 train_losses, test_losses = [], [] train_accs, test_accs = [], [] optimizer = Adam(mlp_model.trainable_variables) # Format training loop for epoch in range(epochs): batch_losses_train, batch_accs_train = [], [] batch_losses_test, batch_accs_test = [], [] # Iterate through training data for x_batch, y_batch in train_data: x_batch, y_batch = repack_batch(x_batch, y_batch, mesh) batch_loss, batch_acc = train_step(mlp_model, x_batch, y_batch, cross_entropy_loss, accuracy, optimizer) # Keep track of batch-level training performance batch_losses_train.append(batch_loss) batch_accs_train.append(batch_acc) # Iterate through testing data for x_batch, y_batch in test_data: x_batch, y_batch = repack_batch(x_batch, y_batch, mesh) batch_loss, batch_acc = test_step(mlp_model, x_batch, y_batch, cross_entropy_loss, accuracy) # Keep track of batch-level testing batch_losses_test.append(batch_loss) batch_accs_test.append(batch_acc) # Keep track of epoch-level model performance train_loss, train_acc = tf.reduce_mean(batch_losses_train), tf.reduce_mean(batch_accs_train) test_loss, test_acc = tf.reduce_mean(batch_losses_test), tf.reduce_mean(batch_accs_test) train_losses.append(train_loss) train_accs.append(train_acc) test_losses.append(test_loss) test_accs.append(test_acc) print(f"Epoch: {epoch}") print(f"Training loss: {train_loss.numpy():.3f}, Training accuracy: {train_acc.numpy():.3f}") print(f"Testing loss: {test_loss.numpy():.3f}, Testing accuracy: {test_acc.numpy():.3f}")

Avaliação de desempenho

Comece escrevendo uma função de plotagem para visualizar a perda e a precisão do modelo durante o treinamento.

def plot_metrics(train_metric, test_metric, metric_type): # Visualize metrics vs training Epochs plt.figure() plt.plot(range(len(train_metric)), train_metric, label = f"Training {metric_type}") plt.plot(range(len(test_metric)), test_metric, label = f"Testing {metric_type}") plt.xlabel("Epochs") plt.ylabel(metric_type) plt.legend() plt.title(f"{metric_type} vs Training Epochs");
plot_metrics(train_losses, test_losses, "Cross entropy loss")
plot_metrics(train_accs, test_accs, "Accuracy")

Salvando o modelo

A integração de tf.saved_model e DTensor ainda está em desenvolvimento. A partir do TensorFlow 2.9.0, tf.saved_model aceita apenas modelos DTensor com variáveis ​​totalmente replicadas. Como solução alternativa, você pode converter um modelo DTensor num modelo totalmente replicado recarregando um checkpoint. No entanto, depois que um modelo é salvo, todas as anotações do DTensor são perdidas e as assinaturas salvas só podem ser usadas com Tensores regulares. Este tutorial será atualizado para mostrar a integração assim que ela estiver solidificada.

Conclusão

Este notebook forneceu uma visão geral do treinamento distribuído com DTensor e as APIs TensorFlow Core. Aqui estão mais algumas dicas que podem ajudar:

Para obter mais exemplos de uso das APIs Core do TensorFlow, confira o guia . Se você quiser saber mais sobre como carregar e preparar dados, consulte os tutoriais sobre carregamento de dados de imagem ou carregamento de dados CSV.