Path: blob/master/site/zh-cn/io/tutorials/kafka.ipynb
25118 views
Copyright 2020 The TensorFlow IO Authors.
使用 Kafka 和 Tensorflow-IO 在流式传输数据上实现稳健的机器学习
设置
安装要求的 tensorflow-io 和 kafka 软件包
导入软件包
验证 tf 和 tfio 导入
下载和设置 Kafka 与 Zookeeper 实例
为了进行演示,我们在本地设置以下实例:
Kafka(代理:127.0.0.1:9092)
Zookeeper(节点:127.0.0.1:2181)
使用默认配置(由 Apache Kafka 提供)可以加快实例设置速度。
在实例作为守护进程启动后,在进程列表中查找 kafka
字符串。两个 Java 进程分别与 Zookeeper 和 Kafka 实例对应。
按照以下规范创建 Kafka 主题:
susy-train:partitions=1,replication-factor=1
susy-test:partitions=2,replication-factor=1
在配置上详细描述主题
复制因子 1 表示不复制数据。这是因为我们的 Kafka 设置中只有一个代理。在生产系统中,启动服务器的数量可以在 100 个节点的范围内。这就是使用复制实现容错的原因。
如需了解详情,请参阅文档。
SUSY 数据集
作为事件流式传输平台,来自各种源的数据都可以写入 Kafka。例如:
网络流量日志
天文测量数据
IoT 传感器数据
产品评论等。
为了学习本教程,我们要下载 SUSY 数据集,并将数据手动馈送给 Kafka。此分类问题的目的是区分产生超对称粒子的信号处理与不产生超对称粒子的背景处理。
探索数据集
第一列是类标签(1 为信号,0 为背景),后跟 18 个特征(先是 8 个低级特征,然后是 10 个高级特征)。前 8 个特征是加速器中的粒子检测器测得的运动特性。后 10 个特征是前 8 个特征的函数。这些是物理学家用来区分这两个类的高级特征。
整个数据集有 500 万行。不过,为了便于学习本教程,我们只考虑其中一小部分(100,000 行),这样,我们可以少花一些时间来移动数据,而将更多时间用来理解 API 的功能。
拆分数据集
在 Kafka 中存储训练和测试数据
在 Kafka 中存储数据模拟了一种用于训练和推断目的的连续远程数据检索的环境。
定义 tfio 训练数据集
利用 IODataset
类将数据从 Kafka 流式传输到 TensorFlow。此类继承自 tf.data.Dataset
,因此,它原生具有 tf.data.Dataset
的所有有用功能。
构建并训练模型
注:请不要将训练步骤与在线训练混淆,后者是一个完全不同的范例,将在后面的部分进行介绍。
由于只利用了数据集中的一小部分数据,因此,在训练阶段,准确率只有约 78%。但是,您随时可以在 Kafka 中存储额外的数据,从而提高模型性能。另外,由于我们的目的只是为了演示 tfio Kafka 数据集的功能,因此,我们使用的是一个相对较小且不太复杂的神经网络。不过,为了达到研究目的,您可以提高此模型的复杂性,修改学习策略,调节超参数等。有关基线方式,请参阅此文章。
在测试数据上进行推断
为了在测试数据上进行推断时遵循“仅执行一次的”语义和容错,我们可以利用 streaming.KafkaGroupIODataset
类。
定义 tfio 测试数据集
对于要流式传输到主题的新数据点,stream_timeout
参数可以在指定时间期限内阻止数据传输。这样,如果数据以断续方式流式传输到主题中,则不需要创建新数据集。
虽然此类可用于训练目的,但还有几点需要注意。从 Kafka 读取所有消息并使用 streaming.KafkaGroupIODataset
提交最新的补偿后,使用者不会从头开始重新读取消息。因此,在训练时,只有在数据不断流入的情况下,才能训练单个周期。在训练阶段,这种功能的用例有限,当模型使用数据点后,将不再需要该数据点,并且可将其丢弃。
但是,当利用仅执行一次的语义实现稳健推断时,此功能的表现非常出色。
在测试数据上评估性能
由于推断基于“仅执行一次”语义,因此,测试集上的评估只能运行一次。为了在测试数据上再次运行推断,应使用一个新的使用者组。
跟踪 testcg
使用者组的补偿滞后
一旦所有分区的 current-offset
与 log-end-offset
匹配,则表示使用者已完成从 Kafka 主题提取所有消息。
在线学习
在线机器学习范例与训练机器学习模型的传统模式略有不同。对于前者,只要有新数据点,模型就会持续递增式学习/更新其参数,并且此过程预计永不停止。这与后者有所不同,传统模式的数据集是确定的,因此,模型只会在数据集上迭代 n
次。对于在线学习,数据一旦被模型使用,则不能再用于训练。
通过利用 streaming.KafkaBatchIODataset
,现在能够以这种方式训练模型。我们继续使用 SUSY 数据集来演示此功能。
用于在线学习的 tfio 训练数据集
在 API 中,streaming.KafkaBatchIODataset
与 streaming.KafkaGroupIODataset
相似。此外,建议使用 stream_timeout
参数来配置数据集在超时之前阻止新消息的持续时间。下面的实例将数据集的 stream_timeout
配置为 10000
毫秒。这意味着,使用来自主题的所有消息后,数据集将再等待 10 秒,然后会超时并断开与 kafka 集群的连接。如果新消息在超时之前流式传输到主题,则对于新使用的数据点,数据使用和模型训练会恢复。要无限期阻止,请将其设置为 -1
。
online_train_ds
生成的每一项都是一个自包含的 tf.data.Dataset
。因此,所有标准转换均可照常应用。
增量式训练的模型能够以周期性方式(基于用例)保存,并且可以用于在在线或离线模式下对测试数据进行推断。
注:streaming.KafkaBatchIODataset
和 streaming.KafkaGroupIODataset
仍处于试验阶段, 尚有根据用户反馈进行改进的空间。
参考文献:
Baldi, P., P. Sadowski, and D. Whiteson. “Searching for Exotic Particles in High-energy Physics with Deep Learning.” Nature Communications 5 (July 2, 2014)