Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/pt-br/guide/data_performance.ipynb
25115 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Desempenho melhor com a API tf.data

Visão geral

As GPUs e TPUs diminuem drasticamente o tempo necessário para executar um único passo de treinamento. Para conseguir o pico de desempenho, é necessário ter um pipeline de entrada eficiente que entregue os dados para o próximo passo antes da conclusão do passo atual. A API tf.data ajuda a criar pipelines de entrada flexíveis e eficientes. Este documento demonstra como usar a API tf.data para criar pipelines de entrada do TensorFlow com desempenho muito alto.

Antes de continuar, confira o guia Crie pipelines de entrada do TensorFlow para aprender a usar a API tf.data.

Configuração

import tensorflow as tf import time

Neste tutorial, você vai fazer a interação de um dataset e mensurar o desempenho. Pode ser difícil criar referenciais de desempenho que possam ser reproduzidos. Diferentes fatores afetam a capacidade de reprodução, como:

  • Carga atual da CPU

  • Tráfego da rede

  • Mecanismos complexos, como cache

Para ter um referencial que possa ser reproduzido, você criará um exemplo artificial.

Dataset

Comece definindo uma classe que herde de tf.data.Dataset chamada ArtificialDataset. Este dataset:

  • Gera num_samples amostras (o padrão é 3)

  • Repousa por um tempo antes do primeiro item para simular a abertura de um arquivo

  • Repousa por um tempo antes de gerar cada item para simular a leitura de dados de um arquivo

class ArtificialDataset(tf.data.Dataset): def _generator(num_samples): # Opening the file time.sleep(0.03) for sample_idx in range(num_samples): # Reading data (line, record) from the file time.sleep(0.015) yield (sample_idx,) def __new__(cls, num_samples=3): return tf.data.Dataset.from_generator( cls._generator, output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64), args=(num_samples,) )

Este dataset é similar ao tf.data.Dataset.range, com um atraso fixo no começo de cada amostra e entre as amostras.

Loop de treinamento

Agora, escreva um loop de treinamento simulado que mensure quanto tempo demora para fazer a interação de um dataset. O tempo de treinamento é simulado.

def benchmark(dataset, num_epochs=2): start_time = time.perf_counter() for epoch_num in range(num_epochs): for sample in dataset: # Performing a training step time.sleep(0.01) print("Execution time:", time.perf_counter() - start_time)

Otimize o desempenho

Para demonstrar como o desempenho pode ser otimizado, você vai melhorar o desempenho de ArtificialDataset.

Estratégia ingênua

Comece com um pipeline ingênuo, sem usar truques, fazendo a iteração do dataset como ele está.

benchmark(ArtificialDataset())

Por baixo dos panos, veja como o tempo de execução foi gasto:

Data execution time plot - a naive method

O gráfico mostra que fazer um passo de treinamento envolve:

  • Abrir um arquivo, se ainda não tiver sido aberto.

  • Buscar uma entrada de dados no arquivo.

  • Usar os dados para o treinamento.

Entretanto, em uma implementação síncrona ingênua como esta, embora seu pipeline esteja buscando os dados, o modelo está ocioso. Em contrapartida, enquanto o modelo está sendo treinado, o pipeline de entrada está ocioso. Portanto, o tempo do passo de treinamento é a soma dos tempos de abertura, leitura e treinamento.

As próximas seções expandem esse pipeline de entrada, ilustrando as práticas recomendadas para criar pipelines de entrada do TensorFlow com bom desempenho.

Pré-busca

A pré-busca faz a sobreposição entre o pré-processamento e a execução do modelo em um passo de treinamento. Enquanto o modelo está executando o passo de treinamento s, o pipeline de entrada está lendo os dados da etapa s+1. Ao fazer isso, o tempo do passo é reduzido para o tempo máximo (em vez da soma) do treinamento e o tempo que leva para extrair os dados.

A API tf.data conta com a transformação tf.data.Dataset.prefetch, que pode ser usada para desacoplar o tempo quando os dados são gerados do tempo quando os dados são consumidos. Especificamente, a transformação usa um thread em segundo plano e um buffer interno para fazer a pré-busca de elementos do dataset de entrada antes do momento em que são solicitados. O número de elementos da pré-busca deve ser igual ao número de lotes consumidos por um único passo de treinamento (ou possivelmente maior). Você pode ajustar esse valor manualmente ou defini-lo como tf.data.AUTOTUNE, o que fará o runtime do tf.data ajustar o valor dinamicamente em tempo de execução.

Observe que a transformação de pré-busca traz benefícios sempre que há uma oportunidade de fazer a sobreposição entre o trabalho de um "gerador" e o trabalho de um "consumidor".

benchmark( ArtificialDataset() .prefetch(tf.data.AUTOTUNE) )

Data execution time plot - prefetching method

Agora, conforme mostrado pelo gráfico do tempo de execução dos dados, enquanto o passo de treinamento está sendo executado com a amostra 0, o pipeline de entrada está lendo os dados da amostra 1, e assim por diante.

Paralelização da extração de dados

Em um ambiente real, os dados de entrada poderão estar armazenados remotamente (por exemplo, no Google Cloud Storage ou HDFS). Um pipeline de dataset que tenha bom desempenho ao ler dados localmente poderá sofrer gargalos de I/O ao ler dados remotamente devido às seguintes diferenças entre armazenamento local e remoto:

  • Time-to-first-byte (tempo até o primeiro byte): a leitura do primeiro byte de um arquivo em um armazenamento remoto pode ter um tempo com ordens de magnitude maior em relação ao armazenamento local.

  • Taxa de transferência de leitura: embora geralmente os armazenamentos remotos ofereçam grande largura de banda agregada, a leitura de um único arquivo poderá conseguir utilizar somente uma pequena fração dessa largura de banda.

Além disso, quando os bytes brutos forem carregados na memória, também poderá ser necessário desserializar e/ou decodificar os dados (por exemplo, protobuf), o que requer computação adicional. Essa sobrecarga está presente independentemente de os dados serem armazenados local ou remotamente, mas pode ser pior no armazenamento remoto se não for feita uma pré-busca eficiente dos dados.

Para mitigar o impacto das diversas sobrecargas de extração dos dados, a transformação tf.data.Dataset.interleave pode ser usada para paralelizar o passo de carregamento dos dados, intercalando o conteúdo de outros datasets (como leitores de arquivos de dados). O número de datasets a serem sobrepostos pode ser especificado pelo argumento cycle_length, enquanto o nível de paralelismo pode ser especificado pelo argumento num_parallel_calls. De maneira similar à transformação prefetch, a transformação interleave tem suporte ao tf.data.AUTOTUNE, que delegará a decisão sobre o nível de paralelismo usado para o runtime do tf.data.

Intercalação sequencial

Os argumentos padrão da transformação tf.data.Dataset.interleave fazem a intercalação de uma única amostra de dois datasets sequencialmente.

benchmark( tf.data.Dataset.range(2) .interleave(lambda _: ArtificialDataset()) )

Data execution time plot - sequential interleave

Esse gráfico do tempo de execução dos dados demonstra o comportamento da transformação interleave, buscando amostras dos dois datasets disponíveis de forma alternada. Entretanto, não há nenhuma melhoria de desempenho aqui.

Intercalação paralela

Agora, use o argumento num_parallel_calls da transformação interleave, que carrega diversos datasets em paralelo, diminuindo o tempo de espera para a abertura dos arquivos.

benchmark( tf.data.Dataset.range(2) .interleave( lambda _: ArtificialDataset(), num_parallel_calls=tf.data.AUTOTUNE ) )

Data execution time plot - parallel interleave method

Desta vez, conforme exibido no gráfico de tempo de execução dos dados, a leitura dos dois datasets está paralelizada, diminuindo o tempo global de processamento dos dados.

Paralelização da transformação de dados

Ao preparar os dados, talvez seja necessário pré-processar os elementos de entrada. Para isso, a API tf.data conta com a transformação tf.data.Dataset.map, que aplica uma função definida pelo usuário a cada elemento do dataset de entrada. Como os elementos de entrada são independentes entre si, o pré-processamento pode ser paralelizado em diversos núcleos das CPUs. Para que isso seja possível, de forma similar às transformações prefetch e interleave, a transformação map conta com o argumento num_parallel_calls para especificar o nível de paralelismo.

A escolha do melhor valor para o argumento num_parallel_calls depende do hardware, das características dos dados de treinamento (como tamanho e formato), do custo da função de mapeamento e dos outros processamentos sendo feitos na CPU ao mesmo tempo. Uma heurística simples é usar o número de núcleos de CPU disponíveis. Entretanto, assim como para as transformações prefetch e interleave, a transformação map tem suporte ao tf.data.AUTOTUNE, que delegará a decisão sobre o nível de paralelismo usado para o runtime do tf.data.

def mapped_function(s): # Do some hard pre-processing tf.py_function(lambda: time.sleep(0.03), [], ()) return s

Mapeamento sequencial

Comece usando a transformação map sem paralelismo como um exemplo de linha de base.

benchmark( ArtificialDataset() .map(mapped_function) )

Data execution time plot - sequential mapping method

Quanto à estratégia ingênua, aqui, conforme mostrado pelo gráfico, os tempos gastos para abrir, ler, fazer o pré-processamento (mapear) e fazer os passos de treinamento são somados para uma única iteração.

Mapeamento paralelo

Agora, use a mesma função de pré-processamento, mas aplique-a a diversas amostras de forma paralela.

benchmark( ArtificialDataset() .map( mapped_function, num_parallel_calls=tf.data.AUTOTUNE ) )

Data execution time - parallel mapping

Conforme o gráfico dos dados demonstra, os passos de pré-processamento se sobrepõem, diminuindo o tempo geral de uma única iteração.

Cache

A transformação tf.data.Dataset.cache pode fazer cache de um dataset, seja na memória ou no armazenamento local, o que evitará que algumas operações (como abertura de arquivos e leitura de dados) sejam executadas em cada época.

benchmark( ArtificialDataset() .map( # Apply time consuming operations before cache mapped_function ).cache( ), 5 )

Data execution time - cached dataset method

Aqui, o gráfico do tempo de execução dos dados mostra que, ao fazer cache de um dataset, as transformações antes da transformação cache (como abertura de arquivos e leitura de dados) são executadas somente durante a primeira época. As épocas subsequentes reutilizarão os dados armazenados em cache pela transformação cache.

Se a função definida pelo usuário passada para a transformação map for cara, aplique a transformação cache após a transformação map, desde que o dataset resultante ainda caiba na memória ou no armazenamento local. Se a função definida pelo usuário aumentar o espaço necessário para armazenar o dataset para além da capacidade de cache, aplique-a após a transformação cache ou considere fazer o pré-processamento dos dados antes do trabalho de treinamento para reduzir o uso de recursos.

Vetorização do mapeamento

Fazer uma chamada à função definida pelo usuário passada para a transformação map traz uma sobrecarga relacionada ao agendamento e à execução dessa função. Vetorize a função definida pelo usuário (ou seja, ela deve operar um lote de entradas ao mesmo tempo) e aplique a transformação batch antes da transformação map.

Para ilustrar essa prática recomendada, seu dataset artificial não é adequado. O atraso de agendamento é de cerca de 10 microssegundos (10e-6 segundos), muito menor do que as dezenas de milissegundos usadas no ArtificialDataset e, portanto, é difícil observar o impacto.

Neste exemplo, use a função base tf.data.Dataset.range e simplifique o loop de treinamento para a forma mais simples.

fast_dataset = tf.data.Dataset.range(10000) def fast_benchmark(dataset, num_epochs=2): start_time = time.perf_counter() for _ in tf.data.Dataset.range(num_epochs): for _ in dataset: pass tf.print("Execution time:", time.perf_counter() - start_time) def increment(x): return x+1

Mapeamento de escalar

fast_benchmark( fast_dataset # Apply function one item at a time .map(increment) # Batch .batch(256) )

Data execution time - scalar map method

O gráfico acima ilustra o que está acontecendo (com menos amostras) usando o método de mapeamento de escalar. Ele mostra que a função mapeada é aplicada a cada amostra. Embora essa função seja muito rápida, há uma certa sobrecarga que impacta o desempenho do tempo de execução.

Mapeamento vetorizado

fast_benchmark( fast_dataset .batch(256) # Apply function on a batch of items # The tf.Tensor.__add__ method already handle batches .map(increment) )

Data execution time - vectorized map method

Desta vez, a função mapeada é chamada uma vez e aplicada a um lote de amostras. Como o gráfico do tempo de execução dos dados mostra, embora a função possa demorar mais tempo para executar, a sobrecarga aparece somente uma vez, aumentando o desempenho geral do tempo de execução.

Redução do volume de memória

Diversas transformações, incluindo interleave, prefetch e shuffle, mantêm um buffer interno de elementos. Se a função definida pelo usuário passada para a transformação map alterar o tamanho dos elementos, então a ordem da transformação de mapeamento e das transformações que fazem buffer dos elementos afeta o uso de memória. De forma geral, escolha a ordem que resulte no menor volume de memória, a menos que uma ordem diferente seja desejável por questões de desempenho.

Cache das computações parciais

É recomendável fazer o cache do dataset após a transformação map, a não ser que essa transformação faça os dados ficarem grandes demais e não caberem na memória. Uma contrapartida é se sua função mapeada puder ser dividida em duas partes: uma que consuma tempo e outra que consuma memória. Neste caso, você pode encadear as transformações da seguinte forma:

dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)

Desta forma, a parte que consome tempo é executada somente durante a primeira época, e você evita usar espaço do cache demais.

Resumo das práticas recomendadas

Veja um resumo das práticas recomendadas para criar pipelines de entrada do TensorFlow com bom desempenho:

Reprodução dos números

Observação: o restante deste notebook fala sobre como reproduzir os números acima. Fique à vontade para mexer no código, mas entendê-lo não é essencial para este tutorial.

Para aprofundar sua compreensão da API tf.data.Dataset, você pode fazer seus próprios pipelines. Veja abaixo o código usado para gerar as imagens deste guia. Pode ser um bom ponto de partida, mostrando alguns caminhos alternativos para dificuldades comuns, como:

  • Capacidade de reprodução do tempo de execução

  • Execução adiantada (eager) das funções mapeadas

  • Transformação interleave que pode ser chamada

import itertools from collections import defaultdict import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt

Dataset

Similar ao ArtificialDataset, você pode criar um dataset que retorne o tempo gasto em cada passo.

class TimeMeasuredDataset(tf.data.Dataset): # OUTPUT: (steps, timings, counters) OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32) OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3)) _INSTANCES_COUNTER = itertools.count() # Number of datasets generated _EPOCHS_COUNTER = defaultdict(itertools.count) # Number of epochs done for each dataset def _generator(instance_idx, num_samples): epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx]) # Opening the file open_enter = time.perf_counter() time.sleep(0.03) open_elapsed = time.perf_counter() - open_enter for sample_idx in range(num_samples): # Reading data (line, record) from the file read_enter = time.perf_counter() time.sleep(0.015) read_elapsed = time.perf_counter() - read_enter yield ( [("Open",), ("Read",)], [(open_enter, open_elapsed), (read_enter, read_elapsed)], [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)] ) open_enter, open_elapsed = -1., -1. # Negative values will be filtered def __new__(cls, num_samples=3): return tf.data.Dataset.from_generator( cls._generator, output_types=cls.OUTPUT_TYPES, output_shapes=cls.OUTPUT_SHAPES, args=(next(cls._INSTANCES_COUNTER), num_samples) )

Este dataset conta com amostras de formato [[2, 1], [2, 2], [2, 3]] e tipo [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32]. Cada amostra é:

( [("Open"), ("Read")], [(t0, d), (t0, d)], [(i, e, -1), (i, e, s)] )

Em que:

  • Open e Read são identificadores de passos

  • t0 é o timestamp de quando o passo correspondente começou

  • d é o tempo gasto no passo correspondente

  • i é o índice da instância

  • e é o índice da época (número de vezes em que foi feita a iteração do dataset)

  • s é o índice da amostra

Loop de iteração

Complique um pouco mais o loop de iteração fazendo a agregação de todos os tempos, o que funcionará apenas com datasets que gerem amostras, conforme detalhado acima.

def timelined_benchmark(dataset, num_epochs=2): # Initialize accumulators steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string) times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32) values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32) start_time = time.perf_counter() for epoch_num in range(num_epochs): epoch_enter = time.perf_counter() for (steps, times, values) in dataset: # Record dataset preparation informations steps_acc = tf.concat((steps_acc, steps), axis=0) times_acc = tf.concat((times_acc, times), axis=0) values_acc = tf.concat((values_acc, values), axis=0) # Simulate training time train_enter = time.perf_counter() time.sleep(0.01) train_elapsed = time.perf_counter() - train_enter # Record training informations steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0) times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0) values_acc = tf.concat((values_acc, [values[-1]]), axis=0) epoch_elapsed = time.perf_counter() - epoch_enter # Record epoch informations steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0) times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0) values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0) time.sleep(0.001) tf.print("Execution time:", time.perf_counter() - start_time) return {"steps": steps_acc, "times": times_acc, "values": values_acc}

Método de plotagem

Por fim, defina uma função que consiga plotar uma linha do tempo dados os valores retornados pela função timelined_benchmark.

def draw_timeline(timeline, title, width=0.5, annotate=False, save=False): # Remove invalid entries (negative times, or empty steps) from the timelines invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0] steps = timeline['steps'][invalid_mask].numpy() times = timeline['times'][invalid_mask].numpy() values = timeline['values'][invalid_mask].numpy() # Get a set of different steps, ordered by the first time they are encountered step_ids, indices = np.stack(np.unique(steps, return_index=True)) step_ids = step_ids[np.argsort(indices)] # Shift the starting time to 0 and compute the maximal time value min_time = times[:,0].min() times[:,0] = (times[:,0] - min_time) end = max(width, (times[:,0]+times[:,1]).max() + 0.01) cmap = mpl.cm.get_cmap("plasma") plt.close() fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0}) fig.suptitle(title) fig.set_size_inches(17.0, len(step_ids)) plt.xlim(-0.01, end) for i, step in enumerate(step_ids): step_name = step.decode() ax = axs[i] ax.set_ylabel(step_name) ax.set_ylim(0, 1) ax.set_yticks([]) ax.set_xlabel("time (s)") ax.set_xticklabels([]) ax.grid(which="both", axis="x", color="k", linestyle=":") # Get timings and annotation for the given step entries_mask = np.squeeze(steps==step) serie = np.unique(times[entries_mask], axis=0) annotations = values[entries_mask] ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66) if annotate: for j, (start, width) in enumerate(serie): annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])]) ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation, horizontalalignment='left', verticalalignment='center') if save: plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")

Use encapsuladores para a função mapeada

Para executar a função mapeada em um contexto eager, você precisa encapsulá-la dentro de uma chamada tf.py_function.

def map_decorator(func): def wrapper(steps, times, values): # Use a tf.py_function to prevent auto-graph from compiling the method return tf.py_function( func, inp=(steps, times, values), Tout=(steps.dtype, times.dtype, values.dtype) ) return wrapper

Comparação dos pipelines

_batch_map_num_items = 50 def dataset_generator_fun(*args): return TimeMeasuredDataset(num_samples=_batch_map_num_items)

Ingênuo

@map_decorator def naive_map(steps, times, values): map_enter = time.perf_counter() time.sleep(0.001) # Time consuming step time.sleep(0.0001) # Memory consuming step map_elapsed = time.perf_counter() - map_enter return ( tf.concat((steps, [["Map"]]), axis=0), tf.concat((times, [[map_enter, map_elapsed]]), axis=0), tf.concat((values, [values[-1]]), axis=0) ) naive_timeline = timelined_benchmark( tf.data.Dataset.range(2) .flat_map(dataset_generator_fun) .map(naive_map) .batch(_batch_map_num_items, drop_remainder=True) .unbatch(), 5 )

Otimizado

@map_decorator def time_consuming_map(steps, times, values): map_enter = time.perf_counter() time.sleep(0.001 * values.shape[0]) # Time consuming step map_elapsed = time.perf_counter() - map_enter return ( tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1), tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1), tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1) ) @map_decorator def memory_consuming_map(steps, times, values): map_enter = time.perf_counter() time.sleep(0.0001 * values.shape[0]) # Memory consuming step map_elapsed = time.perf_counter() - map_enter # Use tf.tile to handle batch dimension return ( tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1), tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1), tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1) ) optimized_timeline = timelined_benchmark( tf.data.Dataset.range(2) .interleave( # Parallelize data reading dataset_generator_fun, num_parallel_calls=tf.data.AUTOTUNE ) .batch( # Vectorize your mapped function _batch_map_num_items, drop_remainder=True) .map( # Parallelize map transformation time_consuming_map, num_parallel_calls=tf.data.AUTOTUNE ) .cache() # Cache data .map( # Reduce memory usage memory_consuming_map, num_parallel_calls=tf.data.AUTOTUNE ) .prefetch( # Overlap producer and consumer works tf.data.AUTOTUNE ) .unbatch(), 5 )
draw_timeline(naive_timeline, "Naive", 15)
draw_timeline(optimized_timeline, "Optimized", 15)