Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/es-419/guide/core/distribution.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Entrenamiento distribuido con las API del núcleo y DTensor

Introducción

Este bloc de notas utiliza las API de bajo nivel TensorFlow Core y DTensor para demostrar un ejemplo de entrenamiento distribuido paralelo de datos. Visite la sección Descripción general de las API Core para obtener más información sobre TensorFlow Core y sus casos de uso previstos. Consulte la guía Descripción general de DTensor y el tutorial Entrenamiento distribuido con DTensors para obtener más información sobre DTensor.

En este ejemplo se utiliza el mismo modelo y optimizador que se mostró en el tutorial perceptrones multicapa. Vea este tutorial primero para familiarizarse con la escritura del flujo de trabajo para aprendizaje automático de extremo a extremo con las API Core.

Nota: DTensor todavía es una API experimental de TensorFlow, lo cual significa que sus funciones están disponibles para pruebas, y su uso está previsto únicamente en entornos de prueba.

Descripción general del entrenamiento paralelo de datos con DTensor

Antes de construir un MLP que admita la distribución, tómese un momento para explorar los fundamentos de DTensor para el entrenamiento paralelo de datos.

DTensor permite ejecutar entrenamientos distribuidos por medio de dispositivos para mejorar la eficiencia, la confiabilidad y la escalabilidad. DTensor distribuye el programa y los tensores de acuerdo con las directivas de fragmentación mediante un procedimiento denominado Expansión de programa único, datos múltiples (SPMD). Una variable de una capa consciente de DTensor se crea como dtensor.DVariable, y los constructores de objetos de capa conscientes de DTensor toman entradas adicionales Layout además de los parámetros habituales de la capa.

Las ideas principales para el entrenamiento paralelo de datos son las siguientes:

  • Las variables del modelo se reproducen en N dispositivos cada una.

  • Un lote global se divide en N lotes por réplica.

  • Cada lote por réplica se entrena en el dispositivo de réplica.

  • El gradiente se reduce antes de que la ponderación de los datos se realice colectivamente en todas las réplicas.

  • El entrenamiento paralelo de datos ofrece una aceleración casi lineal con respecto al número de dispositivos.

Preparación

DTensor forma parte de la versión 2.9.0 de TensorFlow.

#!pip install --quiet --upgrade --pre tensorflow
import matplotlib from matplotlib import pyplot as plt # Preset Matplotlib figure sizes. matplotlib.rcParams['figure.figsize'] = [9, 6]
import tensorflow as tf import tensorflow_datasets as tfds from tensorflow.experimental import dtensor print(tf.__version__) # Set random seed for reproducible results tf.random.set_seed(22)

Configure 8 CPUs virtuales para este experimento. DTensor también se puede utilizar con dispositivos GPU o TPU. Dado que este bloc de notas utiliza dispositivos virtuales, el aumento de la velocidad obtenido del entrenamiento distribuido no es perceptible.

def configure_virtual_cpus(ncpu): phy_devices = tf.config.list_physical_devices('CPU') tf.config.set_logical_device_configuration(phy_devices[0], [ tf.config.LogicalDeviceConfiguration(), ] * ncpu) configure_virtual_cpus(8) DEVICES = [f'CPU:{i}' for i in range(8)] devices = tf.config.list_logical_devices('CPU') device_names = [d.name for d in devices] device_names

El conjunto de datos MNIST

El conjunto de datos está disponible en TensorFlow Datasets. Divida los datos en conjuntos de entrenamiento y prueba. Utilice solo 5,000 ejemplos para el entrenamiento y las pruebas, esto le permitirá ahorrar tiempo.

train_data, test_data = tfds.load("mnist", split=['train[:5000]', 'test[:5000]'], batch_size=128, as_supervised=True)

Preprocesamiento de los datos

Preprocese los datos dándoles forma bidimensional y reescalándolos para que se ajusten al intervalo unitario [0,1].

def preprocess(x, y): # Reshaping the data x = tf.reshape(x, shape=[-1, 784]) # Rescaling the data x = x/255 return x, y train_data, test_data = train_data.map(preprocess), test_data.map(preprocess)

Cree la MLP

Construya un modelo MLP con capas conscientes del DTensor.

La capa densa

Comience por crear un módulo de capa densa que soporte DTensor. La función dtensor.call_with_layout se puede utilizar para llamar a una función que toma una entrada DTensor y produce una salida DTensor. Esto es útil para inicializar una variable DTensor, dtensor.DVariable, con una función soportada por TensorFlow.

class DenseLayer(tf.Module): def __init__(self, in_dim, out_dim, weight_layout, activation=tf.identity): super().__init__() # Initialize dimensions and the activation function self.in_dim, self.out_dim = in_dim, out_dim self.activation = activation # Initialize the DTensor weights using the Xavier scheme uniform_initializer = tf.function(tf.random.stateless_uniform) xavier_lim = tf.sqrt(6.)/tf.sqrt(tf.cast(self.in_dim + self.out_dim, tf.float32)) self.w = dtensor.DVariable( dtensor.call_with_layout( uniform_initializer, weight_layout, shape=(self.in_dim, self.out_dim), seed=(22, 23), minval=-xavier_lim, maxval=xavier_lim)) # Initialize the bias with the zeros bias_layout = weight_layout.delete([0]) self.b = dtensor.DVariable( dtensor.call_with_layout(tf.zeros, bias_layout, shape=[out_dim])) def __call__(self, x): # Compute the forward pass z = tf.add(tf.matmul(x, self.w), self.b) return self.activation(z)

El modelo secuencial de MLP

Ahora cree un módulo MLP que ejecute las capas densas secuencialmente.

class MLP(tf.Module): def __init__(self, layers): self.layers = layers def __call__(self, x, preds=False): # Execute the model's layers sequentially for layer in self.layers: x = layer(x) return x

Realizar un entrenamiento "data-parallel" con DTensor es equivalente a tf.distribute.MirroredStrategy. Para ello cada dispositivo ejecutará el mismo modelo en un fragmento del lote de datos. Así que necesitará lo siguiente:

  • Un dtensor.Mesh con una única dimensión ""batch"

  • Un dtensor.Layout para todos los pesos que los replica en toda la malla (usando dtensor.UNSHARDED para cada eje)

  • Un dtensor.Layout para los datos que dividen la dimensión del lote a lo largo de la malla

Cree una malla DTensor que consista en una única dimensión de lote, donde cada dispositivo se convierte en una réplica que recibe un fragmento del lote global. Utilice esta malla para instanciar un modo MLP con la siguiente arquitectura:

Siguiente paso: ReLU (784 x 700) x ReLU (700 x 500) x Softmax (500 x 10)

mesh = dtensor.create_mesh([("batch", 8)], devices=DEVICES) weight_layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh) input_size = 784 hidden_layer_1_size = 700 hidden_layer_2_size = 500 hidden_layer_2_size = 10 mlp_model = MLP([ DenseLayer(in_dim=input_size, out_dim=hidden_layer_1_size, weight_layout=weight_layout, activation=tf.nn.relu), DenseLayer(in_dim=hidden_layer_1_size , out_dim=hidden_layer_2_size, weight_layout=weight_layout, activation=tf.nn.relu), DenseLayer(in_dim=hidden_layer_2_size, out_dim=hidden_layer_2_size, weight_layout=weight_layout)])

Métricas de entrenamiento

Utilice la función de pérdida de entropía cruzada y la métrica de precisión para realizar el entrenamiento.

def cross_entropy_loss(y_pred, y): # Compute cross entropy loss with a sparse operation sparse_ce = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=y_pred) return tf.reduce_mean(sparse_ce) def accuracy(y_pred, y): # Compute accuracy after extracting class predictions class_preds = tf.argmax(y_pred, axis=1) is_equal = tf.equal(y, class_preds) return tf.reduce_mean(tf.cast(is_equal, tf.float32))

Optimizador

El uso de un optimizador puede resultar en una convergencia significativamente más rápida en comparación con el descenso del gradiente estándar. El optimizador Adam se implementa a continuación y se configuró para que sea compatible con DTensor. Para utilizar optimizadores de Keras con DTensor, consulte el módulo experimentaltf.keras.dtensor.experimental.optimizers.

class Adam(tf.Module): def __init__(self, model_vars, learning_rate=1e-3, beta_1=0.9, beta_2=0.999, ep=1e-7): # Initialize optimizer parameters and variable slots self.model_vars = model_vars self.beta_1 = beta_1 self.beta_2 = beta_2 self.learning_rate = learning_rate self.ep = ep self.t = 1. self.v_dvar, self.s_dvar = [], [] # Initialize optimizer variable slots for var in model_vars: v = dtensor.DVariable(dtensor.call_with_layout(tf.zeros, var.layout, shape=var.shape)) s = dtensor.DVariable(dtensor.call_with_layout(tf.zeros, var.layout, shape=var.shape)) self.v_dvar.append(v) self.s_dvar.append(s) def apply_gradients(self, grads): # Update the model variables given their gradients for i, (d_var, var) in enumerate(zip(grads, self.model_vars)): self.v_dvar[i].assign(self.beta_1*self.v_dvar[i] + (1-self.beta_1)*d_var) self.s_dvar[i].assign(self.beta_2*self.s_dvar[i] + (1-self.beta_2)*tf.square(d_var)) v_dvar_bc = self.v_dvar[i]/(1-(self.beta_1**self.t)) s_dvar_bc = self.s_dvar[i]/(1-(self.beta_2**self.t)) var.assign_sub(self.learning_rate*(v_dvar_bc/(tf.sqrt(s_dvar_bc) + self.ep))) self.t += 1. return

Empaquetado de datos

Comience escribiendo una función de ayuda para transferir datos al dispositivo. Esta función debería usar dtensor.pack para enviar (y sólo enviar) el fragmento del lote global que está destinado para una réplica al dispositivo que respalda la réplica. Para simplificar, supongamos una aplicación de un solo cliente.

Después, escriba una función que la utilice para empaquetar los lotes de datos de entrenamiento en DTensors fragmentados a lo largo del eje del lote (primero). Esto garantizará que DTensor distribuya uniformemente los datos del entrenamiento en la dimensión de la malla del "lote". Tenga en cuenta que en DTensor, el tamaño del lote siempre se refiere al tamaño global del lote; por lo tanto, el tamaño del lote debe ser elegido de tal manera que pueda ser dividido uniformemente por el tamaño de la dimensión de la malla del lote. Se están planeando APIs de DTensor adicionales para simplificar la integración de tf.data, así que permanezca atento.

def repack_local_tensor(x, layout): # Repacks a local Tensor-like to a DTensor with layout # This function assumes a single-client application x = tf.convert_to_tensor(x) sharded_dims = [] # For every sharded dimension, use tf.split to split the along the dimension. # The result is a nested list of split-tensors in queue[0]. queue = [x] for axis, dim in enumerate(layout.sharding_specs): if dim == dtensor.UNSHARDED: continue num_splits = layout.shape[axis] queue = tf.nest.map_structure(lambda x: tf.split(x, num_splits, axis=axis), queue) sharded_dims.append(dim) # Now you can build the list of component tensors by looking up the location in # the nested list of split-tensors created in queue[0]. components = [] for locations in layout.mesh.local_device_locations(): t = queue[0] for dim in sharded_dims: split_index = locations[dim] # Only valid on single-client mesh. t = t[split_index] components.append(t) return dtensor.pack(components, layout) def repack_batch(x, y, mesh): # Pack training data batches into DTensors along the batch axis x = repack_local_tensor(x, layout=dtensor.Layout(['batch', dtensor.UNSHARDED], mesh)) y = repack_local_tensor(y, layout=dtensor.Layout(['batch'], mesh)) return x, y

Entrenamiento

Escribe una función rastreable que ejecute un único paso de entrenamiento dado un lote de datos. Esta función no requiere ninguna anotación de DTensor especial. También escriba una función que ejecute un paso de prueba y devuelva las métricas de rendimiento apropiadas.

@tf.function def train_step(model, x_batch, y_batch, loss, metric, optimizer): # Execute a single training step with tf.GradientTape() as tape: y_pred = model(x_batch) batch_loss = loss(y_pred, y_batch) # Compute gradients and update the model's parameters grads = tape.gradient(batch_loss, model.trainable_variables) optimizer.apply_gradients(grads) # Return batch loss and accuracy batch_acc = metric(y_pred, y_batch) return batch_loss, batch_acc @tf.function def test_step(model, x_batch, y_batch, loss, metric): # Execute a single testing step y_pred = model(x_batch) batch_loss = loss(y_pred, y_batch) batch_acc = metric(y_pred, y_batch) return batch_loss, batch_acc

Ahora, entrene el modelo MLP durante 3 épocas con un tamaño de lote de 128.

# Initialize the training loop parameters and structures epochs = 3 batch_size = 128 train_losses, test_losses = [], [] train_accs, test_accs = [], [] optimizer = Adam(mlp_model.trainable_variables) # Format training loop for epoch in range(epochs): batch_losses_train, batch_accs_train = [], [] batch_losses_test, batch_accs_test = [], [] # Iterate through training data for x_batch, y_batch in train_data: x_batch, y_batch = repack_batch(x_batch, y_batch, mesh) batch_loss, batch_acc = train_step(mlp_model, x_batch, y_batch, cross_entropy_loss, accuracy, optimizer) # Keep track of batch-level training performance batch_losses_train.append(batch_loss) batch_accs_train.append(batch_acc) # Iterate through testing data for x_batch, y_batch in test_data: x_batch, y_batch = repack_batch(x_batch, y_batch, mesh) batch_loss, batch_acc = test_step(mlp_model, x_batch, y_batch, cross_entropy_loss, accuracy) # Keep track of batch-level testing batch_losses_test.append(batch_loss) batch_accs_test.append(batch_acc) # Keep track of epoch-level model performance train_loss, train_acc = tf.reduce_mean(batch_losses_train), tf.reduce_mean(batch_accs_train) test_loss, test_acc = tf.reduce_mean(batch_losses_test), tf.reduce_mean(batch_accs_test) train_losses.append(train_loss) train_accs.append(train_acc) test_losses.append(test_loss) test_accs.append(test_acc) print(f"Epoch: {epoch}") print(f"Training loss: {train_loss.numpy():.3f}, Training accuracy: {train_acc.numpy():.3f}") print(f"Testing loss: {test_loss.numpy():.3f}, Testing accuracy: {test_acc.numpy():.3f}")

Evaluación del desempeño

Empiece por escribir una función para visualizar las pérdidas y la precisión del modelo durante el entrenamiento.

def plot_metrics(train_metric, test_metric, metric_type): # Visualize metrics vs training Epochs plt.figure() plt.plot(range(len(train_metric)), train_metric, label = f"Training {metric_type}") plt.plot(range(len(test_metric)), test_metric, label = f"Testing {metric_type}") plt.xlabel("Epochs") plt.ylabel(metric_type) plt.legend() plt.title(f"{metric_type} vs Training Epochs");
plot_metrics(train_losses, test_losses, "Cross entropy loss")
plot_metrics(train_accs, test_accs, "Accuracy")

Guarde su modelo

La integración de tf.saved_model y DTensor está todavía en desarrollo. A partir de TensorFlow 2.9.0, tf.saved_model sólo acepta modelos de DTensor con variables totalmente replicadas. Como una solución, se puede convertir un modelo de DTensor a uno totalmente replicado mediante la recarga de un punto de control. Sin embargo, después de guardar un modelo, todas las anotaciones de DTensor se pierden y las firmas guardadas sólo se pueden utilizar con tensores normales. Este tutorial se actualizará para mostrar la integración una vez que se solidifique.

Conclusión

Este bloc de notas proporciona una visión general de la formación distribuida con DTensor y las APIs de TensorFlow Core. Aquí hay algunos consejos más que pueden ayudar:

Para obtener más ejemplos sobre el uso de las API de TensorFlow Core, consulte la guía. Si desea obtener más información sobre la carga y preparación de datos, consulte los tutoriales sobre la carga de datos de imagen o la carga de datos CSV.