Path: blob/master/site/ja/io/tutorials/kafka.ipynb
25118 views
Copyright 2020 The TensorFlow IO Authors.
Kafka と TensorFlow-IO によるストリーミングデータのロバストな機械学習
概要
このチュートリアルでは、Kafka クラスタからtf.data.Dataset
にデータをストリーミングし、tf.keras
と組み合わせてトレーニングと推論に使用することに焦点を当てています。
Kafka は主に分散型イベントストリーミング プラットフォームであり、データパイプラインにまたがって拡張可能でフォールトトレラントなストリーミングデータを提供します。Kafka はミッションクリティカルなデータ配信を基本的要件とする多くの大企業にとって、欠かすことのできない技術コンポーネントです。
注意: 基本的な Kafka コンポーネントの知識があれば、簡単にこのチュートリアルを進めることができます。
注意: このチュートリアルを実行するには、Java ランタイム環境が必要です。
セットアップ
必要な TensorFlow-IO と Kafka のパッケージをインストールする
パッケージをインポートする
インポートした TensorFlow と TensorFlow-IO を確認する
Kafka と Zookeeper のインスタンスをダウンロードしてセットアップする
デモ用として、以下のインスタンスをローカルに設定します。
Kafka (Brokers: 127.0.0.1:9092)
Zookeeper (Node: 127.0.0.1:2181)
インスタンスをスピンアップするために、(Apache Kafka で提供されている)デフォルトの構成を使用します。
インスタンスをデーモンプロセスとして起動し、プロセスリスト内のkafka
をグレップします。3 つの Java プロセスは Zookeeper、Kafka、スキーマレジストリのインスタンスに対応します。
以下の仕様で Kafka トピックを作成します。
susy-train: パーティション =1、レプリケーション係数 =1
susy-test: パーティション =2、レプリケーション係数 =1
構成の詳細のトピックを記述します。
レプリケーション係数 1 は、データが複製されないことを示しています。これは Kafka のセットアップに単一のブローカーが存在するためです。本番システムでは、ブートストラップサーバー数が数百台のノード範囲になる場合があります。そこにレプリケーションを使用したフォールトトレランスの重要性があります。
詳細についてはドキュメントをご覧ください。
データセット
Kafka はイベントストリーミング プラットフォームなので、様々なソースからのデータを書き込むことができます。例えば以下のようなものです。
Web トラフィックのログ
天文観測データ
IoT センサーのデータ
商品レビュー、その他多数
本チュートリアルの目的に従い、SUSY データセットをダウンロードして手動で Kafka にデータを投入してみましょう。この分類問題の目標は、超対称性粒子を生成するシグナルプロセスとそうでないバックグラウンドプロセスを区別することです。
データセットを探索する
第 1 列はクラスラベル(1 はシグナル、0 はバックグラウンド)、第 2 列は 18 個の特徴(8 個の低レベル特徴と 10 個の高レベル特徴)です。最初の 8 個の特徴は、加速器内の粒子検出器で測定された運動学的特性です。その後の 10 個の特徴は、最初の 8 個の特徴の関数です。それらは 2 つのクラスを識別するために物理学者が導き出した、高レベル特徴です。
データセット全体は 500 万行で構成されています。ただし、本チュートリアルではデータセットのごく 1 部(10万行分)のみを考慮することにして、データ移動に費やす時間を減らし、その分 API 機能を理解する時間が増やせるようにしましょう。
データセットを分割する
Kafka にトレーニングデータとテストデータを格納する
Kafka にデータを格納して、トレーニングや推論の目的で、継続的なリモートデータ検索の環境をシミュレートします。
TensorFlow-IO トレーニングデータセットを定義する
Kafka から TensorFlow にデータをストリーミングするには、IODataset
クラスを利用します。このクラスはtf.data.Dataset
を継承しているため、tf.data.Dataset
の便利な機能をすべて備えています。
モデルを構築してトレーニングする
注意: トレーニングステップとオンライントレーニングを混同しないでください。それらは全く異なるパラダイムです。
データセットのごく一部しか利用していないので、トレーニング段階での精度は 78% 以下に制限されます。ただし、Kafka にデータを追加保存するとモデルの性能が向上するので、自由にお試しください。また、ここでの目標は TensorFlow-IO Kafka データセットの機能の実演なので、小さくて複雑ではないニューラルネットワークを使用していますが、探索目的でモデルの複雑さを増したり、学習戦略を変更したり、ハイパーパラメータを調整したりすることも可能です。ベースラインのアプローチについては、こちらの記事を参照してください。
テストデータを推測する
streaming.KafkaGroupIODataset
クラスを利用して、推論を高速化かつ拡張可能にしましょう。
TensorFlow-IO テストデータセットを定義する
stream_timeout
パラメーターは、新しいデータポイントがトピックにストリーミングされる特定の期間ブロックします。このため、データがトピックに断続的にストリーミングされる場合に新しいデータセットを作成する必要がありません。
このクラスはトレーニング目的で使用することができますが、注意すべき点があります。すべてのメッセージが Kafka から読み込まれ、streaming.KafkaGroupIODataset
を使用して最新のオフセットがコミットされると、コンシューマがメッセージの読み込みを最初から再開することはありません。したがって、トレーニングしながらデータが継続的に流れ込んでいる状態で、1 エポック分だけのトレーニングが可能です。データポイントをモデルが消費すると、それはもう不要になり破棄されるため、この種の機能をトレーニングの段階で使用できるケースは限定的です。
しかし、正確 1 回 (exactly-once) セマンティクスを使用して推論をロバストする場合には、この機能が役に立ちます。
テストデータの性能を評価する
推論は 'exactly-once' セマンティクスに基づくため、テストセットでの評価は 1 回しか実行できません。もう一度テストデータで推論を実行するには、新しいコンシューマーグループを使用する必要があります。
testcg
コンシューマグループのオフセットラグを追跡する
current-offset
がすべてのパーティションのlog-end-offset
と一致すると、コンシューマが Kafka トピックからすべてのメッセージの取得を完了したことを意味します。
オンライン学習
オンライン機械学習の分野は、従来の一般的な機械学習モデルのトレーニングとは少し異なります。オンライン機械学習の場合は、モデルは、新しいデータポイントが利用できるようになるとすぐに学習してパラメーターを更新し、徐々に学習/更新を続けます。また、このプロセスは、無限に続行することが期待されています。これは、データセットが一定しており、モデルがそのデータセットを n
回イテレートする従来の機械学習とは異なります。オンライン学習では、データは一度だけモデルに使用されるため、二度とトレーニングで使用されることはありません。
streaming.KafkaBatchIODataset
を使用すると、モデルをこのようにしてトレーニングできるようになりました。この機能を実演するために、SUSY データセットを使ってみましょう。
オンライン学習用の tfio トレーニングデータセット
streaming.KafkaBatchIODataset
は、API の streaming.KafkaGroupIODataset
に似ています。また、データセットがタイムアウトになるまでに新しいメッセージをブロックする時間を構成するには、stream_timeout
パラメーターを使用することが推奨されます。以下のインスタンスでは、データセットはstream_timeout
を 10000
ミリ秒として構成されています。つまり、トピックからのすべてのメッセージが消費されると、データセットはさらに 10秒間待機してから、タイムアウトして kafka クラスターから切断されるということです。タイムアウト前に新しいメッセージがストリーミングされると、それらの新たに消費されるデータポイントに関してデータの消費とモデルのトレーニングを再開します。無限にブロックするには、-1
に設定します。
online_train_ds
が生成するアイテムは、それ自体が tf.data.Dataset
です。したがって、すべての標準変換を通常どおり適用することができます。
徐々にトレーニングされたモデルは、(ユースケースに合わせて)定期的に保存することが可能で、オンラインまたはオフラインモードで、テストデータの推論に使用できます。
注意: streaming.KafkaBatchIODataset
と streaming.KafkaGroupIODataset
は実験的段階にあるため、ユーザーのフィードバックに応じて改善される予定です。
参照:
Baldi, P.、P. Sadowski、D. Whiteson「ディープラーニングを用いた高エネルギー物理学におけるエキゾチック粒子の探索」Nature Communications 5 (July 2, 2014)
SUSY データセット: https://archive.ics.uci.edu/ml/datasets/SUSY#