Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/es-419/tfx/tutorials/transform/census.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Cómo preprocesar datos con TensorFlow Transform

El componente de ingeniería de características de TensorFlow Extended (TFX)

Este bloc de notas de Colab de ejemplo proporciona un ejemplo un poco más avanzado de cómo se puede usar TensorFlow Transform (tf.Transform) para preprocesar datos de manera que se use exactamente el mismo código tanto para entrenar un modelo como para realizar inferencias en producción.

TensorFlow Transform es una biblioteca que permite preprocesar datos de entrada para TensorFlow, lo que incluye la creación de características que requieren un paso completo sobre el conjunto de datos de entrenamiento. Por ejemplo, mediante el uso de TensorFlow Transform podría hacer lo siguiente:

  • Normalizar un valor de entrada a través del uso de la media y la desviación estándar

  • Convertir cadenas a números enteros mediante la generación de un vocabulario sobre todos los valores de entrada

  • Convertir valores flotantes en enteros al asignarlos a cubos, en función de la distribución de datos observada

TensorFlow tiene soporte integrado para manipulaciones en un solo ejemplo o un lote de ejemplos. tf.Transform amplía estas capacidades para permitir pasos completos sobre todo el conjunto de datos de entrenamiento.

La salida de tf.Transform se exporta como un grafo de TensorFlow que se puede usar tanto para entrenamiento como para servicio. Usar el mismo grafo tanto para el entrenamiento como para el servicio sirve para evitar sesgos, ya que se aplican las mismas transformaciones en ambas etapas.

Punto clave: Para comprender tf.Transform y cómo funciona con Apache Beam, tendrá que saber un poco sobre Apache Beam. La Guía de programación de Beam es un excelente punto de partida.

##Qué estamos haciendo en este ejemplo

En este ejemplo, procesaremos un conjunto de datos ampliamente utilizado que contiene datos del censo y entrenaremos un modelo para ejecutar la clasificación. En este proceso, transformaremos los datos con ayuda de tf.Transform.

Punto clave: Como modelador y desarrollador, piense en cómo se usan estos datos y en los posibles beneficios y daños que pueden causar las predicciones de un modelo. Un modelo como este podría reforzar los sesgos y las disparidades sociales. ¿Es una característica relevante para el problema que desea resolver o introducirá un sesgo? Para obtener más información, lea sobre la equidad del aprendizaje automático.

Nota: TensorFlow Model Analysis es una herramienta poderosa que nos ayuda a comprender qué tan bien predice un modelo para varios segmentos de sus datos, lo que implica entender cómo el modelo puede reforzar los sesgos y las disparidades sociales.

Instale TensorFlow Transform

!pip install tensorflow-transform
# This cell is only necessary because packages were installed while python was # running. It avoids the need to restart the runtime when running in Colab. import pkg_resources import importlib importlib.reload(pkg_resources)

Importaciones y globales

En primer lugar, importe lo que necesita.

import math import os import pprint import pandas as pd import matplotlib.pyplot as plt import tensorflow as tf print('TF: {}'.format(tf.__version__)) import apache_beam as beam print('Beam: {}'.format(beam.__version__)) import tensorflow_transform as tft import tensorflow_transform.beam as tft_beam print('Transform: {}'.format(tft.__version__)) from tfx_bsl.public import tfxio from tfx_bsl.coders.example_coder import RecordBatchToExamplesEncoder

Luego, descargue los archivos de datos:

!wget https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/census/adult.data !wget https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/census/adult.test train_path = './adult.data' test_path = './adult.test'

Nombre las columnas

Crearemos algunas listas útiles para hacer referencia a las columnas de nuestro conjunto de datos.

CATEGORICAL_FEATURE_KEYS = [ 'workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country', ] NUMERIC_FEATURE_KEYS = [ 'age', 'capital-gain', 'capital-loss', 'hours-per-week', 'education-num' ] ORDERED_CSV_COLUMNS = [ 'age', 'workclass', 'fnlwgt', 'education', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'label' ] LABEL_KEY = 'label'

Esta es una vista previa rápida de los datos:

pandas_train = pd.read_csv(train_path, header=None, names=ORDERED_CSV_COLUMNS) pandas_train.head(5)
one_row = dict(pandas_train.loc[0])
COLUMN_DEFAULTS = [ '' if isinstance(v, str) else 0.0 for v in dict(pandas_train.loc[1]).values()]

Los datos de prueba tienen 1 línea de encabezado que debe omitirse y un "." al final de cada línea.

pandas_test = pd.read_csv(test_path, header=1, names=ORDERED_CSV_COLUMNS) pandas_test.head(5)
testing = os.getenv("WEB_TEST_BROWSER", False) if testing: pandas_train = pandas_train.loc[:1] pandas_test = pandas_test.loc[:1]

###Definamos nuestras características y nuestro esquema Definamos un esquema basado en los tipos de columnas en nuestra entrada. Entre otras cosas, esto nos ayudará a importarlos correctamente.

RAW_DATA_FEATURE_SPEC = dict( [(name, tf.io.FixedLenFeature([], tf.string)) for name in CATEGORICAL_FEATURE_KEYS] + [(name, tf.io.FixedLenFeature([], tf.float32)) for name in NUMERIC_FEATURE_KEYS] + [(LABEL_KEY, tf.io.FixedLenFeature([], tf.string))] ) SCHEMA = tft.DatasetMetadata.from_feature_spec(RAW_DATA_FEATURE_SPEC).schema

[Opcional] Codifique y decodifique protocolos tf.train.Example

En algunas partes de este tutorial debemos convertir ejemplos del conjunto de datos hacia y desde tf.train.Example.

La función oculta encode_example a continuación convierte un diccionario de características del conjunto de datos en un tf.train.Example.

#@title def encode_example(input_features): input_features = dict(input_features) output_features = {} for key in CATEGORICAL_FEATURE_KEYS: value = input_features[key] feature = tf.train.Feature( bytes_list=tf.train.BytesList(value=[value.strip().encode()])) output_features[key] = feature for key in NUMERIC_FEATURE_KEYS: value = input_features[key] feature = tf.train.Feature( float_list=tf.train.FloatList(value=[value])) output_features[key] = feature label_value = input_features.get(LABEL_KEY, None) if label_value is not None: output_features[LABEL_KEY] = tf.train.Feature( bytes_list = tf.train.BytesList(value=[label_value.strip().encode()])) example = tf.train.Example( features = tf.train.Features(feature=output_features) ) return example

Ahora puede convertir ejemplos de conjuntos de datos en protocolos Example:

tf_example = encode_example(pandas_train.loc[0]) tf_example.features.feature['age']
serialized_example_batch = tf.constant([ encode_example(pandas_train.loc[i]).SerializeToString() for i in range(3) ]) serialized_example_batch

También puede convertir lotes de protocolos de ejemplo serializados nuevamente en un diccionario de tensores:

decoded_tensors = tf.io.parse_example( serialized_example_batch, features=RAW_DATA_FEATURE_SPEC )

En algunos casos, la etiqueta no se pasará, por lo que la función de codificación se escribe para que la etiqueta sea opcional:

features_dict = dict(pandas_train.loc[0]) features_dict.pop(LABEL_KEY) LABEL_KEY in features_dict

Al crear un protocolo Example, simplemente no contendrá la clave de etiqueta.

no_label_example = encode_example(features_dict) LABEL_KEY in no_label_example.features.feature.keys()

###Configuración de hiperparámetros y limpieza básica

Constantes e hiperparámetros que se usan para el entrenamiento.

NUM_OOV_BUCKETS = 1 EPOCH_SPLITS = 10 TRAIN_NUM_EPOCHS = 2*EPOCH_SPLITS NUM_TRAIN_INSTANCES = len(pandas_train) NUM_TEST_INSTANCES = len(pandas_test) BATCH_SIZE = 128 STEPS_PER_TRAIN_EPOCH = tf.math.ceil(NUM_TRAIN_INSTANCES/BATCH_SIZE/EPOCH_SPLITS) EVALUATION_STEPS = tf.math.ceil(NUM_TEST_INSTANCES/BATCH_SIZE) # Names of temp files TRANSFORMED_TRAIN_DATA_FILEBASE = 'train_transformed' TRANSFORMED_TEST_DATA_FILEBASE = 'test_transformed' EXPORTED_MODEL_DIR = 'exported_model_dir'
if testing: TRAIN_NUM_EPOCHS = 1

##Cómo preprocesar con tf.Transform

Cree una preprocessing_fn con tf.Transform La función de preprocesamiento es el concepto más importante de tf.Transform. Una función de preprocesamiento es la que realmente ejecuta la transformación del conjunto de datos. Acepta y devuelve un diccionario de tensores, donde tensor significa Tensor o SparseTensor. Hay dos grupos principales de llamadas API que normalmente constituyen el núcleo de una función de preprocesamiento:

  1. TensorFlow Ops: cualquier función que acepte y devuelva tensores, lo que generalmente significa operaciones de TensorFlow. Estas agregan operaciones de TensorFlow al grafo que transforma datos sin procesar en datos transformados, un vector de características a la vez. Estas se ejecutarán para cada ejemplo, tanto durante el entrenamiento como durante el servicio.

  2. Tensorflow Transform Analyzers/Mappers: cualquiera de los analizadores/asignadores que ofrece tf.Transform. Estos también aceptan y devuelven tensores y, por lo general, contienen una combinación de operaciones de Tensorflow y cálculos de Beam, pero a diferencia de las operaciones de TensorFlow, solo se ejecutan en la canalización de Beam durante el análisis, lo que requiere un paso completo por todo el conjunto de datos de entrenamiento. El cálculo de Beam se ejecuta solo una vez (antes del entrenamiento, durante el análisis) y, por lo general, hace un recorrido completo por todo el conjunto de datos de entrenamiento. Se encargan de crear tensores tf.constant, que se agregan a su grafo. Por ejemplo, tft.min calcula el mínimo de un tensor sobre el conjunto de datos de entrenamiento.

Atención: Cuando aplique su función de preprocesamiento para servir inferencias, las constantes creadas por los analizadores durante el entrenamiento no cambiarán. Si sus datos tienen componentes de tendencia o estacionalidad, planifique en consecuencia.

Esta es una preprocessing_fn para este conjunto de datos. Hace varias cosas:

  1. Usa tft.scale_to_0_1 para escalar las características numéricas al rango [0,1].

  2. Usa tft.compute_and_apply_vocabulary para calcular un vocabulario para cada una de las características categóricas y devuelve los ID enteros para cada entrada como tf.int64. Esto se aplica tanto a entradas categóricas de cadenas como a enteras.

  3. Aplica algunas transformaciones manuales a los datos mediante operaciones estándar de TensorFlow. Aquí estas operaciones se aplican a la etiqueta, pero también podrían transformar las características. Las operaciones de TensorFlow hacen varias cosas:

    • Crean una tabla de búsqueda para la etiqueta (tf.init_scope garantiza que la tabla solo se cree la primera vez que se llama a la función).

    • Normalizan el texto de la etiqueta.

    • Convierten la etiqueta en única.

def preprocessing_fn(inputs): """Preprocess input columns into transformed columns.""" # Since we are modifying some features and leaving others unchanged, we # start by setting `outputs` to a copy of `inputs. outputs = inputs.copy() # Scale numeric columns to have range [0, 1]. for key in NUMERIC_FEATURE_KEYS: outputs[key] = tft.scale_to_0_1(inputs[key]) # For all categorical columns except the label column, we generate a # vocabulary but do not modify the feature. This vocabulary is instead # used in the trainer, by means of a feature column, to convert the feature # from a string to an integer id. for key in CATEGORICAL_FEATURE_KEYS: outputs[key] = tft.compute_and_apply_vocabulary( tf.strings.strip(inputs[key]), num_oov_buckets=NUM_OOV_BUCKETS, vocab_filename=key) # For the label column we provide the mapping from string to index. table_keys = ['>50K', '<=50K'] with tf.init_scope(): initializer = tf.lookup.KeyValueTensorInitializer( keys=table_keys, values=tf.cast(tf.range(len(table_keys)), tf.int64), key_dtype=tf.string, value_dtype=tf.int64) table = tf.lookup.StaticHashTable(initializer, default_value=-1) # Remove trailing periods for test data when the data is read with tf.data. # label_str = tf.sparse.to_dense(inputs[LABEL_KEY]) label_str = inputs[LABEL_KEY] label_str = tf.strings.regex_replace(label_str, r'\.$', '') label_str = tf.strings.strip(label_str) data_labels = table.lookup(label_str) transformed_label = tf.one_hot( indices=data_labels, depth=len(table_keys), on_value=1.0, off_value=0.0) outputs[LABEL_KEY] = tf.reshape(transformed_label, [-1, len(table_keys)]) return outputs

Sintaxis

Está casi listo para armar todo y usar Apache Beam para ejecutarlo.

Apache Beam usa una sintaxis especial para definir e invocar transformaciones. Por ejemplo, en esta línea:

result = pass_this | 'name this step' >> to_this_call

El método to_this_call se invoca y se pasa el objeto llamado pass_this, y esta operación se conocerá como name this step en un seguimiento de pila . El resultado de la llamada a to_this_call se devuelve en result. A menudo verá etapas de una canalización encadenadas de esta manera:

result = apache_beam.Pipeline() | 'first step' >> do_this_first() | 'second step' >> do_this_last()

y como comenzó con una nueva canalización, puede continuar así:

next_result = result | 'doing more stuff' >> another_function()

Transforme los datos

Ahora estamos listos para comenzar a transformar nuestros datos en una canalización de Apache Beam.

  1. Lea los datos con ayuda del lector CSV tfxio.CsvTFXIO (para procesar líneas de texto en una canalización, es mejor usar tfxio.BeamRecordCsvTFXIO).

  2. Analice y transforme los datos con la preprocessing_fn definida anteriormente.

  3. Escriba el resultado como un TFRecord de protocolos Example, que usaremos para entrenar un modelo más adelante.

def transform_data(train_data_file, test_data_file, working_dir): """Transform the data and write out as a TFRecord of Example protos. Read in the data using the CSV reader, and transform it using a preprocessing pipeline that scales numeric data and converts categorical data from strings to int64 values indices, by creating a vocabulary for each category. Args: train_data_file: File containing training data test_data_file: File containing test data working_dir: Directory to write transformed data and metadata to """ # The "with" block will create a pipeline, and run that pipeline at the exit # of the block. with beam.Pipeline() as pipeline: with tft_beam.Context(temp_dir=tempfile.mkdtemp()): # Create a TFXIO to read the census data with the schema. To do this we # need to list all columns in order since the schema doesn't specify the # order of columns in the csv. # We first read CSV files and use BeamRecordCsvTFXIO whose .BeamSource() # accepts a PCollection[bytes] because we need to patch the records first # (see "FixCommasTrainData" below). Otherwise, tfxio.CsvTFXIO can be used # to both read the CSV files and parse them to TFT inputs: # csv_tfxio = tfxio.CsvTFXIO(...) # raw_data = (pipeline | 'ToRecordBatches' >> csv_tfxio.BeamSource()) train_csv_tfxio = tfxio.CsvTFXIO( file_pattern=train_data_file, telemetry_descriptors=[], column_names=ORDERED_CSV_COLUMNS, schema=SCHEMA) # Read in raw data and convert using CSV TFXIO. raw_data = ( pipeline | 'ReadTrainCsv' >> train_csv_tfxio.BeamSource()) # Combine data and schema into a dataset tuple. Note that we already used # the schema to read the CSV data, but we also need it to interpret # raw_data. cfg = train_csv_tfxio.TensorAdapterConfig() raw_dataset = (raw_data, cfg) # The TFXIO output format is chosen for improved performance. transformed_dataset, transform_fn = ( raw_dataset | tft_beam.AnalyzeAndTransformDataset( preprocessing_fn, output_record_batches=True)) # Transformed metadata is not necessary for encoding. transformed_data, _ = transformed_dataset # Extract transformed RecordBatches, encode and write them to the given # directory. coder = RecordBatchToExamplesEncoder() _ = ( transformed_data | 'EncodeTrainData' >> beam.FlatMapTuple(lambda batch, _: coder.encode(batch)) | 'WriteTrainData' >> beam.io.WriteToTFRecord( os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE))) # Now apply transform function to test data. In this case we remove the # trailing period at the end of each line, and also ignore the header line # that is present in the test data file. test_csv_tfxio = tfxio.CsvTFXIO( file_pattern=test_data_file, skip_header_lines=1, telemetry_descriptors=[], column_names=ORDERED_CSV_COLUMNS, schema=SCHEMA) raw_test_data = ( pipeline | 'ReadTestCsv' >> test_csv_tfxio.BeamSource()) raw_test_dataset = (raw_test_data, test_csv_tfxio.TensorAdapterConfig()) # The TFXIO output format is chosen for improved performance. transformed_test_dataset = ( (raw_test_dataset, transform_fn) | tft_beam.TransformDataset(output_record_batches=True)) # Transformed metadata is not necessary for encoding. transformed_test_data, _ = transformed_test_dataset # Extract transformed RecordBatches, encode and write them to the given # directory. _ = ( transformed_test_data | 'EncodeTestData' >> beam.FlatMapTuple(lambda batch, _: coder.encode(batch)) | 'WriteTestData' >> beam.io.WriteToTFRecord( os.path.join(working_dir, TRANSFORMED_TEST_DATA_FILEBASE))) # Will write a SavedModel and metadata to working_dir, which can then # be read by the tft.TFTransformOutput class. _ = ( transform_fn | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))

Ejecute la canalización:

import tempfile import pathlib output_dir = os.path.join(tempfile.mkdtemp(), 'keras') transform_data(train_path, test_path, output_dir)

Envuelva el directorio de salida como tft.TFTransformOutput:

tf_transform_output = tft.TFTransformOutput(output_dir)
tf_transform_output.transformed_feature_spec()

Si mira el directorio, verá que contiene tres cosas:

  1. Los archivos de datos train_transformed y test_transformed

  2. El directorio transform_fn (un tf.saved_model)

  3. Los transformed_metadata

En las siguientes secciones se muestra cómo usar estos artefactos para entrenar un modelo.

!ls -l {output_dir}

##Cómo usar nuestros datos preprocesados ​​para entrenar un modelo con tf.keras

Para mostrar cómo tf.Transform nos permite usar el mismo código tanto para el entrenamiento como para el servicio, y así evitar el sesgo, entrenaremos un modelo. Para entrenar nuestro modelo y prepararlo para la producción, tenemos que crear funciones de entrada. La principal diferencia entre nuestra función de entrada de entrenamiento y nuestra función de entrada de servicio es que los datos de entrenamiento contienen las etiquetas y los datos de producción no. Los argumentos y las devoluciones también son algo diferentes.

###Cree una función de entrada para entrenamiento

Al ejecutar la canalización en la sección anterior se crearon archivos TFRecord que contienen los datos transformados.

El siguiente código usa tf.data.experimental.make_batched_features_dataset y tft.TFTransformOutput.transformed_feature_spec para leer estos archivos de datos como tf.data.Dataset:

def _make_training_input_fn(tf_transform_output, train_file_pattern, batch_size): """An input function reading from transformed data, converting to model input. Args: tf_transform_output: Wrapper around output of tf.Transform. transformed_examples: Base filename of examples. batch_size: Batch size. Returns: The input data for training or eval, in the form of k. """ def input_fn(): return tf.data.experimental.make_batched_features_dataset( file_pattern=train_file_pattern, batch_size=batch_size, features=tf_transform_output.transformed_feature_spec(), reader=tf.data.TFRecordDataset, label_key=LABEL_KEY, shuffle=True) return input_fn
train_file_pattern = pathlib.Path(output_dir)/f'{TRANSFORMED_TRAIN_DATA_FILEBASE}*' input_fn = _make_training_input_fn( tf_transform_output=tf_transform_output, train_file_pattern = str(train_file_pattern), batch_size = 10 )

A continuación, se puede ver una muestra transformada de los datos. Observe cómo las columnas numéricas education-num y hourd-per-week se convierten en flotantes con un rango de [0,1], y las columnas de cadena se han convertido en ID:

for example, label in input_fn().take(1): break pd.DataFrame(example)
label

Entrene y evalúe el modelo

Compile el modelo

def build_keras_model(working_dir): inputs = build_keras_inputs(working_dir) encoded_inputs = encode_inputs(inputs) stacked_inputs = tf.concat(tf.nest.flatten(encoded_inputs), axis=1) output = tf.keras.layers.Dense(100, activation='relu')(stacked_inputs) output = tf.keras.layers.Dense(50, activation='relu')(output) output = tf.keras.layers.Dense(2)(output) model = tf.keras.Model(inputs=inputs, outputs=output) return model
def build_keras_inputs(working_dir): tf_transform_output = tft.TFTransformOutput(working_dir) feature_spec = tf_transform_output.transformed_feature_spec().copy() feature_spec.pop(LABEL_KEY) # Build the `keras.Input` objects. inputs = {} for key, spec in feature_spec.items(): if isinstance(spec, tf.io.VarLenFeature): inputs[key] = tf.keras.layers.Input( shape=[None], name=key, dtype=spec.dtype, sparse=True) elif isinstance(spec, tf.io.FixedLenFeature): inputs[key] = tf.keras.layers.Input( shape=spec.shape, name=key, dtype=spec.dtype) else: raise ValueError('Spec type is not supported: ', key, spec) return inputs
def encode_inputs(inputs): encoded_inputs = {} for key in inputs: feature = tf.expand_dims(inputs[key], -1) if key in CATEGORICAL_FEATURE_KEYS: num_buckets = tf_transform_output.num_buckets_for_transformed_feature(key) encoding_layer = ( tf.keras.layers.CategoryEncoding( num_tokens=num_buckets, output_mode='binary', sparse=False)) encoded_inputs[key] = encoding_layer(feature) else: encoded_inputs[key] = feature return encoded_inputs
model = build_keras_model(output_dir) tf.keras.utils.plot_model(model,rankdir='LR', show_shapes=True)

Compile los conjuntos de datos

def get_dataset(working_dir, filebase): tf_transform_output = tft.TFTransformOutput(working_dir) data_path_pattern = os.path.join( working_dir, filebase + '*') input_fn = _make_training_input_fn( tf_transform_output, data_path_pattern, batch_size=BATCH_SIZE) dataset = input_fn() return dataset

Entrene y evalúe el modelo:

def train_and_evaluate( model, working_dir): """Train the model on training data and evaluate on test data. Args: working_dir: The location of the Transform output. num_train_instances: Number of instances in train set num_test_instances: Number of instances in test set Returns: The results from the estimator's 'evaluate' method """ train_dataset = get_dataset(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE) validation_dataset = get_dataset(working_dir, TRANSFORMED_TEST_DATA_FILEBASE) model = build_keras_model(working_dir) history = train_model(model, train_dataset, validation_dataset) metric_values = model.evaluate(validation_dataset, steps=EVALUATION_STEPS, return_dict=True) return model, history, metric_values
def train_model(model, train_dataset, validation_dataset): model.compile(optimizer='adam', loss=tf.losses.CategoricalCrossentropy(from_logits=True), metrics=['accuracy']) history = model.fit(train_dataset, validation_data=validation_dataset, epochs=TRAIN_NUM_EPOCHS, steps_per_epoch=STEPS_PER_TRAIN_EPOCH, validation_steps=EVALUATION_STEPS) return history
model, history, metric_values = train_and_evaluate(model, output_dir)
plt.plot(history.history['loss'], label='Train') plt.plot(history.history['val_loss'], label='Eval') plt.ylim(0,max(plt.ylim())) plt.legend() plt.title('Loss');

Transforme nuevos datos

En la sección anterior, el proceso de entrenamiento usó las copias impresas de los datos transformados que fueron generados por tft_beam.AnalyzeAndTransformDataset en la función transform_dataset.

Para operar con datos nuevos, deberá cargar la versión final de preprocessing_fn que guardó tft_beam.WriteTransformFn.

El método TFTransformOutput.transform_features_layer carga el SavedModel de la preprocessing_fn desde el directorio de salida.

Esta es una función para cargar lotes nuevos sin procesar desde un archivo fuente:

def read_csv(file_name, batch_size): return tf.data.experimental.make_csv_dataset( file_pattern=file_name, batch_size=batch_size, column_names=ORDERED_CSV_COLUMNS, column_defaults=COLUMN_DEFAULTS, prefetch_buffer_size=0, ignore_errors=True)
for ex in read_csv(test_path, batch_size=5): break pd.DataFrame(ex)

Cargue tft.TransformFeaturesLayer para transformar estos datos con preprocessing_fn:

ex2 = ex.copy() ex2.pop('fnlwgt') tft_layer = tf_transform_output.transform_features_layer() t_ex = tft_layer(ex2) label = t_ex.pop(LABEL_KEY) pd.DataFrame(t_ex)

tft_layer es lo suficientemente inteligente como para ejecutar la transformación si solo se pasa un subconjunto de características. Por ejemplo, si solo pasa dos características, obtendrá solo las versiones transformadas de esas características:

ex2 = pd.DataFrame(ex)[['education', 'hours-per-week']] ex2
pd.DataFrame(tft_layer(dict(ex2)))

Esta es una versión más sólida que elimina características que no están en las especificaciones de características y devuelve un par (features, label) si la etiqueta está en las características proporcionadas:

class Transform(tf.Module): def __init__(self, working_dir): self.working_dir = working_dir self.tf_transform_output = tft.TFTransformOutput(working_dir) self.tft_layer = tf_transform_output.transform_features_layer() @tf.function def __call__(self, features): raw_features = {} for key, val in features.items(): # Skip unused keys if key not in RAW_DATA_FEATURE_SPEC: continue raw_features[key] = val # Apply the `preprocessing_fn`. transformed_features = tft_layer(raw_features) if LABEL_KEY in transformed_features: # Pop the label and return a (features, labels) pair. data_labels = transformed_features.pop(LABEL_KEY) return (transformed_features, data_labels) else: return transformed_features
transform = Transform(output_dir)
t_ex, t_label = transform(ex)
pd.DataFrame(t_ex)

Ahora puede usar Dataset.map para aplicar esa transformación, sobre la marcha, a nuevos datos:

model.evaluate( read_csv(test_path, batch_size=5).map(transform), steps=EVALUATION_STEPS, return_dict=True )

Exporte el modelo

Entonces tiene un modelo entrenado y un método para aplicar preprocessing_fn a nuevos datos. Ensámblelos en un nuevo modelo que acepte protocolos tf.train.Example serializados como entrada.

class ServingModel(tf.Module): def __init__(self, model, working_dir): self.model = model self.working_dir = working_dir self.transform = Transform(working_dir) @tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string)]) def __call__(self, serialized_tf_examples): # parse the tf.train.Example feature_spec = RAW_DATA_FEATURE_SPEC.copy() feature_spec.pop(LABEL_KEY) parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec) # Apply the `preprocessing_fn` transformed_features = self.transform(parsed_features) # Run the model outputs = self.model(transformed_features) # Format the output classes_names = tf.constant([['0', '1']]) classes = tf.tile(classes_names, [tf.shape(outputs)[0], 1]) return {'classes': classes, 'scores': outputs} def export(self, output_dir): # Increment the directory number. This is required in order to make this # model servable with model_server. save_model_dir = pathlib.Path(output_dir)/'model' number_dirs = [int(p.name) for p in save_model_dir.glob('*') if p.name.isdigit()] id = max([0] + number_dirs)+1 save_model_dir = save_model_dir/str(id) # Set the signature to make it visible for serving. concrete_serving_fn = self.__call__.get_concrete_function() signatures = {'serving_default': concrete_serving_fn} # Export the model. tf.saved_model.save( self, str(save_model_dir), signatures=signatures) return save_model_dir

Compile el modelo y pruébelo en el lote de ejemplos serializados:

serving_model = ServingModel(model, output_dir) serving_model(serialized_example_batch)

Exporte el modelo como SavedModel:

saved_model_dir = serving_model.export(output_dir) saved_model_dir

Vuelva a cargar el modelo y pruébelo en el mismo lote de ejemplos:

reloaded = tf.saved_model.load(str(saved_model_dir)) run_model = reloaded.signatures['serving_default']
run_model(serialized_example_batch)

##Qué hicimos En este ejemplo usamos tf.Transform para preprocesar un conjunto de datos del censo y entrenar un modelo con los datos limpios y transformados. También creamos una función de entrada que podríamos usar cuando implementemos nuestro modelo entrenado en un entorno de producción para ejecutar inferencias. Al usar el mismo código tanto para el entrenamiento como para la inferencia, evitamos cualquier problema con el sesgo de los datos. En el proceso, aprendimos cómo crear una transformación Apache Beam para ejecutar la transformación que necesitábamos para limpiar los datos. También vimos cómo usar estos datos transformados para entrenar un modelo con ayuda de tf.keras. ¡Esto es solo una pequeña parte de lo que TensorFlow Transform puede hacer! Le recomendamos que se interiorice en tf.Transform y descubra lo que puede hacer por usted.

[Opcional] Cómo usar nuestros datos preprocesados ​​para entrenar un modelo con tf.estimator

Advertencia: Los estimadores no se recomiendan para código nuevo. Los estimadores ejecutan el código de estilo v1.Session que es más difícil de escribir correctamente y que puede tener un comportamiento inesperado; particularmente, cuando se combina con código TF 2. Los estimadores están incluidos dentro de nuestras garantías de compatibilidad, pero no se les harán correcciones a menos que se trate de vulneraciones a la seguridad. Para más detalles, consulte la Guía de migración.

###Cree una función de entrada para entrenamiento

def _make_training_input_fn(tf_transform_output, transformed_examples, batch_size): """Creates an input function reading from transformed data. Args: tf_transform_output: Wrapper around output of tf.Transform. transformed_examples: Base filename of examples. batch_size: Batch size. Returns: The input function for training or eval. """ def input_fn(): """Input function for training and eval.""" dataset = tf.data.experimental.make_batched_features_dataset( file_pattern=transformed_examples, batch_size=batch_size, features=tf_transform_output.transformed_feature_spec(), reader=tf.data.TFRecordDataset, shuffle=True) transformed_features = tf.compat.v1.data.make_one_shot_iterator( dataset).get_next() # Extract features and label from the transformed tensors. transformed_labels = tf.where( tf.equal(transformed_features.pop(LABEL_KEY), 1)) return transformed_features, transformed_labels[:,1] return input_fn

###Cree una función de entrada para servicio

Creemos una función de entrada que podamos usar en producción y preparemos nuestro modelo entrenado para su servicio.

def _make_serving_input_fn(tf_transform_output): """Creates an input function reading from raw data. Args: tf_transform_output: Wrapper around output of tf.Transform. Returns: The serving input function. """ raw_feature_spec = RAW_DATA_FEATURE_SPEC.copy() # Remove label since it is not available during serving. raw_feature_spec.pop(LABEL_KEY) def serving_input_fn(): """Input function for serving.""" # Get raw features by generating the basic serving input_fn and calling it. # Here we generate an input_fn that expects a parsed Example proto to be fed # to the model at serving time. See also # tf.estimator.export.build_raw_serving_input_receiver_fn. raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn( raw_feature_spec, default_batch_size=None) serving_input_receiver = raw_input_fn() # Apply the transform function that was used to generate the materialized # data. raw_features = serving_input_receiver.features transformed_features = tf_transform_output.transform_raw_features( raw_features) return tf.estimator.export.ServingInputReceiver( transformed_features, serving_input_receiver.receiver_tensors) return serving_input_fn

###Envuelva los datos de entrada en FeatureColumns. Nuestro modelo esperará nuestros datos en TensorFlow FeatureColumns.

def get_feature_columns(tf_transform_output): """Returns the FeatureColumns for the model. Args: tf_transform_output: A `TFTransformOutput` object. Returns: A list of FeatureColumns. """ # Wrap scalars as real valued columns. real_valued_columns = [tf.feature_column.numeric_column(key, shape=()) for key in NUMERIC_FEATURE_KEYS] # Wrap categorical columns. one_hot_columns = [ tf.feature_column.indicator_column( tf.feature_column.categorical_column_with_identity( key=key, num_buckets=(NUM_OOV_BUCKETS + tf_transform_output.vocabulary_size_by_name( vocab_filename=key)))) for key in CATEGORICAL_FEATURE_KEYS] return real_valued_columns + one_hot_columns

###Entrene, evalúe y exporte el modelo

def train_and_evaluate(working_dir, num_train_instances=NUM_TRAIN_INSTANCES, num_test_instances=NUM_TEST_INSTANCES): """Train the model on training data and evaluate on test data. Args: working_dir: Directory to read transformed data and metadata from and to write exported model to. num_train_instances: Number of instances in train set num_test_instances: Number of instances in test set Returns: The results from the estimator's 'evaluate' method """ tf_transform_output = tft.TFTransformOutput(working_dir) run_config = tf.estimator.RunConfig() estimator = tf.estimator.LinearClassifier( feature_columns=get_feature_columns(tf_transform_output), config=run_config, loss_reduction=tf.losses.Reduction.SUM) # Fit the model using the default optimizer. train_input_fn = _make_training_input_fn( tf_transform_output, os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE + '*'), batch_size=BATCH_SIZE) estimator.train( input_fn=train_input_fn, max_steps=TRAIN_NUM_EPOCHS * num_train_instances / BATCH_SIZE) # Evaluate model on test dataset. eval_input_fn = _make_training_input_fn( tf_transform_output, os.path.join(working_dir, TRANSFORMED_TEST_DATA_FILEBASE + '*'), batch_size=1) # Export the model. serving_input_fn = _make_serving_input_fn(tf_transform_output) exported_model_dir = os.path.join(working_dir, EXPORTED_MODEL_DIR) estimator.export_saved_model(exported_model_dir, serving_input_fn) return estimator.evaluate(input_fn=eval_input_fn, steps=num_test_instances)

###Unimos todo Hemos creado todo lo que necesitamos para preprocesar los datos de nuestro censo, entrenar un modelo y prepararlo para su servicio. Hasta ahora solo hemos estado preparando las cosas. ¡Es hora de empezar a ejecutar!

Nota: Desplácese por el resultado de esta celda para ver todo el proceso. Los resultados estarán en la parte inferior.

import tempfile temp = temp = os.path.join(tempfile.mkdtemp(),'estimator') transform_data(train_path, test_path, temp) results = train_and_evaluate(temp)
pprint.pprint(results)