Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/es-419/io/tutorials/kafka.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Aprendizaje automático robusto en transmisión de datos con Kafka y Tensorflow-IO

Descripción general

Este tutorial se centra en la transmisión de datos desde un clúster de Kafka a un tf.data.Dataset que luego se usa junto con tf.keras para entrenamiento e inferencia.

Kafka es principalmente una plataforma distribuida de transmisión de eventos que proporciona transmisión de datos escalable y tolerante a fallas a través de canales de datos. Es un componente técnico esencial de una gran cantidad de empresas importantes donde la entrega de datos de misión crítica es un requisito principal.

NOTA: Una comprensión básica de los componentes de Kafka le ayudará a seguir el tutorial con facilidad.

NOTA: Se requiere un entorno de ejecución de Java para ejecutar este tutorial.

Preparación

Instalar los paquetes tensorflow-io y kafka necesarios

!pip install tensorflow-io !pip install kafka-python

Importar paquetes

import os from datetime import datetime import time import threading import json from kafka import KafkaProducer from kafka.errors import KafkaError from sklearn.model_selection import train_test_split import pandas as pd import tensorflow as tf import tensorflow_io as tfio

Validar importaciones tf y tfio

print("tensorflow-io version: {}".format(tfio.__version__)) print("tensorflow version: {}".format(tf.__version__))

Descargar y configurar instancias de Kafka y Zookeeper

Para fines de demostración, las siguientes instancias se configuran localmente:

  • Kafka (Brokers: 127.0.0.1:9092)

  • Zookeeper (Node: 127.0.0.1:2181)

!curl -sSOL https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz !tar -xzf kafka_2.13-3.1.0.tgz

Se usan las configuraciones predeterminadas (proporcionadas por Apache Kafka) para activar las instancias.

!./kafka_2.13-3.1.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.1.0/config/zookeeper.properties !./kafka_2.13-3.1.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.1.0/config/server.properties !echo "Waiting for 10 secs until kafka and zookeeper services are up and running" !sleep 10

Una vez que las instancias se inician como procesos demonio, busque con grep kafka en la lista de procesos. Los dos procesos de Java corresponden a las instancias de zookeeper y kafka.

!ps -ef | grep kafka

Cree los temas de Kafka con las siguientes especificaciones:

  • susy de entrenamiento: particiones = 1, factor de replicación = 1

  • susy de prueba: particiones=2, factor de replicación=1

!./kafka_2.13-3.1.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train !./kafka_2.13-3.1.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test

Describa el tema para obtener detalles sobre la configuración.

!./kafka_2.13-3.1.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train !./kafka_2.13-3.1.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test

El factor de replicación 1 indica que los datos no se están replicando. Esto se debe a la presencia de un único corredor en nuestra configuración de Kafka. En los sistemas de producción, la cantidad de servidores de arranque puede estar en el rango de cientos de nodos. Ahí es donde entra en escena la tolerancia a fallos mediante replicación.

Consulte los documentos para obtener más detalles.

Conjunto de datos SUSY

Kafka, al ser una plataforma de transmisión de eventos, permite escribir en ella datos de diversas fuentes. Por ejemplo:

  • Registros de tráfico web

  • Medidas astronómicas

  • Datos de sensores de IoT

  • Reseñas de productos y mucho más.

Para los fines de este tutorial, descargaremos el conjunto de datos SUSY e introduciremos los datos en Kafka manualmente. El objetivo de este problema de clasificación es distinguir entre un proceso de señal que produce partículas supersimétricas y un proceso de fondo que no lo hace.

!curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz

Explorar el conjunto de datos

La primera columna es la etiqueta de clase (1 para señal, 0 para fondo), seguida de las 18 características (8 características de bajo nivel y luego 10 características de alto nivel). Las primeras 8 características son propiedades cinemáticas que miden los detectores de partículas en el acelerador. Las últimas 10 características son funciones de las primeras 8 características. Estas son características de alto nivel derivadas por los físicos para ayudar a discriminar entre las dos clases.

COLUMNS = [ # labels 'class', # low-level features 'lepton_1_pT', 'lepton_1_eta', 'lepton_1_phi', 'lepton_2_pT', 'lepton_2_eta', 'lepton_2_phi', 'missing_energy_magnitude', 'missing_energy_phi', # high-level derived features 'MET_rel', 'axial_MET', 'M_R', 'M_TR_2', 'R', 'MT2', 'S_R', 'M_Delta_R', 'dPhi_r_b', 'cos(theta_r1)' ]

El conjunto de datos completo consta de 5 millones de filas. Sin embargo, para los fines de este tutorial, consideraremos solo una fracción del conjunto de datos (100 000 filas) para dedicar menos tiempo a mover los datos y más tiempo a comprender la funcionalidad de la API.

susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000) susy_df = next(susy_iterator) susy_df.head()
# Number of datapoints and columns len(susy_df), len(susy_df.columns)
# Number of datapoints belonging to each class (0: background noise, 1: signal) len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])

Dividir el conjunto de datos

train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True) print("Number of training samples: ",len(train_df)) print("Number of testing sample: ",len(test_df)) x_train_df = train_df.drop(["class"], axis=1) y_train_df = train_df["class"] x_test_df = test_df.drop(["class"], axis=1) y_test_df = test_df["class"] # The labels are set as the kafka message keys so as to store data # in multiple-partitions. Thus, enabling efficient data retrieval # using the consumer groups. x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:])) y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:])) x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:])) y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
NUM_COLUMNS = len(x_train_df.columns) len(x_train), len(y_train), len(x_test), len(y_test)

Almacenar los datos de entrenamiento y de prueba en Kafka

El almacenamiento de datos en Kafka simula un entorno para la recuperación remota continua de datos con fines de entrenamiento e inferencia.

def error_callback(exc): raise Exception('Error while sendig data to kafka: {0}'.format(str(exc))) def write_to_kafka(topic_name, items): count=0 producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092']) for message, key in items: producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback) count+=1 producer.flush() print("Wrote {0} messages into topic: {1}".format(count, topic_name)) write_to_kafka("susy-train", zip(x_train, y_train)) write_to_kafka("susy-test", zip(x_test, y_test))

Definir el conjunto de datos de entrenamiento tfio

La clase IODataset se usa para transmitir datos desde Kafka a tensorflow. La clase hereda de tf.data.Dataset y, por lo tanto, tiene todas las funcionalidades útiles de tf.data.Dataset listas para usar.

def decode_kafka_item(item): message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(item.key) return (message, key) BATCH_SIZE=64 SHUFFLE_BUFFER_SIZE=64 train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0) train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE) train_ds = train_ds.map(decode_kafka_item) train_ds = train_ds.batch(BATCH_SIZE)

Construir y entrenar el modelo

# Set the parameters OPTIMIZER="adam" LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True) METRICS=['accuracy'] EPOCHS=10
# design/build the model model = tf.keras.Sequential([ tf.keras.layers.Input(shape=(NUM_COLUMNS,)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(256, activation='relu'), tf.keras.layers.Dropout(0.4), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.4), tf.keras.layers.Dense(1, activation='sigmoid') ]) print(model.summary())
# compile the model model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model model.fit(train_ds, epochs=EPOCHS)

Nota: No confunda el paso de entrenamiento con el entrenamiento en línea. Es un paradigma completamente diferente que se tratará en una sección más adelante.

Dado que solo se usa una fracción del conjunto de datos, nuestra precisión se limita a 78 % aproximadamente durante la fase de entrenamiento. Sin embargo, no dude en almacenar datos adicionales en Kafka para mejorar el rendimiento del modelo. Además, dado que el objetivo era simplemente demostrar la funcionalidad de los conjuntos de datos de tfio de kafka, se usó una red neuronal más pequeña y menos complicada. Sin embargo, se puede aumentar la complejidad del modelo, modificar la estrategia de aprendizaje, ajustar los hiperparámetros y más para seguir explorando. Para obtener un enfoque básico, consulte este artículo.

Inferir sobre los datos de prueba

Para inferir los datos de prueba adhiriéndose a la semántica de "exactamente una vez" junto con la tolerancia a fallas, se puede usar streaming.KafkaGroupIODataset.

Definir el conjunto de datos de prueba tfio

El parámetro stream_timeout se bloquea durante la duración especificada para que se transmitan nuevos puntos de datos al tema. Esto elimina la necesidad de crear nuevos conjuntos de datos si los datos se transmiten al tema de forma intermitente.

test_ds = tfio.experimental.streaming.KafkaGroupIODataset( topics=["susy-test"], group_id="testcg", servers="127.0.0.1:9092", stream_timeout=10000, configuration=[ "session.timeout.ms=7000", "max.poll.interval.ms=8000", "auto.offset.reset=earliest" ], ) def decode_kafka_test_item(raw_message, raw_key): message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(raw_key) return (message, key) test_ds = test_ds.map(decode_kafka_test_item) test_ds = test_ds.batch(BATCH_SIZE)

Aunque esta clase se puede usar con fines de entrenamiento, existen advertencias que deben abordarse. Una vez que se leen todos los mensajes de Kafka y se confirman las últimas compensaciones mediante streaming.KafkaGroupIODataset, el consumidor no reinicia la lectura de los mensajes desde el principio. Por lo tanto, durante el entrenamiento, solo es posible entrenar para una única época con los datos fluyendo continuamente. Este tipo de funcionalidad tiene casos de uso limitados durante la fase de entrenamiento en los que, una vez que el modelo ha consumido un punto de datos, ya no es requerido y puede ser descartado.

Sin embargo, esta funcionalidad brilla cuando se trata de inferencia robusta con semántica de exactamente una vez.

Evaluar el rendimiento de los datos de prueba

res = model.evaluate(test_ds) print("test loss, test acc:", res)

Dado que la inferencia se basa en la semántica de "exactamente una vez", la evaluación en el conjunto de prueba solo se puede ejecutar una vez. Para volver a ejecutar la inferencia sobre los datos de prueba, se debe usar un nuevo grupo de consumidores.

Seguimiento del retraso de compensación del grupo de consumidores testcg

!./kafka_2.13-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg

Una vez que el current-offset coincide con el log-end-offset para todas las particiones, quiere decir que los consumidores han completado la extracción de todos los mensajes del tema de Kafka.

Aprendizaje en línea

El paradigma del aprendizaje automático en línea es un poco diferente de la forma tradicional/convencional de entrenar modelos de aprendizaje automático. En el primer caso, el modelo continúa aprendiendo/actualizando incrementalmente sus parámetros tan pronto como los nuevos puntos de datos estén disponibles y se espera que este proceso continúe indefinidamente. Esto es diferente a los últimos enfoques, donde el conjunto de datos es fijo y el modelo lo itera n veces. En el aprendizaje en línea, es posible que los datos que una vez consumió el modelo no estén disponibles para volver a entrenarlo.

Con streaming.KafkaBatchIODataset, ahora es posible entrenar los modelos de esta manera. Seguiremos usando nuestro conjunto de datos SUSY para demostrar esta funcionalidad.

El conjunto de datos de entrenamiento tfio para el aprendizaje en línea

streaming.KafkaBatchIODataset es similar a streaming.KafkaGroupIODataset en su API. Además, se recomienda usar el parámetro stream_timeout para configurar la duración durante la cual el conjunto de datos se bloqueará para recibir nuevos mensajes antes de que expire el tiempo de espera. En el siguiente ejemplo, el conjunto de datos está configurado con un stream_timeout de 10000 milisegundos. Esto implica que, después de que se hayan consumido todos los mensajes del tema, el conjunto de datos esperará 10 segundos adicionales antes de agotar el tiempo de espera y desconectarse del clúster de Kafka. Si se transmiten nuevos mensajes al tema antes de que expire el tiempo de espera, el consumo de datos y el entrenamiento del modelo se reanudan para esos puntos de datos recién consumidos. Para bloquearlo indefinidamente, puede establecerlo en -1.

online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset( topics=["susy-train"], group_id="cgonline", servers="127.0.0.1:9092", stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1. configuration=[ "session.timeout.ms=7000", "max.poll.interval.ms=8000", "auto.offset.reset=earliest" ], )

Cada elemento que genera online_train_ds es un tf.data.Dataset en sí mismo. Por lo tanto, todas las transformaciones estándar se pueden aplicar como de costumbre.

def decode_kafka_online_item(raw_message, raw_key): message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(raw_key) return (message, key) for mini_ds in online_train_ds: mini_ds = mini_ds.shuffle(buffer_size=32) mini_ds = mini_ds.map(decode_kafka_online_item) mini_ds = mini_ds.batch(32) if len(mini_ds) > 0: model.fit(mini_ds, epochs=3)

El modelo entrenado incrementalmente se puede guardar de forma periódica (según casos de uso) y se puede usar para inferir los datos de prueba en modo en línea o fuera de línea.

Nota: streaming.KafkaBatchIODataset y streaming.KafkaGroupIODataset aún se encuentran en fase experimental y puede mejorarse según los comentarios de los usuarios.

Referencias:

  • Baldi, P., P. Sadowski, and D. Whiteson. “Searching for Exotic Particles in High-energy Physics with Deep Learning.” Nature Communications 5 (July 2, 2014)

  • Conjunto de datos SUSY: https://archive.ics.uci.edu/ml/datasets/SUSY#