Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/zh-cn/io/tutorials/kafka.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

使用 Kafka 和 Tensorflow-IO 在流式传输数据上实现稳健的机器学习

概述

本教程重点介绍将数据从 Kafka 集群流式传输到 tf.data.Dataset,随后与 tf.keras 结合进行训练和推断。

Kafka 在根本上是一个分布式事件流式传输平台,用于在数据流水线之间提供可扩展和可容错的流式传输数据。它是将任务关键型数据交付作为主要需求的众多大型企业的重要技术组件。

:对 kafka 组件的基本了解可以帮助您更轻松地学习本教程。

**注:**运行本教程需要 Java 运行时环境。

设置

安装要求的 tensorflow-io 和 kafka 软件包

!pip install tensorflow-io !pip install kafka-python

导入软件包

import os from datetime import datetime import time import threading import json from kafka import KafkaProducer from kafka.errors import KafkaError from sklearn.model_selection import train_test_split import pandas as pd import tensorflow as tf import tensorflow_io as tfio

验证 tf 和 tfio 导入

print("tensorflow-io version: {}".format(tfio.__version__)) print("tensorflow version: {}".format(tf.__version__))

下载和设置 Kafka 与 Zookeeper 实例

为了进行演示,我们在本地设置以下实例:

  • Kafka(代理:127.0.0.1:9092)

  • Zookeeper(节点:127.0.0.1:2181)

!curl -sSOL https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz !tar -xzf kafka_2.13-3.1.0.tgz

使用默认配置(由 Apache Kafka 提供)可以加快实例设置速度。

!./kafka_2.13-3.1.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.1.0/config/zookeeper.properties !./kafka_2.13-3.1.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.1.0/config/server.properties !echo "Waiting for 10 secs until kafka and zookeeper services are up and running" !sleep 10

在实例作为守护进程启动后,在进程列表中查找 kafka 字符串。两个 Java 进程分别与 Zookeeper 和 Kafka 实例对应。

!ps -ef | grep kafka

按照以下规范创建 Kafka 主题:

  • susy-train:partitions=1,replication-factor=1

  • susy-test:partitions=2,replication-factor=1

!./kafka_2.13-3.1.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train !./kafka_2.13-3.1.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test

在配置上详细描述主题

!./kafka_2.13-3.1.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train !./kafka_2.13-3.1.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test

复制因子 1 表示不复制数据。这是因为我们的 Kafka 设置中只有一个代理。在生产系统中,启动服务器的数量可以在 100 个节点的范围内。这就是使用复制实现容错的原因。

如需了解详情,请参阅文档

SUSY 数据集

作为事件流式传输平台,来自各种源的数据都可以写入 Kafka。例如:

  • 网络流量日志

  • 天文测量数据

  • IoT 传感器数据

  • 产品评论等。

为了学习本教程,我们要下载 SUSY 数据集,并将数据手动馈送给 Kafka。此分类问题的目的是区分产生超对称粒子的信号处理与不产生超对称粒子的背景处理。

!curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz

探索数据集

第一列是类标签(1 为信号,0 为背景),后跟 18 个特征(先是 8 个低级特征,然后是 10 个高级特征)。前 8 个特征是加速器中的粒子检测器测得的运动特性。后 10 个特征是前 8 个特征的函数。这些是物理学家用来区分这两个类的高级特征。

COLUMNS = [ # labels 'class', # low-level features 'lepton_1_pT', 'lepton_1_eta', 'lepton_1_phi', 'lepton_2_pT', 'lepton_2_eta', 'lepton_2_phi', 'missing_energy_magnitude', 'missing_energy_phi', # high-level derived features 'MET_rel', 'axial_MET', 'M_R', 'M_TR_2', 'R', 'MT2', 'S_R', 'M_Delta_R', 'dPhi_r_b', 'cos(theta_r1)' ]

整个数据集有 500 万行。不过,为了便于学习本教程,我们只考虑其中一小部分(100,000 行),这样,我们可以少花一些时间来移动数据,而将更多时间用来理解 API 的功能。

susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000) susy_df = next(susy_iterator) susy_df.head()
# Number of datapoints and columns len(susy_df), len(susy_df.columns)
# Number of datapoints belonging to each class (0: background noise, 1: signal) len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])

拆分数据集

train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True) print("Number of training samples: ",len(train_df)) print("Number of testing sample: ",len(test_df)) x_train_df = train_df.drop(["class"], axis=1) y_train_df = train_df["class"] x_test_df = test_df.drop(["class"], axis=1) y_test_df = test_df["class"] # The labels are set as the kafka message keys so as to store data # in multiple-partitions. Thus, enabling efficient data retrieval # using the consumer groups. x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:])) y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:])) x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:])) y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
NUM_COLUMNS = len(x_train_df.columns) len(x_train), len(y_train), len(x_test), len(y_test)

在 Kafka 中存储训练和测试数据

在 Kafka 中存储数据模拟了一种用于训练和推断目的的连续远程数据检索的环境。

def error_callback(exc): raise Exception('Error while sendig data to kafka: {0}'.format(str(exc))) def write_to_kafka(topic_name, items): count=0 producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092']) for message, key in items: producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback) count+=1 producer.flush() print("Wrote {0} messages into topic: {1}".format(count, topic_name)) write_to_kafka("susy-train", zip(x_train, y_train)) write_to_kafka("susy-test", zip(x_test, y_test))

定义 tfio 训练数据集

利用 IODataset 类将数据从 Kafka 流式传输到 TensorFlow。此类继承自 tf.data.Dataset,因此,它原生具有 tf.data.Dataset 的所有有用功能。

def decode_kafka_item(item): message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(item.key) return (message, key) BATCH_SIZE=64 SHUFFLE_BUFFER_SIZE=64 train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0) train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE) train_ds = train_ds.map(decode_kafka_item) train_ds = train_ds.batch(BATCH_SIZE)

构建并训练模型

# Set the parameters OPTIMIZER="adam" LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True) METRICS=['accuracy'] EPOCHS=10
# design/build the model model = tf.keras.Sequential([ tf.keras.layers.Input(shape=(NUM_COLUMNS,)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(256, activation='relu'), tf.keras.layers.Dropout(0.4), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.4), tf.keras.layers.Dense(1, activation='sigmoid') ]) print(model.summary())
# compile the model model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model model.fit(train_ds, epochs=EPOCHS)

:请不要将训练步骤与在线训练混淆,后者是一个完全不同的范例,将在后面的部分进行介绍。

由于只利用了数据集中的一小部分数据,因此,在训练阶段,准确率只有约 78%。但是,您随时可以在 Kafka 中存储额外的数据,从而提高模型性能。另外,由于我们的目的只是为了演示 tfio Kafka 数据集的功能,因此,我们使用的是一个相对较小且不太复杂的神经网络。不过,为了达到研究目的,您可以提高此模型的复杂性,修改学习策略,调节超参数等。有关基线方式,请参阅此文章

在测试数据上进行推断

为了在测试数据上进行推断时遵循“仅执行一次的”语义和容错,我们可以利用 streaming.KafkaGroupIODataset 类。

定义 tfio 测试数据集

对于要流式传输到主题的新数据点,stream_timeout 参数可以在指定时间期限内阻止数据传输。这样,如果数据以断续方式流式传输到主题中,则不需要创建新数据集。

test_ds = tfio.experimental.streaming.KafkaGroupIODataset( topics=["susy-test"], group_id="testcg", servers="127.0.0.1:9092", stream_timeout=10000, configuration=[ "session.timeout.ms=7000", "max.poll.interval.ms=8000", "auto.offset.reset=earliest" ], ) def decode_kafka_test_item(raw_message, raw_key): message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(raw_key) return (message, key) test_ds = test_ds.map(decode_kafka_test_item) test_ds = test_ds.batch(BATCH_SIZE)

虽然此类可用于训练目的,但还有几点需要注意。从 Kafka 读取所有消息并使用 streaming.KafkaGroupIODataset 提交最新的补偿后,使用者不会从头开始重新读取消息。因此,在训练时,只有在数据不断流入的情况下,才能训练单个周期。在训练阶段,这种功能的用例有限,当模型使用数据点后,将不再需要该数据点,并且可将其丢弃。

但是,当利用仅执行一次的语义实现稳健推断时,此功能的表现非常出色。

在测试数据上评估性能

res = model.evaluate(test_ds) print("test loss, test acc:", res)

由于推断基于“仅执行一次”语义,因此,测试集上的评估只能运行一次。为了在测试数据上再次运行推断,应使用一个新的使用者组。

跟踪 testcg 使用者组的补偿滞后

!./kafka_2.13-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg

一旦所有分区的 current-offsetlog-end-offset 匹配,则表示使用者已完成从 Kafka 主题提取所有消息。

在线学习

在线机器学习范例与训练机器学习模型的传统模式略有不同。对于前者,只要有新数据点,模型就会持续递增式学习/更新其参数,并且此过程预计永不停止。这与后者有所不同,传统模式的数据集是确定的,因此,模型只会在数据集上迭代 n 次。对于在线学习,数据一旦被模型使用,则不能再用于训练。

通过利用 streaming.KafkaBatchIODataset,现在能够以这种方式训练模型。我们继续使用 SUSY 数据集来演示此功能。

用于在线学习的 tfio 训练数据集

在 API 中,streaming.KafkaBatchIODatasetstreaming.KafkaGroupIODataset 相似。此外,建议使用 stream_timeout 参数来配置数据集在超时之前阻止新消息的持续时间。下面的实例将数据集的 stream_timeout 配置为 10000 毫秒。这意味着,使用来自主题的所有消息后,数据集将再等待 10 秒,然后会超时并断开与 kafka 集群的连接。如果新消息在超时之前流式传输到主题,则对于新使用的数据点,数据使用和模型训练会恢复。要无限期阻止,请将其设置为 -1

online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset( topics=["susy-train"], group_id="cgonline", servers="127.0.0.1:9092", stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1. configuration=[ "session.timeout.ms=7000", "max.poll.interval.ms=8000", "auto.offset.reset=earliest" ], )

online_train_ds 生成的每一项都是一个自包含的 tf.data.Dataset。因此,所有标准转换均可照常应用。

def decode_kafka_online_item(raw_message, raw_key): message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(raw_key) return (message, key) for mini_ds in online_train_ds: mini_ds = mini_ds.shuffle(buffer_size=32) mini_ds = mini_ds.map(decode_kafka_online_item) mini_ds = mini_ds.batch(32) if len(mini_ds) > 0: model.fit(mini_ds, epochs=3)

增量式训练的模型能够以周期性方式(基于用例)保存,并且可以用于在在线或离线模式下对测试数据进行推断。

注:streaming.KafkaBatchIODatasetstreaming.KafkaGroupIODataset 仍处于试验阶段, 尚有根据用户反馈进行改进的空间。

参考文献: