Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/es-419/tutorials/distribute/parameter_server_training.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Entrenamiento del servidor de parámetros con ParameterServerStrategy

Descripción general

El entrenamiento de servidores de parámetros es un método común de datos paralelos para ampliar el entrenamiento de modelos en varias máquinas.

Un clúster de entrenamiento de servidores de parámetros consiste en workers y servidores de parámetros. Las variables se crean en los servidores de parámetros y se leen y actualizan mediante los workers en cada paso. De forma predeterminada, los workers leen y actualizan estas variables de forma independiente, sin sincronizarse entre sí. Por eso, a veces, el entrenamiento tipo servidor de parámetros se denomina entrenamiento asíncrono.

En TensorFlow 2, el entrenamiento del servidor de parámetros se realiza mediante la clase tf.distribute.ParameterServerStrategy, que distribuye los pasos del entrenamiento a un clúster que escala hasta miles de workers (acompañados de servidores de parámetros).

Métodos de entrenamiento admitidos

Hay dos métodos principales de entrenamiento compatibles:

Un cluster con trabajos y tareas

Independientemente de la API elegida (Model.fit o un bucle de entrenamiento personalizado), el entrenamiento distribuido en TensorFlow 2 implica un 'cluster' con varios 'jobs', y cada uno de los jobs puede tener una o más 'tasks'.

Cuando se utiliza el entrenamiento del servidor de parámetros, se recomienda tener:

  • Un puesto de coordinador (que tiene el nombre de chief)

  • Múltiples puestos de trabajador (nombre del puesto worker)

  • Múltiples parámetros del servidor (nombre del puesto ps)

El coordinador crea recursos, despacha tareas de entrenamiento, escribe puntos de control y se ocupa de los fallos de las tareas. Los trabajadores y servidores de parámetros ejecutan instancias tf.distribute.Server que reciben solicitudes del coordinador.

Entrenamiento del servidor de parámetros con la API Model.fit

El entrenamiento del servidor de parámetros con la API Model.fit requiere que el coordinador utilice un objeto tf.distribute.ParameterServerStrategy. De forma similar al uso de Model.fit sin estrategia, o con otras estrategias, el flujo de trabajo implica crear y compilar el modelo, preparar las retrollamadas y llamar a Model.fit.

Entrenamiento del servidor de parámetros con un bucle de entrenamiento personalizado

Con bucles de entrenamiento personalizados, la clase tf.distribute.coordinator.ClusterCoordinator es el componente clave utilizado para el coordinador.

  • La clase ClusterCoordinator debe funcionar junto con un objeto tf.distribute.ParameterServerStrategy.

  • Este objeto tf.distribute.Strategy es necesario para proporcionar la información del conglomerado y se utiliza para definir un paso del entrenamiento, como se demuestra en el Entrenamiento personalizado con tf.distribute.Strategy.

  • A continuación, el objeto ClusterCoordinator envía la ejecución de estos pasos de entrenamiento a los workers remotos.

La API más importante que proporciona el objeto ClusterCoordinator es schedule:

  • La API schedule pone en espera una tf.function y devuelve un RemoteValue de tipo futuro inmediatamente.

  • Las funciones en la fila se enviarán a los workers remotos en hilos de fondo y sus RemoteValue se rellenarán de forma asíncrona.

  • Dado que schedule no requiere la asignación de un worker, la tf.function anterior puede ejecutarse en cualquier worker disponible.

  • Si el worker en el que se ejecuta deja de estar disponible antes de su finalización, la función se volverá a intentar en otro worker disponible.

  • Debido a este hecho y a que la ejecución de funciones no es atómica, una única llamada a una función puede ejecutarse más de una vez.

Además de despachar funciones remotas, el ClusterCoordinator también ayuda a crear conjuntos de datos en todos los workers y a reconstruir estos conjuntos de datos cuando un worker se recupera de un error.

Preparación del tutorial

El tutorial se bifurcará en Model.fit y rutas de bucle de entrenamiento personalizado, y podrá elegir la que se ajuste a sus necesidades. Las secciones distintas de "Entrenamiento con X" se aplican a ambas rutas.

!pip install portpicker
#@title import multiprocessing import os import random import portpicker import tensorflow as tf

Preparación del clúster

Como se mencionó anteriormente, un clúster de entrenamiento de servidor de parámetros requiere una tarea coordinadora que ejecute su programa de entrenamiento, uno o varios workers y tareas de servidor de parámetros que ejecuten servidores TensorFlow-tf.distribute.Server-y posiblemente una tarea de evaluación adicional que ejecute la evaluación sidecar (consulte la sección evaluación sidecar a continuación). Los requisitos necesarios para su configuración son:

  • La tarea coordinadora necesita conocer las direcciones y puertos de todos los demás servidores de TensorFlow, excepto la del evaluador.

  • Los workers y los servidores de parámetros necesitan saber en qué puerto deben escuchar. Para mayor simplicidad, normalmente puede pasar la información completa del clúster al crear servidores TensorFlow en estas tareas.

  • La tarea evaluadora no tiene por qué conocer la preparación del cluster de entrenamiento. Si la conoce, no debe intentar conectarse al clúster de entrenamiento.

  • Los workers y los servidores de parámetros deben tener como tipos de tarea "worker" y "ps", respectivamente. El coordinador debe utilizar "chief" como tipo de tarea por razones de legado.

En este tutorial, creará un clúster en proceso para que todo el entrenamiento del servidor de parámetros pueda ejecutarse en Colab. Aprenderá a configurar clusters reales en una sección posterior.

Clúster en proceso

Comenzará mediante la creación de varios servidores de TensorFlow por adelantado y se conectará a ellos más tarde. Tenga en cuenta que esto es sólo con el propósito de demostración de este tutorial, y en el entrenamiento real los servidores se iniciarán en máquinas "worker" y "ps".

def create_in_process_cluster(num_workers, num_ps): """Creates and starts local servers and returns the cluster_resolver.""" worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)] ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)] cluster_dict = {} cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports] if num_ps > 0: cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports] cluster_spec = tf.train.ClusterSpec(cluster_dict) # Workers need some inter_ops threads to work properly. worker_config = tf.compat.v1.ConfigProto() if multiprocessing.cpu_count() < num_workers + 1: worker_config.inter_op_parallelism_threads = num_workers + 1 for i in range(num_workers): tf.distribute.Server( cluster_spec, job_name="worker", task_index=i, config=worker_config, protocol="grpc") for i in range(num_ps): tf.distribute.Server( cluster_spec, job_name="ps", task_index=i, protocol="grpc") cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver( cluster_spec, rpc_layer="grpc") return cluster_resolver # Set the environment variable to allow reporting worker and ps failure to the # coordinator. This is a workaround and won't be necessary in the future. os.environ["GRPC_FAIL_FAST"] = "use_caller" NUM_WORKERS = 3 NUM_PS = 2 cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

La configuración de grupos en proceso se utiliza con frecuencia en las pruebas unitarias, como se muestra aquí.

Otra opción para realizar pruebas locales es lanzar procesos en la máquina local - consulte Entrenamiento multi-worker con Keras para ver un ejemplo de este enfoque.

Instanciar un ParameterServerStrategy

Antes de sumergirse en el código de entrenamiento, vamos a instanciar un objeto tf.distribute.ParameterServerStrategy. Tenga en cuenta que esto es necesario independientemente de si está procediendo con Model.fit o con un bucle de entrenamiento personalizado. El argumento variable_partitioner se explicará en la sección Fragmentación de variables.

variable_partitioner = ( tf.distribute.experimental.partitioners.MinSizePartitioner( min_shard_bytes=(256 << 10), max_shards=NUM_PS)) strategy = tf.distribute.ParameterServerStrategy( cluster_resolver, variable_partitioner=variable_partitioner)

Para utilizar las GPU para realizar el entrenamiento, asigne GPU visibles a cada worker. ParameterServerStrategy utilizará todas las GPU disponibles en cada trabajador, con la restricción de que todos los trabajadores deben tener el mismo número de GPU disponibles.

Fragmentación de variables

La fragmentación de variables se refiere a la división de una variable en múltiples variables más pequeñas, que se denominan frgamentos. La fragmentación de variables puede ser útil para distribuir la carga de la red cuando se accede a estos fragmentos. También es útil para distribuir el cálculo y el almacenamiento de una variable normal entre varios servidores de parámetros, por ejemplo, cuando se utilizan incrustaciones muy grandes que quizá no quepan en la memoria de una sola máquina.

Para habilitar la fragmentación de variables, puede pasar un variable_partitioner al construir un objeto ParameterServerStrategy. El variable_partitioner se invocará cada vez que se cree una variable y se espera que devuelva el número de fragmentos a lo largo de cada dimensión de la variable. Se proporcionan algunos variable_partitionercomo tf.distribute.experimental.partitioners.MinSizePartitioner. Se recomienda utilizar particionadores basados en el tamaño como tf.distribute.experimental.partitioners.MinSizePartitioner para evitar particionar variables pequeñas, lo que podría tener un impacto negativo en la velocidad de entrenamiento del modelo.

Cuando se pasa un partitioner_variable, y se crea una variable directamente bajo Strategy.scope, la variable se convertirá en un tipo contenedor con una propiedad variables, la cual proporciona acceso a la lista de los fragmentos. En la mayoría de los casos, este contenedor se convertirá automáticamente en un tensor que concatena todos los fragmentos. Como resultado, puede utilizarse como una variable normal. Por otro lado, algunos métodos de TensorFlow como tf.nn.embedding_lookup proporcionan una implementación eficiente para este tipo de contenedores y en estos métodos se evitará la concatenación automática.

Consulte la documentación de la API de tf.distribute.ParameterServerStrategy para obtener más detalles.

Entrenamiento con Model.fit

Keras proporciona una API de entrenamiento fácil de usar a través de Model.fit que maneja el bucle de entrenamiento de manera encubierta, con la flexibilidad de un train_step anulable, y retrollamadas que proporcionan funcionalidades como guardar puntos de control o guardar resúmenes para TensorBoard. Con Model.fit, el mismo código de entrenamiento puede utilizarse con otras estrategias con un simple intercambio del objeto de estrategia.

Datos introducidos

Keras Model.fit con tf.distribute.ParameterServerStrategy puede tomar datos de entrada en forma de un tf.data.Dataset, tf. distribute.DistributedDataset, o un tf.keras.utils.experimental.DatasetCreator, siendo Dataset la opción recomendada por su facilidad de uso. Sin embargo, si tiene problemas de memoria al utilizar Dataset, puede que necesite utilizar DatasetCreator con un argumento dataset_fn invocable (consulte la documentación de la API tf.keras.utils.experimental.DatasetCreator para obtener más detalles).

Si transforma su conjunto de datos en un tf.data.Dataset, deberá utilizar Dataset.shuffle y Dataset.repeat, como se demuestra en el siguiente ejemplo de código.

  • Keras Model.fit con el parámetro server training asume que cada worker recibe el mismo conjunto de datos, excepto cuando se mezclan de forma diferente. Por lo tanto, al llamar a Dataset.shuffle, se aseguran iteraciones más uniformes sobre los datos.

  • Dado que los workers no se sincronizan, pueden terminar de procesar sus conjuntos de datos en momentos diferentes. Por lo tanto, la forma más sencilla de definir épocas con parámetros de entrenamiento del servidor es utilizar Dataset.repeat -que repite un conjunto de datos indefinidamente cuando se llama sin argumento- y especificar el argumento steps_per_epoch en la llamada Model.fit.

Consulte la sección "Flujos de trabajo de entrenamiento" de la guía tf.data para obtener más detalles sobre shuffle y repeat.

global_batch_size = 64 x = tf.random.uniform((10, 10)) y = tf.random.uniform((10,)) dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat() dataset = dataset.batch(global_batch_size) dataset = dataset.prefetch(2)

Si en cambio crea su conjunto de datos con tf.keras.utils.experimental.DatasetCreator, el código en dataset_fn se invocará en el dispositivo de entrada, que generalmente es el CPU, en cada una de las máquinas de trabajo.

Construcción y compilación de modelos

Ahora, creará un tf.keras.Model-un modelo trivial tf.keras.models.Sequential con fines de demostración-seguido de una llamada Model.compile para incorporar componentes, como un optimizador, métricas y otros parámetros como steps_per_execution:

with strategy.scope(): model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)]) model.compile(tf.keras.optimizers.legacy.SGD(), loss="mse", steps_per_execution=10)

Retrollamadas y entrenamiento

Antes de llamar a Keras Model.fit para el entrenamiento real, prepare cualquiera de las retrollamadas necesarias para realizar tareas comunes, como:

  • tf.keras.callbacks.ModelCheckpoint: guarda el modelo con cierta frecuencia, por ejemplo, después de cada época.

  • tf.keras.callbacks.BackupAndRestore: proporciona tolerancia ante errores haciendo una copia de seguridad del modelo y del número de época actual, si el clúster experimenta falta de disponibilidad (como abortar o adelantarse). Posteriormente, puede restaurar el estado de entrenamiento al reiniciar desde un error en el trabajo, y continuar el entrenamiento desde el principio de la época que se interrumpió.

  • tf.keras.callbacks.TensorBoard: escribe periódicamente los registros del modelo en archivos de resumen que pueden visualizarse en la herramienta TensorBoard.

Nota: Debido a consideraciones de rendimiento, las llamadas de retorno personalizadas no pueden tener llamadas de nivel de lote anuladas cuando se utilizan con ParameterServerStrategy. Modifique sus retrollamadas personalizadas para que sean llamadas a nivel de época y ajuste steps_per_epoch a un valor adecuado. Además, steps_per_epoch es un argumento obligatorio para Model.fit cuando se utiliza con ParameterServerStrategy.

working_dir = "/tmp/my_working_dir" log_dir = os.path.join(working_dir, "log") ckpt_filepath = os.path.join(working_dir, "ckpt") backup_dir = os.path.join(working_dir, "backup") callbacks = [ tf.keras.callbacks.TensorBoard(log_dir=log_dir), tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath), tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir), ] model.fit(dataset, epochs=5, steps_per_epoch=20, callbacks=callbacks)

Uso directo con ClusterCoordinator (opcional)

Incluso si elige la ruta de entrenamiento Model.fit, puede instanciar opcionalmente un objeto tf.distribute.coordinator.ClusterCoordinator para programar otras funciones que desee que se ejecuten en los workers. Consulte la sección Entrenamiento con un bucle de entrenamiento personalizado para obtener más detalles y ejemplos.

Entrenamiento con un bucle de entrenamiento personalizado

El uso de bucles de entrenamiento personalizados con tf.distribute.Strategy proporciona una gran flexibilidad para definir bucles de entrenamiento. Con el ParameterServerStrategy definido anteriormente (como strategy), utilizará un tf.distribute.coordinator.ClusterCoordinator para enviar la ejecución de los pasos de entrenamiento a los workers remotos.

A continuación, creará un modelo, definirá un conjunto de datos y definirá una función de paso, como ya lo hizo en el bucle de entrenamiento con otros tf.distribute.Strategy. Encontrará más detalles en el tutorial Entrenamiento personalizado con tf.distribute.Strategy.

Para garantizar una extracción previa eficiente de los conjuntos de datos, utilice las API recomendadas para la creación distribuida de conjuntos de datos que se mencionan en la sección Despacho de pasos de entrenamiento a trabajadores remotos que aparecerá más adelante. Además, asegúrese de llamar a Strategy.run dentro de worker_fn para aprovechar al máximo las GPU asignadas a los trabajadores. El resto de los pasos son los mismos para el entrenamiento con o sin una GPU.

Vamos a crear estos componentes en los siguientes pasos:

Prepare los datos

En primer lugar, escriba una función que cree un conjunto de datos.

Si desea preprocesar los datos con capas de preprocesamiento de Keras o capas de transformación de Tensorflow, cree estas capas fuera del dataset_fn y bajo Strategy.scope, como lo haría para cualquier otra capa de Keras. Esto se debe a que el dataset_fn se envolverá en una tf.function y después se ejecutará en cada worker para generar la canalización de datos.

Si no sigue el procedimiento anterior, la creación de las capas podría crear estados de Tensorflow que se elevarían de la tf.function al coordinador. En este caso, acceder a ellos desde los workers incurriría en llamadas RPC repetitivas entre el coordinador y los workers, y causaría una desaceleración significativa.

Si coloca las capas bajo Strategy.scope las creará en todos los workers. A continuación, aplicará la transformación dentro del dataset_fn mediante tf.data.Dataset.map. Consulte Preprocesamiento de datos en el tutorial Entrada distribuida para obtener más información sobre el preprocesamiento de datos con entrada distribuida.

feature_vocab = [ "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman" ] label_vocab = ["yes", "no"] with strategy.scope(): feature_lookup_layer = tf.keras.layers.StringLookup( vocabulary=feature_vocab, mask_token=None) label_lookup_layer = tf.keras.layers.StringLookup( vocabulary=label_vocab, num_oov_indices=0, mask_token=None) raw_feature_input = tf.keras.layers.Input( shape=(3,), dtype=tf.string, name="feature") feature_id_input = feature_lookup_layer(raw_feature_input) feature_preprocess_stage = tf.keras.Model( {"features": raw_feature_input}, feature_id_input) raw_label_input = tf.keras.layers.Input( shape=(1,), dtype=tf.string, name="label") label_id_input = label_lookup_layer(raw_label_input) label_preprocess_stage = tf.keras.Model( {"label": raw_label_input}, label_id_input)

Generar ejemplos simulados en un conjunto de datos:

def feature_and_label_gen(num_examples=200): examples = {"features": [], "label": []} for _ in range(num_examples): features = random.sample(feature_vocab, 3) label = ["yes"] if "avenger" in features else ["no"] examples["features"].append(features) examples["label"].append(label) return examples examples = feature_and_label_gen()

A continuación, cree el conjunto de datos de entrenamiento envuelto en un dataset_fn:

def dataset_fn(_): raw_dataset = tf.data.Dataset.from_tensor_slices(examples) train_dataset = raw_dataset.map( lambda x: ( {"features": feature_preprocess_stage(x["features"])}, label_preprocess_stage(x["label"]) )).shuffle(200).batch(32).repeat() return train_dataset

Construir el modelo

Después, cree el modelo y los demás objetos. Asegúrese de crear todas las variables bajo Strategy.scope.

# These variables created under the `Strategy.scope` will be placed on parameter # servers in a round-robin fashion. with strategy.scope(): # Create the model. The input needs to be compatible with Keras processing layers. model_input = tf.keras.layers.Input( shape=(3,), dtype=tf.int64, name="model_input") emb_layer = tf.keras.layers.Embedding( input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384) emb_output = tf.reduce_mean(emb_layer(model_input), axis=1) dense_output = tf.keras.layers.Dense( units=1, activation="sigmoid", kernel_regularizer=tf.keras.regularizers.L2(1e-4), )(emb_output) model = tf.keras.Model({"features": model_input}, dense_output) optimizer = tf.keras.optimizers.legacy.RMSprop(learning_rate=0.1) accuracy = tf.keras.metrics.Accuracy()

Confirmemos que el uso de FixedShardsPartitioner dividió todas las variables en dos fragmentos y que cada uno de ellos se asignó a un servidor de parámetros diferente:

assert len(emb_layer.weights) == 2 assert emb_layer.weights[0].shape == (4, 16384) assert emb_layer.weights[1].shape == (4, 16384) print(emb_layer.weights[0].device) print(emb_layer.weights[1].device)

Definir el paso del entrenamiento

En tercer lugar, cree el paso de entrenamiento envuelto en una tf.function:

@tf.function def step_fn(iterator): def replica_fn(batch_data, labels): with tf.GradientTape() as tape: pred = model(batch_data, training=True) per_example_loss = tf.keras.losses.BinaryCrossentropy( reduction=tf.keras.losses.Reduction.NONE)(labels, pred) loss = tf.nn.compute_average_loss(per_example_loss) model_losses = model.losses if model_losses: loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses)) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64) accuracy.update_state(labels, actual_pred) return loss batch_data, labels = next(iterator) losses = strategy.run(replica_fn, args=(batch_data, labels)) return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

En la función de pasos de entrenamiento anterior, llamar a Strategy.run y Strategy.reduce en el step_fn puede ser compatible con varias GPU por worker. Si los workers tienen GPUs asignadas, Strategy.run distribuirá los conjuntos de datos en múltiples réplicas (GPUs). Sus llamadas paralelas a tf.nn.compute_average_loss() calculan la media de la pérdida en las réplicas (GPU) de un trabajador, independientemente del número total de trabajadores.

Enviar pasos de entrenamiento a workers remotos

Una vez que haya definido todos los cálculos mediante ParameterServerStrategy, utilizará la clase tf.distribute.coordinator.ClusterCoordinator para crear recursos y distribuir los pasos de entrenamiento entre los workers remotos.

Primero creemos un objeto ClusterCoordinator y pasémosle el objeto de estrategia:

coordinator = tf.distribute.coordinator.ClusterCoordinator(strategy)

A continuación, cree un conjunto de datos por worker y un iterador mediante la API ClusterCoordinator.create_per_worker_dataset{/code0, que replica el conjunto de datos a todos los trabajadores. En el <code data-md-type="codespan">per_worker_dataset_fn siguiente, se recomienda envolver el dataset_fn en strategy.distribute_datasets_from_function para permitir una extracción previa eficiente a las GPU sin interrupciones.

@tf.function def per_worker_dataset_fn(): return strategy.distribute_datasets_from_function(dataset_fn) per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn) per_worker_iterator = iter(per_worker_dataset)

El último paso consiste en distribuir el cómputo a los workers remotos utilizando ClusterCoordinator.schedule:

  • El método schedule pone en espera una tf.function y devuelve un RemoteValue de tipo futuro inmediatamente. Las funciones en espera se enviarán a los workers remotos en hilos de fondo y el RemoteValue se rellenará de forma asíncrona.

  • El método join (ClusterCoordinator.join) puede utilizarse para esperar hasta que se ejecuten todas las funciones programadas.

num_epochs = 4 steps_per_epoch = 5 for i in range(num_epochs): accuracy.reset_states() for _ in range(steps_per_epoch): coordinator.schedule(step_fn, args=(per_worker_iterator,)) # Wait at epoch boundaries. coordinator.join() print("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))

A continuación le mostraremos cómo puede obtener el resultado de un RemoteValue:

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,)) print("Final loss is %f" % loss.fetch())

Alternativamente, puede lanzar todos los pasos y hacer algo mientras espera a que se completen:

for _ in range(total_steps): coordinator.schedule(step_fn, args=(per_worker_iterator,)) while not coordinator.done(): time.sleep(10) # Do something like logging metrics or writing checkpoints.

Para ver el flujo de trabajo completo de entrenamiento y servicio de este ejemplo en particular, consulte esta prueba.

Obtener más información sobre la creación de conjuntos de datos

El conjunto de datos del código anterior se crea utilizando la API ClusterCoordinator.create_per_worker_dataset. Crea un conjunto de datos por worker y devuelve un objeto contenedor. Puede llamar al método iter sobre él para crear un iterador por worker. El iterador por worker contiene un iterador por worker y la porción correspondiente de un worker se sustituirá en el argumento de entrada de la función pasada al método ClusterCoordinator.schedule antes de que la función se ejecute en un worker concreto.

El método ClusterCoordinator.schedule asume que los workers son equivalentes y, por lo tanto, asume que los conjuntos de datos en diferentes workers son los mismos (excepto que pueden mezclarse de forma diferente). Debido a esto, también se recomienda repetir los conjuntos de datos y programar un número finito de pasos en vez de confiar en recibir un OutOfRangeError de un conjunto de datos.

Otra nota importante es que los conjuntos de datos tf.data no admiten la serialización y deserialización implícitas más allá de los límites de las tareas. Así que es importante crear el conjunto de datos completo dentro de la función pasada a ClusterCoordinator.create_per_worker_dataset. La API create_per_worker_dataset también puede tomar directamente un tf.data.Dataset o tf.distribute.DistributedDataset como entrada.

Evaluación

Los dos enfoques principales para realizar la evaluación con tf.distribute.ParameterServerStrategy de entrenamiento son la evaluación en línea y la evaluación sidecar. Cada uno tiene sus pros y sus contras, como se describe a continuación. Se recomienda el método de evaluación en línea si no tiene ninguna preferencia. Para los usuarios que utilicen Model.fit, Model.evaluate utilice la evaluación en línea (distribuida) oculta.

Evaluación inline

En este método, el coordinador alterna entre el entrenamiento y la evaluación, por lo que se denomina evaluación inline.

La evaluación inline tiene varias ventajas. Por ejemplo:

  • Puede admitir grandes modelos de evaluación y conjuntos de datos de evaluación que una sola tarea no puede contener.

  • Los resultados de la evaluación pueden utilizarse para tomar decisiones para el entrenamiento de la siguiente época, por ejemplo, si se debe detener el entrenamiento antes de tiempo.

Hay dos formas de aplicar la evaluación inline: la evaluación directa y la evaluación distribuida.

  • Evaluación directa: Para modelos y conjuntos de datos de evaluación pequeños, el coordinador puede ejecutar la evaluación directamente en el modelo distribuido con el conjunto de datos de evaluación en el coordinador:

eval_dataset = tf.data.Dataset.from_tensor_slices( feature_and_label_gen(num_examples=16)).map( lambda x: ( {"features": feature_preprocess_stage(x["features"])}, label_preprocess_stage(x["label"]) )).batch(8) eval_accuracy = tf.keras.metrics.Accuracy() for batch_data, labels in eval_dataset: pred = model(batch_data, training=False) actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64) eval_accuracy.update_state(labels, actual_pred) print("Evaluation accuracy: %f" % eval_accuracy.result())
  • Evaluación distribuida: Para modelos o conjuntos de datos de gran tamaño que no es factible ejecutar directamente en el coordinador, la tarea del coordinador puede distribuir las tareas de evaluación entre los workers por medio de los métodos ClusterCoordinator.schedule/ClusterCoordinator.join:

with strategy.scope(): # Define the eval metric on parameter servers. eval_accuracy = tf.keras.metrics.Accuracy() @tf.function def eval_step(iterator): def replica_fn(batch_data, labels): pred = model(batch_data, training=False) actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64) eval_accuracy.update_state(labels, actual_pred) batch_data, labels = next(iterator) strategy.run(replica_fn, args=(batch_data, labels)) def eval_dataset_fn(): return tf.data.Dataset.from_tensor_slices( feature_and_label_gen(num_examples=16)).map( lambda x: ( {"features": feature_preprocess_stage(x["features"])}, label_preprocess_stage(x["label"]) )).shuffle(16).repeat().batch(8) per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn) per_worker_eval_iterator = iter(per_worker_eval_dataset) eval_steps_per_epoch = 2 for _ in range(eval_steps_per_epoch): coordinator.schedule(eval_step, args=(per_worker_eval_iterator,)) coordinator.join() print("Evaluation accuracy: %f" % eval_accuracy.result())

Permitir la evaluación exactamente una vez

Los métodos schedule y join de tf.distribute.coordinator.ClusterCoordinator no admiten garantías de visita ni la semántica exactly-once predeterminada. En otras palabras, en el ejemplo anterior no hay ninguna garantía de que todos los ejemplos de evaluación de un conjunto de datos se evalúen exactamente una vez; es posible que algunos no se visiten y que otros se evalúen varias veces.

Se puede preferir la evaluación exactamente una vez para reducir la varianza de la evaluación entre épocas y mejorar la selección de modelos realizada mediante una detención temprana, el ajuste de hiperparámetros u otros métodos. Hay diferentes formas de habilitar la evaluación exactamente una vez:

  • Con un flujo de trabajo Model.fit/.evaluate, puede activarse si se agrega un argumento a Model.compile. Consulte la documentación sobre el argumento pss_evaluation_shards.

  • La API de servicio tf.data puede utilizarse para proporcionar una visita exactamente única para la evaluación cuando se utiliza ParameterServerStrategy (consulte la sección Dynamic Sharding en tf.data.experimental.service documentación de la API).

  • Sidecar evaluation proporciona de forma predeterminada la evaluación exactamente una vez, ya que la evaluación tiene lugar en una única máquina. Sin embargo, esto puede ser mucho más lento que realizar la evaluación distribuida entre muchos workers.

La primera opción, utilizando Model.compile, es la solución sugerida para la mayoría de los usuarios.

La evaluación exacta tiene algunas limitaciones:

  • No se permite escribir un bucle de evaluación distribuido personalizado con una garantía de visita de sólo una vez. Presente una incidencia en GitHub si necesita soporte para ello.

  • No puede administrar automáticamente el cálculo de las métricas que utilizan la API Layer.add_metric. Estas deben excluirse de la evaluación, o reelaborarse en objetos Metric.

Evaluación del Sidecar

Otro método para definir y ejecutar un bucle de evaluación en el entrenamiento tf.distribute.ParameterServerStrategy se denomina evaluación del sidecar, en el que se crea una tarea evaluadora específica que lee repetidamente los puntos de control y ejecuta la evaluación en el último punto de verificación (consulte esta guía para obtener más detalles sobre los puntos de verificación). Las tareas del coordinador y del worker no dedican tiempo a la evaluación, por lo que para un número fijo de iteraciones el tiempo total de entrenamiento debería ser menor que utilizando otros métodos de evaluación. Sin embargo, requiere una tarea evaluadora adicional y puntos de control periódicos para activar la evaluación.

Para escribir un bucle de evaluación para realizar una evaluación Sidecar, tiene dos opciones:

  1. Utilizar la API tf.keras.utils.SidecarEvaluator.

  2. Crear un bucle de evaluación personalizado.

Consulte la tf.keras.utils.SidecarEvaluator para obtener más detalles sobre la opción 1.

La evaluación del sidecar sólo es compatible con una única tarea. Esto significa que:

  • Se garantiza que cada ejemplo se evalúa una vez. En caso de que el evaluador se adelante o se reinicie, simplemente reiniciará el bucle de evaluación desde el último punto de verificación, y se descartará el progreso parcial de la evaluación realizado antes del reinicio.

  • Sin embargo, ejecutar la evaluación en una sola tarea implica que una evaluación completa puede llevar mucho tiempo.

  • Si el tamaño del modelo es demasiado grande para que quepa en la memoria de un evaluador, la evaluación del sidecar única no se puede aplicar.

Otra advertencia es que la implementación de tf.keras.utils.SidecarEvaluator, y el bucle de evaluación personalizado que aparece a continuación, pueden saltarse algunos puntos de verificación porque siempre recoge el último punto de verificación disponible, y durante una época de la evaluación, se pueden producir múltiples puntos de verificación desde el clúster de entrenamiento. Puede escribir un bucle de evaluación personalizado que evalúe cada punto de verificación, pero no se trata en este tutorial. Por otra parte, puede permanecer inactivo si los puntos de verificación se producen con menos frecuencia que el tiempo que se tarda en ejecutar la evaluación.

Un bucle de evaluación personalizado proporciona más control sobre los detalles, como elegir qué punto de verificación para evaluar, o proporcionar cualquier lógica adicional que se ejecute junto con la evaluación. A continuación se muestra un posible bucle de evaluación del sidecar personalizado:

checkpoint_dir = ... eval_model = ... eval_data = ... checkpoint = tf.train.Checkpoint(model=eval_model) for latest_checkpoint in tf.train.checkpoints_iterator( checkpoint_dir): try: checkpoint.restore(latest_checkpoint).expect_partial() except (tf.errors.OpError,) as e: # checkpoint may be deleted by training when it is about to read it. continue # Optionally add callbacks to write summaries. eval_model.evaluate(eval_data) # Evaluation finishes when it has evaluated the last epoch. if latest_checkpoint.endswith('-{}'.format(train_epochs)): break

Clústers en el mundo real

Nota: esta sección no es necesaria para ejecutar el código del tutorial en esta página.

En un entorno de producción real, ejecutará todas las tareas en procesos diferentes en máquinas diferentes. La forma más sencilla de configurar la información del clúster en cada tarea es establecer variables de entorno "TF_CONFIG" y utilizar un tf.distribute.cluster_resolver.TFConfigClusterResolver para analizar "TF_CONFIG".

Para obtener una descripción general de las variables de entorno "TF_CONFIG", consulte "Configuración de la variable de entorno TF_CONFIG" en la guía Entrenamiento distribuido.

Si inicia sus tareas de entrenamiento utilizando Kubernetes u otras plantillas de configuración, es probable que estas plantillas ya hayan establecido "TF_CONFIG" por usted.

Establezca la variable de entorno "TF_CONFIG".

Suponga que tiene 3 workers y 2 servidores de parámetros. Entonces el "TF_CONFIG" del worker 1 puede ser:

os.environ["TF_CONFIG"] = json.dumps({ "cluster": { "worker": ["host1:port", "host2:port", "host3:port"], "ps": ["host4:port", "host5:port"], "chief": ["host6:port"] }, "task": {"type": "worker", "index": 1} })

El "TF_CONFIG" del evaluador puede ser:

os.environ["TF_CONFIG"] = json.dumps({ "cluster": { "evaluator": ["host7:port"] }, "task": {"type": "evaluator", "index": 0} })

La parte "cluster" en la cadena "TF_CONFIG" anterior para el evaluador es opcional.

Si utiliza el mismo binario para todas las tareas

Si prefiere ejecutar todas estas tareas utilizando un único binario, tendrá que dejar que su programa se ramifique en diferentes funciones desde el principio:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver() if cluster_resolver.task_type in ("worker", "ps"): # Start a TensorFlow server and wait. elif cluster_resolver.task_type == "evaluator": # Run sidecar evaluation else: # Run the coordinator.

El siguiente código inicia un servidor de TensorFlow y espera, útil para los roles "worker" y "ps":

# Set the environment variable to allow reporting worker and ps failure to the # coordinator. This is a workaround and won't be necessary in the future. os.environ["GRPC_FAIL_FAST"] = "use_caller" server = tf.distribute.Server( cluster_resolver.cluster_spec(), job_name=cluster_resolver.task_type, task_index=cluster_resolver.task_id, protocol=cluster_resolver.rpc_layer or "grpc", start=True) server.join()

Gestionar el error en la tarea

Error del worker

Tanto el bucle de entrenamiento personalizado tf.distribute.coordinator.ClusterCoordinator como el enfoque Model.fit proporcionan tolerancia ante errores incorporada para el fallo de los workers. Tras la recuperación de los workers, el ClusterCoordinator invoca la recreación del conjunto de datos en los workers.

Error del servidor de parámetros o del coordinador

Sin embargo, cuando el coordinador vea un error del servidor de parámetros, lanzará un UnavailableError o AbortedError inmediatamente. En este caso, puede reiniciar el coordinador. El propio coordinador también puede dejar de estar disponible. Por lo tanto, se recomiendan ciertas herramientas para no perder el progreso del entrenamiento:

  • Para Model.fit, debe utilizar una retrollamada BackupAndRestore, que se encarga de guardar y restaurar el progreso automáticamente. Consulte la sección anterior Retrollamadas y entrenamiento para obtener un ejemplo.

  • Para un bucle de entrenamiento personalizado, debe verificar las variables del modelo periódicamente y cargar las variables del modelo desde un punto de verificación, si lo hay, antes de iniciar el entrenamiento. El progreso del entrenamiento puede inferirse aproximadamente a partir de optimizer.iterations si un optimizador es sometido a un punto de verificación:

checkpoint_manager = tf.train.CheckpointManager( tf.train.Checkpoint(model=model, optimizer=optimizer), checkpoint_dir, max_to_keep=3) if checkpoint_manager.latest_checkpoint: checkpoint = checkpoint_manager.checkpoint checkpoint.restore( checkpoint_manager.latest_checkpoint).assert_existing_objects_matched() global_steps = int(optimizer.iterations.numpy()) starting_epoch = global_steps // steps_per_epoch for _ in range(starting_epoch, num_epochs): for _ in range(steps_per_epoch): coordinator.schedule(step_fn, args=(per_worker_iterator,)) coordinator.join() checkpoint_manager.save()

Extracción de un RemoteValue

Se garantiza el éxito de la extracción de un RemoteValue si una función se ejecuta correctamente. Esto se debe a que actualmente el valor de retorno se copia inmediatamente en el coordinador después de que se ejecute una función. Si se produce algún error en el worker durante la copia, la función se volverá a intentar en otro worker disponible. Por lo tanto, si desea optimizar el rendimiento, puede programar funciones sin valor de retorno.

Informe de errores

Una vez que el coordinador vea un error como UnavailableError de los servidores de parámetros u otros errores de aplicación como un InvalidArgument de tf.debugging.check_numerics, cancelará todas las funciones pendientes y en espera antes de lanzar el error. La obtención de sus correspondientes RemoteValuesuscitará un CancelledError.

Después de que se produzca un error, el coordinador no producirá el mismo error ni ningún error de las funciones anuladas.

Mejora del rendimiento

Hay varias razones posibles por las que puede experimentar problemas de rendimiento cuando se entrena con tf.distribute.ParameterServerStrategy y tf.distribute.coordinator.ClusterCoordinator.

Una razón común es que los servidores de parámetros tienen una carga desequilibrada y algunos servidores de parámetros muy cargados han alcanzado su capacidad. También puede haber múltiples causas de origen. Algunos métodos sencillos para mitigar este problema son:

  1. Fragmente sus variables de modelo de gran tamaño mediante la especificación de un variable_partitioner al construir un ParameterServerStrategy.

  2. Evite crear una variable hotspot que sea requerida por todos los servidores de parámetros en un solo paso, por ambos:

  3. Utilizar una tasa de aprendizaje constante o subclase tf.keras.optimizers.schedules.LearningRateSchedule en los optimizadores. Esto se debe a que el comportamiento predeterminado es que la tasa de aprendizaje se convertirá en una variable colocada en un servidor de parámetros concreto, y solicitada por todos los demás servidores de parámetros en cada paso); y

  4. Utilizando un tf.keras.optimizers.legacy.Optimizer (los tf.keras.optimizers.Optimizer estándar podrían seguir dando lugar a variables hotspot).

  5. Mezcle sus grandes vocabularios antes de pasarlos a las capas de preprocesamiento de Keras.

Otra posible razón de los problemas de rendimiento es el coordinador. La implementación de schedule/join está basada en Python y, por tanto, puede tener sobrecarga de hilos. Además, la latencia entre el coordinador y los trabajadores puede ser grande. Si este es el caso:

  • Para Model.fit, puede establecer el argumento steps_per_execution proporcionado en Model.compile con un valor superior a 1.

  • Para un bucle de entrenamiento personalizado, puede empaquetar varios pasos en una sola tf.function:

steps_per_invocation = 10 @tf.function def step_fn(iterator): for _ in range(steps_per_invocation): features, labels = next(iterator) def replica_fn(features, labels): ... strategy.run(replica_fn, args=(features, labels))

Conforme se vaya optimizando la biblioteca, es de esperar que la mayoría de los usuarios no tengan que empaquetar manualmente los pasos en el futuro.

Además, un pequeño truco para mejorar el rendimiento es programar funciones sin valor de retorno como se explica en la sección anterior gestionar el error en la tarea.

Limitaciones conocidas

La mayoría de las limitaciones conocidas ya se han tratado en las secciones anteriores. Esta sección ofrece un resumen.

ParameterServerStrategy general

  • os.environment["grpc_fail_fast"]="use_caller" es necesario en todas las tareas, incluido el coordinador, para que la tolerancia ante errores funcione correctamente.

  • No se admite el entrenamiento síncrono del servidor de parámetros.

  • Suele ser necesario empaquetar varios pasos en una sola función para lograr un rendimiento óptimo.

  • No es compatible cargar un saved_model mediante tf.saved_model.load que contenga variables fragmentadas. Tenga en cuenta que se espera que la carga de un saved_model de este tipo mediante TensorFlow Serving funcione (consulte el tutorial serving para obtener más detalles).

  • No es posible recuperarse de un error del servidor de parámetros sin reiniciar la tarea del coordinador.

  • La creación de tf.lookup.StaticHashTable, comúnmente empleada por algunas capas de preprocesamiento de Keras, como tf.keras.layers.IntegerLookup, tf. keras.layers.StringLookup, y tf.keras.layers.TextVectorization, deben colocarse en Strategy.scope. De lo contrario, los recursos se colocarán en el coordinador, y los RPC de búsqueda desde los trabajadores al coordinador tendrán implicaciones en el rendimiento.

Modelo.fit específicos

  • El argumento steps_per_epoch es necesario en Model.fit. Puede seleccionar un valor que proporcione intervalos adecuados en una época.

  • ParameterServerStrategy no tiene soporte para las retrollamadas personalizadas que tienen llamadas a nivel de lote por razones de rendimiento. Debe convertir esas llamadas en llamadas a nivel de época con steps_per_epoch adecuadamente escogidos, de forma que se llamen cada steps_per_epoch número de pasos. Las retrollamadas incorporadas no se ven afectadas: sus llamadas a nivel de lote se modificaron para que sean eficaces. Se está planificando el soporte de llamadas a nivel de lote para ParameterServerStrategy.

  • Por la misma razón, a diferencia de otras estrategias, las barras de progreso y las métricas sólo se registran en los límites de las épocas.

  • run_eagerly no es compatible.

Específicos del bucle de entrenamiento personalizado

  • ClusterCoordinator.schedule no permite garantías de visita para un conjunto de datos en general, aunque una garantía de visita para la evaluación es posible mediante Model.fit/.evaluate. Consulte Habilitar la evaluación exactamente una vez.

  • Cuando ClusterCoordinator.create_per_worker_dataset se utiliza con una llamada como entrada, el conjunto de datos completo debe crearse dentro de la función que se le transfiere.

  • tf.data.Options se ignora en un conjunto de datos creado por ClusterCoordinator.create_per_worker_dataset.