Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/pt-br/tutorials/distribute/input.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Entrada distribuída

As APIs de tf.distribute oferecem uma maneira fácil de os usuários aumentarem a escala do treinamento de uma máquina para várias. Ao aumentar a escala do modelo, os usuários também precisam distribuir a entrada em diversos dispositivos. tf.distribute fornece APIs que permitem distribuir automaticamente sua entrada entre os dispositivos.

Este guia mostrará as diferentes formas de criar um dataset distribuído e iteradores usando as APIs de tf.distribute. Além disso, os seguintes tópicos serão discutidos:

  • Uso, fragmentação e opções de divisão em lotes ao usar tf.distribute.Strategy.experimental_distribute_dataset e tf.distribute.Strategy.distribute_datasets_from_function.

  • Diferentes formas de fazer a iteração do dataset distribuído.

  • Diferenças entre as APIs tf.distribute.Strategy.experimental_distribute_dataset/tf.distribute.Strategy.distribute_datasets_from_function e APIs de tf.data, bem como qualquer limitação que os usuários poderão ter ao usá-las.

Este guia não aborda o uso de entrada distribuída com APIs do Keras.

Datasets distribuídos

Para usar APIs de tf.distribute para aumentar a escala, use tf.data.Dataset para representar a entrada. tf.distribute funciona de forma eficiente com tf.data.Dataset — por exemplo, pela pré-busca automática no dispositivo acelerador e atualizações de desempenho regulares. Se você tiver um caso de uso que exija a utilização de algo diferente de tf.data.Dataset, confira a seção Entradas de tensor deste guia. Em um loop de treinamento não distribuído, primeiro crie uma instância de tf.data.Dataset e depois faça a iteração dos elementos. Por exemplo:

import tensorflow as tf # Helper libraries import numpy as np import os print(tf.__version__)
# Simulate multiple CPUs with virtual devices N_VIRTUAL_DEVICES = 2 physical_devices = tf.config.list_physical_devices("CPU") tf.config.set_logical_device_configuration( physical_devices[0], [tf.config.LogicalDeviceConfiguration() for _ in range(N_VIRTUAL_DEVICES)])
print("Available devices:") for i, device in enumerate(tf.config.list_logical_devices()): print("%d) %s" % (i, device))
global_batch_size = 16 # Create a tf.data.Dataset object. dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size) @tf.function def train_step(inputs): features, labels = inputs return labels - 0.3 * features # Iterate over the dataset using the for..in construct. for inputs in dataset: print(train_step(inputs))

Para permitir que os usuários utilizem a estratégia do tf.distribute com mudanças mínimas do código existente, duas APIs foram lançadas, que distribuem uma instância de tf.data.Dataset e retornam um objeto do dataset distribuído. Então, um usuário pode fazer a iteração dessa instância do dataset distribuído e treinar o modelo conforme mencionado. Agora, vejamos maiores detalhes das duas APIs – tf.distribute.Strategy.experimental_distribute_dataset e tf.distribute.Strategy.distribute_datasets_from_function:

tf.distribute.Strategy.experimental_distribute_dataset

Uso

Esta API recebe uma instância de tf.data.Dataset como entrada e retorna uma instância de tf.distribute.DistributedDataset. A divisão em lotes do dataset de entrada deve ser igual ao tamanho global de lote. O tamanho global de lote é o número de amostras que você deseja processar em todos os dispositivos no passo 1. Você pode fazer a iteração desse dataset distribuído com Pythonic ou criar um iterador usando iter. O objeto retornado não é uma instância de tf.data.Dataset e não é compatível com nenhuma outra API que transforma ou inspeciona o dataset de alguma forma. Essa é a API recomendada se você não quiser fragmentar a entrada em diferentes réplicas de forma específica.

global_batch_size = 16 mirrored_strategy = tf.distribute.MirroredStrategy() dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size) # Distribute input using the `experimental_distribute_dataset`. dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset) # 1 global batch of data fed to the model in 1 step. print(next(iter(dist_dataset)))

Propriedades

Divisão em lotes

tf.distribute recria lotes da instância de tf.data.Dataset de entrada com um novo tamanho de lote igual ao tamanho global de lote dividido pelo número de réplicas em sincronia. O número de réplicas em sincronia é igual ao número de dispositivos que participam da redução total de gradiente durante o treinamento. Quando um usuário faz uma chamada a next no iterador distribuído, um tamanho de dados do lote por réplica é retornado em cada réplica. A cardinalidade do dataset dividido em lotes sempre será múltiplo do número de réplicas. Veja alguns exemplos:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • Sem distribuição:

      • Lote 1: [0, 1, 2, 3]

      • Lote 2: [4, 5]

    • Com distribuição em 2 réplicas. O último lote ([4, 5]) é dividido entre 2 réplicas.

    • Lote 1:

      • Réplica 1: [0, 1]

      • Réplica 2: [2, 3]

    • Lote 2:

      • Réplica 1: [4]

      • Réplica 2: [5]

  • tf.data.Dataset.range(4).batch(4)

    • Sem distribuição:

      • Lote 1: [0, 1, 2, 3]

    • Com distribuição em 5 réplicas:

      • Lote 1:

        • Réplica 1: [0]

        • Réplica 2: [1]

        • Réplica 3: [2]

        • Réplica 4: [3]

        • Réplica 5: []

  • tf.data.Dataset.range(8).batch(4)

    • Sem distribuição:

      • Lote 1: [0, 1, 2, 3]

      • Lote 2: [4, 5, 6, 7]

    • Com distribuição em 3 réplicas:

      • Lote 1:

        • Réplica 1: [0, 1]

        • Réplica 2: [2, 3]

        • Réplica 3: []

      • Lote 2:

        • Réplica 1: [4, 5]

        • Réplica 2: [6, 7]

        • Réplica 3: []

Observação: os exemplos acima apenas ilustram como um lote global é dividido em diferentes réplicas. Não é recomendável depender dos valores reais que vão para cada réplica, pois eles podem mudar dependendo da implementação.

A nova divisão em lotes do dataset tem uma complexidade de espaço que aumenta linearmente com o número de réplicas. Portanto, para o caso de uso de treinamento multiworker, o pipeline de entrada pode se deparar com erros de falta de memória (OOM, na sigla em inglês).

Fragmentação

tf.distribute também fragmenta automaticamente o dataset de entrada no treinamento multiworker com MultiWorkerMirroredStrategy e TPUStrategy. Cada dataset é criado no dispositivo com CPU do worker. Com a fragmentação automática de um dataset em um conjunto de workers, é atribuído a cada worker um subconjunto de todo o dataset (se a política tf.data.experimental.AutoShardPolicy certa estiver definida). Isso é feito para garantir que, em cada passo, um tamanho global de lote de elementos não sobrepostos do dataset seja processado por cada worker. A fragmentação automática tem opções diferentes, que podem ser especificadas usando-se tf.data.experimental.DistributeOptions. É importante salientar que não há fragmentação automática no treinamento multiworker com ParameterServerStrategy, e mais informações sobre a criação de datasets com essa estratégia estão disponíveis no tutorial ParameterServerStrategy.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(64).batch(16) options = tf.data.Options() options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA dataset = dataset.with_options(options)

Há três opções diferentes que você pode definir para a política tf.data.experimental.AutoShardPolicy:

  • AUTO (automática): esta é a opção padrão, em que será feita uma tentativa de fragmentar por FILE (arquivo). A tentativa de fragmentar por FILE falha se não for detectado um dataset baseado em arquivos. Nesse caso, tf.distribute reverterá para a fragmentação por DATA (dados). Se o dataset de entrada for baseado em arquivos, mas o número de arquivos for menor do que o número de workers, será gerado um erro InvalidArgumentError. Se isso acontecer, defina a política explicitamente como AutoShardPolicy.DATA ou divida a fonte de entrada em arquivos menores de tal forma que o número de arquivos seja maior do que o número de workers.

  • FILE (arquivo): esta é a opção se você quiser fragmentar os arquivos de entrada em todos os workers. Você deve usar esta opção se o número de arquivos de entrada for muito maior do que o número de workers e se os dados nos arquivos estiverem distribuídos de maneira uniforme. A desvantagem desta opção é ficar com workers ociosos se os dados nos arquivos não estiverem distribuídos de maneira uniforme. Se o número de arquivos for menor do que o número de workers, será gerado um erro InvalidArgumentError. Se isso acontecer, defina a política explicitamente como AutoShardPolicy.DATA. Por exemplo, vamos distribuir 2 arquivos em 2 workers, com 1 réplica cada. O arquivo 1 contém [0, 1, 2, 3, 4, 5] e o arquivo 2 contém [6, 7, 8, 9, 10, 11]. O número total de réplicas em sincronia será 2, e o tamanho global de lote será 4.

    • Worker 0:

      • Lote 1 = Réplica 1: [0, 1]

      • Lote 2 = Réplica 1: [2, 3]

      • Lote 3 = Réplica 1: [4]

      • Lote 4 = Réplica 1: [5]

    • Worker 1:

      • Lote 1 = Réplica 2: [6, 7]

      • Lote 2 = Réplica 2: [8, 9]

      • Lote 3 = Réplica 2: [10]

      • Lote 4 = Réplica 2: [11]

  • DATA (dados): os elementos serão fragmentados em todos os workers. Cada worker lerá todo o dataset e processará somente o fragmento atribuído a ele. Todos os outros fragmentos serão descartados. Geralmente, isso é usado se o número de arquivos de entrada for menor do que o número de workers e se você quiser uma melhor fragmentação de dados em todos os workers. A desvantagem é que todo o dataset será lido em cada worker. Por exemplo, vamos distribuir 1 arquivo em 2 workers. O arquivo 1 contém [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. O número total de réplicas em sincronia será 2.

    • Worker 0:

      • Lote 1 = Réplica 1: [0, 1]

      • Lote 2 = Réplica 1: [4, 5]

      • Lote 3 = Réplica 1: [8, 9]

    • Worker 1:

      • Lote 1 = Réplica 2: [2, 3]

      • Lote 2 = Réplica 2: [6, 7]

      • Lote 3 = Réplica 2: [10, 11]

  • OFF (desativada): se você desativar a fragmentação automática, cada worker processará todos os dados. Por exemplo, vamos distribuir 1 arquivo em 2 workers. O arquivo 1 contém [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. O número total de réplicas em sincronia será 2. Então, cada worker verá a seguinte distribuição:

    • Worker 0:

      • Lote 1 = Réplica 1: [0, 1]

      • Lote 2 = Réplica 1: [2, 3]

      • Lote 3 = Réplica 1: [4, 5]

      • Lote 4 = Réplica 1: [6, 7]

      • Lote 5 = Réplica 1: [8, 9]

      • Lote 6 = Réplica 1: [10, 11]

    • Worker 1:

      • Lote 1 = Réplica 2: [0, 1]

      • Lote 2 = Réplica 2: [2, 3]

      • Lote 3 = Réplica 2: [4, 5]

      • Lote 4 = Réplica 2: [6, 7]

      • Lote 5 = Réplica 2: [8, 9]

      • Lote 6 = Réplica 2: [10, 11]

Pré-busca

Por padrão, tf.distribute adiciona uma transformação de pré-busca ao final da instância de tf.data.Dataset fornecida pelo usuário. O argumento da transformação de pré-busca, que é buffer_size, é igual ao número de réplicas em sincronia.

tf.distribute.Strategy.distribute_datasets_from_function

Uso

Esta API recebe uma função de entrada e devolve uma instância de tf.distribute.DistributedDataset. A função de entrada que o usuário passa tem um argumento tf.distribute.InputContext e deve retornar uma instância de tf.data.Dataset. Com esta API, tf.distribute não faz nenhuma outra mudança na instância de tf.data.Dataset do usuário retornada pela função de entrada. É responsabilidade do usuário criar os lotes e fragmentar o dataset. tf.distribute faz uma chamada à função de entrada no dispositivo com CPU de cada worker. Além de permitir que os usuários especifiquem sua própria lógica de divisão lotes e fragmentação, esta API também apresenta escalabilidade e desempenho melhores em comparação a tf.distribute.Strategy.experimental_distribute_dataset quando usada para treinamento multiworker.

mirrored_strategy = tf.distribute.MirroredStrategy() def dataset_fn(input_context): batch_size = input_context.get_per_replica_batch_size(global_batch_size) dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(64).batch(16) dataset = dataset.shard( input_context.num_input_pipelines, input_context.input_pipeline_id) dataset = dataset.batch(batch_size) dataset = dataset.prefetch(2) # This prefetches 2 batches per device. return dataset dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)

Propriedades

Divisão em lotes

A instância tf.data.Dataset, que é o valor de retorno da função de entrada, deve ser dividida em lotes usando o tamanho de lote por réplica. O tamanho de lote por réplica é igual ao tamanho global de lote dividido pelo número de réplicas que fazem parte do treinamento sincronizado. Isso ocorre pelo fato de tf.distribute fazer uma chamada à função de entrada no dispositivo com CPU de cada worker. O dataset criado em um determinado worker deve estar pronto para uso por todas as réplicas nesse worker.

Fragmentação

O objeto tf.distribute.InputContext que é passado implicitamente como argumento para a função de entrada do usuário é criado por tf.distribute em segundo plano. Ele tem informações sobre o número de workers, ID do worker atual, etc. A função de entrada pode realizar a fragmentação conforme as políticas definidas pelo usuário utilizando essas propriedades, que fazem parte do objeto tf.distribute.InputContext.

Pré-busca

tf.distribute não adiciona uma transformação de pré-busca ao final do tf.data.Dataset retornado pela função de entrada fornecida pelo usuário, então você deve fazer uma chamada explicita a Dataset.prefetch no exemplo acima.

Observação: tanto tf.distribute.Strategy.experimental_distribute_dataset quanto tf.distribute.Strategy.distribute_datasets_from_function retornam instâncias de tf.distribute.DistributedDataset que não são do tipo tf.data.Dataset. Você pode fazer a iteração dessas instâncias (conforme exibido na seção Iteradores distribuídos) e usar a propriedade element_spec.

Iteradores distribuídos

De forma similar às instâncias de tf.data.Dataset não distribuídas, você precisará criar um iterador nas instâncias de tf.distribute.DistributedDataset para fazer a iteração e acessar os elementos em tf.distribute.DistributedDataset. Veja abaixo de que maneira você pode criar um tf.distribute.DistributedIterator e usá-lo para treinar seu modelo:

Usos

Uso de um Pythonic para constructo de loop

Você pode usar um loop Pythonic amigável para o usuário para fazer a iteração do tf.distribute.DistributedDataset. Os elementos retornados pelo tf.distribute.DistributedIterator podem ser um único tf.Tensor ou um tf.distribute.DistributedValues, que contém um valor por réplica. Ao colocar o loop dentro de uma tf.function, o desempenho aumenta. Entretanto, os comandos break e return não estarão disponíveis para um loop de um tf.distribute.DistributedDataset que é colocado dentro de uma tf.function.

global_batch_size = 16 mirrored_strategy = tf.distribute.MirroredStrategy() dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size) dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset) @tf.function def train_step(inputs): features, labels = inputs return labels - 0.3 * features for x in dist_dataset: # train_step trains the model using the dataset elements loss = mirrored_strategy.run(train_step, args=(x,)) print("Loss is ", loss)

Uso de iter para criar um iterador explícito

Para fazer a iteração dos elementos em uma instância de tf.distribute.DistributedDataset, você pode criar um tf.distribute.DistributedIterator usando a API iter nele. Com um iterador explícito, você pode fazer a iteração em um número fixo de passos. Para obter o próximo elemento do dist_iterator de uma instância de tf.distribute.DistributedIterator, você pode fazer uma chamada a next(dist_iterator), dist_iterator.get_next() ou dist_iterator.get_next_as_optional(). Os dois últimos são basicamente a mesma coisa:

num_epochs = 10 steps_per_epoch = 5 for epoch in range(num_epochs): dist_iterator = iter(dist_dataset) for step in range(steps_per_epoch): # train_step trains the model using the dataset elements loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),)) # which is the same as # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),)) print("Loss is ", loss)

Com next ou tf.distribute.DistributedIterator.get_next, se o tf.distribute.DistributedIterator tiver chegado ao fim, será exibido um erro OutOfRange (fora do intervalo). O cliente pode capturar o erro no Python e continuar fazendo outros trabalhos, como criação de checkpoints e avaliação. Entretanto, isso não funcionará se você estiver usando um loop de treinamento de host (por exemplo, executando diversos passos por tf.function), desta forma:

@tf.function def train_fn(iterator): for _ in tf.range(steps_per_loop): strategy.run(step_fn, args=(next(iterator),))

Este exemplo train_fn contém diversos passos com o encapsulamento do corpo do passo dentro de um tf.range. Neste caso, iterações diferentes no loop sem nenhuma dependência poderiam começar em paralelo, então um erro OutOfRange pode ser gerado em iterações posteriores, antes de a computação de iterações anteriores terminar. Quando um erro OutOfRange é gerado, todas as operações da função são encerradas imediatamente. Se você deseja evitar esse tipo de situação, uma alternativa que não gera um erro OutOfRange é tf.distribute.DistributedIterator.get_next_as_optional. get_next_as_optional retorna um tf.experimental.Optional, que contém o próximo elemento ou nenhum valor se tf.distribute.DistributedIterator tiver chegado ao fim.

# You can break the loop with `get_next_as_optional` by checking if the `Optional` contains a value global_batch_size = 4 steps_per_loop = 5 strategy = tf.distribute.MirroredStrategy() dataset = tf.data.Dataset.range(9).batch(global_batch_size) distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset)) @tf.function def train_fn(distributed_iterator): for _ in tf.range(steps_per_loop): optional_data = distributed_iterator.get_next_as_optional() if not optional_data.has_value(): break per_replica_results = strategy.run(lambda x: x, args=(optional_data.get_value(),)) tf.print(strategy.experimental_local_results(per_replica_results)) train_fn(distributed_iterator)

Uso da propriedade element_spec

Se você passar os elementos de um dataset distribuído para uma tf.function e quiser um tf.TypeSpec garantido, pode especificar o argumento input_signature da tf.function. A saída de um dataset distribuído é tf.distribute.DistributedValues, que pode representar a entrada de um único dispositivo ou de vários. Para obter o tf.TypeSpec correspondente a esse valor distribuído, você pode usar tf.distribute.DistributedDataset.element_spec ou tf.distribute.DistributedIterator.element_spec.

global_batch_size = 16 epochs = 5 steps_per_epoch = 5 mirrored_strategy = tf.distribute.MirroredStrategy() dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size) dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset) @tf.function(input_signature=[dist_dataset.element_spec]) def train_step(per_replica_inputs): def step_fn(inputs): return 2 * inputs return mirrored_strategy.run(step_fn, args=(per_replica_inputs,)) for _ in range(epochs): iterator = iter(dist_dataset) for _ in range(steps_per_epoch): output = train_step(next(iterator)) tf.print(output)

Pré-processamento de dados

Até agora, você aprendeu como distribuir um tf.data.Dataset. Porém, antes que os dados estejam prontos para o modelo, eles precisam ser pré-processados para limpeza, transformação ou aumento, por exemplo. Veja dois conjuntos de ferramentas úteis:

  • [Camadas de pré-processamento do Keras](https://www.tensorflow.org/guide/keras/preprocessing_layers): conjunto de camadas do Keras que permitem aos desenvolvedores criar pipelines de processamento de entrada nativos do Keras. Algumas camadas de pré-processamento do Keras contêm estados não treináveis, que podem ser definidos na inicialização ou `adapt`ados (consulte a seção `adaptar` do [guia de camadas de pré-processamento do Keras](https://www.tensorflow.org/guide/keras/preprocessing_layers)). Ao distribuir camadas de pré-processamento stateful, os estados devem ser replicados em todos os workers. Para usar essas camadas, você pode torná-las parte do modelo ou aplicá-las aos datasets.
  • TensorFlow Transform (tf.Transform): biblioteca do TensorFlow que permite definir uma transformação de dados tanto no nível de instância quanto de passos completos por meio de pipelines de pré-processamento de dados. O TensorFlow Transform tem duas fases. A primeira é a fase Analisar, em que os dados de treinamento não tratados são analisados em um processo de passos completos para computar as estatísticas necessárias para as transformações, e a lógica de transformação é gerada como operações no nível de instância. A segunda é a fase Transformar, em que os dados de treinamento não tratados são transformados em um processo no nível de instância.

Camadas de pré-processamento do Keras versus TensorFlow Transform

Tanto o TensorFlow Transform quanto as camadas de pré-processamento do Keras oferecem uma maneira de dividir o pré-processamento durante o treinamento e agrupar o pré-processamento com um modelo durante a inferência, reduzindo o desvio de treinamento/exibição.

O TensorFlow Transform, que está intimamente integrado ao TFX, oferece uma solução map-reduce escalável para analisar e transformar datasets de qualquer tamanho em um trabalho separado do pipeline de treinamento. Se você precisar fazer uma análise de um dataset que não caiba em uma única máquina, o TensorFlow Transform deve ser sua primeira opção.

As camadas de pré-processamento do Keras são mais adequadas para pré-processamento aplicado durante o treinamento, após ler os dados no disco. Elas são perfeitamente adequadas para o desenvolvimento de modelos na biblioteca do Keras. Elas permitem a análise de um dataset menor com o uso de adapt e possibilitam também casos de uso como ampliação de dados de imagens, em que cada passo do dataset de entrada gera exemplos diferentes para o treinamento.

As duas bibliotecas podem ser combinadas, em que o TensorFlow Transform é usado para análise e transformações estáticas dos dados de entrada, e as camadas de pré-processamento do Keras são usadas para transformações de treinamento-tempo (por exemplo, one-hot encoding ou ampliação de dados).

Melhores práticas com tf.distribute

Para trabalhar com as duas ferramentas, é preciso inicializar a lógica de transformação para aplicar aos dados, o que pode criar recursos do Tensorflow. Esses recursos ou estados devem ser replicados em todos os workers para salvar a comunicação inter-worker ou worker-coordenador. Para isso, recomenda-se que você crie camadas de pré-processamento do Keras, tft.TFTransformOutput.transform_features_layer, ou tft.TransformFeaturesLayer em tf.distribute.Strategy.scope, da mesma forma que faria para qualquer outra camada do Keras.

Os próximos exemplos demonstram o uso da API tf.distribute.Strategy com a API de alto nível do Keras, Model.fit, e um loop de treinamento personalizado separadamente.

Observações adicionais para usuários de camadas de pré-processamento do Keras:

Pré-processamento de camadas e vocabulários grandes

Ao lidar com vocabulários grandes (mais de 1 GB) em uma configuração multiworker (por exemplo, tf.distribute.MultiWorkerMirroredStrategy, tf.distribute.experimental.ParameterServerStrategy, tf.distribute.TPUStrategy), recomenda-se salvar o vocabulário em um arquivo estático acessível por todos os workers (por exemplo, com armazenamento em nuvem). Dessa forma, será reduzido o tempo gasto ao replicar o vocabulário em todos os workers durante o treinamento.

Pré-processamento no pipeline de tf.data versus no modelo

Embora as camadas de pré-processamento do Keras possam ser aplicadas tanto como parte do modelo quanto diretamente a um tf.data.Dataset, cada opção tem sua vantagem:

  • Ao aplicar as camadas de pré-processamento no modelo, o seu modelo fica mais portátil, e isso ajuda a reduzir o desvio de treinamento/exibição. (Confira mais detalhes na seção Benefícios de fazer o pré-processamento dentro do modelo no momento da inferência do guia Trabalhando com camadas de pré-processamento)

  • Ao aplicar no pipeline de tf.data, é possível fazer a pré-busca e o descarregamento na CPU, o que costuma oferecer melhor desempenho ao usar aceleradores.

Ao executar em uma ou mais TPUs, os usuários devem quase sempre colocar as camadas de pré-processamento do Keras no pipeline de tf.data, pois nem todas as camadas têm suporte a TPUs, e operações com strings não são executadas em TPUs. (As duas exceções são tf.keras.layers.Normalization e tf.keras.layers.Rescaling, que são executadas sem problemas em TPUs e usadas com frequência como a primeira camada em um modelo de imagens.)

Pré-processamento com Model.fit

Ao usar o Model.fit do Keras, você não precisa distribuir dados com tf.distribute.Strategy.experimental_distribute_dataset nem tf.distribute.Strategy.distribute_datasets_from_function. Confira mais detalhes no guia Trabalhando com camadas de pré-processamento e no guia Treinamento distribuído com o Keras. Veja um exemplo resumido abaixo:

strategy = tf.distribute.MirroredStrategy() with strategy.scope(): # Create the layer(s) under scope. integer_preprocessing_layer = tf.keras.layers.IntegerLookup(vocabulary=FILE_PATH) model = ... model.compile(...) dataset = dataset.map(lambda x, y: (integer_preprocessing_layer(x), y)) model.fit(dataset)

Usuários de tf.distribute.experimental.ParameterServerStrategy com a API Model.fit precisam usar um tf.keras.utils.experimental.DatasetCreator como a entrada. (Confira mais detalhes no guia Treinamento de servidor de parâmetros)

strategy = tf.distribute.experimental.ParameterServerStrategy( cluster_resolver, variable_partitioner=variable_partitioner) with strategy.scope(): preprocessing_layer = tf.keras.layers.StringLookup(vocabulary=FILE_PATH) model = ... model.compile(...) def dataset_fn(input_context): ... dataset = dataset.map(preprocessing_layer) ... return dataset dataset_creator = tf.keras.utils.experimental.DatasetCreator(dataset_fn) model.fit(dataset_creator, epochs=5, steps_per_epoch=20, callbacks=callbacks)

Pré-processamento com um loop de treinamento personalizado

Ao escrever um loop de treinamento personalizado, você distribuirá os dados com a API tf.distribute.Strategy.experimental_distribute_dataset ou com a API tf.distribute.Strategy.distribute_datasets_from_function. Se você distribuir o dataset por meio de tf.distribute.Strategy.experimental_distribute_dataset, a aplicação dessas APIs de pré-processamento em seu pipeline de dados deixará automaticamente os recursos co-localizados com o pipeline de dados, evitando acesso remoto aos recursos. Portanto, todos os nossos exemplos usarão tf.distribute.Strategy.distribute_datasets_from_function e, neste caso, é essencial colocar a inicialização dessas APIs em strategy.scope() para maior eficiência:

strategy = tf.distribute.MirroredStrategy() vocab = ["a", "b", "c", "d", "f"] with strategy.scope(): # Create the layer(s) under scope. layer = tf.keras.layers.StringLookup(vocabulary=vocab) def dataset_fn(input_context): # a tf.data.Dataset dataset = tf.data.Dataset.from_tensor_slices(["a", "c", "e"]).repeat() # Custom your batching, sharding, prefetching, etc. global_batch_size = 4 batch_size = input_context.get_per_replica_batch_size(global_batch_size) dataset = dataset.batch(batch_size) dataset = dataset.shard( input_context.num_input_pipelines, input_context.input_pipeline_id) # Apply the preprocessing layer(s) to the tf.data.Dataset def preprocess_with_kpl(input): return layer(input) processed_ds = dataset.map(preprocess_with_kpl) return processed_ds distributed_dataset = strategy.distribute_datasets_from_function(dataset_fn) # Print out a few example batches. distributed_dataset_iterator = iter(distributed_dataset) for _ in range(3): print(next(distributed_dataset_iterator))

É importante salientar que, se você estiver fazendo o treinamento com tf.distribute.experimental.ParameterServerStrategy, também fará uma chamada a tf.distribute.experimental.coordinator.ClusterCoordinator.create_per_worker_dataset

@tf.function def per_worker_dataset_fn(): return strategy.distribute_datasets_from_function(dataset_fn) per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn) per_worker_iterator = iter(per_worker_dataset)

Para o TensorFlow Transform, conforme mencionado acima, a fase Analisar é feita separadamente do treinamento e, portanto, foi omitida aqui. Confira as instruções detalhadas no tutorial. Geralmente, essa fase inclui a criação de uma função de pré-processamento tf.Transform e a transformação dos dados em um pipeline do Apache Beam com essa função de pré-processamento. No final da fase Analisar, a saída pode ser exportada como um grafo do Tensorflow, que você pode usar tanto para treinamento quanto para produção. Nosso exemplo abrange somente a parte do pipeline de treinamento:

with strategy.scope(): # working_dir contains the tf.Transform output. tf_transform_output = tft.TFTransformOutput(working_dir) # Loading from working_dir to create a Keras layer for applying the tf.Transform output to data tft_layer = tf_transform_output.transform_features_layer() ... def dataset_fn(input_context): ... dataset.map(tft_layer, num_parallel_calls=tf.data.AUTOTUNE) ... return dataset distributed_dataset = strategy.distribute_datasets_from_function(dataset_fn)

Lotes parciais

Os lotes parciais são encontrados: 1) Quando instâncias do tf.data.Dataset criadas pelos usuários podem conter tamanhos de lote que não geram um número inteiro ao serem divididos pelo número de réplicas; ou 2) quando a cardinalidade da instância do dataset não é divisível pelo tamanho do lote. Portanto, quando o dataset é distribuído em várias réplicas, a chamada next em alguns iteradores pode resultar em um erro tf.errors.OutOfRangeError. Para tratar esse caso de uso, tf.distribute retorna lotes simulados com tamanho de lote igual a 0 para as réplicas que não têm mais dados a processar.

Para o caso com um único worker, se os dados não forem retornados pela chamada next no iterador, lotes simulados de tamanho de lote igual a 0 serão criados e usados junto com os dados reais no dataset. No caso de lotes parciais, o último lote global de dados conterá dados reais junto com lotes simulados de dados. Agora, a condição de parada dos dados de processamento verifica se alguma das réplicas tem dados. Se não houver dados em nenhuma das réplicas, será gerado um erro tf.errors.OutOfRangeError.

Para o caso multiworker, o valor booleano que representa a presença de dados em cada um dos workers é agregado usando a comunicação entre réplicas, e ele é usado para identificar se todos os workers terminaram o processamento do dataset distribuído. Como isso envolve comunicação entre workers, há uma certa perda de desempenho.

Ressalvas

  • Ao usar as APIs tf.distribute.Strategy.experimental_distribute_dataset em uma configuração multiworker, você passa um tf.data.Dataset que lê os arquivos. Se a política tf.data.experimental.AutoShardPolicy estiver definida como AUTO (automática) ou FILE (arquivo), o tamanho real do lote por passo pode ser menor do que aquele que você definiu como o tamanho global de lote. Isso pode acontecer quando os elementos restantes no arquivo são menores do que o tamanho global de lote. Você pode esgotar o dataset sem depender do número de passos a serem executados ou, alternativamente, definir tf.data.experimental.AutoShardPolicy como DATA (dados).

  • Atualmente, não há suporte a transformações do dataset stateful usando o tf.distribute, e qualquer operação stateful que o dataset possa ter é ignorada. Por exemplo, se o seu dataset tiver um map_fn que use tf.random.uniform para girar uma imagem, então você tem um grafo de dataset que depende de estados (ou seja, a semente aleatória) na máquina local na qual o processo Python está sendo executado.

  • Opções tf.data.experimental.OptimizationOptions experimentais que são desativadas por padrão podem causar uma degradação de desempenho em determinados contextos (como quando usadas em conjunto com tf.distribute). Você pode ativá-las somente após validar que elas aumentam o desempenho para sua carga de trabalho em uma configuração distribuída.

  • Confira este guia para ver como otimizar seu pipeline de entrada com o tf.data, de forma geral. Algumas dicas adicionais:

    • Se você tiver vários workers e estiver usando tf.data.Dataset.list_files para criar um dataset com todos os arquivos que correspondam a um ou mais padrões glob, lembre-se de definir o argumento seed ou definir shuffle=False para que cada worker fragmente o arquivo de forma consistente.

  • Caso o seu pipeline de entrada inclua tanto a mistura de dados no nível de registro quanto o processamento de dados, a menos que os dados não processados sejam consideravelmente maiores do que os dados processados (o que não costuma ser o caso), misture os dados primeiro e depois faça o processamento, conforme exibido no exemplo abaixo. Isso pode trazer benefícios para o uso de memória e o desempenho.

d = tf.data.Dataset.list_files(pattern, shuffle=False) d = d.shard(num_workers, worker_index) d = d.repeat(num_epochs) d = d.shuffle(shuffle_buffer_size) d = d.interleave(tf.data.TFRecordDataset, cycle_length=num_readers, block_length=1) d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) mantém um buffer interno de elementos buffer_size e, portanto, reduzir o buffer_size pode mitigar o problema de falta de memória (OOM).

  • A ordem de processamento dos dados pelos workers ao usar tf.distribute.experimental_distribute_dataset ou tf.distribute.distribute_datasets_from_function não é garantida. Geralmente, isso é necessário se você estiver usando tf.distribute para aumentar a escala da previsão. Entretanto, você pode inserir um índice para cada elemento do lote e ordenar as saídas. O trecho de código abaixo é um exemplo de como ordenar saídas.

Observação: tf.distribute.MirroredStrategy é usado aqui apenas por conveniência. Você só precisa reordenar as entradas ao usar diversos workers, mas tf.distribute.MirroredStrategy é usada para distribuir o treinamento em um único worker.

mirrored_strategy = tf.distribute.MirroredStrategy() dataset_size = 24 batch_size = 6 dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size) dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset) def predict(index, inputs): outputs = 2 * inputs return index, outputs result = {} for index, inputs in dist_dataset: output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs)) indices = list(mirrored_strategy.experimental_local_results(output_index)) rindices = [] for a in indices: rindices.extend(a.numpy()) outputs = list(mirrored_strategy.experimental_local_results(outputs)) routputs = [] for a in outputs: routputs.extend(a.numpy()) for i, value in zip(rindices, routputs): result[i] = value print(result)

Às vezes, os usuários não podem usar um tf.data.Dataset para representar a entrada e, consequentemente, as APIs mencionadas acima para distribuir o dataset em diversos dispositivos. Nesses casos, você pode usar tensores brutos ou entradas de um gerador.

Uso de experimental_distribute_values_from_function para entradas de tensor arbitrárias

strategy.run aceita tf.distribute.DistributedValues, que é a saída de next(iterator). Para passar os valores dos tensores, use tf.distribute.Strategy.experimental_distribute_values_from_function para construir tf.distribute.DistributedValues a partir de tensores brutos. Com essa opção, o usuário precisará especificar sua própria lógica de divisão em lotes e fragmentação na função de entrada, o que pode ser feito usando o objeto de entrada tf.distribute.experimental.ValueContext.

mirrored_strategy = tf.distribute.MirroredStrategy() def value_fn(ctx): return tf.constant(ctx.replica_id_in_sync_group) distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn) for _ in range(4): result = mirrored_strategy.run(lambda x: x, args=(distributed_values,)) print(result)

Uso de tf.data.Dataset.from_generator se a sua entrada for a partir de um gerador

Se você tiver uma função geradora que queira usar, pode criar uma instância de tf.data.Dataset usando a API from_generator.

Observação: no momento, isso não é compatível com tf.distribute.TPUStrategy.

mirrored_strategy = tf.distribute.MirroredStrategy() def input_gen(): while True: yield np.random.rand(4) # use Dataset.from_generator dataset = tf.data.Dataset.from_generator( input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4])) dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset) iterator = iter(dist_dataset) for _ in range(4): result = mirrored_strategy.run(lambda x: x, args=(next(iterator),)) print(result)