Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ja/io/tutorials/avro.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Avro Dataset API

概要

Avro Dataset API は、Avro フォーマットデータを TensorFlow Dataset として TensorFlow にネイティブに読み込むことを目的としています。Avro は Protocol Buffers に類似するデータシリアル化システムです。永続データのシリアル化フォーマットと Hadoop ノード間の通信用ワイヤフォーマットの両方を提供できる Apache Hadoop で広く使用されています。Avro データは行指向の圧縮されたバイナリデータフォーマットです。個別の JSON ファイルとして保存されるスキーマに依存しています。Avro フォーマットとスキーマ宣言の仕様については、公式マニュアルをご覧ください。

パッケージのセットアップ

必要な tensorflow-io パッケージをインストールする

!pip install tensorflow-io

パッケージをインポートする

import tensorflow as tf import tensorflow_io as tfio

インポートした TensorFlow と TensorFlow-IO を確認する

print("tensorflow-io version: {}".format(tfio.__version__)) print("tensorflow version: {}".format(tf.__version__))

使い方

データセットを探索する

このチュートリアルの目的により、Avro サンプルデータセットをダウンロードしましょう。

Avro サンプルファイルをダウンロードします。

!curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro !ls -l train.avro

Avro サンプルファイルに対応するスキーマファイルをダウンロードします。

!curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc !ls -l train.avsc

上記の例では、テスト用の Avro データセットは MNIST データセットに基づいて作成されています。TFRecord の元の MNIST データセットは TF 名前付きデータセットから生成されていますが、MNIST データセットはデモデータセットとしては大きすぎるため、単純化するために、ほとんどをトリミングし、最初のいくつかのレコードだけが保持されています。また、元の MNIST データセットの image フィールドに追加のトリミングを行い、Avro の features フィールドにマッピングしています。そのため、avro ファイル train.avro には 4 つのレコードがあり、それぞれに 3 つのフィールドがあります。features フィールドは int の配列、label フィールドは int または null、dataType フィールドは enum です。デコードされた train.avro を表示するには以下のようにします(Avro は圧縮フォーマットであるため、元の Avro データファイルは人間が読み取ることはできません)。

Avro ファイルを読み取るために必要なパッケージをインストールします。

!pip install avro

以下のようにして、Avro ファイルを読み取って、人間が読み取れるフォーマットで出力します。

from avro.io import DatumReader from avro.datafile import DataFileReader import json def print_avro(avro_file, max_record_num=None): if max_record_num is not None and max_record_num <= 0: return with open(avro_file, 'rb') as avro_handler: reader = DataFileReader(avro_handler, DatumReader()) record_count = 0 for record in reader: record_count = record_count+1 print(record) if max_record_num is not None and record_count == max_record_num: break print_avro(avro_file='train.avro')

まは、train.avsc で表現される train.avro のスキーマは JSON 形式のファイルです。train.avsc を表示するには、以下のようにします。

def print_schema(avro_schema_file): with open(avro_schema_file, 'r') as handle: parsed = json.load(handle) print(json.dumps(parsed, indent=4, sort_keys=True)) print_schema('train.avsc')

データセットを準備する

Avro Dataset API を使って、TensorFlow データセットとして train.avro を読み込みます。

features = { 'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32), 'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100), 'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string) } schema = tf.io.gfile.GFile('train.avsc').read() dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'], reader_schema=schema, features=features, shuffle=False, batch_size=3, num_epochs=1) for record in dataset: print(record['features[*]']) print(record['label']) print(record['dataType']) print("--------------------")

上記の例は、train.avro を TensorFlow Dataset に変換します。データセットの各要素は特徴量名をキーとするディクショナリで、値はスパースまたは密なテンソルに変換されます。たとえば featureslabel、および dataType フィールドはそれぞれ VarLenFeature(SparseTensor)、FixedLenFeature(DenseTensor)、および FixedLenFeature(DenseTensor) に変換されます。batch_size は 3 であるため、train.avro の 3 つのレコードが結果データセットの 1 つの要素に変換されます。ラベルが null の train.avro の最初のレコードについては、Avro リーダーによってしていされたデフォルト値(-100)に置換されます。この例では、train.avro に合計 4 件のレコードがあります。バッチサイズが 3 であるため、結果データセットには 3 つの要素が含まれ、その最後のバッチサイズは 1 となります。ただし、ユーザーは drop_final_batch を有効にすることで、サイズがバッチサイズよりも小さい場合は最後のバッチをドロップするように設定することもできます。以下に例を示します。

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'], reader_schema=schema, features=features, shuffle=False, batch_size=3, drop_final_batch=True, num_epochs=1) for record in dataset: print(record)

num_parallel_reads を増やして Avro の解析/読み取り並行性を高めることで、Avro データの処理を高速化することも可能です。

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'], reader_schema=schema, features=features, shuffle=False, num_parallel_reads=16, batch_size=3, drop_final_batch=True, num_epochs=1) for record in dataset: print(record)

make_avro_record_dataset の詳細な使用方法については、API ドキュメントをご覧ください。

Avro データセットで tf.keras モデルをトレーニングする

では、MNIST データセットに基づく Avro データセットを使用して、tf.keras モデルトレーニングのエンドツーエンドの例を見てみましょう。

Avro Dataset API を使って、TensorFlow データセットとして train.avro を読み込みます。

features = { 'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32), 'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100), } schema = tf.io.gfile.GFile('train.avsc').read() dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'], reader_schema=schema, features=features, shuffle=False, batch_size=1, num_epochs=1)

単純な Keras モデルを定義します。

def build_and_compile_cnn_model(): model = tf.keras.Sequential() model.compile(optimizer='sgd', loss='mse') return model model = build_and_compile_cnn_model()

Avro データセットで Keras モデルをトレーニングする

def extract_label(feature): label = feature.pop('label') return tf.sparse.to_dense(feature['features[*]']), label model.fit(x=dataset.map(extract_label), epochs=1, steps_per_epoch=1, verbose=1)

Avro データセットは、Avro データを解析し、TensorFlow テンソルに変換することができます。これにはレコード、マップ、ブランチ、列挙のレコードも含まれます。解析情報は、マップとして Avro データセット実装に渡されます。このマップのキーはデータの解析方法をエンコードし、値は、データをどのように TensorFlow テンソルに変換する方法をエンコードし、プリミティブ型(ブール、int、ロング、浮動小数点、ダブル、文字列など)とテンソル型(スパースまたは密など)を決定します。以下に、TensorFlow のパーサータイプ(表 1)とプリミティブ型の変換(表 2)を示します。

表 1. サポートされている TensorFlow パーサータイプ:

TensorFlow パーサータイプTensorFlow テンソル説明
tf.FixedLenFeature([], tf.int32)密なテンソル固定長の特徴量を解析します。つまり、すべての行の要素数は一定で、たとえば 1 つの要素または配列で、各行に同じ数の要素が必ず含まれます。
tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50])スパーステンソル各行にインデックスと値の可変長リストがある疎な特徴量を解析します。'index_key' はインデックスを、'value_key' は値を識別します。'dtype' はデータ型です。'size' は各インデックスエントリの期待される最大インデックス値です。
tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64)スパーステンソル可変長の特徴量を解析します。つまり、各データ行の要素数が異なる可能性があります。たとえば、最初の行には 5 つの要素があり、2 番目の行には 7 つの要素があります。

表 2. サポートされている、Avro 型から TensorFlow の型への変換 
:

Avro プリミティブ型TensorFlow プリミティブ型
boolean: バイナリ値tf.bool
bytes: 8 ビット符号なしバイトのシーケンスtf.string
double: 倍精度 64 ビット IEEE 浮動小数点数tf.float64
enum: 列挙型シンボル名を使った tf.string
float: 単精度 32 ビット IEEE 浮動小数点数tf.float32
int: 32 ビット符号付き整数tf.int32
long: 64 ビット符号付整数tf.int64
null: 値無しデフォルト値を使用
string: unicode 文字のシーケンスtf.string

Avro Dataset API の一連の包括的な例は、tests 内にあります。