Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/pt-br/io/tutorials/kafka.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Aprendizado de máquina robusto com transmissão de dados usando o Kafka e o Tensorflow IO

Visão geral

Este tutorial aborda a transmissão de dados de um cluster Kafka para um tf.data.Dataset que depois é usado em conjunto com tf.keras para treinamento e inferência.

Kafka é principalmente uma plataforma de transmissão de eventos distribuída que conta com transmissão de dados com escalabilidade e tolerância a falhas entre pipelines de dados. É um componente técnico essencial de diversas grandes empresas em que a entrega de dados críticos é um requisito importante.

OBSERVAÇÃO: ter uma compreensão básica dos componentes do Kafka ajudará a acompanhar o tutorial com facilidade.

OBSERVAÇÃO: é necessário ter um ambiente de runtime do Java para este tutorial.

Configuração

Instale os pacotes do TensorFlow IO e Kafka obrigatórios

!pip install tensorflow-io !pip install kafka-python

Importe os pacotes

import os from datetime import datetime import time import threading import json from kafka import KafkaProducer from kafka.errors import KafkaError from sklearn.model_selection import train_test_split import pandas as pd import tensorflow as tf import tensorflow_io as tfio

Valide as importações de tf e tfio

print("tensorflow-io version: {}".format(tfio.__version__)) print("tensorflow version: {}".format(tf.__version__))

Baixe e configure as instâncias do Kafka e do Zookeeper

Para fins de demonstração, as seguintes instâncias são configuradas localmente:

  • Kafka (Brokers: 127.0.0.1:9092)

  • Zookeeper (Nó: 127.0.0.1:2181)

!curl -sSOL https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz !tar -xzf kafka_2.13-3.1.0.tgz

Use as configurações padrão (fornecidas pelo Apache Kafka) para iniciar as instâncias.

!./kafka_2.13-3.1.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.1.0/config/zookeeper.properties !./kafka_2.13-3.1.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.1.0/config/server.properties !echo "Waiting for 10 secs until kafka and zookeeper services are up and running" !sleep 10

Após as instâncias serem inicializadas como processos daemon, use o comando grep kafka na lista de processos. Os dois processos Java correspondem às instâncias do Kafka e do Zookeeper.

!ps -ef | grep kafka

Crie os tópicos Kafka com as seguintes especificações:

  • susy-train: partitions=1, replication-factor=1

  • susy-test: partitions=2, replication-factor=1

!./kafka_2.13-3.1.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train !./kafka_2.13-3.1.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test

Descreva o tópico para ver os detalhes da configuração:

!./kafka_2.13-3.1.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train !./kafka_2.13-3.1.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test

O fator de replicação 1 indica que os dados não estão sendo replicados devido à presença de um único broker em nossa configuração do Kafka. Em sistemas de produção, o número dos servidores de inicialização pode chegar a centenas de nós. É aí que entra a tolerância a falhas usando replicação.

Confira mais detalhes na documentação.

Dataset SUSY

Como o Kafka é uma plataforma de transmissão de dados, permite que dados de várias fontes sejam gravados nele. Por exemplo:

  • Logs de tráfego web

  • Medidas astronômicas

  • Dados de sensores IoT

  • Avaliações de produtos e muito mais

Neste tutorial, vamos baixar o dataset SUSY e alimentar o Kafka com dados manualmente. O objetivo deste problema de classificação é distinguir entre um processo de sinal que produz partículas supersimétricas e um processo em segundo plano que não as produz.

!curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz

Explore o dataset

A primeira coluna é o rótulo de classe (1 para sinal, 0 para segundo plano), seguida por 18 características (8 de baixo nível seguidas por 10 de alto nível). As primeiras 8 características são propriedades cinemáticas mensuradas pelos detectores de partículas no acelerador. As últimas 10 características são funções das 8 primeiras e são características de alto nível derivadas pelos físicos para ajudar a diferenciar entre as duas classes.

COLUMNS = [ # labels 'class', # low-level features 'lepton_1_pT', 'lepton_1_eta', 'lepton_1_phi', 'lepton_2_pT', 'lepton_2_eta', 'lepton_2_phi', 'missing_energy_magnitude', 'missing_energy_phi', # high-level derived features 'MET_rel', 'axial_MET', 'M_R', 'M_TR_2', 'R', 'MT2', 'S_R', 'M_Delta_R', 'dPhi_r_b', 'cos(theta_r1)' ]

O dataset inteiro contém 5 milhões de linhas. Entretanto, para este tutorial, vamos considerar somente uma fração do dataset (100 mil linhas) para que seja gasto menos tempo movimentando os dados e mais tempo compreendendo a funcionalidade da API.

susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000) susy_df = next(susy_iterator) susy_df.head()
# Number of datapoints and columns len(susy_df), len(susy_df.columns)
# Number of datapoints belonging to each class (0: background noise, 1: signal) len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])

Divida o dataset

train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True) print("Number of training samples: ",len(train_df)) print("Number of testing sample: ",len(test_df)) x_train_df = train_df.drop(["class"], axis=1) y_train_df = train_df["class"] x_test_df = test_df.drop(["class"], axis=1) y_test_df = test_df["class"] # The labels are set as the kafka message keys so as to store data # in multiple-partitions. Thus, enabling efficient data retrieval # using the consumer groups. x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:])) y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:])) x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:])) y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
NUM_COLUMNS = len(x_train_df.columns) len(x_train), len(y_train), len(x_test), len(y_test)

Armazene os dados de treinamento e teste no Kafka

Ao armazenar os dados no Kafka, é simulado um ambiente para recuperação contínua de dados remotos para treinamento e inferência.

def error_callback(exc): raise Exception('Error while sendig data to kafka: {0}'.format(str(exc))) def write_to_kafka(topic_name, items): count=0 producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092']) for message, key in items: producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback) count+=1 producer.flush() print("Wrote {0} messages into topic: {1}".format(count, topic_name)) write_to_kafka("susy-train", zip(x_train, y_train)) write_to_kafka("susy-test", zip(x_test, y_test))

Defina o dataset de treinamento tfio

A classe IODataset é usada para transmitir dados do Kafka para o TensorFlow. A classe herda de tf.data.Dataset e, portanto, tem todas as funcionalidades úteis de tf.data.Dataset de forma integrada.

def decode_kafka_item(item): message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(item.key) return (message, key) BATCH_SIZE=64 SHUFFLE_BUFFER_SIZE=64 train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0) train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE) train_ds = train_ds.map(decode_kafka_item) train_ds = train_ds.batch(BATCH_SIZE)

Compile e treine o modelo

# Set the parameters OPTIMIZER="adam" LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True) METRICS=['accuracy'] EPOCHS=10
# design/build the model model = tf.keras.Sequential([ tf.keras.layers.Input(shape=(NUM_COLUMNS,)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(256, activation='relu'), tf.keras.layers.Dropout(0.4), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.4), tf.keras.layers.Dense(1, activation='sigmoid') ]) print(model.summary())
# compile the model model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model model.fit(train_ds, epochs=EPOCHS)

Observação: não confunda o passo de treinamento com o treinamento online. É um paradigma inteiramente diferente que será abordado em uma seção mais adiante.

Como somente uma fração do dataset está sendo utilizada, nossa exatidão está limitada a cerca de 78% durante a fase de treinamento. Porém, fique à vontade para adicionar dados adicionais ao Kafka para melhorar o desempenho do modelo. Além disso, como o objetivo era apenas demonstrar a funcionalidade dos datasets tfio no Kafka, uma rede neural menor e menos complicada foi usada. Porém, é possível aumentar a complexidade do modelo, modificar a estratégia de aprendizado, ajustar os hiperparâmetros, etc., para fins de experimentação. Confira uma estratégia de linha de base neste artigo.

Faça a inferência com os dados de teste

Para fazer a inferência com os dados de teste se atendo à semântica "exatamente uma vez" juntamente com tolerância a falhas, pode-se usar streaming.KafkaGroupIODataset.

Defina o dataset de teste tfio

O parâmetro stream_timeout impede que novos pontos de dados sejam transmitidos para o tópico pela duração fornecida, o que remove a necessidade de criar novos datasets se os dados forem transmitidos para o tópico de forma intermitente.

test_ds = tfio.experimental.streaming.KafkaGroupIODataset( topics=["susy-test"], group_id="testcg", servers="127.0.0.1:9092", stream_timeout=10000, configuration=[ "session.timeout.ms=7000", "max.poll.interval.ms=8000", "auto.offset.reset=earliest" ], ) def decode_kafka_test_item(raw_message, raw_key): message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(raw_key) return (message, key) test_ds = test_ds.map(decode_kafka_test_item) test_ds = test_ds.batch(BATCH_SIZE)

Embora essa classe possa ser usada para fins de treinamento, é preciso tratar algumas ressalvas. Quando todas as mensagens são lidas no Kafka e os últimos deslocamentos são realizados usando-se streaming.KafkaGroupIODataset, o consumidor não reinicia a leitura de mensagens do começo. Portanto, durante o treinamento, só é possível treinar uma única época com dados chegando continuamente. Esse tipo de funcionalidade tem casos de uso limitados durante a fase de treinamento, em que, após um ponto de dados ser consumido pelo modelo, ele não é mais necessário e pode ser descartado.

Entretanto, essa funcionalidade é excelente para inferência robusta com semântica "exatamente uma vez".

Avalie o desempenho com os dados de teste

res = model.evaluate(test_ds) print("test loss, test acc:", res)

Como a inferência é baseada na semântica "exatamente uma vez", a avaliação com o conjunto de teste pode ser executada somente uma vez. Para executar a inferência novamente com os dados de teste, um novo grupo consumidor deve ser usado.

Monitore o atraso de deslocamento do grupo consumidor testcg

!./kafka_2.13-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg

Quando current-offset coincide com log-end-offset para todas as partições, isso indica que os consumidores concluíram a busca de todas as mensagens do tópico Kafka.

Treinamento online

O paradigma de aprendizado de máquina online é um pouco diferente da forma tradicional/convencional de treinar modelos de aprendizado de máquina. No primeiro caso, o modelo continua aprendendo/atualizando incrementalmente seus parâmetros assim que novos pontos de dados ficam disponíveis, e espera-se que esse processo continue indefinidamente. Isso é diferente da segunda estratégia, em que o dataset é fixo e o modelo faz a iteração n vezes. No aprendizado online, após os dados serem consumidos pelo modelo, talvez eles não fiquem disponíveis novamente para treinamento.

Ao utilizar streaming.KafkaBatchIODataset, agora é possível treinar os modelos dessa forma. Vamos continuar usando o dataset SUSY para demonstrar essa funcionalidade.

Dataset de treinamento tfio para aprendizado online

streaming.KafkaBatchIODataset é similar streaming.KafkaGroupIODataset em sua API. Além disso, é recomendável utilizar o parâmetro stream_timeout (tempo limite da transmissão) para configurar a duração em que o dataset impedirá novas mensagens antes de atingir o tempo limite. No caso abaixo, o dataset está configurado com stream_timeout de 10000 milissegundos. Isso implica que, após todas as mensagens do tópico terem sido consumidas, o dataset aguardará 10 segundos antes de atingir o tempo limite e se desconectar do cluster Kafka. Se novas mensagens forem transmitidas para o tópico antes do tempo limite, o consumo de dados e o treinamento do modelo são retomados para esses pontos de dados recém-consumidos. Para impedir por tempo indeterminado, defina como -1.

online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset( topics=["susy-train"], group_id="cgonline", servers="127.0.0.1:9092", stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1. configuration=[ "session.timeout.ms=7000", "max.poll.interval.ms=8000", "auto.offset.reset=earliest" ], )

Todo item que online_train_ds gera é um tf.data.Dataset. Portanto, todas as transformações padrão podem ser aplicadas como sempre.

def decode_kafka_online_item(raw_message, raw_key): message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(raw_key) return (message, key) for mini_ds in online_train_ds: mini_ds = mini_ds.shuffle(buffer_size=32) mini_ds = mini_ds.map(decode_kafka_online_item) mini_ds = mini_ds.batch(32) if len(mini_ds) > 0: model.fit(mini_ds, epochs=3)

O modelo treinado de forma incremental pode ser salvo periodicamente (baseado nos casos de uso) e pode ser utilizado para fazer inferência com os dados de teste nos modos online e offline.

Observação: streaming.KafkaBatchIODataset e streaming.KafkaGroupIODataset ainda estão em fase experimental e podem ser melhorados com base no feedback dos usuários.

Referências:

  • Baldi, P., P. Sadowski e D. Whiteson. “Searching for Exotic Particles in High-energy Physics with Deep Learning.” (Busca de partículas exóticas em física de alta energia com aprendizado profundo). Nature Communications 5 (2 de julho de 2014)

  • Dataset SUSY: https://archive.ics.uci.edu/ml/datasets/SUSY#