Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/pt-br/tutorials/distribute/parameter_server_training.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Treinamento de servidor de parâmetros com ParameterServerStrategy

Visão geral

O treinamento de servidor de parâmetros é um método de Paralelo de Dados comum para fazer o treinamento de modelos em diversas máquinas.

Um cluster de treinamento de servidor de parâmetros é composto por workers e servidores de parâmetros. As variáveis são criadas nos servidores de parâmetros e são lidas e atualizadas pelos workers em cada passo. Por padrão, os workers leem e atualizam essas variáveis de forma independente, sem sincronizá-las entre si. É por isso que o treinamento de servidor de parâmetros às vezes é chamado de treinamento assíncrono.

No TensorFlow 2, o treinamento de servidor de parâmetros é possibilitado pela classe tf.distribute.ParameterServerStrategy, que distribui os passos de treinamento em um cluster que pode ter até milhares de workers (acompanhados por servidores de parâmetros).

Métodos de treinamento disponíveis

Há dois métodos de treinamento principais disponíveis:

Cluster com trabalhos e tarefas

Não importa a API escolhida (Model.fit ou loop de treinamento personalizado), o treinamento distribuído no TensorFlow 2 envolve um 'cluster' com diversos 'jobs' (trabalhos), e cada trabalho pode ter uma ou mais 'task's (tarefas).

Ao usar o treinamento de servidor de parâmetros, recomenda-se ter:

  • Um trabalho coordenador (que tem o nome chief)

  • Diversos trabalhos worker (com nome worker)

  • Diversos trabalhos servidor de parâmetros (com nome ps)

O coordenador cria recursos, envia tarefas de treinamento, escreve checkpoints e lida com as falhas das tarefas. Os workers e servidores de parâmetros executam instâncias de tf.distribute.Server, que escutam solicitações do coordenador.

Treinamento de servidor de parâmetros com a API Model.fit

O treinamento de servidor de parâmetros com a API Model.fit requer que o coordenador use um objeto tf.distribute.ParameterServerStrategy. Similar ao uso de Model.fit sem estratégia ou com outras estratégias, o fluxo de trabalho inclui criar e compilar o modelo, preparar os callbacks e fazer uma chamada a Model.fit.

Treinamento de servidor de parâmetros com um loop de treinamento personalizado

Com loops de treinamento personalizado, a classe tf.distribute.coordinator.ClusterCoordinator é o componente essencial usado para o coordenador.

  • A classe ClusterCoordinator precisa funcionar em conjunto com um objeto tf.distribute.ParameterServerStrategy.

  • O objeto tf.distribute.Strategy é necessário para fornecer as informações do cluster e é usado para definir um passo de treinamento, conforme demonstrado em Treinamento personalizado com tf.distribute.Strategy.

  • Então, o objeto ClusterCoordinator envia a execução desses passos de treinamento para os workers remotos.

A API mais importante fornecida pelo objeto ClusterCoordinator é schedule:

  • A API schedule enfileira uma tf.function e retorna um RemoteValue com previsão futura imediatamente.

  • As funções enfileiradas serão enviadas para workers remotos em threads em segundo plano, e seus RemoteValues serão preenchidos de forma assíncrona.

  • Como schedule não requer atribuição de workers, a tf.function passada pode ser executada em qualquer worker.

  • Se o worker no qual ele é executado ficar indisponível antes da conclusão, a função será refeita em outro worker disponível.

  • Devido a isso e ao fato de a execução de função não ser atômica, uma mesma chamada à função pode ser executada mais de uma vez.

Além de enviar mais funções remotas, o ClusterCoordinator também ajuda a criar datasets em todos os workers e a reconstruir esses datasets quando um worker se recupera após uma falha.

Organização do tutorial

Este tutorial será dividido em dois caminhos, Model.fit e loop de treinamento personalizado, e você pode escolher o mais adequado para suas particularidades. Seções que não comecem com "Treinamento com X" são aplicáveis aos dois caminhos.

!pip install portpicker
#@title import multiprocessing import os import random import portpicker import tensorflow as tf

Configuração do cluster

Conforme mencionado acima, um cluster de treinamento de servidor de parâmetros requer uma tarefa coordenadora que execute seu programa de treinamento, um ou vários workers e tarefas de servidor de parâmetros que executem servidores do TensorFlow, tf.distribute.Server, e possivelmente uma tarefa de avaliação adicional que execute uma avaliação secundária (confira a seção Avaliação secundária abaixo). Os requisitos para configurá-los são:

  • A tarefa coordenadora precisa saber os endereços e portas de todos os outros servidores do TensorFlow, exceto do avaliador.

  • Os workers e servidores de parâmetros precisam saber qual porta devem escutar. Por questões de simplicidade, geralmente você pode passar as informações completas do cluster ao criar servidores do TensorFlow nessas tarefas.

  • A tarefa de avaliação não precisa saber a configuração do cluster de treinamento. Caso saiba, ela não deve tentar estabelecer conexão com o cluster do treinamento.

  • Os workers e servidores de parâmetros devem ter os tipos de tarefa "worker" e "ps", respectivamente. O coordenador deve usar "chief" como o tipo de tarefa por questões de compatibilidade legada.

Neste tutorial, você criará um cluster dentro do processo para que o todo o treinamento de servidor de parâmetros possa ser executado no Colab. Você aprenderá a configurar clusters reais em uma seção posterior.

Cluster dentro do processo

Você começará criando vários servidores do TensorFlow antecipadamente e estabelecerá conexão com eles posteriormente. Isso é feito apenas para fins de demonstração neste tutorial. Em treinamentos reais, os servidores serão iniciados nas máquinas "worker" e "ps".

def create_in_process_cluster(num_workers, num_ps): """Creates and starts local servers and returns the cluster_resolver.""" worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)] ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)] cluster_dict = {} cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports] if num_ps > 0: cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports] cluster_spec = tf.train.ClusterSpec(cluster_dict) # Workers need some inter_ops threads to work properly. worker_config = tf.compat.v1.ConfigProto() if multiprocessing.cpu_count() < num_workers + 1: worker_config.inter_op_parallelism_threads = num_workers + 1 for i in range(num_workers): tf.distribute.Server( cluster_spec, job_name="worker", task_index=i, config=worker_config, protocol="grpc") for i in range(num_ps): tf.distribute.Server( cluster_spec, job_name="ps", task_index=i, protocol="grpc") cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver( cluster_spec, rpc_layer="grpc") return cluster_resolver # Set the environment variable to allow reporting worker and ps failure to the # coordinator. This is a workaround and won't be necessary in the future. os.environ["GRPC_FAIL_FAST"] = "use_caller" NUM_WORKERS = 3 NUM_PS = 2 cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

A configuração do cluster dentro do processo é usada com frequência no teste de unidade, como aqui.

Outra opção para testes locais é iniciar processos na máquina local. Confira um exemplo dessa estratégia no guia Treinamento multiworker com o Keras.

Criação de instância de ParameterServerStrategy

Antes de você conferir todo o código de treinamento, vamos instanciar um objeto de tf.distribute.ParameterServerStrategy. Isso é necessário tanto para o caso do Model.fit quanto do loop de treinamento personalizado. O argumento variable_partitioner será explicado na seção Fragmentação de variável.

variable_partitioner = ( tf.distribute.experimental.partitioners.MinSizePartitioner( min_shard_bytes=(256 << 10), max_shards=NUM_PS)) strategy = tf.distribute.ParameterServerStrategy( cluster_resolver, variable_partitioner=variable_partitioner)

Para usar as GPUs para treinamento, aloque GPUs visíveis a cada worker. ParameterServerStrategy usará todas as GPUs disponíveis em cada worker, com a restrição de que todos os workers devem ter o mesmo número de GPUs disponíveis.

Fragmentação de variável

Fragmentação variável refere-se a dividir uma variável em diversas variáveis menores, chamadas de fragmentos. A fragmentação de variável pode ser útil para distribuir a carga da rede ao acessar esses fragmentos. Também é útil para distribuir computação e armazenamento de uma variável normal em diversos servidores de parâmetros ao, por exemplo, usar embeddings muito grandes que possam não caber na memória de uma única máquina.

Para ativar a fragmentação de variável, você pode passar variable_partitioner ao construir um objeto ParameterServerStrategy. variable_partitioner será chamado toda vez em que uma variável for criada e deve retornar o número de fragmentos em cada dimensão da variável. Alguns variable_partitioners prontos para uso são fornecidos, como tf.distribute.experimental.partitioners.MinSizePartitioner. Recomenda-se usar particionadores baseados em tamanho, como tf.distribute.experimental.partitioners.MinSizePartitioner, para evitar particionar variáveis pequenas, o que poderia causar um impacto negativo na velocidade de treinamento do modelo.

Quando um variable_partitioner é passado e você criar uma variável diretamente dentro de Strategy.scope, a variável se tornará um tipo container com uma propriedade variables, que fornece acesso à lista de fragmentos. Na maioria dos casos, esse container será convertido automaticamente em um Tensor concatenando-se todos os fragmentos. Como consequência, ele pode ser usado como uma variável normal. Por outro lado, alguns métodos do TensorFlow, como tf.nn.embedding_lookup, oferecem uma implementação eficiente para esse tipo container e, nesses métodos, a concatenação automática será evitada.

Confira mais detalhes nos documentos da API de tf.distribute.ParameterServerStrategy.

Treinamento com Model.fit

O Keras oferece uma API de treinamento fácil de usar por meio do Model.fit que trata o loop de treinamento em segundo plano, com a flexibilidade de um train_step que pode ser sobrescrito e callbacks, que oferecem funcionalidades como salvamento de checkpoints ou salvamento de resumos para o TensorBoard. Com o Model.fit, o mesmo código de treinamento pode ser usado com outras estratégias com uma troca simples do objeto de estratégia.

Dados de entrada

O Model.fit do Keras com tf.distribute.ParameterServerStrategy aceita dados de entrada na forma de um tf.data.Dataset, tf.distribute.DistributedDataset ou tf.keras.utils.experimental.DatasetCreator, sendo que Dataset é a opção recomendada pela facilidade de uso. Porém, se você tiver problemas de memória ao usar Dataset, talvez precise usar DatasetCreator com um argumento dataset_fn que pode ser chamado (confira mais detalhes na documentação da API de tf.keras.utils.experimental.DatasetCreator).

Se você transformar seu dataset em um tf.data.Dataset, deve usar Dataset.shuffle e Dataset.repeat, conforme demonstrado no exemplo de código abaixo.

  • O Model.fit do Keras com o treinamento de servidor de parâmetros pressupõe que cada worker recebe o mesmo dataset, exceto quando ele é misturado de forma diferente. Portanto, ao fazer uma chamada a Dataset.shuffle, você garante ainda mais iterações dos dados.

  • Como os workers não sincronizam, eles podem terminar o processamento dos datasets em momentos diferentes. Dessa forma, a maneira mais fácil de definir épocas para o treinamento de servidor de parâmetros é usando Dataset.repeat – que repete um dataset indefinidamente quando a chamada é feita sem um argumento – e especificando o argumento steps_per_epoch na chamada a Model.fit.

Consulte a seção "Treinamento dos fluxos de trabalho" do guia do tf.data para ver mais detalhes de shuffle e repeat.

global_batch_size = 64 x = tf.random.uniform((10, 10)) y = tf.random.uniform((10,)) dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat() dataset = dataset.batch(global_batch_size) dataset = dataset.prefetch(2)

Se você criar o dataset com tf.keras.utils.experimental.DatasetCreator, o código em dataset_fn será chamado no dispositivo de entrada, que geralmente é a CPU, em cada uma das máquinas de worker.

Construção e compilação do modelo

Agora, você criará um tf.keras.Model – um modelo tf.keras.models.Sequential trivial para fins de demonstração – seguido por uma chamada Model.compile para incorporar componentes, como um otimizador, métricas e outros parâmetros, como steps_per_execution:

with strategy.scope(): model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)]) model.compile(tf.keras.optimizers.legacy.SGD(), loss="mse", steps_per_execution=10)

Callbacks e treinamento

Antes de fazer uma chamada a Model.fit do Keras para o treinamento real, prepare todos os callbacks necessários para tarefas comuns, como:

  • tf.keras.callbacks.ModelCheckpoint: salva o modelo com uma determinada frequência, como após cada época.

  • tf.keras.callbacks.BackupAndRestore: faz backup do modelo e do número da época atual para proporcionar tolerância a falhas, se o cluster passar por uma indisponibilidade (como anulamento e interrupção). Então, você pode restaurar o estado de treinamento ao reiniciar após a falha de um trabalho e continuar o treinamento pelo começo da época interrompida.

  • tf.keras.callbacks.TensorBoard: escreve periodicamente logs do modelo em arquivos de resumo que podem ser visualizados na ferramenta TensorBoard.

Observação: por questões de desempenho, callbacks personalizados não podem ter os callbacks no nível de lote sobrescritos quando usados com ParameterServerStrategy. Modifique seus callbacks personalizados para que sejam chamados no nível de época e ajuste steps_per_epoch para um valor adequado. Além disso, steps_per_epoch é um argumento necessário para Model.fit quando usado com ParameterServerStrategy.

working_dir = "/tmp/my_working_dir" log_dir = os.path.join(working_dir, "log") ckpt_filepath = os.path.join(working_dir, "ckpt") backup_dir = os.path.join(working_dir, "backup") callbacks = [ tf.keras.callbacks.TensorBoard(log_dir=log_dir), tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath), tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir), ] model.fit(dataset, epochs=5, steps_per_epoch=20, callbacks=callbacks)

Uso direto com ClusterCoordinator (opcional)

Mesmo se você optar pelo caminho de treinamento com Model.fit, pode, opcionalmente, instanciar um objeto tf.distribute.coordinator.ClusterCoordinator para agendar outras funções que você deseja executar nos workers. Confira a seção Treinamento com um loop de treinamento personalizado para ver mais detalhes e exemplos.

Treinamento com um loop de treinamento personalizado

O uso de loops de treinamento personalizado com tf.distribute.Strategy proporciona excelente flexibilidade para definir os loops de treinamento. Com o ParameterServerStrategy definido acima (como strategy), você usará um tf.distribute.coordinator.ClusterCoordinator para enviar a execução dos passos de treinamento para os workers remotos.

Em seguida, você criará um modelo, definirá um dataset e definirá uma função de passos da mesma forma que no loop de treinamento com outros tf.distribute.Strategys. Confira mais detalhes no tutorial Treinamento personalizado com tf.distribute.Strategy.

Para garantir uma pré-busca eficiente do dataset, use as APIs de criação de dataset distribuído recomendadas na seção Envio dos passos de treinamento para os workers remotos abaixo. Além disso, faça uma chamada a Strategy.run dentro de worker_fn para usar ao máximo as GPUs alocadas aos workers. As outras etapas são as mesmas para treinamento com ou sem GPUs.

Vamos criar esses componentes nos seguintes passos:

Configuração dos dados

Primeiro, escreva uma função que crie um dataset.

Se você quiser pré-processar os dados com as camadas de pré-processamento do Keras ou com as camadas do TensorFlow Transform, crie essas camadas fora de dataset_fn e dentro de Strategy.scope, como faria para qualquer outra camada do Keras. Isso é necessário porque dataset_fn será encapsulado em uma função tf.function e depois executado em cada worker para gerar o pipeline de dados.

Se você não seguir o procedimento acima, a criação das camadas poderá criar estados do TensorFlow que serão levados da tf.function para o coordenador. Dessa forma, acessá-las nos workers poderia ocasionar chamadas RPC repetitivas entre o coordenador e os workers, causando uma lentidão considerável.

Ao colocar as camadas dentro de Strategy.scope, elas serão criadas em todos os workers. Então, você aplicará a transformação dentro de dataset_fn por meio de tf.data.Dataset.map. Confira Pré-processamento de dados no tutorial Entrada distribuída para ver mais informações sobre pré-processamento de dados com entrada distribuída.

feature_vocab = [ "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman" ] label_vocab = ["yes", "no"] with strategy.scope(): feature_lookup_layer = tf.keras.layers.StringLookup( vocabulary=feature_vocab, mask_token=None) label_lookup_layer = tf.keras.layers.StringLookup( vocabulary=label_vocab, num_oov_indices=0, mask_token=None) raw_feature_input = tf.keras.layers.Input( shape=(3,), dtype=tf.string, name="feature") feature_id_input = feature_lookup_layer(raw_feature_input) feature_preprocess_stage = tf.keras.Model( {"features": raw_feature_input}, feature_id_input) raw_label_input = tf.keras.layers.Input( shape=(1,), dtype=tf.string, name="label") label_id_input = label_lookup_layer(raw_label_input) label_preprocess_stage = tf.keras.Model( {"label": raw_label_input}, label_id_input)

Gere exemplos em um dataset:

def feature_and_label_gen(num_examples=200): examples = {"features": [], "label": []} for _ in range(num_examples): features = random.sample(feature_vocab, 3) label = ["yes"] if "avenger" in features else ["no"] examples["features"].append(features) examples["label"].append(label) return examples examples = feature_and_label_gen()

Depois, crie o dataset de treinamento encapsulado em um dataset_fn:

def dataset_fn(_): raw_dataset = tf.data.Dataset.from_tensor_slices(examples) train_dataset = raw_dataset.map( lambda x: ( {"features": feature_preprocess_stage(x["features"])}, label_preprocess_stage(x["label"]) )).shuffle(200).batch(32).repeat() return train_dataset

Criação do modelo

Agora, crie o modelo e outros objetos. Lembre-se de criar todas as variáveis dentro de Strategy.scope.

# These variables created under the `Strategy.scope` will be placed on parameter # servers in a round-robin fashion. with strategy.scope(): # Create the model. The input needs to be compatible with Keras processing layers. model_input = tf.keras.layers.Input( shape=(3,), dtype=tf.int64, name="model_input") emb_layer = tf.keras.layers.Embedding( input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384) emb_output = tf.reduce_mean(emb_layer(model_input), axis=1) dense_output = tf.keras.layers.Dense( units=1, activation="sigmoid", kernel_regularizer=tf.keras.regularizers.L2(1e-4), )(emb_output) model = tf.keras.Model({"features": model_input}, dense_output) optimizer = tf.keras.optimizers.legacy.RMSprop(learning_rate=0.1) accuracy = tf.keras.metrics.Accuracy()

Vamos confirmar que o uso de FixedShardsPartitioner divida todas as variáveis em dois fragmentos e que cada fragmento seja atribuído a um servidor de parâmetros diferente:

assert len(emb_layer.weights) == 2 assert emb_layer.weights[0].shape == (4, 16384) assert emb_layer.weights[1].shape == (4, 16384) print(emb_layer.weights[0].device) print(emb_layer.weights[1].device)

Definição do passo de treinamento

A terceira etapa é criar o passo de treinamento encapsulado em uma função tf.function:

@tf.function def step_fn(iterator): def replica_fn(batch_data, labels): with tf.GradientTape() as tape: pred = model(batch_data, training=True) per_example_loss = tf.keras.losses.BinaryCrossentropy( reduction=tf.keras.losses.Reduction.NONE)(labels, pred) loss = tf.nn.compute_average_loss(per_example_loss) model_losses = model.losses if model_losses: loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses)) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64) accuracy.update_state(labels, actual_pred) return loss batch_data, labels = next(iterator) losses = strategy.run(replica_fn, args=(batch_data, labels)) return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

Na função de passo de treinamento acima, chamar Strategy.run e Strategy.reduce em step_fn pode oferecer suporte a múltiplas GPUs por worker. Se os workers tiverem GPUs alocadas, Strategy.run distribuirá os datasets em múltiplas réplicas (GPUs). Suas chamadas paralelas para tf.nn.compute_average_loss() computam a média da perda nas réplicas (GPUs) de um worker, independente do número total de workers.

Envio dos passos de treinamento para os workers remotos

Após todas as computações serem definidas por ParameterServerStrategy, você usará a classe tf.distribute.coordinator.ClusterCoordinator para criar recursos e distribuir os passos de treinamento para os workers remotos.

Primeiro, crie um objeto ClusterCoordinator e passe o objeto "strategy":

coordinator = tf.distribute.coordinator.ClusterCoordinator(strategy)

Depois, crie um dataset e um iterador por worker usando a API ClusterCoordinator.create_per_worker_dataset, que replica o dataset em todos os workers. Em per_worker_dataset_fn abaixo, recomenda-se encapsular dataset_fn em strategy.distribute_datasets_from_function para permitir uma pré-busca eficiente nas GPUs de forma transparente.

@tf.function def per_worker_dataset_fn(): return strategy.distribute_datasets_from_function(dataset_fn) per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn) per_worker_iterator = iter(per_worker_dataset)

A etapa final é distribuir a computação para os workers remotos usando ClusterCoordinator.schedule:

  • O método schedule enfileira uma tf.function e retorna um RemoteValue com previsão futura imediatamente. As funções enfileiradas serão enviadas para workers remotos em threads em segundo plano, e RemoteValue será preenchido de forma assíncrona.

  • O método join (ClusterCoordinator.join) pode ser usado para aguardar até que todas as funções agendadas sejam executadas.

num_epochs = 4 steps_per_epoch = 5 for i in range(num_epochs): accuracy.reset_states() for _ in range(steps_per_epoch): coordinator.schedule(step_fn, args=(per_worker_iterator,)) # Wait at epoch boundaries. coordinator.join() print("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))

Veja como você pode buscar o resultado de um RemoteValue:

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,)) print("Final loss is %f" % loss.fetch())

Outra opção é iniciar todos os passos e fazer alguma outra coisa enquanto aguarda a conclusão:

for _ in range(total_steps): coordinator.schedule(step_fn, args=(per_worker_iterator,)) while not coordinator.done(): time.sleep(10) # Do something like logging metrics or writing checkpoints.

Confira o fluxo de trabalho de treinamento e serviço completo para esse exemplo específico neste teste.

Mais detalhes sobre a criação do dataset

O dataset no código acima é criado usando a API ClusterCoordinator.create_per_worker_dataset. Ela cria um dataset por worker e retorna um objeto container. Você pode fazer uma chamada ao método iter para criar um iterador por worker. Você terá um iterador por worker, e a fatia correspondente de um worker será substituída no argumento de entrada da função passado ao método ClusterCoordinator.schedule antes que a função seja executada em um worker específico.

O método ClusterCoordinator.schedule pressupõe que todos os workers sejam equivalentes e, portanto, também pressupõe que os datasets em diferentes workers sejam os mesmos (exceto pelo fato de poderem ter sido misturados de forma diferente). Dessa forma, também recomenda-se repetir os datasets e agendar um número finito de passos em vez de contar com o recebimento do erro OutOfRangeError de um dataset.

Outra observação importante: os datasets de tf.data não têm suporte à serialização e desserialização implícitas após cada tarefa. Portanto, é importante criar o dataset inteiro dentro da função passada a ClusterCoordinator.create_per_worker_dataset. A API create_per_worker_dataset também pode receber diretamente um tf.data.Dataset ou tf.distribute.DistributedDataset como entrada.

Avaliação

As duas principais estratégias para fazer a avaliação com o treinamento tf.distribute.ParameterServerStrategy são a avaliação embutida e a avaliação secundária. Cada uma tem suas vantagens e desvantagens, conforme descrito abaixo. O método de avaliação embutida é recomendado se você não tiver uma preferência específica. Para usuários que usam Model.fit, Model.evaluate usa avaliação embutida (distribuída) em segundo plano.

Avaliação embutida

Neste método, o coordenador alterna entre treinamento e avaliação e, portanto, é chamado de avaliação embutida.

Existem vários benefícios da avaliação embutida. Por exemplo:

  • É compatível com modelos de avaliação e datasets de avaliação grandes que uma única tarefa não consegue realizar.

  • Os resultados da avaliação podem ser usados para tomar decisões para treinar a próxima época (por exemplo, se o treinamento deve ser interrompido antecipadamente).

Existem duas formas de implementar a avaliação embutida: avaliação direta e avaliação distribuída.

  • Avaliação direta: para modelos e datasets de avaliação pequenos, o coordenador pode executar a avaliação diretamente no modelo distribuído, com o dataset de avaliação no coordenador:

eval_dataset = tf.data.Dataset.from_tensor_slices( feature_and_label_gen(num_examples=16)).map( lambda x: ( {"features": feature_preprocess_stage(x["features"])}, label_preprocess_stage(x["label"]) )).batch(8) eval_accuracy = tf.keras.metrics.Accuracy() for batch_data, labels in eval_dataset: pred = model(batch_data, training=False) actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64) eval_accuracy.update_state(labels, actual_pred) print("Evaluation accuracy: %f" % eval_accuracy.result())
  • Avaliação distribuída: para modelos ou datasets grandes em que não seria viável executar diretamente no coordenador, a tarefa do coordenador pode distribuir tarefas de avaliação para os workers pelos métodos ClusterCoordinator.schedule/ClusterCoordinator.join:

with strategy.scope(): # Define the eval metric on parameter servers. eval_accuracy = tf.keras.metrics.Accuracy() @tf.function def eval_step(iterator): def replica_fn(batch_data, labels): pred = model(batch_data, training=False) actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64) eval_accuracy.update_state(labels, actual_pred) batch_data, labels = next(iterator) strategy.run(replica_fn, args=(batch_data, labels)) def eval_dataset_fn(): return tf.data.Dataset.from_tensor_slices( feature_and_label_gen(num_examples=16)).map( lambda x: ( {"features": feature_preprocess_stage(x["features"])}, label_preprocess_stage(x["label"]) )).shuffle(16).repeat().batch(8) per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn) per_worker_eval_iterator = iter(per_worker_eval_dataset) eval_steps_per_epoch = 2 for _ in range(eval_steps_per_epoch): coordinator.schedule(eval_step, args=(per_worker_eval_iterator,)) coordinator.join() print("Evaluation accuracy: %f" % eval_accuracy.result())

Ativação da avaliação "exatamente uma vez"

Os métodos schedule e join de tf.distribute.coordinator.ClusterCoordinator não oferecem suporte às semânticas de "garantias de análise" e "exatamente uma vez" por padrão. Em outras palavras, no exemplo acima, não há garantia de que todos os exemplos de avaliação em um dataset serão avaliados exatamente uma vez; alguns poderão não ser analisados, e alguns poderão ser avaliados diversas vezes.

A avaliação "exatamente uma vez" poderá ser preferível para reduzir a variância da avaliação entre as épocas e para melhorar a seleção de modelo por interrupção antecipada, ajuste de hiperparâmetros ou outros métodos. Existem diferentes formas de ativar a avaliação "exatamente uma vez":

  • Com um fluxo de trabalho Model.fit/.evaluate, é possível ativá-la adicionando um argumento a Model.compile. Confira a documentação do argumento pss_evaluation_shards.

  • A API de serviço tf.data pode ser usada para permitir a análise "exatamente uma vez" para a avaliação usando ParameterServerStrategy (confira a seção Fragmentação dinâmica da documentação da API de tf.data.experimental.service).

  • A avaliação secundária fornece a avaliação "exatamente uma vez" por padrão, já que a avaliação é feita em somente uma máquina. Entretanto, isso poderá ser muito mais lento do que fazer a avaliação distribuída em vários workers.

A primeira opção, com o uso de Model.compile, é a solução sugerida para a maioria dos usuários.

A avaliação "exatamente uma vez" tem algumas limitações:

  • Não há suporte à escrita de um loop de avaliação distribuído personalizado com uma garantia de análise "exatamente uma vez". Registre um problema no GitHub caso precise de suporte.

  • Não pode lidar automaticamente com a computação de métricas que usem a API Layer.add_metric. Elas devem ser excluídas da avaliação ou convertidas em objetos Metric.

Avaliação secundária

Outro método para definir e executar um loop de avaliação no treinamento tf.distribute.ParameterServerStrategy é chamado de avaliação secundária, em que você cria uma tarefa de avaliação dedicada que lê checkpoints repetidamente e executa a avaliação no último checkpoint (confira mais detalhes sobre criação de checkpoints neste guia). As tarefas do coordenador e dos workers não gastam tempo com avaliação, então, para um número fixo de iterações, o tempo de treinamento geral deverá ser menor do que ao usar outros métodos de avaliação. Entretanto, requer uma tarefa de avaliação adicional e criação periódica de checkpoints para acionar a avaliação.

Para escrever um loop de avaliação para a avaliação secundária, você tem duas opções:

  1. Usar a API tf.keras.utils.SidecarEvaluator.

  2. Criar um loop de avaliação personalizado.

Consulte a documentação da API tf.keras.utils.SidecarEvaluator para ver mais detalhes sobre primeira opção.

A avaliação secundária é compatível somente com uma única tarefa. Portanto:

  • Há uma garantia de que cada exemplo será avaliado uma vez. Caso o avaliador seja interrompido ou reiniciado, ele reinicia o loop de avaliação pelo último checkpoint, e o progresso parcial de avaliação feito antes da reinicialização é descartado.

  • Entretanto, ao executar a avaliação em uma única tarefa, uma avaliação completa pode levar um longo tempo.

  • Se o tamanho do modelo for grande demais e não couber na memória do avaliador, a avaliação secundária única não será aplicável.

Outra ressalva é que a implementação tf.keras.utils.SidecarEvaluator e o loop de avaliação personalizado abaixo podem pular alguns checkpoints, pois eles sempre escolhem o último checkpoint disponível e, durante uma época de avaliação, diversos checkpoints podem ser gerados pelo cluster de treinamento. Você pode escrever um loop de avaliação personalizado que avalie cada checkpoint, mas isso não é discutido neste tutorial. Por outro lado, ele pode ficar ocioso se os checkpoints forem produzidos com uma frequência inferior ao tempo necessário para executar a avaliação.

Um loop de avaliação personalizado oferece mais controle dos detalhes, como escolher qual checkpoint será avaliado ou incluir lógica adicional para ser executada junto com a avaliação. Veja abaixo um possível loop de avaliação secundária personalizado:

checkpoint_dir = ... eval_model = ... eval_data = ... checkpoint = tf.train.Checkpoint(model=eval_model) for latest_checkpoint in tf.train.checkpoints_iterator( checkpoint_dir): try: checkpoint.restore(latest_checkpoint).expect_partial() except (tf.errors.OpError,) as e: # checkpoint may be deleted by training when it is about to read it. continue # Optionally add callbacks to write summaries. eval_model.evaluate(eval_data) # Evaluation finishes when it has evaluated the last epoch. if latest_checkpoint.endswith('-{}'.format(train_epochs)): break

Clusters no mundo real

Observação: esta seção não é necessária para executar o código do tutorial nesta página.

Em um ambiente de produção real, você executará todas as tarefas em processos diferentes, em máquinas diferentes. A maneira mais simples de configurar as informações do cluster em cada tarefa é definindo variáveis de ambiente "TF_CONFIG" e usando tf.distribute.cluster_resolver.TFConfigClusterResolver para processar "TF_CONFIG".

Confira a descrição geral de variáveis de ambiente "TF_CONFIG" em "Configuração da variável de ambienteTF_CONFIG" no guia Treinamento distribuído.

Se você iniciar suas tarefas de treinamento usando Kubernetes ou outros modelos de configuração, provavelmente a variável de ambiente "TF_CONFIG" já foi configurada para você.

Configuração da variável de ambiente "TF_CONFIG"

Vamos supor que você tenha três workers e dois servidores de parâmetros. Portanto, "TF_CONFIG" do worker 1 pode ser:

os.environ["TF_CONFIG"] = json.dumps({ "cluster": { "worker": ["host1:port", "host2:port", "host3:port"], "ps": ["host4:port", "host5:port"], "chief": ["host6:port"] }, "task": {"type": "worker", "index": 1} })

"TF_CONFIG" do avaliador pode ser:

os.environ["TF_CONFIG"] = json.dumps({ "cluster": { "evaluator": ["host7:port"] }, "task": {"type": "evaluator", "index": 0} })

A parte "cluster" na string "TF_CONFIG" acima para o avaliador é opcional.

Se você usar o mesmo binário para todas as tarefas

Se você preferir executar todas essas tarefas usando um único binário, precisará deixar seu programa se dividir em diferentes funções no começo:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver() if cluster_resolver.task_type in ("worker", "ps"): # Start a TensorFlow server and wait. elif cluster_resolver.task_type == "evaluator": # Run sidecar evaluation else: # Run the coordinator.

O código abaixo inicia um servidor do TensorFlow e aguarda, o que é útil para as funções "worker" e "ps":

# Set the environment variable to allow reporting worker and ps failure to the # coordinator. This is a workaround and won't be necessary in the future. os.environ["GRPC_FAIL_FAST"] = "use_caller" server = tf.distribute.Server( cluster_resolver.cluster_spec(), job_name=cluster_resolver.task_type, task_index=cluster_resolver.task_id, protocol=cluster_resolver.rpc_layer or "grpc", start=True) server.join()

Tratamento de falhas das tarefas

Falha dos workers

Tanto a estratégia de loop de treinamento personalizado tf.distribute.coordinator.ClusterCoordinator quanto Model.fit têm uma tolerância a falhas integrada em caso de falha dos workers. Quando o worker se recupera, ClusterCoordinator chama a recriação do dataset nos workers.

Falha do servidor de parâmetros ou do coordenador

Entretanto, quando o coordenador observar um erro no servidor de parâmetros, gerará um erro UnavailableError ou AbortedError imediatamente. Nesse caso, você pode reiniciar o coordenador. O coordenador em si também pode ficar indisponível. Portanto, recomenda-se usar certas ferramentas para não perder o progresso do treinamento:

  • Para Model.fit, você deve usar um callback BackupAndRestore, que trata o salvamento e a restauração do progresso automaticamente. Confira um exemplo na seção Callbacks e treinamento acima.

  • Para um loop de treinamento personalizado, você deve fazer o checkpoint das variáveis do modelo periodicamente e carregar as variáveis do modelo a partir de um checkpoint, se algum existir, antes de o treinamento começar. O progresso do treinamento pode ser inferido aproximadamente de optimizer.iterations, se for feito checkpoint de um otimizador:

checkpoint_manager = tf.train.CheckpointManager( tf.train.Checkpoint(model=model, optimizer=optimizer), checkpoint_dir, max_to_keep=3) if checkpoint_manager.latest_checkpoint: checkpoint = checkpoint_manager.checkpoint checkpoint.restore( checkpoint_manager.latest_checkpoint).assert_existing_objects_matched() global_steps = int(optimizer.iterations.numpy()) starting_epoch = global_steps // steps_per_epoch for _ in range(starting_epoch, num_epochs): for _ in range(steps_per_epoch): coordinator.schedule(step_fn, args=(per_worker_iterator,)) coordinator.join() checkpoint_manager.save()

Busca de RemoteValue

A busca de RemoteValue sempre será bem-sucedida se uma função for executada com êxito. Isso ocorre porque, atualmente, o valor de retorno é copiado imediatamente para o coordenador após a execução de uma função. Se qualquer worker falhar durante a cópia, a função será refeita em outro worker disponível. Portanto, se você quiser otimizar o desempenho, pode agendar funções sem um valor de retorno.

Relatório de erros

Quando o coordenador observa um erro, como UnavailableError, nos servidores de parâmetros ou outros erros de aplicação, como InvalidArgument em tf.debugging.check_numerics, ele cancelará todas as funções pendentes e enfileiradas antes de gerar o erro. A busca dos RemoteValues correspondentes gerará um erro CancelledError.

Após um erro ser gerado, o coordenador não gerará o mesmo erro ou qualquer outro erro a partir das funções canceladas.

Melhoria do desempenho

Há vários possíveis motivos para você se deparar com problemas de desempenho ao fazer o treinamento com tf.distribute.ParameterServerStrategy e tf.distribute.coordinator.ClusterCoordinator.

Um motivo comum é servidores de parâmetros com carga desbalanceada, em que alguns servidores de parâmetros com carga em excesso chegam à capacidade máxima. Também há diversas causas raiz. Veja alguns métodos simples para mitigar esse problema:

  1. Fragmente as variáveis grandes do modelo especificando variable_partitioner ao construir ParameterServerStrategy.

  2. Evite criar uma variável de hotspot exigida por todos os servidores de parâmetros em um único passo:

  3. Use uma taxa de aprendizado constante ou a subclasse tf.keras.optimizers.schedules.LearningRateSchedule nos otimizadores. Isso é feito porque o comportamento padrão é a taxa de aprendizado se tornar uma variável colocada em um servidor de parâmetros específico e solicitada por todos os outros servidores de parâmetros em cada passo.

  4. Use um tf.keras.optimizers.legacy.Optimizer (os tf.keras.optimizers.Optimizers padrão ainda podem acarretar variáveis de hotspot).

  5. Misture seus vocabulários grandes antes de passá-los às camadas de pré-processamento do Keras.

Outro possível motivo para problemas de desempenho é o coordenador. A implementação de schedule/join é baseada no Python e, portanto, pode haver sobrecarga de threads. Além disso, a latência entre o coordenador e os workers pode ser alta. Se esse for o caso:

  • Para Model.fit, você pode definir o argumento steps_per_execution fornecido em Model.compile como um valor maior do que 1.

  • Para um loop de treinamento personalizado, você pode agrupar diversos passos em uma única função tf.function:

steps_per_invocation = 10 @tf.function def step_fn(iterator): for _ in range(steps_per_invocation): features, labels = next(iterator) def replica_fn(features, labels): ... strategy.run(replica_fn, args=(features, labels))

À medida que a biblioteca for otimizada, espera-se que a maioria dos usuários não precise agrupar passos manualmente no futuro.

Além disso, uma dica para melhorar o desempenho é agendar funções sem um valor de retorno, conforme explicado na seção Tratamento de falhas das tarefas acima.

Limitações conhecidas

A maioria das limitações conhecidas já foram discutidas nas seções acima. Esta seção apresenta um resumo.

ParameterServerStrategy geral

  • os.environment["grpc_fail_fast"]="use_caller" é necessário em cada tarefa, incluindo o coordenador, para que a tolerância a falhas funcione corretamente.

  • Não há suporte a treinamento de servidor de parâmetros síncrono.

  • Geralmente, é necessário agrupar vários passos em uma única função para atingir o desempenho ideal.

  • Não há suporte ao carregamento de um saved_model por meio de tf.saved_model.load contendo variáveis fragmentadas. Espera-se que carregar um saved_model usando o TensorFlow Serving funcione (consulte mais detalhes no tutorial de serviço).

  • Não há suporte à recuperação de uma falha do servidor de parâmetros sem reiniciar tarefa do coordenador.

  • A criação de tf.lookup.StaticHashTable, usada com frequência por algumas camadas de pré-processamento do Keras, como tf.keras.layers.IntegerLookup, tf.keras.layers.StringLookup e tf.keras.layers.TextVectorization, deve ser feita dentro de Strategy.scope. Caso contrário, os recursos serão colocados no coordenador, e os RPCs de pesquisa dos workers para o coordenador terão consequências para o desempenho.

Especificidades de Model.fit

  • O argumento steps_per_epoch é exigido no Model.fit. Você pode selecionar um valor que forneça intervalos adequados em uma época.

  • ParameterServerStrategy não tem suporte a callbacks personalizados que tenham chamadas no nível de lote por questões de desempenho. Você deve converter essas chamadas em chamadas no nível de época e escolher um steps_per_epoch adequado para que as chamadas sejam feitas a cada steps_per_epoch passos. Os callbacks integrados não são afetados: suas chamadas no nível de lote foram modificadas de forma a terem bom desempenho. Há planos para incluir suporte a chamadas no nível de lote em ParameterServerStrategy.

  • Pelo mesmo motivo, diferentemente de outras estratégias, as barras e métricas de progresso são registradas somente após cada época.

  • Não há suporte a run_eagerly.

Especificidades do loop de treinamento personalizado

  • De forma geral, ClusterCoordinator.schedule não tem suporte a garantia de análise para um dataset, embora seja possível ter garantia de análise da avaliação com Model.fit/.evaluate. Confira Ativação da avaliação "exatamente uma vez".

  • Quando ClusterCoordinator.create_per_worker_dataset é usado com um callable como entrada, todo o dataset deve ser criado dentro da função passada a ele.

  • tf.data.Options é ignorado em um dataset criado por ClusterCoordinator.create_per_worker_dataset.