Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ja/io/tutorials/kafka.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Kafka と TensorFlow-IO によるストリーミングデータのロバストな機械学習

概要

このチュートリアルでは、Kafka クラスタからtf.data.Datasetにデータをストリーミングし、tf.kerasと組み合わせてトレーニングと推論に使用することに焦点を当てています。

Kafka は主に分散型イベントストリーミング プラットフォームであり、データパイプラインにまたがって拡張可能でフォールトトレラントなストリーミングデータを提供します。Kafka はミッションクリティカルなデータ配信を基本的要件とする多くの大企業にとって、欠かすことのできない技術コンポーネントです。

注意: 基本的な Kafka コンポーネントの知識があれば、簡単にこのチュートリアルを進めることができます。

注意: このチュートリアルを実行するには、Java ランタイム環境が必要です。

セットアップ

必要な TensorFlow-IO と Kafka のパッケージをインストールする

!pip install tensorflow-io !pip install kafka-python

パッケージをインポートする

import os from datetime import datetime import time import threading import json from kafka import KafkaProducer from kafka.errors import KafkaError from sklearn.model_selection import train_test_split import pandas as pd import tensorflow as tf import tensorflow_io as tfio

インポートした TensorFlow と TensorFlow-IO を確認する

print("tensorflow-io version: {}".format(tfio.__version__)) print("tensorflow version: {}".format(tf.__version__))

Kafka と Zookeeper のインスタンスをダウンロードしてセットアップする

デモ用として、以下のインスタンスをローカルに設定します。

  • Kafka (Brokers: 127.0.0.1:9092)

  • Zookeeper (Node: 127.0.0.1:2181)

!curl -sSOL https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz !tar -xzf kafka_2.13-3.1.0.tgz

インスタンスをスピンアップするために、(Apache Kafka で提供されている)デフォルトの構成を使用します。

!./kafka_2.13-3.1.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.1.0/config/zookeeper.properties !./kafka_2.13-3.1.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.1.0/config/server.properties !echo "Waiting for 10 secs until kafka and zookeeper services are up and running" !sleep 10

インスタンスをデーモンプロセスとして起動し、プロセスリスト内のkafkaをグレップします。3 つの Java プロセスは Zookeeper、Kafka、スキーマレジストリのインスタンスに対応します。

!ps -ef | grep kafka

以下の仕様で Kafka トピックを作成します。

  • susy-train: パーティション =1、レプリケーション係数 =1

  • susy-test: パーティション =2、レプリケーション係数 =1

!./kafka_2.13-3.1.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train !./kafka_2.13-3.1.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test

構成の詳細のトピックを記述します。

!./kafka_2.13-3.1.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train !./kafka_2.13-3.1.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test

レプリケーション係数 1 は、データが複製されないことを示しています。これは Kafka のセットアップに単一のブローカーが存在するためです。本番システムでは、ブートストラップサーバー数が数百台のノード範囲になる場合があります。そこにレプリケーションを使用したフォールトトレランスの重要性があります。

詳細についてはドキュメントをご覧ください。

データセット

Kafka はイベントストリーミング プラットフォームなので、様々なソースからのデータを書き込むことができます。例えば以下のようなものです。

  • Web トラフィックのログ

  • 天文観測データ

  • IoT センサーのデータ

  • 商品レビュー、その他多数

本チュートリアルの目的に従い、SUSY データセットをダウンロードして手動で Kafka にデータを投入してみましょう。この分類問題の目標は、超対称性粒子を生成するシグナルプロセスとそうでないバックグラウンドプロセスを区別することです。

!curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz

データセットを探索する

第 1 列はクラスラベル(1 はシグナル、0 はバックグラウンド)、第 2 列は 18 個の特徴(8 個の低レベル特徴と 10 個の高レベル特徴)です。最初の 8 個の特徴は、加速器内の粒子検出器で測定された運動学的特性です。その後の 10 個の特徴は、最初の 8 個の特徴の関数です。それらは 2 つのクラスを識別するために物理学者が導き出した、高レベル特徴です。

COLUMNS = [ # labels 'class', # low-level features 'lepton_1_pT', 'lepton_1_eta', 'lepton_1_phi', 'lepton_2_pT', 'lepton_2_eta', 'lepton_2_phi', 'missing_energy_magnitude', 'missing_energy_phi', # high-level derived features 'MET_rel', 'axial_MET', 'M_R', 'M_TR_2', 'R', 'MT2', 'S_R', 'M_Delta_R', 'dPhi_r_b', 'cos(theta_r1)' ]

データセット全体は 500 万行で構成されています。ただし、本チュートリアルではデータセットのごく 1 部(10万行分)のみを考慮することにして、データ移動に費やす時間を減らし、その分 API 機能を理解する時間が増やせるようにしましょう。

susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000) susy_df = next(susy_iterator) susy_df.head()
# Number of datapoints and columns len(susy_df), len(susy_df.columns)
# Number of datapoints belonging to each class (0: background noise, 1: signal) len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])

データセットを分割する

train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True) print("Number of training samples: ",len(train_df)) print("Number of testing sample: ",len(test_df)) x_train_df = train_df.drop(["class"], axis=1) y_train_df = train_df["class"] x_test_df = test_df.drop(["class"], axis=1) y_test_df = test_df["class"] # The labels are set as the kafka message keys so as to store data # in multiple-partitions. Thus, enabling efficient data retrieval # using the consumer groups. x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:])) y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:])) x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:])) y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
NUM_COLUMNS = len(x_train_df.columns) len(x_train), len(y_train), len(x_test), len(y_test)

Kafka にトレーニングデータとテストデータを格納する

Kafka にデータを格納して、トレーニングや推論の目的で、継続的なリモートデータ検索の環境をシミュレートします。

def error_callback(exc): raise Exception('Error while sendig data to kafka: {0}'.format(str(exc))) def write_to_kafka(topic_name, items): count=0 producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092']) for message, key in items: producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback) count+=1 producer.flush() print("Wrote {0} messages into topic: {1}".format(count, topic_name)) write_to_kafka("susy-train", zip(x_train, y_train)) write_to_kafka("susy-test", zip(x_test, y_test))

TensorFlow-IO トレーニングデータセットを定義する

Kafka から TensorFlow にデータをストリーミングするには、IODatasetクラスを利用します。このクラスはtf.data.Datasetを継承しているため、tf.data.Datasetの便利な機能をすべて備えています。

def decode_kafka_item(item): message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(item.key) return (message, key) BATCH_SIZE=64 SHUFFLE_BUFFER_SIZE=64 train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0) train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE) train_ds = train_ds.map(decode_kafka_item) train_ds = train_ds.batch(BATCH_SIZE)

モデルを構築してトレーニングする

# Set the parameters OPTIMIZER="adam" LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True) METRICS=['accuracy'] EPOCHS=10
# design/build the model model = tf.keras.Sequential([ tf.keras.layers.Input(shape=(NUM_COLUMNS,)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(256, activation='relu'), tf.keras.layers.Dropout(0.4), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.4), tf.keras.layers.Dense(1, activation='sigmoid') ]) print(model.summary())
# compile the model model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model model.fit(train_ds, epochs=EPOCHS)

注意: トレーニングステップとオンライントレーニングを混同しないでください。それらは全く異なるパラダイムです。

データセットのごく一部しか利用していないので、トレーニング段階での精度は 78% 以下に制限されます。ただし、Kafka にデータを追加保存するとモデルの性能が向上するので、自由にお試しください。また、ここでの目標は TensorFlow-IO Kafka データセットの機能の実演なので、小さくて複雑ではないニューラルネットワークを使用していますが、探索目的でモデルの複雑さを増したり、学習戦略を変更したり、ハイパーパラメータを調整したりすることも可能です。ベースラインのアプローチについては、こちらの記事を参照してください。

テストデータを推測する

streaming.KafkaGroupIODatasetクラスを利用して、推論を高速化かつ拡張可能にしましょう。

TensorFlow-IO テストデータセットを定義する

stream_timeout パラメーターは、新しいデータポイントがトピックにストリーミングされる特定の期間ブロックします。このため、データがトピックに断続的にストリーミングされる場合に新しいデータセットを作成する必要がありません。

test_ds = tfio.experimental.streaming.KafkaGroupIODataset( topics=["susy-test"], group_id="testcg", servers="127.0.0.1:9092", stream_timeout=10000, configuration=[ "session.timeout.ms=7000", "max.poll.interval.ms=8000", "auto.offset.reset=earliest" ], ) def decode_kafka_test_item(raw_message, raw_key): message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(raw_key) return (message, key) test_ds = test_ds.map(decode_kafka_test_item) test_ds = test_ds.batch(BATCH_SIZE)

このクラスはトレーニング目的で使用することができますが、注意すべき点があります。すべてのメッセージが Kafka から読み込まれ、streaming.KafkaGroupIODatasetを使用して最新のオフセットがコミットされると、コンシューマがメッセージの読み込みを最初から再開することはありません。したがって、トレーニングしながらデータが継続的に流れ込んでいる状態で、1 エポック分だけのトレーニングが可能です。データポイントをモデルが消費すると、それはもう不要になり破棄されるため、この種の機能をトレーニングの段階で使用できるケースは限定的です。

しかし、正確 1 回 (exactly-once) セマンティクスを使用して推論をロバストする場合には、この機能が役に立ちます。

テストデータの性能を評価する

res = model.evaluate(test_ds) print("test loss, test acc:", res)

推論は 'exactly-once' セマンティクスに基づくため、テストセットでの評価は 1 回しか実行できません。もう一度テストデータで推論を実行するには、新しいコンシューマーグループを使用する必要があります。

testcgコンシューマグループのオフセットラグを追跡する

!./kafka_2.13-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg

current-offsetがすべてのパーティションのlog-end-offsetと一致すると、コンシューマが Kafka トピックからすべてのメッセージの取得を完了したことを意味します。

オンライン学習

オンライン機械学習の分野は、従来の一般的な機械学習モデルのトレーニングとは少し異なります。オンライン機械学習の場合は、モデルは、新しいデータポイントが利用できるようになるとすぐに学習してパラメーターを更新し、徐々に学習/更新を続けます。また、このプロセスは、無限に続行することが期待されています。これは、データセットが一定しており、モデルがそのデータセットを n 回イテレートする従来の機械学習とは異なります。オンライン学習では、データは一度だけモデルに使用されるため、二度とトレーニングで使用されることはありません。

streaming.KafkaBatchIODataset を使用すると、モデルをこのようにしてトレーニングできるようになりました。この機能を実演するために、SUSY データセットを使ってみましょう。

オンライン学習用の tfio トレーニングデータセット

streaming.KafkaBatchIODataset は、API の streaming.KafkaGroupIODataset に似ています。また、データセットがタイムアウトになるまでに新しいメッセージをブロックする時間を構成するには、stream_timeout パラメーターを使用することが推奨されます。以下のインスタンスでは、データセットはstream_timeout10000 ミリ秒として構成されています。つまり、トピックからのすべてのメッセージが消費されると、データセットはさらに 10秒間待機してから、タイムアウトして kafka クラスターから切断されるということです。タイムアウト前に新しいメッセージがストリーミングされると、それらの新たに消費されるデータポイントに関してデータの消費とモデルのトレーニングを再開します。無限にブロックするには、-1 に設定します。

online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset( topics=["susy-train"], group_id="cgonline", servers="127.0.0.1:9092", stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1. configuration=[ "session.timeout.ms=7000", "max.poll.interval.ms=8000", "auto.offset.reset=earliest" ], )

online_train_ds が生成するアイテムは、それ自体が tf.data.Dataset です。したがって、すべての標準変換を通常どおり適用することができます。

def decode_kafka_online_item(raw_message, raw_key): message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(raw_key) return (message, key) for mini_ds in online_train_ds: mini_ds = mini_ds.shuffle(buffer_size=32) mini_ds = mini_ds.map(decode_kafka_online_item) mini_ds = mini_ds.batch(32) if len(mini_ds) > 0: model.fit(mini_ds, epochs=3)

徐々にトレーニングされたモデルは、(ユースケースに合わせて)定期的に保存することが可能で、オンラインまたはオフラインモードで、テストデータの推論に使用できます。

注意: streaming.KafkaBatchIODatasetstreaming.KafkaGroupIODataset は実験的段階にあるため、ユーザーのフィードバックに応じて改善される予定です。

参照:

  • Baldi, P.、P. Sadowski、D. Whiteson「ディープラーニングを用いた高エネルギー物理学におけるエキゾチック粒子の探索」Nature Communications 5 (July 2, 2014)

  • SUSY データセット: https://archive.ics.uci.edu/ml/datasets/SUSY#