Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/es-419/guide/data_performance.ipynb
25115 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Mejor rendimiento con la API tf.data

Visión general

Las GPU y las TPU pueden reducir radicalmente el tiempo necesario para ejecutar un solo paso de entrenamiento. Alcanzar el máximo rendimiento requiere un canal de entrada eficiente que proporcione datos para el siguiente paso antes de que el paso actual haya finalizado. La API tf.data ayuda a construir canalizaciones de entrada flexibles y eficientes. Este documento muestra cómo usar la API tf.data API para construir canalizaciones de entrada TensorFlow de alto rendimiento.

Antes de continuar, consulte la guía Construir canalizaciones de entrada de TensorFlow para aprender a usar la API tf.data.

Preparación

import tensorflow as tf import time

A lo largo de esta guía, iterará sobre un conjunto de datos y medirá el rendimiento. La realización de pruebas de rendimiento reproducibles puede resultar difícil. Entre los distintos factores que afectan a la reproducibilidad se incluyen:

  • La carga actual de la CPU

  • El tráfico de red

  • Mecanismos complejos, como la caché

Para obtener un punto de referencia reproducible, construirá un ejemplo artificial.

El conjunto de datos

Primero, defina una clase que herede de tf.data.Dataset llamada ArtificialDataset. Este conjunto de datos:

  • Genera num_samples muestreos (predeterminado 3)

  • Se duerme durante algún tiempo antes del primer artículo para simular la apertura de un archivo

  • Duerme durante algún tiempo antes de producir cada artículo para simular la lectura de datos de un archivo

class ArtificialDataset(tf.data.Dataset): def _generator(num_samples): # Opening the file time.sleep(0.03) for sample_idx in range(num_samples): # Reading data (line, record) from the file time.sleep(0.015) yield (sample_idx,) def __new__(cls, num_samples=3): return tf.data.Dataset.from_generator( cls._generator, output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64), args=(num_samples,) )

Este conjunto de datos es similar al de tf.data.Dataset.range, añadiendo un retraso fijo al principio y entre cada muestreo.

El bucle de entrenamiento

Después, escriba un bucle de entrenamiento simulado que mida el tiempo que se tarda en iterar sobre un conjunto de datos. El tiempo de entrenamiento es simulado.

def benchmark(dataset, num_epochs=2): start_time = time.perf_counter() for epoch_num in range(num_epochs): for sample in dataset: # Performing a training step time.sleep(0.01) print("Execution time:", time.perf_counter() - start_time)

Optimizar el rendimiento

Para mostrar cómo se puede optimizar el rendimiento, mejorará el rendimiento del ArtificialDataset.

El acercamiento ingenuo

Comience con una canalización ingenua sin usar trucos, iterando sobre el conjunto de datos tal cual.

benchmark(ArtificialDataset())

En el fondo, así es como se empleó su tiempo de ejecución:

Gráfico de tiempo de ejecución de datos: método ingenuo

El gráfico muestra que realizar un paso de entrenamiento implica:

  • Abrir un archivo si aún no se ha abierto

  • Extraer una entrada de datos del archivo

  • Usar los datos para el entrenamiento

Sin embargo, en una implementación síncrona ingenua como ésta, mientras su canalización extrae los datos, su modelo permanece inactivo. A la inversa, mientras su modelo se está entrenando, la canalización de entrada está inactiva. El tiempo del paso de entrenamiento es, por tanto, la suma de los tiempos de apertura, lectura y entrenamiento.

Las siguientes secciones desarrollan esta canalización de entrada, ilustrando las prácticas recomendadas de diseño de canalizaciones de entrada TensorFlow de alto rendimiento.

Preextracción

La preextracción se superpone al preprocesamiento y a la ejecución del modelo de un paso de entrenamiento. Mientras el modelo ejecuta el paso de entrenamiento s, la canalización de entrada lee los datos para el paso s+1. De este modo se reduce el tiempo del paso al máximo (en lugar de la suma) entre el entrenamiento y el tiempo que se tarda en extraer los datos.

La API tf.data ofrece la transformación tf.data.Dataset.prefetch. Puede usarse para desacoplar el tiempo en que se producen los datos del tiempo en que se consumen. En concreto, la transformación usa un subproceso en segundo plano y un búfer interno para preextraer elementos del conjunto de datos de entrada antes de que se soliciten. El número de elementos a preextraer debe ser igual (o posiblemente mayor) que el número de lotes consumidos por un único paso de entrenamiento. Puede ajustar este valor manualmente o configurarlo como tf.data.AUTOTUNE, lo que solicitará al runtime de tf.data que ajuste el valor dinámicamente en tiempo de ejecución.

Tenga en cuenta que la transformación de preextracción ofrece beneficios siempre que exista la oportunidad de superponer el trabajo de un "productor" con el de un "consumidor".

benchmark( ArtificialDataset() .prefetch(tf.data.AUTOTUNE) )

Gráfico de tiempo de ejecución de datos: método de preextracción

Ahora, como muestra el gráfico de tiempo de ejecución de datos, mientras se realiza el paso de entrenamiento para la muestra 0, la canalización de entrada está leyendo los datos de la muestra 1, y así sucesivamente.

Paralelización de la extracción de datos

En un entorno real, los datos de entrada pueden almacenarse de forma remota (por ejemplo, en Google Cloud Storage o HDFS). Una canalización de conjuntos de datos que funciona bien cuando lee los datos localmente podría sufrir cuellos de botella en la E/S al leer datos remotamente debido a las siguientes diferencias entre el almacenamiento local y el remoto:

  • Tiempo hasta el primer byte: La lectura del primer byte de un archivo desde un almacenamiento remoto puede tardar órdenes de magnitud más que desde un almacenamiento local.

  • Rendimiento de lectura: Aunque el almacenamiento remoto suele ofrecer un gran ancho de banda agregado, es posible que la lectura de un único archivo sólo pueda utilizar una pequeña fracción de este ancho de banda.

Además, una vez que los bytes en bruto se cargan en la memoria, también puede ser necesario deserializar y/o descifrar los datos (por ejemplo, protobuf), lo que requiere un cálculo adicional. Esta carga se produce tanto si los datos se almacenan local como remotamente, pero puede ser peor en el caso remoto si los datos no se preextraen de forma eficaz.

Para mitigar el impacto de los diversos gastos generales de la extracción de datos, se puede usar la transformación tf.data.Dataset.interleave para paralelizar el paso de carga de datos, intercalando el contenido de otros conjuntos de datos (como los lectores de archivos de datos). El número de conjuntos de datos a superponer puede especificarse mediante el argumento cycle_length, mientras que el nivel de paralelismo puede especificarse mediante el argumento num_parallel_calls. Parecida a la transformación prefetch, la transformación interleave admite tf.data.AUTOTUNE, que delegará la decisión sobre qué nivel de paralelismo usar al runtime de tf.data.

Intercalación secuencial

Los argumentos predeterminados de la transformación tf.data.Dataset.interleave hacen que intercalen muestras individuales de dos conjuntos de datos de forma secuencial.

benchmark( tf.data.Dataset.range(2) .interleave(lambda _: ArtificialDataset()) )

Data execution time plot - sequential interleave

Este gráfico de tiempo de ejecución de datos permite exhibir el comportamiento de la transformación interleave, extrayendo muestreos alternativamente de los dos conjuntos de datos disponibles. Sin embargo, no supone ninguna mejora del rendimiento.

Intercalado paralelo

Ahora, use el argumento num_parallel_calls de la transformación interleave. Esto carga varios conjuntos de datos en paralelo, reduciendo el tiempo de espera para la apertura de los archivos.

benchmark( tf.data.Dataset.range(2) .interleave( lambda _: ArtificialDataset(), num_parallel_calls=tf.data.AUTOTUNE ) )

Gráfico de tiempo de ejecución de datos: método de intercalado paralelo

Esta vez, como se ve en el gráfico de tiempo de ejecución de datos, la lectura de los dos conjuntos de datos se realiza en paralelo, lo que reduce el tiempo de procesamiento global de los datos.

Paralelización de la transformación de datos

Al preparar los datos, puede ser necesario preprocesar los elementos de entrada. Para ello, la API tf.data ofrece la transformación tf.data.Dataset.map, que aplica una función definida por el usuario a cada elemento del conjunto de datos de entrada. Dado que los elementos de entrada son independientes entre sí, el preprocesamiento puede paralelizarse en varios núcleos de CPU. Para que ello sea posible, análogamente a las transformaciones prefetch e interleave, la transformación map facilita el argumento num_parallel_calls para especificar el nivel de paralelismo.

Elegir el mejor valor para el argumento num_parallel_calls depende de su hardware, de las características de sus datos de entrenamiento (como su tamaño y forma), del costo de su función de mapeo y de qué otros procesos están teniendo lugar en la CPU al mismo tiempo. Una heurística sencilla es usar el número de núcleos de CPU disponibles. Sin embargo, al igual que para la transformación prefetch e interleave, la transformación map admite tf.data.AUTOTUNE que delegará la decisión sobre qué nivel de paralelismo usar al runtime de tf.data.

def mapped_function(s): # Do some hard pre-processing tf.py_function(lambda: time.sleep(0.03), [], ()) return s

Mapeo secuencial

Empiece usando la transformación map sin paralelismo como ejemplo de línea de referencia.

benchmark( ArtificialDataset() .map(mapped_function) )

Gráfico de tiempo de ejecución de datos: método ingenuo

En cuanto al enfoque ingenuo, aquí, como muestra el gráfico, los tiempos empleados en los pasos de apertura, lectura, preprocesamiento (mapeado) y entrenamiento se suman para una única iteración.

Mapeo paralelo

Ahora, use la misma función de preprocesamiento pero aplíquela en paralelo en varias muestras.

benchmark( ArtificialDataset() .map( mapped_function, num_parallel_calls=tf.data.AUTOTUNE ) )

Tiempo de ejecución de datos: mapeo paralelo

Como demuestra el gráfico de datos, los pasos de preprocesamiento se superponen, lo que reduce el tiempo total de una sola iteración.

Almacenamiento en caché

La transformación tf.data.Dataset.cache puede almacenar en caché un conjunto de datos, ya sea en memoria o en almacenamiento local. Esto evitará que algunas operaciones (como la apertura de archivos y la lectura de datos) se ejecuten durante cada época.

benchmark( ArtificialDataset() .map( # Apply time consuming operations before cache mapped_function ).cache( ), 5 )

Tiempo de ejecución de los datos: método de conjunto de datos en caché

Aquí, el gráfico del tiempo de ejecución de los datos muestra que cuando se almacena en caché un conjunto de datos, las transformaciones anteriores a la de cache (como la apertura del archivo y la lectura de los datos) se ejecutan sólo durante la primera época. Las épocas siguientes reutilizarán los datos almacenados en caché por la transformación cache.

Si la función definida por el usuario que se pasa a la transformación map es costosa, aplique la transformación cache después de la transformación map siempre que el conjunto de datos resultante aún pueda caber en la memoria o en el almacenamiento local. Si la función definida por el usuario aumenta el espacio necesario para almacenar el conjunto de datos más allá de la capacidad de la caché, aplíquela después de la transformación cache o considere la posibilidad de preprocesar sus datos antes del trabajo de entrenamiento para reducir el uso de recursos.

Mapeo vectorizante

Invocar una función definida por el usuario pasada a la transformación map tiene sobrecarga relacionada con la programación y ejecución de la función definida por el usuario. Vectorice la función definida por el usuario (es decir, haga que opere sobre un lote de entradas a la vez) y aplique la transformación batch antes de la transformación map.

Para ilustrar esta práctica recomendada, no es adecuado su conjunto de datos artificial. El retraso de planificación es de unos 10 microsegundos (10e-6 segundos), mucho menos que las decenas de milisegundos usadas en el ArtificialDataset, por lo que su impacto es difícil de ver.

Para este ejemplo, use la función base tf.data.Dataset.range y simplifique el bucle de entrenamiento a su forma más simple.

fast_dataset = tf.data.Dataset.range(10000) def fast_benchmark(dataset, num_epochs=2): start_time = time.perf_counter() for _ in tf.data.Dataset.range(num_epochs): for _ in dataset: pass tf.print("Execution time:", time.perf_counter() - start_time) def increment(x): return x+1

Mapeo escalar

fast_benchmark( fast_dataset # Apply function one item at a time .map(increment) # Batch .batch(256) )

Tiempo de ejecución de datos: método de mapeo escalar

El gráfico anterior ilustra lo que ocurre (con menos muestras) usando el método de mapeo escalar. Muestra que la función mapeada se aplica a cada muestra. Aunque esta función es muy rápida, tiene algunos gastos generales que repercuten en el rendimiento temporal.

Mapeado vectorizado

fast_benchmark( fast_dataset .batch(256) # Apply function on a batch of items # The tf.Tensor.__add__ method already handle batches .map(increment) )

Tiempo de ejecución de datos: método de mapeo vectorizado

Esta vez, la función mapeada se llama una vez y se aplica a un lote de muestra. El gráfico de tiempo de ejecución de datos muestra que, aunque la función puede tardar más tiempo en ejecutarse, la sobrecarga sólo aparece una vez, lo que mejora el rendimiento global del tiempo.

Reducir el espacio requerido en memoria

Una serie de transformaciones, incluyendo interleave, prefetch, y shuffle, mantienen un búfer interno de elementos. Si la función definida por el usuario que se pasa a la transformación map cambia el tamaño de los elementos, entonces el orden de la transformación map y de las transformaciones que almacenan elementos en búfer afecta al uso de la memoria. En general, elija el orden que resulte en un menor consumo de memoria, a menos que por rendimiento convenga un orden diferente.

Almacenamiento en caché de cómputos parciales

Se recomienda almacenar en caché el conjunto de datos después de la transformación map excepto si esta transformación hace que los datos sean demasiado grandes para caber en la memoria. Se puede compensar si su función mapeada se puede dividir en dos partes: una que consuma tiempo y otra que consuma memoria. En este caso, puede encadenar sus transformaciones como se indica a continuación:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

De esta forma, la parte que consume más tiempo sólo se ejecuta durante la primera época, y se evita usar demasiado espacio de la caché.

Resumen de prácticas recomendadas

Aquí va un resumen de las prácticas recomendadas para diseñar canalizaciones de entrada TensorFlow de alto rendimiento:

Reproducir las figuras

Nota: El resto de este bloc de notas trata sobre cómo reproducir las figuras anteriores. Siéntase libre de jugar con este código, pero entenderlo no es una parte esencial de este tutorial.

Para profundizar en la API tf.data.Dataset, puede jugar con sus propias canalizaciones. A continuación se muestra el código usado para trazar las imágenes de esta guía. Puede ser un buen punto de partida, mostrando algunas soluciones para dificultades comunes como:

  • Reproducibilidad en tiempo de ejecución

  • Eager execution de funciones mapeadas

  • Transformación interleave invocable

import itertools from collections import defaultdict import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt

El conjunto de datos

Igual que con ArtificialDataset, puede construir un conjunto de datos que devuelva el tiempo empleado en cada paso.

class TimeMeasuredDataset(tf.data.Dataset): # OUTPUT: (steps, timings, counters) OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32) OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3)) _INSTANCES_COUNTER = itertools.count() # Number of datasets generated _EPOCHS_COUNTER = defaultdict(itertools.count) # Number of epochs done for each dataset def _generator(instance_idx, num_samples): epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx]) # Opening the file open_enter = time.perf_counter() time.sleep(0.03) open_elapsed = time.perf_counter() - open_enter for sample_idx in range(num_samples): # Reading data (line, record) from the file read_enter = time.perf_counter() time.sleep(0.015) read_elapsed = time.perf_counter() - read_enter yield ( [("Open",), ("Read",)], [(open_enter, open_elapsed), (read_enter, read_elapsed)], [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)] ) open_enter, open_elapsed = -1., -1. # Negative values will be filtered def __new__(cls, num_samples=3): return tf.data.Dataset.from_generator( cls._generator, output_types=cls.OUTPUT_TYPES, output_shapes=cls.OUTPUT_SHAPES, args=(next(cls._INSTANCES_COUNTER), num_samples) )

Este conjunto de datos aporta muestreos de forma [[2, 1], [2, 2], [2, 3]] y de tipo [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32]. Cada muestreo es:

( [("Open"), ("Read")], [(t0, d), (t0, d)], [(i, e, -1), (i, e, s)] )

Donde:

  • Open y Read son identificadores de pasos

  • t0 es la marca de tiempo en la que se inició el paso correspondiente

  • d es el tiempo empleado en el paso correspondiente

  • i es el índice de instancia

  • e es el índice de época (número de veces que se ha iterado el conjunto de datos)

  • s es el índice de muestras

El bucle de iteración

Haga el bucle de iteración un poco más complicado para agregar todos los tiempos. Esto sólo funcionará con conjuntos de datos que generen muestreos como se detalla más arriba.

def timelined_benchmark(dataset, num_epochs=2): # Initialize accumulators steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string) times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32) values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32) start_time = time.perf_counter() for epoch_num in range(num_epochs): epoch_enter = time.perf_counter() for (steps, times, values) in dataset: # Record dataset preparation informations steps_acc = tf.concat((steps_acc, steps), axis=0) times_acc = tf.concat((times_acc, times), axis=0) values_acc = tf.concat((values_acc, values), axis=0) # Simulate training time train_enter = time.perf_counter() time.sleep(0.01) train_elapsed = time.perf_counter() - train_enter # Record training informations steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0) times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0) values_acc = tf.concat((values_acc, [values[-1]]), axis=0) epoch_elapsed = time.perf_counter() - epoch_enter # Record epoch informations steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0) times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0) values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0) time.sleep(0.001) tf.print("Execution time:", time.perf_counter() - start_time) return {"steps": steps_acc, "times": times_acc, "values": values_acc}

El método de graficado

Por último, defina una función capaz de trazar una línea de tiempo dados los valores devueltos por la función timelined_benchmark.

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False): # Remove invalid entries (negative times, or empty steps) from the timelines invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0] steps = timeline['steps'][invalid_mask].numpy() times = timeline['times'][invalid_mask].numpy() values = timeline['values'][invalid_mask].numpy() # Get a set of different steps, ordered by the first time they are encountered step_ids, indices = np.stack(np.unique(steps, return_index=True)) step_ids = step_ids[np.argsort(indices)] # Shift the starting time to 0 and compute the maximal time value min_time = times[:,0].min() times[:,0] = (times[:,0] - min_time) end = max(width, (times[:,0]+times[:,1]).max() + 0.01) cmap = mpl.cm.get_cmap("plasma") plt.close() fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0}) fig.suptitle(title) fig.set_size_inches(17.0, len(step_ids)) plt.xlim(-0.01, end) for i, step in enumerate(step_ids): step_name = step.decode() ax = axs[i] ax.set_ylabel(step_name) ax.set_ylim(0, 1) ax.set_yticks([]) ax.set_xlabel("time (s)") ax.set_xticklabels([]) ax.grid(which="both", axis="x", color="k", linestyle=":") # Get timings and annotation for the given step entries_mask = np.squeeze(steps==step) serie = np.unique(times[entries_mask], axis=0) annotations = values[entries_mask] ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66) if annotate: for j, (start, width) in enumerate(serie): annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])]) ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation, horizontalalignment='left', verticalalignment='center') if save: plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

Usar contenedores para la función mapeada

Para ejecutar la función mapeada en un contexto eager, debe contenerlas dentro de una llamada tf.py_function.

def map_decorator(func): def wrapper(steps, times, values): # Use a tf.py_function to prevent auto-graph from compiling the method return tf.py_function( func, inp=(steps, times, values), Tout=(steps.dtype, times.dtype, values.dtype) ) return wrapper

Comparación de canalizaciones

_batch_map_num_items = 50 def dataset_generator_fun(*args): return TimeMeasuredDataset(num_samples=_batch_map_num_items)

Ingénuo

@map_decorator def naive_map(steps, times, values): map_enter = time.perf_counter() time.sleep(0.001) # Time consuming step time.sleep(0.0001) # Memory consuming step map_elapsed = time.perf_counter() - map_enter return ( tf.concat((steps, [["Map"]]), axis=0), tf.concat((times, [[map_enter, map_elapsed]]), axis=0), tf.concat((values, [values[-1]]), axis=0) ) naive_timeline = timelined_benchmark( tf.data.Dataset.range(2) .flat_map(dataset_generator_fun) .map(naive_map) .batch(_batch_map_num_items, drop_remainder=True) .unbatch(), 5 )

Optimizado

@map_decorator def time_consuming_map(steps, times, values): map_enter = time.perf_counter() time.sleep(0.001 * values.shape[0]) # Time consuming step map_elapsed = time.perf_counter() - map_enter return ( tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1), tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1), tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1) ) @map_decorator def memory_consuming_map(steps, times, values): map_enter = time.perf_counter() time.sleep(0.0001 * values.shape[0]) # Memory consuming step map_elapsed = time.perf_counter() - map_enter # Use tf.tile to handle batch dimension return ( tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1), tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1), tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1) ) optimized_timeline = timelined_benchmark( tf.data.Dataset.range(2) .interleave( # Parallelize data reading dataset_generator_fun, num_parallel_calls=tf.data.AUTOTUNE ) .batch( # Vectorize your mapped function _batch_map_num_items, drop_remainder=True) .map( # Parallelize map transformation time_consuming_map, num_parallel_calls=tf.data.AUTOTUNE ) .cache() # Cache data .map( # Reduce memory usage memory_consuming_map, num_parallel_calls=tf.data.AUTOTUNE ) .prefetch( # Overlap producer and consumer works tf.data.AUTOTUNE ) .unbatch(), 5 )
draw_timeline(naive_timeline, "Naive", 15)
draw_timeline(optimized_timeline, "Optimized", 15)