Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/es-419/federated/openmined2020/openmined_conference_2020.ipynb
25118 views
Kernel: Python 3

Antes de empezar

Para editar el bloc de notas de colaboración, vaya a "File" -> "Save a copy in Drive" ("Archivo" -> "Guardar una copia en Drive") y edite su copia.

Antes de empezar, ejecute lo que se encuentra a continuación, para asegurarse de que el entorno esté preparado correctamente. Si no ve un mensaje de inicio, para más instrucciones, consulte la guía de instalación.

#@title Upgrade tensorflow_federated and load TensorBoard #@test {"skip": true} !pip install --quiet --upgrade tensorflow-federated !pip install --quiet --upgrade nest-asyncio import nest_asyncio nest_asyncio.apply() %load_ext tensorboard import sys if not sys.warnoptions: import warnings warnings.simplefilter("ignore")
#@title import collections from matplotlib import pyplot as plt from IPython.display import display, HTML, IFrame import numpy as np import tensorflow as tf import tensorflow_federated as tff tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) np.random.seed(0) def greetings(): display(HTML('<b><font size="6" color="#ff00f4">Greetings, virtual tutorial participants!</font></b>')) return True l = tff.federated_computation(greetings)()

TensorFlow federado para clasificación de imágenes

Experimentemos con el aprendizaje federado en una simulación. En este tutorial usamos el ejemplo de entrenamiento clásico MNIST para presentar la capa de API de aprendizaje federado (FL, por sus siglas en inglés) de TFF, tff.learning; un conjunto de interfaces que se puede utilizar para realizar distintos tipos de tareas de aprendizaje federado, como un entrenamiento federado, con respecto a los modelos implementados por TensorFlow provistos por usuarios.

Estructura del tutorial

Entrenaremos un modelo para clasificar imágenes con el conjunto de datos clásico MNIST y aplicaremos el aprendizaje de red neuronal para clasificar dígitos de imágenes. En este caso, simularemos aprendizaje federado con los datos de entrenamiento distribuidos en diferentes dispositivos.

Secciones

  1. Carga de las bibliotecas de TFF.

  2. Exploración y preprocesamiento del conjunto de datos EMNIST federado.

  3. Creación de un modelo.

  4. Configuración del proceso del cálculo de promedio federado para entrenamiento.

  5. Análisis de las métricas de entrenamiento.

  6. Configuración del cálculo de evaluación federada.

  7. Análisis de las métricas de evaluación.

Preparación de los datos de entrada

Empecemos con los datos. Para poner en práctica el aprendizaje federado es necesario contar con un conjunto de datos federados; es decir, una colección de datos de múltiples usuarios. Los datos federados normalmente son no i.i.d., lo que presenta un grupo de problemas particulares. Normalmente, los usuarios tienen diferentes distribuciones de datos que dependen de los patrones de uso.

A fin de facilitar la experimentación, sembramos el repositorio de TFF con algunos conjuntos de datos.

A continuación, compartimos cómo podemos cargar nuestro conjunto de datos de muestra.

# Code for loading federated data from TFF repository emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

Los conjuntos de datos devueltos por load_data() son instancias de tff.simulation.datasets.ClientData, una interfaz que permite enumerar los conjuntos de usuarios para construir un tf.data.Dataset que representa los datos de un usuario en particular y para consultar la estructura de elementos individuales.

Exploremos el conjunto de datos.

len(emnist_train.client_ids)
# Let's look at the shape of our data example_dataset = emnist_train.create_tf_dataset_for_client( emnist_train.client_ids[0]) example_dataset.element_spec
# Let's select an example dataset from one of our simulated clients example_dataset = emnist_train.create_tf_dataset_for_client( emnist_train.client_ids[0]) # Your code to get an example element from one client: example_element = next(iter(example_dataset)) example_element['label'].numpy()
plt.imshow(example_element['pixels'].numpy(), cmap='gray', aspect='equal') plt.grid(False) _ = plt.show()

Exploración de los datos que no tienen una distribución iid

## Example MNIST digits for one client f = plt.figure(figsize=(20,4)) j = 0 for e in example_dataset.take(40): plt.subplot(4, 10, j+1) plt.imshow(e['pixels'].numpy(), cmap='gray', aspect='equal') plt.axis('off') j += 1
# Number of examples per layer for a sample of clients f = plt.figure(figsize=(12,7)) f.suptitle("Label Counts for a Sample of Clients") for i in range(6): ds = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[i]) k = collections.defaultdict(list) for e in ds: k[e['label'].numpy()].append(e['label'].numpy()) plt.subplot(2, 3, i+1) plt.title("Client {}".format(i)) for j in range(10): plt.hist(k[j], density=False, bins=[0,1,2,3,4,5,6,7,8,9,10])
# Let's play around with the emnist_train dataset. # Let's explore the non-iid charateristic of the example data. for i in range(5): ds = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[i]) k = collections.defaultdict(list) for e in ds: k[e['label'].numpy()].append(e['pixels'].numpy()) f = plt.figure(i, figsize=(12,5)) f.suptitle("Client #{}'s Mean Image Per Label".format(i)) for j in range(10): mn_img = np.mean(k[j],0) plt.subplot(2, 5, j+1) plt.imshow(mn_img.reshape((28,28)))#,cmap='gray') plt.axis('off') # Each client has different mean images -- each client will be nudging the model # in their own directions.

Preprocesamiento de los datos

Como los datos ya son un tf.data.Dataset, el preprocesamiento se puede cumplir con transformaciones de conjuntos de datos. Consulte aquí por más detalle sobre estas transformaciones.

NUM_CLIENTS = 10 NUM_EPOCHS = 5 BATCH_SIZE = 20 SHUFFLE_BUFFER = 100 PREFETCH_BUFFER=10 def preprocess(dataset): def batch_format_fn(element): """Flatten a batch `pixels` and return the features as an `OrderedDict`.""" return collections.OrderedDict( x=tf.reshape(element['pixels'], [-1, 784]), y=tf.reshape(element['label'], [-1, 1])) return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER).batch( BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

Verifiquemos si funcionó.

preprocessed_example_dataset = preprocess(example_dataset) sample_batch = tf.nest.map_structure(lambda x: x.numpy(), next(iter(preprocessed_example_dataset))) sample_batch

Aquí, presentamos una función ayudante simple que construirá una lista de conjuntos de datos (a partir de un conjunto dado de usuarios) como entrada a una ronda de entrenamiento o evaluación.

def make_federated_data(client_data, client_ids): return [ preprocess(client_data.create_tf_dataset_for_client(x)) for x in client_ids ]

Ahora, ¿cómo elegimos a los clientes?

sample_clients = emnist_train.client_ids[0:NUM_CLIENTS] # Your code to get the federated dataset here for the sampled clients: federated_train_data = make_federated_data(emnist_train, sample_clients) print('Number of client datasets: {l}'.format(l=len(federated_train_data))) print('First dataset: {d}'.format(d=federated_train_data[0]))

Creación de un modelo con Keras

Si usa Keras, probablemente ya tenga el código que construye un modelo Keras. A continuación, mostramos un ejemplo de un modelo simple que bastará para nuestro propósito.

def create_keras_model(): return tf.keras.models.Sequential([ tf.keras.layers.InputLayer(input_shape=(784,)), tf.keras.layers.Dense(10, kernel_initializer='zeros'), tf.keras.layers.Softmax(), ])

Entrenamiento centralizado con Keras

## Centralized training with keras --------------------------------------------- # This is separate from the TFF tutorial, and demonstrates how to train a # Keras model in a centralized fashion (contrasting training in a federated env) (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() # Preprocess the data (these are NumPy arrays) x_train = x_train.reshape(60000, 784).astype("float32") / 255 y_train = y_train.astype("float32") mod = create_keras_model() mod.compile( optimizer=tf.keras.optimizers.RMSprop(), loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()] ) h = mod.fit( x_train, y_train, batch_size=64, epochs=2 ) # ------------------------------------------------------------------------------

Entrenamiento federado con un modelo Keras

A fin de usar cualquier modelo con TFF, hay que encapsularlo (wrap) en una instancia de la interfaz del tff.learning.Model.

Aquí hallará más métricas Keras para agregar.

def model_fn(): # We _must_ create a new model here, and _not_ capture it from an external # scope. TFF will call this within different graph contexts. keras_model = create_keras_model() return tff.learning.from_keras_model( keras_model, input_spec=preprocessed_example_dataset.element_spec, loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

Entrenamiento del modelo sobre datos federados

Ahora que tenemos un modelo encapsulado como tff.learning.Model para usarlo con TFF, podemos dejar que TFF construya un algoritmo de promedio federado "Federated Averaging" si invocamos la función ayudante tff.learning.build_federated_averaging_process, como se muestra a continuación.

iterative_process = tff.learning.build_federated_averaging_process( model_fn, client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02), # Add server optimizer here! server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

¿Qué acaba de suceder? TFF ha construido un par de cálculos federados y los empaquetó en un tff.templates.IterativeProcess en los cuales estos cálculos se encuentran en forma de un par de propiedades initialize y next.

Por lo general, un proceso iterativo será provocado por un bucle de control como el siguiente:

def initialize(): ... def next(state): ... iterative_process = IterativeProcess(initialize, next) state = iterative_process.initialize() for round in range(num_rounds): state = iterative_process.next(state)

Invoquemos el cálculo initialize para construir el estado del servidor.

state = iterative_process.initialize()

El segundo par de cálculos federados, next, representa a una ronda simple de cálculo de promedio federado de un modelo nuevo actualizado en el servidor, que está compuesta por el envío del estado del servidor (incluidos los parámetros del modelo) a los clientes, el entrenamiento en el dispositivo sobre sus datos locales, las actualizaciones del modelo de recolección y el cálculo del promedio y la producción.

Ejecutemos una ronda simple de entrenamiento y observemos los resultados. Podemos usar los datos federados que ya hemos generado (arriba) para una muestra de usuarios.

# Run one single round of training. state, metrics = iterative_process.next(state, federated_train_data) print('round 1, metrics={}'.format(metrics['train']))

Ejecutemos algunas rondas más. Tal como lo señalamos antes, normalmente en esta instancia, elegiríamos un subconjunto de los datos de simulación a partir de una muestra de usuarios seleccionada de forma aleatoria para cada ronda, a fin de simular una implementación realista en la cual los usuarios vienen y van continuamente. Pero en estas notas interactivas, con propósito demostrativo, simplemente reutilizaremos los mismos usuarios, para que el sistema converja rápidamente.

NUM_ROUNDS = 11 for round_num in range(2, NUM_ROUNDS): state, metrics = iterative_process.next(state, federated_train_data) print('round {:2d}, metrics={}'.format(round_num, metrics['train']))

La pérdida de entrenamiento disminuye después de cada ronda de entrenamiento federado. Es señal de que el modelo está convergiendo. Hay algunas salvedades importantes relacionadas con estas métricas de entrenamiento, pero para conocerlas, consulte más adelante la sección Evaluación en este tutorial.

##Se muestran las métricas del modelo en TensorBoard. Luego, observemos las métricas de estos cálculos federados en TensorBoard.

Comencemos por crear un directorio y el escritor de resúmenes correspondiente en el que se redactarán las métricas.

#@test {"skip": true} import os import shutil logdir = "/tmp/logs/scalars/training/" if os.path.exists(logdir): shutil.rmtree(logdir) # Your code to create a summary writer: summary_writer = tf.summary.create_file_writer(logdir) state = iterative_process.initialize()

Grafiquemos las métricas escalares relevantes con el mismo escritor de resúmenes.

#@test {"skip": true} with summary_writer.as_default(): for round_num in range(1, NUM_ROUNDS): state, metrics = iterative_process.next(state, federated_train_data) for name, value in metrics['train'].items(): tf.summary.scalar(name, value, step=round_num)

Comencemos por TensorBoard, con el directorio de registros raíz especificado arriba. La carga de los datos puede demorar algunos segundos.

#@test {"skip": true} %tensorboard --logdir /tmp/logs/scalars/ --port=0

A fin de ver las métricas de evaluación del mismo modo, se puede crear una carpeta de evaluación por separado, como "logs/scalars/eval", para escribir en TensorBoard.

Evaluación

Para llevar a cabo la evaluación sobre los datos federados, se puede construir otro cálculo federado diseñado para este propósito, con la función tff.learning.build_federated_evaluation y pasar el constructor del modelo como un argumento.

# Construct federated evaluation computation here: evaluation = tff.learning.build_federated_evaluation(model_fn)

Ahora compilemos una muestra de prueba de datos federados y volvamos a ejecutar la evaluación de los datos de prueba. Los datos provendrán de una muestra diferente de usuarios y de un conjunto de datos retenidos (held-out) distintivos.

import random shuffled_ids = emnist_test.client_ids.copy() random.shuffle(shuffled_ids) sample_clients = shuffled_ids[0:NUM_CLIENTS] federated_test_data = make_federated_data(emnist_test, sample_clients) len(federated_test_data), federated_test_data[0]
# Run evaluation on the test data here, using the federated model produced from # training: test_metrics = evaluation(state.model, federated_test_data)
str(test_metrics)

De este modo, se concluye con el tutorial. Le aconsejamos jugar con distintos parámetros (p. ej., los tamaños de los lotes, la cantidad de usuarios, las épocas, las tasas de aprendizaje, etc.), para modificar el código que figura arriba a fin de simular el entrenamiento con muestras aleatorias de usuarios en cada ronda. También le recomendamos explorar los otros tutoriales que hemos desarrollado.

Creación de los propios algoritmos de aprendizaje federado

En los tutoriales anteriores aprendimos a configurar las canalizaciones de los datos y del modelo. Además las usamos para realizar entrenamientos federados con la API tff.learning API.

Por supuesto, esto es solamente la punta del iceberg en la investigación sobre el aprendizaje federado. En este tutorial analizaremos cómo implementar los algoritmos de aprendizaje federado sin delegar a la API tff.learning. Con este tutorial, pretendemos lograr lo siguiente:

Objetivos:

  • Entender la estructura general de los algoritmos de aprendizaje federado.

  • Explorar el núcleo federado de TFF.

  • Usar el núcleo federado para implementar directamente el cálculo del promedio federado.

Preparación de los datos de entrada

Primero, cargamos y preprocesamos el conjunto de datos EMNIST incluido en TFF. Básicamente usamos el mismo código que se utilizó en el primer tutorial.

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()
NUM_CLIENTS = 10 BATCH_SIZE = 20 def preprocess(dataset): def batch_format_fn(element): """Flatten a batch of EMNIST data and return a (features, label) tuple.""" return (tf.reshape(element['pixels'], [-1, 784]), tf.reshape(element['label'], [-1, 1])) return dataset.batch(BATCH_SIZE).map(batch_format_fn)
client_ids = np.random.choice(emnist_train.client_ids, size=NUM_CLIENTS, replace=False) federated_train_data = [preprocess(emnist_train.create_tf_dataset_for_client(x)) for x in client_ids ]

Preparación del modelo

Usamos el mismo modelo del primer tutorial, que tiene una sola capa oculta, seguida por una capa softmax.

def create_keras_model(): return tf.keras.models.Sequential([ tf.keras.layers.InputLayer(input_shape=(784,)), tf.keras.layers.Dense(10, kernel_initializer='zeros'), tf.keras.layers.Softmax(), ])

Encapsulamos (wrap) este modelo Keras como un tff.learning.Model.

def model_fn(): keras_model = create_keras_model() return tff.learning.from_keras_model( keras_model, input_spec=federated_train_data[0].element_spec, loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

Personalización del algoritmo de aprendizaje federado

Si bien la API tff.learning permite que uno cree muchas variantes del cálculo del promedio federado, hay otros algoritmos federados que no se adaptan perfectamente a este marco de trabajo. Por ejemplo, tal vez le convenga agregar algoritmos de regularización, recorte (clipping) u otros más complicados como el entrenamiento GAN federado. Probablemente, por otra parte, lo que le resulte interesante sea el análisis federado.

Para estos algoritmos más avanzados, deberemos escribir nuestro propio algoritmo personalizado de aprendizaje federado.

En general, los algoritmos de aprendizaje federado están compuestos por cuatro partes principales:

  1. Un paso para la emisión (broadcast) del servidor al cliente.

  2. Un paso para la actualización del cliente local.

  3. Un paso para la carga del cliente al servidor.

  4. Un paso para la actualización del servidor.

En TFF, un algoritmo federado, normalmente, está representado por un IterativeProcess. Simplemente, es una clase que contiene las funciones initialize_fn y next_fn. initialize_fn se usará para inicializar el servidor y next_fn realizará una ronda de comunicación del cálculo de promedio federado. Escribamos un esquema sobre cómo debería lucir de nuestro proceso iterativo para FedAvg.

Primero, hay una función para inicializar que simplemente crea tff.learning.Model y devuelve sus pesos entrenables.

def initialize_fn(): model = model_fn() return model.weights.trainable

Esta función tiene buen aspecto, pero como verá más adelante, deberemos hacerle una pequeña modificación para convertirla en un cálculo de TFF.

También nos convendrá realizar el esquema de next_fn.

def next_fn(server_weights, federated_dataset): # Broadcast the server weights to the clients. server_weights_at_client = broadcast(server_weights) # Each client computes their updated weights. client_weights = client_update(federated_dataset, server_weights_at_client) # The server averages these updates. mean_client_weights = mean(client_weights) # The server updates its model. server_weights = server_update(mean_client_weights) return server_weights

Centrémonos en implementar estos cuatro componentes por separado. Primero, enfoquémonos en las partes que se pueden implementar en TensorFlow puro, a saber, los pasos relacionados con el cliente y el servidor.

Bloques de TensorFlow

Actualización del cliente

Usaremos nuestro tff.learning.Model para hacer el entrenamiento del cliente, esencialmente, del mismo modo en que se entrenaría un modelo de TF. En particular, usaremos tf.GradientTape para calcular el gradiente en lotes de datos y luego aplicarlo con un client_optimizer.

Tenga en cuenta que cada instancia de tff.learning.Model tiene un atributo de weights con dos subatributos:

  • trainable: una lista de tensores correspondientes a las capas entrenables.

  • non_trainable: una lista de tensores correspondientes a las capas no entrenables.

Para cumplir con nuestro objetivo, solamente usaremos pesos entrenables (ya que nuestro modelo únicamente tiene los de este tipo).

@tf.function def client_update(model, dataset, server_weights, client_optimizer): """Performs training (using the server model weights) on the client's dataset.""" # Initialize the client model with the current server weights. client_weights = model.weights.trainable # Assign the server weights to the client model. tf.nest.map_structure(lambda x, y: x.assign(y), client_weights, server_weights) # Use the client_optimizer to update the local model. for batch in dataset: with tf.GradientTape() as tape: # Compute a forward pass on the batch of data outputs = model.forward_pass(batch) # Compute the corresponding gradient grads = tape.gradient(outputs.loss, client_weights) grads_and_vars = zip(grads, client_weights) # Apply the gradient using a client optimizer. client_optimizer.apply_gradients(grads_and_vars) return client_weights

Actualización del servidor

Para la actualización del servidor se necesitará incluso menos esfuerzo. Implementaremos el cálculo de promedios federados "vainilla", en el que los pesos del modelo del servidor se reemplazan por el promedio de los pesos del modelo del cliente. Una vez más, solamente nos centraremos en los pesos entrenables.

@tf.function def server_update(model, mean_client_weights): """Updates the server model weights as the average of the client model weights.""" model_weights = model.weights.trainable # Assign the mean client weights to the server model. tf.nest.map_structure(lambda x, y: x.assign(y), model_weights, mean_client_weights) return model_weights

Tenga en cuenta que para el fragmento anterior es claramente un exceso; ya que, sencillamente, se podría simplificar con la devolución de mean_client_weights. Sin embargo, en las implementaciones más avanzadas del cálculo de promedio federado se usa mean_client_weights con técnicas más sofisticadas, como momentum o adaptabilidad.

Hasta el momento, solamente hemos escrito código puro de TensorFlow. El motivo es el diseño, ya que TFF permite usar gran parte del código de TensorFlow con el que ya estamos familiarizados. Sin embargo, deberá especificar la lógica de orquestación; es decir, la que dicta lo que el servidor emite (broadcast) al cliente y que el cliente carga en el servidor.

El núcleo federado de TFF será indispensable.

Introducción al núcleo federado

El núcleo federado (FC, por sus siglas en inglés) es un conjunto de interfaces de bajo nivel que sirve como base para la API tff.learning. Sin embargo, estas interfaces no se limitan al aprendizaje. De hecho, se pueden usar para análisis y muchos otros cálculos de datos distribuidos.

A un alto nivel, el núcleo federado es un entorno de desarrollo que permite expresar de manera compacta la lógica de programación para combinar código de TensorFlow con los operadores de comunicación distribuidos (como las sumas y las emisiones distribuidas). El objetivo es brindarles a los investigadores y especialistas el control explícito de la comunicación distribuida en sus sistemas, sin requerir de otros detalles para la implementación (tales como la especificación de los intercambios de mensajes de red punto a punto).

Un punto clave es que TFF está diseñado para la preservación de la privacidad. Por lo tanto, permite el control explícito del sitio donde residen los datos, para prevenir la acumulación indeseada de datos en el lugar del servidor centralizado.

Datos federados

Del mismo modo que el concepto de "tensor" en TensorFlow es fundamental, el concepto de los "datos federados" es clave en TFF. Se refiere a una colección de elementos de datos alojados en un grupo de dispositivos en un sistema distribuido (p. ej., las bases de datos de clientes o los pesos del modelo del servidor). La colección entera de valores de todos los dispositivos se representa con un solo valor federado.

Por ejemplo, supongamos que hay dispositivos clientes y que cada uno tiene un flotante que representa la temperatura de un sensor. Esos flotantes se pueden representar como flotante federado de la siguiente manera:

federated_float_on_clients = tff.type_at_clients(tf.float32)

Los tipos federados son especificados por un tipo de T de los miembros que lo componen (p. ej., tf.float32) y un grupo de dispositivos G. Nos centraremos en aquellos casos en que G es tff.CLIENTS o tff.SERVER. Un tipo federado como tal se representa con {T}@G, como se muestra a continuación.

str(federated_float_on_clients)

¿Por qué nos interesan tanto las ubicaciones? El objetivo clave de TFF es el de facilitar la escritura de código que se podría implementar en un sistema distribuido real. Significa que es vital razonar con respecto a qué subconjuntos de dispositivos ejecutan qué códigos y dónde residen las diferentes porciones de datos.

TFF se centra en tres cosas: en los datos, en dónde se ubican los datos y en cómo se transforman esos datos. Las primeras dos se encuentran encapsuladas dentro de los tipos federados, mientras que la última, en cálculos federados.

Cálculos federados

TFF es un entorno de programación funcional fuertemente tipado cuyas unidades básicas son cálculos federados. Son porciones de lógica que aceptan valores federados como entrada y devuelven valores federados como salida.

Por ejemplo, supongamos que quisiéramos calcular el promedio de temperaturas en los sensores de nuestro cliente. Podríamos definir lo siguiente (con nuestro flotante federado):

@tff.federated_computation(tff.type_at_clients(tf.float32)) def get_average_temperature(client_temperatures): return tff.federated_mean(client_temperatures)

Uno podría preguntarse, en qué difiere esto del decorador tf.function de TensorFlow. La respuesta determinante es que el código generado por tff.federated_computation no es un código de TensorFlow ni de Python. Es una especificación de un sistema distribuido en un lenguaje pegamento interno independiente de plataformas.

Si bien es cierto que puede sonar complicado, se puede pensar en los cálculos TFF como funciones con firmas de tipo bien definidas. Estas firmas de tipo se pueden consultar directamente.

str(get_average_temperature.type_signature)

Este tff.federated_computation acepta argumentos del tipo federado <float32>@CLIENTS y devuelve valores del mismo tipo <float32>@SERVER. Los cálculos federados también pueden ir de servidor a cliente, de cliente a cliente o de servidor a servidor. Los cálculos federados además se pueden componer como las funciones normales, siempre y cuando haya coincidencia entre las firmas de tipo.

Para facilitar el desarrollo, TFF permite invocar un tff.federated_computation como una función Python. Por ejemplo, podemos llamar lo siguiente:

get_average_temperature([68.5, 70.3, 69.8])

Los cálculos sin ejecución eager y con TensorFlow

Hay dos restricciones fundamentales para tener en cuenta. La primera, es que cuando un intérprete Python encuentra un decorador tff.federated_computation, la función se rastrea una vez y se serializa para futuros usos. Por lo tanto, los cálculos TFF son fundamentalmente non-eager (no utilizan ejecución eager). Este comportamiento es, en cierto modo, análogo al del decorador tf.function en TensorFlow.

La segunda, es que un cálculo federado solamente puede estar compuesto por operadores federados ( como tff.federated_mean), no puede contener operaciones de TensorFlow. Hay que confinar el código de TensorFlow a bloques decorados con tff.tf_computation. El código TensorFlow más común, directamente, se puede decorar, como la siguiente función que toma un número y le agrega 0.5.

@tff.tf_computation(tf.float32) def add_half(x): return tf.add(x, 0.5)

Estas también son firmas de tipo, pero sin ubicaciones. Por ejemplo, se puede llamar lo siguiente:

str(add_half.type_signature)

Observamos entonces, la gran diferencia que hay entre tff.federated_computation y tff.tf_computation. El primero tiene ubicaciones explícitas, mientras que el segundo no.

Podemos usar bloques tff.tf_computation en cálculos federados para ubicaciones específicas. Creemos una función que agregue un medio (add half), pero solamente a flotantes federados de clientes. Podemos hacerlo con tff.federated_map, que aplica un tff.tf_computation dado y, a la vez, preserva la ubicación.

@tff.federated_computation(tff.type_at_clients(tf.float32)) def add_half_on_clients(x): return tff.federated_map(add_half, x)

Esta función es casi idéntica a add_half, excepto porque solamente acepta valores con ubicación en tff.CLIENTS y devuelve valores con la misma ubicación. Podemos observarlo en su firma de tipo:

str(add_half_on_clients.type_signature)

En resumen:

  • TFF opera sobre valores federados.

  • Cada valor federado tiene un tipo federado, con un tipo (p. ej., tf.float32) y una ubicación (p. ej., tff.CLIENTS).

  • Los valores federados se pueden transformar con cálculos federados, que se deben decorar con tff.federated_computation y una firma de tipo federado.

  • El código TensorFlow debe estar contenido en bloques con decoradores tff.tf_computation.

  • Estos bloques, después se pueden incorporar en cálculos federados.

Creación de los propios algoritmos de aprendizaje federado (parte 2)

Ahora que ya tenemos una idea de lo que es el núcleo federado, podemos crear un algoritmo propio de aprendizaje federado. Recordemos que antes (arriba) ya definimos un initialize_fn y next_fn para nuestro algoritmo. El next_fn usará client_update y server_update que ya definimos con código de TensorFlow puro.

Sin embargo, para hacer nuestro algoritmo con un cálculo federado, necesitaremos que tanto next_fn como initialize_fn sean tff.federated_computation.

Bloques de TensorFlow federado

Creación del cálculo de inicialización

La función de inicializar será bastante simple: deberemos crear un modelo con model_fn. Sin embargo, recuerde que debemos separar nuestro código de TensorFlow con tff.tf_computation.

@tff.tf_computation def server_init(): model = model_fn() return model.weights.trainable

Entonces, ahora, podemos pasarlo directamente a cálculo federado con tff.federated_value.

@tff.federated_computation def initialize_fn(): return tff.federated_value(server_init(), tff.SERVER)

Creación de next_fn

El código de actualización de cliente y servidor ahora se puede usar para escribir el algoritmo real. Primero, se transformará el client_update en un tff.tf_computation que acepta un conjunto de datos del cliente y los pesos del servidor, y sale un tensor de pesos del cliente actualizado.

Necesitaremos los tipos correspondientes que decoren adecuadamente nuestra función. Afortunadamente, el tipo de pesos del servidor se puede extraer directamente desde nuestro modelo.

whimsy_model = model_fn() tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)

Observemos la firma de tipo del conjunto de datos. Recordemos que tomamos imágenes de 28 por 28 (con etiquetas de enteros) y las aplanamos.

str(tf_dataset_type)

También podemos extraer el tipo de pesos del modelo con nuestra función server_init, que figura más arriba.

model_weights_type = server_init.type_signature.result

Al examinar la firma de tipo, podrá ver la arquitectura del modelo.

str(model_weights_type)

Ahora podemos crear nuestro propio tff.tf_computation para la actualización del cliente.

@tff.tf_computation(tf_dataset_type, model_weights_type) def client_update_fn(tf_dataset, server_weights): model = model_fn() client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01) return client_update(model, tf_dataset, server_weights, client_optimizer)

La versión tff.tf_computation de la actualización del servidor se puede definir de un modo similar, con los tipos que ya hemos extraído.

@tff.tf_computation(model_weights_type) def server_update_fn(mean_client_weights): model = model_fn() return server_update(model, mean_client_weights)

Por último, pero no menos importante, deberemos crear el tff.federated_computation que une todo. Esta función aceptará dos valores federados, uno correspondiente a los pesos del servidor (con la ubicación tff.SERVER) y otro correspondiente a los conjuntos de datos del cliente (con la ubicación tff.CLIENTS).

Tenga en cuenta que ambos tipos ya han sido definidos más arriba. Simplemente debemos darles la ubicación adecuada con tff.FederatedType.

federated_server_type = tff.type_at_server(model_weights_type) federated_dataset_type = tff.type_at_clients(tf_dataset_type)

¿Recuerda los 4 elementos de un algoritmo de aprendizaje federado?

  1. Un paso para la emisión (broadcast) del servidor al cliente.

  2. Un paso para la actualización del cliente local.

  3. Un paso para la carga del cliente al servidor.

  4. Un paso para la actualización del servidor.

Ahora que hemos creado lo anterior, cada parte se puede representar de forma compacta como una sola línea de código TFF. Esta simplicidad es el motivo por el cual hemos debido prestar suma atención a la especificación de cosas como los tipos federados.

@tff.federated_computation(federated_server_type, federated_dataset_type) def next_fn(server_weights, federated_dataset): # Broadcast the server weights to the clients. server_weights_at_client = tff.federated_broadcast(server_weights) # Each client computes their updated weights. client_weights = tff.federated_map( client_update_fn, (federated_dataset, server_weights_at_client)) # The server averages these updates. mean_client_weights = tff.federated_mean(client_weights) # The server updates its model. server_weights = tff.federated_map(server_update_fn, mean_client_weights) return server_weights

Ahora tenemos un tff.federated_computation tanto para la inicialización del algoritmo como para la ejecución de un paso del algoritmo. Para terminarlo, pasaremos estos elementos a tff.templates.IterativeProcess.

federated_algorithm = tff.templates.IterativeProcess( initialize_fn=initialize_fn, next_fn=next_fn )

Observemos la *firma de tipo * de las funciones initialize y next de nuestro proceso iterativo.

str(federated_algorithm.initialize.type_signature)

Refleja el hecho de que federated_algorithm.initialize es una función no argumentativa que devuelve un modelo de una sola capa (con una matriz de peso de 784 por 10, y 10 unidades de sesgo).

str(federated_algorithm.next.type_signature)

Aquí, podemos ver que federated_algorithm.next acepta un modelo de servidor y datos del cliente, y devuelve un modelo de servidor actualizado.

Evaluación del algoritmo

Ejecutemos algunas rondas y veamos cómo cambia la pérdida. Primero, definiremos una función de evaluación con el modo centralizado referido en el segundo tutorial.

En primer lugar, creamos un conjunto de datos de evaluación centralizado y luego aplicamos el mismo preprocesamiento que usamos para los datos de entrenamiento.

Tenga en cuenta que solamente take (tomamos) los primeros 1000 elementos, para eficiencia en los cálculos. Pero que normalmente usaríamos el conjunto completo de los datos de prueba.

central_emnist_test = emnist_test.create_tf_dataset_from_all_clients().take(1000) central_emnist_test = preprocess(central_emnist_test)

A continuación, escribiremos una función que acepte un estado del servidor y usaremos Keras para evaluar el conjunto de datos de prueba. Si está familiarizado con tf.Keras, todo esto le resultará conocido; de todos modos, preste particular atención al uso de set_weights.

def evaluate(server_state): keras_model = create_keras_model() keras_model.compile( loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()] ) keras_model.set_weights(server_state) keras_model.evaluate(central_emnist_test)

Ahora, inicialicemos nuestro algoritmo y evaluemos el conjunto de prueba.

server_state = federated_algorithm.initialize() evaluate(server_state)

Entrenemos durante algunas rondas y veamos si cambia algo.

for round in range(15): server_state = federated_algorithm.next(server_state, federated_train_data)
evaluate(server_state)

Observamos una disminución leve en la función de pérdida. Si bien el salto es pequeño, solamente hemos realizado 15 rondas de entrenamiento y sobre un subconjunto reducido de clientes. Para ver mejores resultados, probablemente debamos hacer cientos o miles de rondas.

Modificación del algoritmo

En este punto, detengámonos a pensar sobre lo que hemos logrado. Hemos implementado el cálculo promedio federado directamente mediante la combinación de código de TensorFlow puro (para las actualizaciones del cliente y del servidor) con cálculos federados del núcleo federado de TFF.

Para realizar un aprendizaje más sofisticado, simplemente podemos alterar lo que hicimos arriba. En particular, editando el código de TF puro mencionado podemos cambiar la manera en que el cliente realiza el entrenamiento o cómo el servidor actualiza su modelo.

Desafío: agregar recorte (clipping) de gradiente a la función client_update.