Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/en-snapshot/io/tutorials/avro.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Avro Dataset API

Overview

The objective of Avro Dataset API is to load Avro formatted data natively into TensorFlow as TensorFlow dataset. Avro is a data serialization system similiar to Protocol Buffers. It's widely used in Apache Hadoop where it can provide both a serialization format for persistent data, and a wire format for communication between Hadoop nodes. Avro data is a row-oriented, compacted binary data format. It relies on schema which is stored as a separate JSON file. For the spec of Avro format and schema declaration, please refer to the official manual.

Setup package

Install the required tensorflow-io package

!pip install tensorflow-io

Import packages

import tensorflow as tf import tensorflow_io as tfio

Validate tf and tfio imports

print("tensorflow-io version: {}".format(tfio.__version__)) print("tensorflow version: {}".format(tf.__version__))

Usage

Explore the dataset

For the purpose of this tutorial, let's download the sample Avro dataset.

Download a sample Avro file:

!curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro !ls -l train.avro

Download the corresponding schema file of the sample Avro file:

!curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc !ls -l train.avsc

In the above example, a testing Avro dataset were created based on mnist dataset. The original mnist dataset in TFRecord format is generated from TF named dataset. However, the mnist dataset is too large as a demo dataset. For simplicity purpose, most of it were trimmed and first few records only were kept. Moreover, additional trimming was done for image field in original mnist dataset and mapped it to features field in Avro. So the avro file train.avro has 4 records, each of which has 3 fields: features, which is an array of int, label, an int or null, and dataType, an enum. To view the decoded train.avro (Note the original avro data file is not human readable as avro is a compacted format):

Install the required package to read Avro file:

!pip install avro

To read and print an Avro file in a human-readable format:

from avro.io import DatumReader from avro.datafile import DataFileReader import json def print_avro(avro_file, max_record_num=None): if max_record_num is not None and max_record_num <= 0: return with open(avro_file, 'rb') as avro_handler: reader = DataFileReader(avro_handler, DatumReader()) record_count = 0 for record in reader: record_count = record_count+1 print(record) if max_record_num is not None and record_count == max_record_num: break print_avro(avro_file='train.avro')

And the schema of train.avro which is represented by train.avsc is a JSON-formatted file. To view the train.avsc:

def print_schema(avro_schema_file): with open(avro_schema_file, 'r') as handle: parsed = json.load(handle) print(json.dumps(parsed, indent=4, sort_keys=True)) print_schema('train.avsc')

Prepare the dataset

Load train.avro as TensorFlow dataset with Avro dataset API:

features = { 'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32), 'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100), 'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string) } schema = tf.io.gfile.GFile('train.avsc').read() dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'], reader_schema=schema, features=features, shuffle=False, batch_size=3, num_epochs=1) for record in dataset: print(record['features[*]']) print(record['label']) print(record['dataType']) print("--------------------")

The above example converts train.avro into tensorflow dataset. Each element of the dataset is a dictionary whose key is the feature name, value is the converted sparse or dense tensor. E.g, it converts features, label, dataType field to a VarLenFeature(SparseTensor), FixedLenFeature(DenseTensor), and FixedLenFeature(DenseTensor) respectively. Since batch_size is 3, it coerce 3 records from train.avro into one element in the result dataset. For the first record in train.avro whose label is null, avro reader replaces it with the specified default value(-100). In this example, there're 4 records in total in train.avro. Since batch size is 3, the result dataset contains 3 elements, last of which's batch size is 1. However user is also able to drop the last batch if the size is smaller than batch size by enabling drop_final_batch. E.g:

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'], reader_schema=schema, features=features, shuffle=False, batch_size=3, drop_final_batch=True, num_epochs=1) for record in dataset: print(record)

One can also increase num_parallel_reads to expediate Avro data processing by increasing avro parse/read parallelism.

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'], reader_schema=schema, features=features, shuffle=False, num_parallel_reads=16, batch_size=3, drop_final_batch=True, num_epochs=1) for record in dataset: print(record)

For detailed usage of make_avro_record_dataset, please refer to API doc.

Train tf.keras models with Avro dataset

Now let's walk through an end-to-end example of tf.keras model training with Avro dataset based on mnist dataset.

Load train.avro as TensorFlow dataset with Avro dataset API:

features = { 'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32), 'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100), } schema = tf.io.gfile.GFile('train.avsc').read() dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'], reader_schema=schema, features=features, shuffle=False, batch_size=1, num_epochs=1)

Define a simple keras model:

def build_and_compile_cnn_model(): model = tf.keras.Sequential() model.compile(optimizer='sgd', loss='mse') return model model = build_and_compile_cnn_model()

Train the keras model with Avro dataset:

def extract_label(feature): label = feature.pop('label') return tf.sparse.to_dense(feature['features[*]']), label model.fit(x=dataset.map(extract_label), epochs=1, steps_per_epoch=1, verbose=1)

The avro dataset can parse and coerce any avro data into TensorFlow tensors, including records in records, maps, arrays, branches, and enumerations. The parsing information is passed into the avro dataset implementation as a map where keys encode how to parse the data values encode on how to coerce the data into TensorFlow tensors – deciding the primitive type (e.g. bool, int, long, float, double, string) as well as the tensor type (e.g. sparse or dense). A listing of TensorFlow's parser types (see Table 1) and the coercion of primitive types (Table 2) is provided.

Table 1 the supported TensorFlow parser types:

TensorFlow Parser TypesTensorFlow TensorsExplanation
tf.FixedLenFeature([], tf.int32)dense tensorParse a fixed length feature; that is all rows have the same constant number of elements, e.g. just one element or an array that has always the same number of elements for each row
tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50])sparse tensorParse a sparse feature where each row has a variable length list of indices and values. The 'index_key' identifies the indices. The 'value_key' identifies the value. The 'dtype' is the data type. The 'size' is the expected maximum index value for each index entry
tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64)sparse tensorParse a variable length feature; that means each data row can have a variable number of elements, e.g. the 1st row has 5 elements, the 2nd row has 7 elements

Table 2 the supported conversion from Avro types to TensorFlow's types:

Avro Primitive TypeTensorFlow Primitive Type
boolean: a binary valuetf.bool
bytes: a sequence of 8-bit unsigned bytestf.string
double: double precision 64-bit IEEE floating point numbertf.float64
enum: enumeration typetf.string using the symbol name
float: single precision 32-bit IEEE floating point numbertf.float32
int: 32-bit signed integertf.int32
long: 64-bit signed integertf.int64
null: no valueuses default value
string: unicode character sequencetf.string

A comprehensive set of examples of Avro dataset API is provided within the tests.