Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/pt-br/federated/openmined2020/openmined_conference_2020.ipynb
25118 views
Kernel: Python 3

Antes de começarmos

Para acessar o notebook no Colab, vá até "File (Arquivo)" -> "Save a copy in Drive (Salvar uma cópia no Drive)" e faça as alterações no notebook copiado.

Antes de começarmos, execute o código abaixo para que o ambiente seja configurado corretamente. Se não for exibida uma saudação, consulte as instruções de instalação.

#@title Upgrade tensorflow_federated and load TensorBoard #@test {"skip": true} !pip install --quiet --upgrade tensorflow-federated !pip install --quiet --upgrade nest-asyncio import nest_asyncio nest_asyncio.apply() %load_ext tensorboard import sys if not sys.warnoptions: import warnings warnings.simplefilter("ignore")
#@title import collections from matplotlib import pyplot as plt from IPython.display import display, HTML, IFrame import numpy as np import tensorflow as tf import tensorflow_federated as tff tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) np.random.seed(0) def greetings(): display(HTML('<b><font size="6" color="#ff00f4">Greetings, virtual tutorial participants!</font></b>')) return True l = tff.federated_computation(greetings)()

TensorFlow Federated para classificação de imagens

Vamos simular um aprendizado federado. Neste tutorial, usaremos o exemplo de treinamento MNIST clássico para apresentar a camada de API Federated Learning (FL, na sigla em inglês para aprendizado federado) do TFF, tff.learning – um conjunto de interfaces de alto nível que podem ser usadas para fazer tarefas comuns de aprendizado federado, como treinamento federado, usando modelos fornecidos por usuários e implementados no TensorFlow.

Organização do tutorial

Treinaremos um modelo para fazer classificação de imagens usando o dataset MNIST clássico, com o aprendizado de rede neutral para classificar dígitos em imagens. Neste caso, simularemos o aprendizado federado com os dados de treinamento distribuídos em diferentes dispositivos.

Seções

  1. Carregue as bibliotecas do TFF.

  2. Explore/pré-processe o dataset federado EMNIST.

  3. Crie um modelo.

  4. Configure o processo de agregação federada para o treinamento.

  5. Analise as métricas de treinamento.

  6. Configure a computação de avaliação federada.

  7. Analise as métricas de avaliação.

Prepare os dados de entrada

Vamos começar pelos dados. O aprendizado federado requer um conjunto federado de dados, ou seja, uma coleção de dados vindos de diversos usuários. Geralmente, os dados federados não são i.i.d. (independentes e identicamente distribuídos), o que traz desafios únicos. Tipicamente, os usuários têm distribuições diferentes de dados dependendo dos padrões de uso.

Para que possamos fazer experimentos, alimentamos o repositório do TFF com alguns datasets.

Veja como carregar nosso dataset de amostra:

# Code for loading federated data from TFF repository emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

Os datasets retornados por load_data() são instâncias de tff.simulation.datasets.ClientData, uma interface que permite enumerar o conjunto de usuários para construir um tf.data.Dataset que represente os dados de um usuário específico e para consultar a estrutura de elementos individuais.

Vamos explorar o dataset.

len(emnist_train.client_ids)
# Let's look at the shape of our data example_dataset = emnist_train.create_tf_dataset_for_client( emnist_train.client_ids[0]) example_dataset.element_spec
# Let's select an example dataset from one of our simulated clients example_dataset = emnist_train.create_tf_dataset_for_client( emnist_train.client_ids[0]) # Your code to get an example element from one client: example_element = next(iter(example_dataset)) example_element['label'].numpy()
plt.imshow(example_element['pixels'].numpy(), cmap='gray', aspect='equal') plt.grid(False) _ = plt.show()

Explorando dados não independentes e identicamente distribuídos

## Example MNIST digits for one client f = plt.figure(figsize=(20,4)) j = 0 for e in example_dataset.take(40): plt.subplot(4, 10, j+1) plt.imshow(e['pixels'].numpy(), cmap='gray', aspect='equal') plt.axis('off') j += 1
# Number of examples per layer for a sample of clients f = plt.figure(figsize=(12,7)) f.suptitle("Label Counts for a Sample of Clients") for i in range(6): ds = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[i]) k = collections.defaultdict(list) for e in ds: k[e['label'].numpy()].append(e['label'].numpy()) plt.subplot(2, 3, i+1) plt.title("Client {}".format(i)) for j in range(10): plt.hist(k[j], density=False, bins=[0,1,2,3,4,5,6,7,8,9,10])
# Let's play around with the emnist_train dataset. # Let's explore the non-iid charateristic of the example data. for i in range(5): ds = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[i]) k = collections.defaultdict(list) for e in ds: k[e['label'].numpy()].append(e['pixels'].numpy()) f = plt.figure(i, figsize=(12,5)) f.suptitle("Client #{}'s Mean Image Per Label".format(i)) for j in range(10): mn_img = np.mean(k[j],0) plt.subplot(2, 5, j+1) plt.imshow(mn_img.reshape((28,28)))#,cmap='gray') plt.axis('off') # Each client has different mean images -- each client will be nudging the model # in their own directions.

Pré-processamento de dados

Como os dados já são um tf.data.Dataset, o pré-processamento pode ser feito usando-se transformações de datasets. Confira mais detalhes sobre essas transformações aqui.

NUM_CLIENTS = 10 NUM_EPOCHS = 5 BATCH_SIZE = 20 SHUFFLE_BUFFER = 100 PREFETCH_BUFFER=10 def preprocess(dataset): def batch_format_fn(element): """Flatten a batch `pixels` and return the features as an `OrderedDict`.""" return collections.OrderedDict( x=tf.reshape(element['pixels'], [-1, 784]), y=tf.reshape(element['label'], [-1, 1])) return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER).batch( BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

Vamos verificar se funcionou.

preprocessed_example_dataset = preprocess(example_dataset) sample_batch = tf.nest.map_structure(lambda x: x.numpy(), next(iter(preprocessed_example_dataset))) sample_batch

Veja abaixo uma função helper simples que construirá uma lista de datasets a partir do conjunto fornecido de usuários como uma entrada para uma rodada de treinamento ou avaliação.

def make_federated_data(client_data, client_ids): return [ preprocess(client_data.create_tf_dataset_for_client(x)) for x in client_ids ]

Agora, como escolhemos os clientes?

sample_clients = emnist_train.client_ids[0:NUM_CLIENTS] # Your code to get the federated dataset here for the sampled clients: federated_train_data = make_federated_data(emnist_train, sample_clients) print('Number of client datasets: {l}'.format(l=len(federated_train_data))) print('First dataset: {d}'.format(d=federated_train_data[0]))

Criando um modelo com o Keras

Se você estiver usando o Keras, provavelmente já tem algum código que construa um modelo do Keras. Veja abaixo um exemplo de um modelo simples que será suficiente para nosso tutorial.

def create_keras_model(): return tf.keras.models.Sequential([ tf.keras.layers.InputLayer(input_shape=(784,)), tf.keras.layers.Dense(10, kernel_initializer='zeros'), tf.keras.layers.Softmax(), ])

Treinamento centralizado com o Keras

## Centralized training with keras --------------------------------------------- # This is separate from the TFF tutorial, and demonstrates how to train a # Keras model in a centralized fashion (contrasting training in a federated env) (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() # Preprocess the data (these are NumPy arrays) x_train = x_train.reshape(60000, 784).astype("float32") / 255 y_train = y_train.astype("float32") mod = create_keras_model() mod.compile( optimizer=tf.keras.optimizers.RMSprop(), loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()] ) h = mod.fit( x_train, y_train, batch_size=64, epochs=2 ) # ------------------------------------------------------------------------------

Treinamento federado usando um modelo do Keras

Para usar qualquer modelo com o TFF, ele precisa ser encapsulado em uma instância da interface tff.learning.Model.

Confira mais métricas do Keras aqui.

def model_fn(): # We _must_ create a new model here, and _not_ capture it from an external # scope. TFF will call this within different graph contexts. keras_model = create_keras_model() return tff.learning.from_keras_model( keras_model, input_spec=preprocessed_example_dataset.element_spec, loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

Treinando o modelo com dados federados

Agora que temos um modelo encapsulado como tff.learning.Model para uso com o TFF, podemos deixar que o TFF construa um algoritmo de agregação federada por meio da invocação da função helper tff.learning.build_federated_averaging_process conforme mostrado abaixo.

iterative_process = tff.learning.build_federated_averaging_process( model_fn, client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02), # Add server optimizer here! server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

O que aconteceu? O TFF construiu um par de computações federadas e as encapsulou em um tff.templates.IterativeProcess (processo iterativo), em que essas computações ficam disponíveis como um par de propriedades initialize (inicializar) e next (próximo).

Geralmente, um processo iterativo é feito por um loop de controle, como:

def initialize(): ... def next(state): ... iterative_process = IterativeProcess(initialize, next) state = iterative_process.initialize() for round in range(num_rounds): state = iterative_process.next(state)

Vamos invocar a computação initialize para construir o estado do servidor.

state = iterative_process.initialize()

O segundo item do par de computações federadas, next, representa uma única rodada da agregação federada, que consiste em enviar o estado do servidor (incluindo os parâmetros do modelo) para os clientes, treinamento no dispositivo usando os dados locals, coleta e agregação das atualizações do modelo e geração de um modelo atualizado no servidor.

Vamos executar uma única rodada de treinamento e ver os resultados. Podemos usar os dados federados já gerados acima para uma amostra de usuários.

# Run one single round of training. state, metrics = iterative_process.next(state, federated_train_data) print('round 1, metrics={}'.format(metrics['train']))

Vamos executar mais algumas rodadas. Conforme dito antes, geralmente, a esta altura, você deve escolher um subconjunto dos seus dados de simulação usando uma nova amostra de usuários selecionada aleatoriamente para cada rodada para simular uma implantação realista, em que os usuários chegam e saem o tempo todo. Porém, neste notebook interativo, para fins de demonstração, reutilizaremos os mesmos usuários para que a convergência do sistema seja rápida.

NUM_ROUNDS = 11 for round_num in range(2, NUM_ROUNDS): state, metrics = iterative_process.next(state, federated_train_data) print('round {:2d}, metrics={}'.format(round_num, metrics['train']))

A perda de treinamento está diminuindo a cada rodada de treinamento federado, indicando que o modelo está convergindo. Há algumas ressalvas importantes sobre essas métricas de treinamento, que são explicadas mais adiante, na seção Avaliação do tutorial.

##Exibindo métricas do modelo no TensorBoard Agora, vamos visualizar as métricas dessas computações federadas usando o Tensorboard.

Vamos começar criando o diretório e o gravador de resumo correspondente que gravará as mensagens.

#@test {"skip": true} import os import shutil logdir = "/tmp/logs/scalars/training/" if os.path.exists(logdir): shutil.rmtree(logdir) # Your code to create a summary writer: summary_writer = tf.summary.create_file_writer(logdir) state = iterative_process.initialize()

Plote as métricas de escalares relevantes com o mesmo gravador de resumo.

#@test {"skip": true} with summary_writer.as_default(): for round_num in range(1, NUM_ROUNDS): state, metrics = iterative_process.next(state, federated_train_data) for name, value in metrics['train'].items(): tf.summary.scalar(name, value, step=round_num)

Inicialize o TensorBoard com o diretório de log raiz especificado acima. O carregamento dos dados pode demorar alguns segundos.

#@test {"skip": true} %tensorboard --logdir /tmp/logs/scalars/ --port=0

Para ver as métricas de avaliação da mesma forma, você pode criar uma pasta eval separada, como "logs/scalars/eval", para gravar no TensorBoard.

Avaliação

Para fazer a avaliação de dados federados, você pode construir outra computação federada criada para esse fim por meio da função tff.learning.build_federated_evaluation e passando seu construtor do modelo como argumento.

# Construct federated evaluation computation here: evaluation = tff.learning.build_federated_evaluation(model_fn)

Agora, vamos compilar uma amostra de teste dos dados federados e executar a avaliação novamente para os dados de teste, que virão de uma amostra diferente de usuários, mas de um conjunto de dados externo distinto.

import random shuffled_ids = emnist_test.client_ids.copy() random.shuffle(shuffled_ids) sample_clients = shuffled_ids[0:NUM_CLIENTS] federated_test_data = make_federated_data(emnist_test, sample_clients) len(federated_test_data), federated_test_data[0]
# Run evaluation on the test data here, using the federated model produced from # training: test_metrics = evaluation(state.model, federated_test_data)
str(test_metrics)

Isso concluí este tutorial. Sugerimos que você faça experimentos com os parâmetros (por exemplo, tamanho de lote, número de usuários, épocas, taxas de aprendizado, etc.) para modificar o código acima a fim de simular o treinamento com amostras de usuários aleatórias em cada rodada e para explorar os outros tutoriais criados.

Crie seus próprios algoritmos de aprendizado federado

Nos tutoriais anteriores, aprendemos a configurar o modelo e os pipelines de dados, que serão usados para fazer o treinamento federado utilizando a API tff.learning.

Na pesquisa de aprendizado federado, essa é só a ponta do iceberg. Neste tutorial, vamos discutir como implementar algoritmos de aprendizado federado sem usar a API tff.learning. Nossos objetivos são os seguintes:

Objetivos:

  • Entender a estrutura geral dos algoritmos de aprendizado federado.

  • Explorar o Federated Core do TFF.

  • Usar o Federated Core para implementar a agregação federada diretamente.

Prepare os dados de entrada

Primeiro, carregamos e pré-processamos o dataset EMNIST incluído no TFF. Basicamente, usamos o mesmo código do primeiro tutorial.

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()
NUM_CLIENTS = 10 BATCH_SIZE = 20 def preprocess(dataset): def batch_format_fn(element): """Flatten a batch of EMNIST data and return a (features, label) tuple.""" return (tf.reshape(element['pixels'], [-1, 784]), tf.reshape(element['label'], [-1, 1])) return dataset.batch(BATCH_SIZE).map(batch_format_fn)
client_ids = np.random.choice(emnist_train.client_ids, size=NUM_CLIENTS, replace=False) federated_train_data = [preprocess(emnist_train.create_tf_dataset_for_client(x)) for x in client_ids ]

Prepare o modelo

Usamos o mesmo modelo do primeiro tutorial, que tem uma única camada oculta seguida por uma camada softmax.

def create_keras_model(): return tf.keras.models.Sequential([ tf.keras.layers.InputLayer(input_shape=(784,)), tf.keras.layers.Dense(10, kernel_initializer='zeros'), tf.keras.layers.Softmax(), ])

Encapsulamos esse modelo do Keras como tff.learning.Model.

def model_fn(): keras_model = create_keras_model() return tff.learning.from_keras_model( keras_model, input_spec=federated_train_data[0].element_spec, loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

Personalize o algoritmo de aprendizado federado

Embora a API tff.learning abarque diversas variantes da agregação federada, há diversos outros algoritmos que não são perfeitamente adequados para este framework. Por exemplo, talvez você queira acrescentar regularização, recorte ou algoritmos mais complicados, como treinamento federado de GAN. Talvez você também tenha interesse em análise federada.

Para esses algoritmos mais avançados, teremos que escrever nosso próprio algoritmo personalizado de aprendizado federado.

De forma geral, os algoritmos de aprendizado federado têm quatro componentes principais:

  1. Um passo de difusão servidor para cliente.

  2. Um passo de atualização do cliente local.

  3. Um passo de upload cliente para servidor.

  4. Um passo de atualização do servidor.

No TFF, geralmente representamos algoritmos federados como um IterativeProcess (processo iterativo), que é uma classe que contém initialize_fn e next_fn. A função initialize_fn é usada para inicializar o servidor, enquanto a função next_fn faz uma rodada de comunicação de agregação federada. Vamos escrever um esqueleto do nosso processo de iteração para FedAvg.

Primeiro, temos uma função de inicialização que simplesmente cria um tff.learning.Model e retorna seus pesos treináveis.

def initialize_fn(): model = model_fn() return model.weights.trainable

Essa função parece boa, mas, como veremos posteriormente, precisaremos fazer uma pequena modificação para transformá-la em uma computação do TFF.

Também queremos esboçar a função next_fn.

def next_fn(server_weights, federated_dataset): # Broadcast the server weights to the clients. server_weights_at_client = broadcast(server_weights) # Each client computes their updated weights. client_weights = client_update(federated_dataset, server_weights_at_client) # The server averages these updates. mean_client_weights = mean(client_weights) # The server updates its model. server_weights = server_update(mean_client_weights) return server_weights

Vamos nos concentrar na implementação desses quatro componentes separadamente. Primeiro, vamos nos concentrar nas partes que podem ser implementadas totalmente no TensorFlow, ou seja, os passos de atualização do cliente e do servidor.

Blocos do TensorFlow

Atualização do cliente

Usaremos nosso tff.learning.Model para o treinamento do cliente basicamente da mesma maneira que treinaríamos um modelo do TF. Especificamente, usaremos tf.GradientTape para computar o gradiente dos lotes de dados e depois aplicaremos esse gradiente usando um client_optimizer.

Observe que cada instância de tff.learning.Model tem um atributo weights (pesos) com dois subatributos:

  • trainable: lista dos tensores que correspondem a camadas treináveis.

  • non_trainable: lista dos tensores que correspondem a camadas não treináveis.

Para nossos objetivos, usaremos somente os pesos treináveis (pois nosso modelo tem somente esse tipo de peso).

@tf.function def client_update(model, dataset, server_weights, client_optimizer): """Performs training (using the server model weights) on the client's dataset.""" # Initialize the client model with the current server weights. client_weights = model.weights.trainable # Assign the server weights to the client model. tf.nest.map_structure(lambda x, y: x.assign(y), client_weights, server_weights) # Use the client_optimizer to update the local model. for batch in dataset: with tf.GradientTape() as tape: # Compute a forward pass on the batch of data outputs = model.forward_pass(batch) # Compute the corresponding gradient grads = tape.gradient(outputs.loss, client_weights) grads_and_vars = zip(grads, client_weights) # Apply the gradient using a client optimizer. client_optimizer.apply_gradients(grads_and_vars) return client_weights

Atualização do servidor

A atualização do servidor requer ainda menos esforços. Implementamos a agregação federada comum, em que apenas substituímos os pesos do modelo do servidor pela média dos pesos do modelo do cliente. Novamente, nos concentraremos somente nos pesos treináveis.

@tf.function def server_update(model, mean_client_weights): """Updates the server model weights as the average of the client model weights.""" model_weights = model.weights.trainable # Assign the mean client weights to the server model. tf.nest.map_structure(lambda x, y: x.assign(y), model_weights, mean_client_weights) return model_weights

Observe que o trecho de código acima é um exagero, pois poderíamos simplesmente retornar mean_client_weights. Entretanto, para implementações mais avançadas de agregação federada, poderíamos usar mean_client_weights com técnicas mais sofisticadas, como momento e adaptatividade.

Até agora, escrevemos somente código puro TensorFlow. Isso foi de propósito, pois o TFF permite o uso de muito código TensorFlow que já conhecemos. Porém, agora temos que especificar a lógica de orquestração, ou seja, a lógica que dita a difusão feita do servidor para o cliente e o upload do cliente para o servidor.

Isso requer o "Federated Core" do TFF.

Introdução ao Federated Core

O Federated Core (FC) é um conjunto de interfaces de baixo nível que serve como a base da API tff.learning. Porém, essas interfaces não estão limitadas ao aprendizado. De fato, podem ser usadas para análises e muitas outras computações de dados distribuídos.

De forma gera, o Federated Core é um ambiente de desenvolvimento que permite uma lógica de programa expressada compactamente para combinar código TensorFlow com operadores de comunicação distribuída (como somas e difusões distribuídas). O objetivo é fornecer aos pesquisadores e usuários controle explícito da comunicação distribuída em seus sistemas sem exigir os detalhes de implementação do sistema (como especificar trocas de mensagem da rede ponto a ponto).

Um ponto fundamental é que o TFF foi criado para preservar a privacidade. Portanto, ele permite o controle de onde os dados ficam para evitar o acúmulo indesejado de dados no servidor centralizado.

Dados federados

Similar ao conceito de "Tensor" no TensorFlow, um dos conceitos fundamentais, um conceito importante no TFF são os "dados federados", uma coleção de itens de dados hospedados em um grupo de dispositivos em um sistema distribuído (por exemplo, os datasets do cliente ou os pesos do modelo do servidor). Modelamos toda a coleção de itens de dados em todos os dispositivos como um único valor federado.

Por exemplo, vamos supor que tenhamos dispositivos clientes, cada um com um float representando a temperatura de um sensor. Poderíamos representá-lo como um float federado assim:

federated_float_on_clients = tff.type_at_clients(tf.float32)

Os tipos federados são especificados por um tipo T de seus membros constituintes (por exemplo, tf.float32) e um grupo G de dispositivos. Vamos nos concentrar nos casos em que G é tff.CLIENTS ou tff.SERVER. Um tipo federado como este é representado como {T}@G, conforme exibido abaixo.

str(federated_float_on_clients)

Por que essas colocações são tão importantes? Um objetivo essencial do TFF é permitir a escrita de código que possa ser implantado em um sistema distribuído real. Portanto, isso é essencial para decidir quais subconjuntos de dispositivos executam qual código e onde diferentes conjuntos de dados ficam.

O TFF se concentra em três aspectos: dados, onde os dados são colocados e como os dados estão sendo transformados. Os dois primeiros são encapsulados em tipos federados, enquanto o último é encapsulado em computações federadas.

Computações federadas

O TFF é um ambiente de programação funcional fortemente tipado, cujas unidades básicas são computações federadas, que são partes de lógica que recebem valores federados como entrada e retornam valores federados como saída.

Por exemplo, vamos supor que queiramos fazer a média da temperaturas dos nossos sensores do cliente. Podemos definir o seguinte (usando nosso float federado):

@tff.federated_computation(tff.type_at_clients(tf.float32)) def get_average_temperature(client_temperatures): return tff.federated_mean(client_temperatures)

Talvez você pergunte: como isso é diferente do decorador tf.function no TensorFlow? A resposta fundamental é que o código gerado por tff.federated_computation não é nem codigo TensorFlow nem código Python, é uma especificação de um sistema distribuído em uma linguagem glue interna independente de plataforma.

Embora isso possa parecer complicado, pense nas computações do TFF como funções com assinaturas de tipo bem definidas. Essas assinaturas de tipo podem ser consultadas diretamente.

str(get_average_temperature.type_signature)

tff.federated_computation recebe argumentos do tipo federado <float>@CLIENTS e retorna valores do tipo federado <float>@SERVER. As computações federadas também podem ir de servidor a cliente, de cliente a cliente e de servidor a servidor. As computações federadas também podem ser compostas como funções normais, desde que as assinaturas de tipo coincidam.

Para dar suporte ao desenvolvimento, o TFF permite invocar uma tff.federated_computation como uma função do Python. Por exemplo, podemos chamar:

get_average_temperature([68.5, 70.3, 69.8])

Computações não eager (não adiantadas) e o TensorFlow

É preciso ter em mente duas restrições importantes. Primeiro, quando o interpretador do Python encontra um decorador tff.federated_computation, é feito o tracing e a serialização da função uma vez para uso futuro. Portanto, as computações do TFF são fundamentalmente não eager. Esse comportamento é de certa forma análogo ao do decorador tf.function no TensorFlow.

Segundo, uma computação federada pode consistir somente de operadores federados (como tff.federated_mean), que não podem conter operações do TensorFlow. O código TensorFlow precisa estar confinado aos blocos decorados com tff.tf_computation. A maior parte do código comum do TensorFlow pode ser decorado diretamente, como a seguinte função que recebe um número e adiciona 0.5 a ele.

@tff.tf_computation(tf.float32) def add_half(x): return tf.add(x, 0.5)

Também há assinaturas de tipo, mas sem colocações. Por exemplo, podemos chamar:

str(add_half.type_signature)

Aqui vemos uma diferença importante entre tff.federated_computation e tff.tf_computation. O primeiro tem colocações explícitas, enquanto o segundo, não.

Podemos usar blocos tff.tf_computation em computações federadas por meio da especificação de colocações. Vamos criar uma função que adicione 0,5, mas somente a floats federados nos clientes. Podemos fazer isso usando tff.federated_map, que aplica uma determinada tff.tf_computation, preservando a colocação.

@tff.federated_computation(tff.type_at_clients(tf.float32)) def add_half_on_clients(x): return tff.federated_map(add_half, x)

Essa função é quase idêntica a add_half, exceto que recebe somente valores com colocação em tff.CLIENTS e retorna valores com a mesma colocação. Podemos ver isso em sua assinatura de tipo:

str(add_half_on_clients.type_signature)

Resumindo:

  • O TFF faz operações em valores federados.

  • Cada valor federado tem um tipo federado, com um tipo (por exemplo, tf.float32) e uma colocação (por exemplo, tff.CLIENTS).

  • Os valores federados podem ser transformados usando-se computações federadas, que devem ser decoradas com tff.federated_computation, e uma assinatura de tipo federado.

  • O código TensorFlow deve ficar contido em blocos com decoradores tff.tf_computation.

  • Esses blocos podem ser incorporados às computações federadas.

Crie seu próprio algoritmo de aprendizado federado (parte 2)

Agora que demos uma olhada no Federated Core, podemos criar nosso próprio algoritmo de aprendizado federado. Lembre-se de que definimos acima funções initialize_fn e next_fn para o algoritmo. next_fn usa client_update e server_update que definimos usando código puro TensorFlow.

Porém, para fazer do nosso algoritmo uma computação federada, precisamos que next_fn e initialize_fn sejam tff.federated_computations.

Blocos do TensorFlow Federated

Crie a computação de inicialização

A função de inicialização será bem simples: vamos criar um modelo usando model_fn. Porém, lembre-se de que precisamos separar o código TensorFlow usando tff.tf_computation.

@tff.tf_computation def server_init(): model = model_fn() return model.weights.trainable

Em seguida, podemos passá-la diretamente para uma computação federada usando tff.federated_value.

@tff.federated_computation def initialize_fn(): return tff.federated_value(server_init(), tff.SERVER)

Crie next_fn

Agora usaremos nosso código de atualização do cliente e do servidor para escrever o algoritmo em si. Primeiro, transformamos client_update em uma tff.tf_computation que receba datasets do cliente e pesos do servidor e gere como saída um tensor de pesos do cliente atualizados.

Precisamos dos tipos correspondentes para decorar corretamente a função. Felizmente, o tipo dos pesos do servidor pode ser extraído diretamente do modelo.

whimsy_model = model_fn() tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)

Vamos conferir a assinatura de tipo do dataset. Lembre-se de que pegamos imagens 28x28 (com rótulos inteiros) e as nivelamos.

str(tf_dataset_type)

Também podemos extrair o tipo de pesos do modelo usando a função server_init acima.

model_weights_type = server_init.type_signature.result

Ao avaliar a assinatura de tipos, poderemos ver a arquitetura do modelo.

str(model_weights_type)

Agora, vamos criar tff.tf_computation para a atualização do cliente.

@tff.tf_computation(tf_dataset_type, model_weights_type) def client_update_fn(tf_dataset, server_weights): model = model_fn() client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01) return client_update(model, tf_dataset, server_weights, client_optimizer)

A versão de tff.tf_computation da atualização do servidor pode ser definida de forma similar, usando os tipos que já extraímos.

@tff.tf_computation(model_weights_type) def server_update_fn(mean_client_weights): model = model_fn() return server_update(model, mean_client_weights)

Por último, precisamos criar a tff.federated_computation que junta tudo isso. Essa função receberá dois valores federados, um correspondente aos pesos do servidor (com a colocação tff.SERVER) e outro correspondente aos datasets do cliente (com colocação tff.CLIENTS).

Esses dois tipos foram definidos acima. Só precisamos definir a colocação correta para eles usando `tff.type_at_{server/clients}``.

federated_server_type = tff.type_at_server(model_weights_type) federated_dataset_type = tff.type_at_clients(tf_dataset_type)

Lembra-se dos quatro elementos de um algoritmo de aprendizado federado?

  1. Um passo de difusão servidor para cliente.

  2. Um passo de atualização do cliente local.

  3. Um passo de upload cliente para servidor.

  4. Um passo de atualização do servidor.

Agora que construímos tudo isso, cada parte pode ser representada compactamente como uma única linha de código do TFF. É por essa simplicidade que tomamos cuidado redobrado ao especificar os tipos federados.

@tff.federated_computation(federated_server_type, federated_dataset_type) def next_fn(server_weights, federated_dataset): # Broadcast the server weights to the clients. server_weights_at_client = tff.federated_broadcast(server_weights) # Each client computes their updated weights. client_weights = tff.federated_map( client_update_fn, (federated_dataset, server_weights_at_client)) # The server averages these updates. mean_client_weights = tff.federated_mean(client_weights) # The server updates its model. server_weights = tff.federated_map(server_update_fn, mean_client_weights) return server_weights

Agora temos uma tff.federated_computation para o algoritmo de inicialização e para executar um passo do algoritmo. Para finalizar o algoritmo, nós os passamos para tff.templates.IterativeProcess.

federated_algorithm = tff.templates.IterativeProcess( initialize_fn=initialize_fn, next_fn=next_fn )

Vamos conferir a assinatura de tipo das funções initialize e next do processo iterativo.

str(federated_algorithm.initialize.type_signature)

Isso reflete o fato de federated_algorithm.initialize ser uma função sem argumentos que retorna um modelo de uma única camada (com uma matriz de pesos 784 por 10 e 10 unidades de bias).

str(federated_algorithm.next.type_signature)

Aqui, vemos que federated_algorithm.next recebe um modelo do servidor e dados do cliente, e retorna um modelo do servidor atualizado.

Avalie o algoritmo

Vamos executar algumas rodadas e ver como a perda muda. Primeiro, vamos definir uma função de avaliação usando a estratégia centralizada discutida no segundo tutorial.

Primeiro, criamos um dataset de avaliação centralizado e depois aplicamos o mesmo pré-processamento usado para os dados de treinamento.

Observe que apenas recebemos (take) os primeiros 1.000 elementos por motivos de eficiência computacional, mas, geralmente, usaríamos todo o dataset de teste.

central_emnist_test = emnist_test.create_tf_dataset_from_all_clients().take(1000) central_emnist_test = preprocess(central_emnist_test)

Em seguida, escrevemos uma função que receba um estado do servidor e use o Keras para fazer a avaliação para o dataset de teste. Se você já conhece tf.Keras, reconhecerá esse código, exceto pelo uso de set_weights.

def evaluate(server_state): keras_model = create_keras_model() keras_model.compile( loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()] ) keras_model.set_weights(server_state) keras_model.evaluate(central_emnist_test)

Agora, vamos inicializar o algoritmo e fazer a avaliação para o dataset de teste.

server_state = federated_algorithm.initialize() evaluate(server_state)

Vamos treinar por algumas rodadas e ver se algo muda.

for round in range(15): server_state = federated_algorithm.next(server_state, federated_train_data)
evaluate(server_state)

Vemos uma pequena diminuição na função de perda. Embora a mudança seja pequena, observe que só fizemos 10 rodadas de treinamento com um subconjunto de clientes pequeno. Para obter resultados melhores, talvez precisemos de centenas ou milhares de rodadas.

Modificação do algoritmo

Agora, vamos parar e pensar no que fizemos. Implementamos agregação federada diretamente pela combinação de código puro TensorFlow (para as atualizações do servidor e do cliente) com computações federadas usando o Federated Core do TFF.

Para fazer um aprendizado mais sofisticado, basta alterar o que temos acima. Especificamente, ao editar o código puro TF acima, podemos mudar como o cliente faz o treinamento ou como o servidor atualiza seu modelo.

Desafio: acrescente recorte de gradiente à função client_update.