Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/pt-br/tfx/tutorials/transform/census.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Pré-processamento de dados com o TensorFlow Transform

O componente de engenharia de características do TensorFlow Extended (TFX)

Este exemplo de notebook colab fornece um exemplo um pouco mais avançado de como o TensorFlow Transform (tf.Transform) pode ser usado para pré-processar dados usando exatamente o mesmo código para treinar um modelo e servir inferências em produção.

O TensorFlow Transform é uma biblioteca para pré-processamento de dados de entrada para TensorFlow, incluindo a criação de características que exigem um passo completo do dataset de treinamento. Por exemplo, usando o TensorFlow Transform você poderia:

  • Normalizar um valor de entrada usando a média e o desvio padrão

  • Converter strings em inteiros gerando um vocabulário sobre todos os valores de entrada

  • Converter números de ponto flutuante em números inteiros atribuindo-os a intervalos, com base na distribuição de dados observada

O TensorFlow tem suporte integrado para manipulações em um único exemplo ou num lote de exemplos. O tf.Transform estende esses recursos para oferecer suporte a passos completos em todo o dataset de treinamento.

A saída do tf.Transform é exportada como um grafo do TensorFlow que você pode usar para treinamento e serviço. Usar o mesmo grafo para treinamento e serviço pode evitar desvios, uma vez que as mesmas transformações são aplicadas em ambos os estágios.

Ponto chave: para entender o tf.Transform e como ele funciona com o Apache Beam, você precisará saber um pouco sobre o próprio Apache Beam. O Guia de programação do Beam é um ótimo lugar para começar.

##O que estamos fazendo neste exemplo

Neste exemplo, processaremos um dataset amplamente utilizado contendo dados de censo e treinaremos um modelo para fazer a classificação. Ao longo do caminho estaremos transformando os dados usando o tf.Transform.

Ponto-chave: como modelador e desenvolvedor, pense em como esses dados são usados ​​e nos possíveis benefícios e danos que as previsões de um modelo podem causar. Um modelo como este poderia reforçar um viés social e disparidades. Uma determinada característica é relevante para o problema que você quer resolver ou será que ela vai introduzir um viés? Para mais informações, leia sobre Equidade em ML.

Observação: o TensorFlow Model Analysis é uma ferramenta poderosa para entender o quão bem seu modelo faz previsões para vários segmentos de seus dados, incluindo a compreensão de como seu modelo pode reforçar preconceitos e disparidades sociais.

Instale o TensorFlow Transform

!pip install tensorflow-transform
# This cell is only necessary because packages were installed while python was # running. It avoids the need to restart the runtime when running in Colab. import pkg_resources import importlib importlib.reload(pkg_resources)

Importações e variáveis globais

Primeiro importe o que precisamos.

import math import os import pprint import pandas as pd import matplotlib.pyplot as plt import tensorflow as tf print('TF: {}'.format(tf.__version__)) import apache_beam as beam print('Beam: {}'.format(beam.__version__)) import tensorflow_transform as tft import tensorflow_transform.beam as tft_beam print('Transform: {}'.format(tft.__version__)) from tfx_bsl.public import tfxio from tfx_bsl.coders.example_coder import RecordBatchToExamplesEncoder

Em seguida, baixe os arquivos de dados:

!wget https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/census/adult.data !wget https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/census/adult.test train_path = './adult.data' test_path = './adult.test'

Dê nomes às colunas

Criaremos algumas listas úteis para referenciar as colunas em nosso dataset.

CATEGORICAL_FEATURE_KEYS = [ 'workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country', ] NUMERIC_FEATURE_KEYS = [ 'age', 'capital-gain', 'capital-loss', 'hours-per-week', 'education-num' ] ORDERED_CSV_COLUMNS = [ 'age', 'workclass', 'fnlwgt', 'education', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'label' ] LABEL_KEY = 'label'

Aqui está uma rápida visualização dos dados:

pandas_train = pd.read_csv(train_path, header=None, names=ORDERED_CSV_COLUMNS) pandas_train.head(5)
one_row = dict(pandas_train.loc[0])
COLUMN_DEFAULTS = [ '' if isinstance(v, str) else 0.0 for v in dict(pandas_train.loc[1]).values()]

Os dados de teste têm 1 linha de cabeçalho que precisa ser ignorada e um "." no final de cada linha.

pandas_test = pd.read_csv(test_path, header=1, names=ORDERED_CSV_COLUMNS) pandas_test.head(5)
testing = os.getenv("WEB_TEST_BROWSER", False) if testing: pandas_train = pandas_train.loc[:1] pandas_test = pandas_test.loc[:1]

###Defina nossas características e esquema Vamos definir um esquema com base nos tipos de colunas em nossa entrada. Entre outras coisas, isso ajudará a importá-los corretamente.

RAW_DATA_FEATURE_SPEC = dict( [(name, tf.io.FixedLenFeature([], tf.string)) for name in CATEGORICAL_FEATURE_KEYS] + [(name, tf.io.FixedLenFeature([], tf.float32)) for name in NUMERIC_FEATURE_KEYS] + [(LABEL_KEY, tf.io.FixedLenFeature([], tf.string))] ) SCHEMA = tft.DatasetMetadata.from_feature_spec(RAW_DATA_FEATURE_SPEC).schema

[Opcional] Codifique e decodifique protos do tf.train.Example

Este tutorial precisa converter exemplos do dataset de e para protos do tf.train.Example em alguns lugares.

A função oculta encode_example abaixo converte um dicionário de recursos do dataset num tf.train.Example.

#@title def encode_example(input_features): input_features = dict(input_features) output_features = {} for key in CATEGORICAL_FEATURE_KEYS: value = input_features[key] feature = tf.train.Feature( bytes_list=tf.train.BytesList(value=[value.strip().encode()])) output_features[key] = feature for key in NUMERIC_FEATURE_KEYS: value = input_features[key] feature = tf.train.Feature( float_list=tf.train.FloatList(value=[value])) output_features[key] = feature label_value = input_features.get(LABEL_KEY, None) if label_value is not None: output_features[LABEL_KEY] = tf.train.Feature( bytes_list = tf.train.BytesList(value=[label_value.strip().encode()])) example = tf.train.Example( features = tf.train.Features(feature=output_features) ) return example

Agora você pode converter exemplos de datasets em protos Example:

tf_example = encode_example(pandas_train.loc[0]) tf_example.features.feature['age']
serialized_example_batch = tf.constant([ encode_example(pandas_train.loc[i]).SerializeToString() for i in range(3) ]) serialized_example_batch

Você também pode converter lotes de protos Example serializados de volta para um dicionário de tensores:

decoded_tensors = tf.io.parse_example( serialized_example_batch, features=RAW_DATA_FEATURE_SPEC )

Em alguns casos o rótulo não será passado, então a função encode é escrita de forma que o rótulo seja opcional:

features_dict = dict(pandas_train.loc[0]) features_dict.pop(LABEL_KEY) LABEL_KEY in features_dict

Ao criar um proto Example, ele simplesmente não conterá a chave do rótulo.

no_label_example = encode_example(features_dict) LABEL_KEY in no_label_example.features.feature.keys()

###Configuração de hiperparâmetros e manutenção básica

Constantes e hiperparâmetros usados ​​para treinamento.

NUM_OOV_BUCKETS = 1 EPOCH_SPLITS = 10 TRAIN_NUM_EPOCHS = 2*EPOCH_SPLITS NUM_TRAIN_INSTANCES = len(pandas_train) NUM_TEST_INSTANCES = len(pandas_test) BATCH_SIZE = 128 STEPS_PER_TRAIN_EPOCH = tf.math.ceil(NUM_TRAIN_INSTANCES/BATCH_SIZE/EPOCH_SPLITS) EVALUATION_STEPS = tf.math.ceil(NUM_TEST_INSTANCES/BATCH_SIZE) # Names of temp files TRANSFORMED_TRAIN_DATA_FILEBASE = 'train_transformed' TRANSFORMED_TEST_DATA_FILEBASE = 'test_transformed' EXPORTED_MODEL_DIR = 'exported_model_dir'
if testing: TRAIN_NUM_EPOCHS = 1

##Pré-processamento com o tf.Transform

###Crie uma preprocessing_fn do tf.Transform A função de pré-processamento é o conceito mais importante do tf.Transform. Uma função de pré-processamento é onde a transformação do dataset realmente acontece. Ela recebe e retorna um dicionário de tensores, onde um tensor significa um Tensor ou SparseTensor. Existem dois grupos principais de chamadas de API que normalmente formam o coração de uma função de pré-processamento:

  1. Ops do TensorFlow: qualquer função que aceita e retorna tensores, o que geralmente correspondem às ops do TensorFlow. Elas adicionam operações do TensorFlow ao grafo que transforma dados brutos em dados transformados, um vetor de características por vez. Eles serão executados para cada exemplo, tanto durante o treinamento quanto durante o serviço.

  2. Analisadores/mapeadores do Tensorflow Transform: qualquer um dos analisadores/mapeadores fornecidos pelo tf.Transform. Eles também recebem e retornam tensores e normalmente contêm uma combinação de ops do Tensorflow e computações do Beam, mas, ao contrário das ops do TensorFlow, eles são executados apenas no pipeline do Beam durante a análise, exigindo uma passagem completa (full pass) por todo o dataset de treinamento. As computações do Beam rodam apenas uma vez (antes do treinamento, durante a análise) e normalmente fazem uma passagem completa por todo o dataset de treinamento. Criam tensores tf.constant, que são adicionados ao grafo. Por exemplo, tft.min calcula o mínimo de um tensor no dataset de treinamento.

Atenção: Quando você aplica sua função de pré-processamento para servir inferências, as constantes que foram criadas pelos analisadores durante o treinamento não mudam. Se seus dados tiverem componentes de tendência ou sazonalidade, planeje levando isso em consideração.

Aqui está um preprocessing_fn para este dataset. Ele faz várias coisas:

  1. Usando tft.scale_to_0_1, ele redimensiona as caracteristicas numéricas para o intervalo [0,1].

  2. Usando tft.compute_and_apply_vocabulary, ele calcula um vocabulário para cada uma das características categóricas e retorna os IDs inteiros para cada entrada como tf.int64. Isso se aplica a entradas categóricas de string e inteiro.

  3. Ele aplica algumas transformações manuais aos dados usando operações padrão do TensorFlow. Aqui, essas operações são aplicadas ao rótulo, mas também podem transformar as características. As operações do TensorFlow fazem várias coisas:

    • Eles criam uma tabela de pesquisa para o rótulo (o tf.init_scope garante que a tabela seja criada apenas na primeira vez que a função for chamada).

    • Eles normalizam o texto do rótulo.

    • Eles convertem o rótulo em um one-hot.

def preprocessing_fn(inputs): """Preprocess input columns into transformed columns.""" # Since we are modifying some features and leaving others unchanged, we # start by setting `outputs` to a copy of `inputs. outputs = inputs.copy() # Scale numeric columns to have range [0, 1]. for key in NUMERIC_FEATURE_KEYS: outputs[key] = tft.scale_to_0_1(inputs[key]) # For all categorical columns except the label column, we generate a # vocabulary but do not modify the feature. This vocabulary is instead # used in the trainer, by means of a feature column, to convert the feature # from a string to an integer id. for key in CATEGORICAL_FEATURE_KEYS: outputs[key] = tft.compute_and_apply_vocabulary( tf.strings.strip(inputs[key]), num_oov_buckets=NUM_OOV_BUCKETS, vocab_filename=key) # For the label column we provide the mapping from string to index. table_keys = ['>50K', '<=50K'] with tf.init_scope(): initializer = tf.lookup.KeyValueTensorInitializer( keys=table_keys, values=tf.cast(tf.range(len(table_keys)), tf.int64), key_dtype=tf.string, value_dtype=tf.int64) table = tf.lookup.StaticHashTable(initializer, default_value=-1) # Remove trailing periods for test data when the data is read with tf.data. # label_str = tf.sparse.to_dense(inputs[LABEL_KEY]) label_str = inputs[LABEL_KEY] label_str = tf.strings.regex_replace(label_str, r'\.$', '') label_str = tf.strings.strip(label_str) data_labels = table.lookup(label_str) transformed_label = tf.one_hot( indices=data_labels, depth=len(table_keys), on_value=1.0, off_value=0.0) outputs[LABEL_KEY] = tf.reshape(transformed_label, [-1, len(table_keys)]) return outputs

Sintaxe

Você está quase pronto para juntar tudo e usar o Apache Beam para executá-lo.

O Apache Beam usa uma sintaxe especial para definir e invocar transformações. Por exemplo, nesta linha:

result = pass_this | 'name this step' >> to_this_call

O método to_this_call está sendo invocado e passado ao objeto chamado pass_this, e esta operação será chamada de name this step num rastreamento de pilha. O resultado da chamada para to_this_call é retornado em result. Freqüentemente, você verá estágios de um pipeline encadeados assim:

result = apache_beam.Pipeline() | 'first step' >> do_this_first() | 'second step' >> do_this_last()

e como isso começou com um novo pipeline, você pode continuar assim:

next_result = result | 'doing more stuff' >> another_function()

Transforme os dados

Agora estamos prontos para começar a transformar nossos dados num pipeline Apache Beam.

  1. Leia os dados usando o leitor CSV tfxio.CsvTFXIO (para processar linhas de texto em um pipeline, use tfxio.BeamRecordCsvTFXIO).

  2. Analise e transforme os dados usando preprocessing_fn definida acima.

  3. Escreva o resultado como um TFRecord de protos Example, que usaremos para treinar um modelo posteriormente

def transform_data(train_data_file, test_data_file, working_dir): """Transform the data and write out as a TFRecord of Example protos. Read in the data using the CSV reader, and transform it using a preprocessing pipeline that scales numeric data and converts categorical data from strings to int64 values indices, by creating a vocabulary for each category. Args: train_data_file: File containing training data test_data_file: File containing test data working_dir: Directory to write transformed data and metadata to """ # The "with" block will create a pipeline, and run that pipeline at the exit # of the block. with beam.Pipeline() as pipeline: with tft_beam.Context(temp_dir=tempfile.mkdtemp()): # Create a TFXIO to read the census data with the schema. To do this we # need to list all columns in order since the schema doesn't specify the # order of columns in the csv. # We first read CSV files and use BeamRecordCsvTFXIO whose .BeamSource() # accepts a PCollection[bytes] because we need to patch the records first # (see "FixCommasTrainData" below). Otherwise, tfxio.CsvTFXIO can be used # to both read the CSV files and parse them to TFT inputs: # csv_tfxio = tfxio.CsvTFXIO(...) # raw_data = (pipeline | 'ToRecordBatches' >> csv_tfxio.BeamSource()) train_csv_tfxio = tfxio.CsvTFXIO( file_pattern=train_data_file, telemetry_descriptors=[], column_names=ORDERED_CSV_COLUMNS, schema=SCHEMA) # Read in raw data and convert using CSV TFXIO. raw_data = ( pipeline | 'ReadTrainCsv' >> train_csv_tfxio.BeamSource()) # Combine data and schema into a dataset tuple. Note that we already used # the schema to read the CSV data, but we also need it to interpret # raw_data. cfg = train_csv_tfxio.TensorAdapterConfig() raw_dataset = (raw_data, cfg) # The TFXIO output format is chosen for improved performance. transformed_dataset, transform_fn = ( raw_dataset | tft_beam.AnalyzeAndTransformDataset( preprocessing_fn, output_record_batches=True)) # Transformed metadata is not necessary for encoding. transformed_data, _ = transformed_dataset # Extract transformed RecordBatches, encode and write them to the given # directory. coder = RecordBatchToExamplesEncoder() _ = ( transformed_data | 'EncodeTrainData' >> beam.FlatMapTuple(lambda batch, _: coder.encode(batch)) | 'WriteTrainData' >> beam.io.WriteToTFRecord( os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE))) # Now apply transform function to test data. In this case we remove the # trailing period at the end of each line, and also ignore the header line # that is present in the test data file. test_csv_tfxio = tfxio.CsvTFXIO( file_pattern=test_data_file, skip_header_lines=1, telemetry_descriptors=[], column_names=ORDERED_CSV_COLUMNS, schema=SCHEMA) raw_test_data = ( pipeline | 'ReadTestCsv' >> test_csv_tfxio.BeamSource()) raw_test_dataset = (raw_test_data, test_csv_tfxio.TensorAdapterConfig()) # The TFXIO output format is chosen for improved performance. transformed_test_dataset = ( (raw_test_dataset, transform_fn) | tft_beam.TransformDataset(output_record_batches=True)) # Transformed metadata is not necessary for encoding. transformed_test_data, _ = transformed_test_dataset # Extract transformed RecordBatches, encode and write them to the given # directory. _ = ( transformed_test_data | 'EncodeTestData' >> beam.FlatMapTuple(lambda batch, _: coder.encode(batch)) | 'WriteTestData' >> beam.io.WriteToTFRecord( os.path.join(working_dir, TRANSFORMED_TEST_DATA_FILEBASE))) # Will write a SavedModel and metadata to working_dir, which can then # be read by the tft.TFTransformOutput class. _ = ( transform_fn | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))

Execute o pipeline:

import tempfile import pathlib output_dir = os.path.join(tempfile.mkdtemp(), 'keras') transform_data(train_path, test_path, output_dir)

Encapsule o diretório de saída em um tft.TFTransformOutput:

tf_transform_output = tft.TFTransformOutput(output_dir)
tf_transform_output.transformed_feature_spec()

Se você olhar no diretório, verá que ele contém três coisas:

  1. Os arquivos de dados train_transformed e test_transformed

  2. O diretório transform_fn (um tf.saved_model)

  3. O transformed_metadata

As seções a seguir mostram como usar esses artefatos para treinar um modelo.

!ls -l {output_dir}

##Usando nossos dados pré-processados ​​para treinar um modelo usando tf.keras

Para mostrar como tf.Transform nos permite usar o mesmo código para treinamento e serviço e, assim, evitar desvios, vamos treinar um modelo. Para treinar nosso modelo e prepará-lo para produção, precisamos criar funções de entrada. A principal diferença entre nossa função de entrada de treinamento e nossa função de entrada de serviço é que os dados de treinamento contêm os rótulos, e os dados de produção não. Os argumentos e retornos também são um pouco diferentes.

###Crie uma função de entrada para treinamento

A execução do pipeline na seção anterior criou arquivos TFRecord contendo os dados transformados.

O código a seguir usa tf.data.experimental.make_batched_features_dataset e tft.TFTransformOutput.transformed_feature_spec para ler esses arquivos de dados como tf.data.Dataset:

def _make_training_input_fn(tf_transform_output, train_file_pattern, batch_size): """An input function reading from transformed data, converting to model input. Args: tf_transform_output: Wrapper around output of tf.Transform. transformed_examples: Base filename of examples. batch_size: Batch size. Returns: The input data for training or eval, in the form of k. """ def input_fn(): return tf.data.experimental.make_batched_features_dataset( file_pattern=train_file_pattern, batch_size=batch_size, features=tf_transform_output.transformed_feature_spec(), reader=tf.data.TFRecordDataset, label_key=LABEL_KEY, shuffle=True) return input_fn
train_file_pattern = pathlib.Path(output_dir)/f'{TRANSFORMED_TRAIN_DATA_FILEBASE}*' input_fn = _make_training_input_fn( tf_transform_output=tf_transform_output, train_file_pattern = str(train_file_pattern), batch_size = 10 )

Abaixo você pode ver uma amostra transformada dos dados. Observe como as colunas numéricas, como education-num e hourd-per-week são convertidas em números flutuantes com um intervalo de [0,1], e as colunas de string foram convertidas em IDs:

for example, label in input_fn().take(1): break pd.DataFrame(example)
label

Treine e avalie o modelo

Crie o modelo

def build_keras_model(working_dir): inputs = build_keras_inputs(working_dir) encoded_inputs = encode_inputs(inputs) stacked_inputs = tf.concat(tf.nest.flatten(encoded_inputs), axis=1) output = tf.keras.layers.Dense(100, activation='relu')(stacked_inputs) output = tf.keras.layers.Dense(50, activation='relu')(output) output = tf.keras.layers.Dense(2)(output) model = tf.keras.Model(inputs=inputs, outputs=output) return model
def build_keras_inputs(working_dir): tf_transform_output = tft.TFTransformOutput(working_dir) feature_spec = tf_transform_output.transformed_feature_spec().copy() feature_spec.pop(LABEL_KEY) # Build the `keras.Input` objects. inputs = {} for key, spec in feature_spec.items(): if isinstance(spec, tf.io.VarLenFeature): inputs[key] = tf.keras.layers.Input( shape=[None], name=key, dtype=spec.dtype, sparse=True) elif isinstance(spec, tf.io.FixedLenFeature): inputs[key] = tf.keras.layers.Input( shape=spec.shape, name=key, dtype=spec.dtype) else: raise ValueError('Spec type is not supported: ', key, spec) return inputs
def encode_inputs(inputs): encoded_inputs = {} for key in inputs: feature = tf.expand_dims(inputs[key], -1) if key in CATEGORICAL_FEATURE_KEYS: num_buckets = tf_transform_output.num_buckets_for_transformed_feature(key) encoding_layer = ( tf.keras.layers.CategoryEncoding( num_tokens=num_buckets, output_mode='binary', sparse=False)) encoded_inputs[key] = encoding_layer(feature) else: encoded_inputs[key] = feature return encoded_inputs
model = build_keras_model(output_dir) tf.keras.utils.plot_model(model,rankdir='LR', show_shapes=True)

Construa os datasets

def get_dataset(working_dir, filebase): tf_transform_output = tft.TFTransformOutput(working_dir) data_path_pattern = os.path.join( working_dir, filebase + '*') input_fn = _make_training_input_fn( tf_transform_output, data_path_pattern, batch_size=BATCH_SIZE) dataset = input_fn() return dataset

Treine e avalie o modelo:

def train_and_evaluate( model, working_dir): """Train the model on training data and evaluate on test data. Args: working_dir: The location of the Transform output. num_train_instances: Number of instances in train set num_test_instances: Number of instances in test set Returns: The results from the estimator's 'evaluate' method """ train_dataset = get_dataset(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE) validation_dataset = get_dataset(working_dir, TRANSFORMED_TEST_DATA_FILEBASE) model = build_keras_model(working_dir) history = train_model(model, train_dataset, validation_dataset) metric_values = model.evaluate(validation_dataset, steps=EVALUATION_STEPS, return_dict=True) return model, history, metric_values
def train_model(model, train_dataset, validation_dataset): model.compile(optimizer='adam', loss=tf.losses.CategoricalCrossentropy(from_logits=True), metrics=['accuracy']) history = model.fit(train_dataset, validation_data=validation_dataset, epochs=TRAIN_NUM_EPOCHS, steps_per_epoch=STEPS_PER_TRAIN_EPOCH, validation_steps=EVALUATION_STEPS) return history
model, history, metric_values = train_and_evaluate(model, output_dir)
plt.plot(history.history['loss'], label='Train') plt.plot(history.history['val_loss'], label='Eval') plt.ylim(0,max(plt.ylim())) plt.legend() plt.title('Loss');

Transforme os novos dados

Na seção anterior, o processo de treinamento usou cópias impressas dos dados transformados que foram gerados por tft_beam.AnalyzeAndTransformDataset na função transform_dataset.

Para operar com novos dados, você precisará carregar a versão final do preprocessing_fn que foi salvo por tft_beam.WriteTransformFn.

O método TFTransformOutput.transform_features_layer carrega o SavedModel do preprocessing_fn do diretório de saída.

Aqui está uma função para carregar lotes novos e não processados ​​de um arquivo-fonte:

def read_csv(file_name, batch_size): return tf.data.experimental.make_csv_dataset( file_pattern=file_name, batch_size=batch_size, column_names=ORDERED_CSV_COLUMNS, column_defaults=COLUMN_DEFAULTS, prefetch_buffer_size=0, ignore_errors=True)
for ex in read_csv(test_path, batch_size=5): break pd.DataFrame(ex)

Carregue o tft.TransformFeaturesLayer para transformar esses dados com a preprocessing_fn:

ex2 = ex.copy() ex2.pop('fnlwgt') tft_layer = tf_transform_output.transform_features_layer() t_ex = tft_layer(ex2) label = t_ex.pop(LABEL_KEY) pd.DataFrame(t_ex)

A tft_layer é inteligente o suficiente para executar a transformação se apenas um subconjunto de características for passado. Por exemplo, se você passar apenas duas características, receberá de volta apenas as versões transformadas dessas características:

ex2 = pd.DataFrame(ex)[['education', 'hours-per-week']] ex2
pd.DataFrame(tft_layer(dict(ex2)))

Aqui está uma versão mais robusta que elimina características que não estão nas especificações de características e retorna um par (features, label) se o rótulo estiver nas características fornecidas:

class Transform(tf.Module): def __init__(self, working_dir): self.working_dir = working_dir self.tf_transform_output = tft.TFTransformOutput(working_dir) self.tft_layer = tf_transform_output.transform_features_layer() @tf.function def __call__(self, features): raw_features = {} for key, val in features.items(): # Skip unused keys if key not in RAW_DATA_FEATURE_SPEC: continue raw_features[key] = val # Apply the `preprocessing_fn`. transformed_features = tft_layer(raw_features) if LABEL_KEY in transformed_features: # Pop the label and return a (features, labels) pair. data_labels = transformed_features.pop(LABEL_KEY) return (transformed_features, data_labels) else: return transformed_features
transform = Transform(output_dir)
t_ex, t_label = transform(ex)
pd.DataFrame(t_ex)

Agora você pode usar o Dataset.map para aplicar essa transformação dinamicamente a novos dados:

model.evaluate( read_csv(test_path, batch_size=5).map(transform), steps=EVALUATION_STEPS, return_dict=True )

Exporte o modelo

Portanto, você tem um modelo treinado e um método para aplicar preprocessing_fn a novos dados. Monte-os em um novo modelo que aceite protos tf.train.Example serializados como entrada.

class ServingModel(tf.Module): def __init__(self, model, working_dir): self.model = model self.working_dir = working_dir self.transform = Transform(working_dir) @tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string)]) def __call__(self, serialized_tf_examples): # parse the tf.train.Example feature_spec = RAW_DATA_FEATURE_SPEC.copy() feature_spec.pop(LABEL_KEY) parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec) # Apply the `preprocessing_fn` transformed_features = self.transform(parsed_features) # Run the model outputs = self.model(transformed_features) # Format the output classes_names = tf.constant([['0', '1']]) classes = tf.tile(classes_names, [tf.shape(outputs)[0], 1]) return {'classes': classes, 'scores': outputs} def export(self, output_dir): # Increment the directory number. This is required in order to make this # model servable with model_server. save_model_dir = pathlib.Path(output_dir)/'model' number_dirs = [int(p.name) for p in save_model_dir.glob('*') if p.name.isdigit()] id = max([0] + number_dirs)+1 save_model_dir = save_model_dir/str(id) # Set the signature to make it visible for serving. concrete_serving_fn = self.__call__.get_concrete_function() signatures = {'serving_default': concrete_serving_fn} # Export the model. tf.saved_model.save( self, str(save_model_dir), signatures=signatures) return save_model_dir

Crie o modelo e teste-o no lote de exemplos serializados:

serving_model = ServingModel(model, output_dir) serving_model(serialized_example_batch)

Exporte o modelo como um SavedModel:

saved_model_dir = serving_model.export(output_dir) saved_model_dir

Recarregue o modelo e teste-o no mesmo lote de exemplos:

reloaded = tf.saved_model.load(str(saved_model_dir)) run_model = reloaded.signatures['serving_default']
run_model(serialized_example_batch)

##O que fizemos Neste exemplo, usamos tf.Transform para pré-processar um dataset do censo e treinar um modelo com os dados limpos e transformados. Também criamos uma função de entrada que poderíamos usar ao implantar nosso modelo treinado em um ambiente de produção para realizar inferências. Ao usar o mesmo código para treinamento e inferência, evitamos problemas de desvio de dados. Ao longo do caminho, aprendemos como criar uma transformação do Apache Beam para realizar a transformação necessária para limpar os dados. Também vimos como usar esses dados transformados para treinar um modelo usando tf.keras. Esta é apenas uma pequena parte do que o TensorFlow Transform é capaz! Nós encorajamos você a mergulhar no tf.Transform e descobrir o que ele pode fazer por você.

[Opcional] Usando nossos dados pré-processados ​​para treinar um modelo com tf.estimator

Aviso: os Estimators não são recomendados para novos códigos. Os Estimators executam código v1.Session, que é mais difícil de escrever corretamente e pode se comportar de forma inesperada, ainda mais quando usado em conjunto com código do TF 2. Os Estimators são abarcados pelas garantias de compatibilidade, mas não recebem mais correções, exceto para vulnerabilidades de segurança. Confira mais detalhes no guia de migração.

###Crie uma função de entrada para treinamento

def _make_training_input_fn(tf_transform_output, transformed_examples, batch_size): """Creates an input function reading from transformed data. Args: tf_transform_output: Wrapper around output of tf.Transform. transformed_examples: Base filename of examples. batch_size: Batch size. Returns: The input function for training or eval. """ def input_fn(): """Input function for training and eval.""" dataset = tf.data.experimental.make_batched_features_dataset( file_pattern=transformed_examples, batch_size=batch_size, features=tf_transform_output.transformed_feature_spec(), reader=tf.data.TFRecordDataset, shuffle=True) transformed_features = tf.compat.v1.data.make_one_shot_iterator( dataset).get_next() # Extract features and label from the transformed tensors. transformed_labels = tf.where( tf.equal(transformed_features.pop(LABEL_KEY), 1)) return transformed_features, transformed_labels[:,1] return input_fn

###Crie uma função de entrada para servir

Vamos criar uma função de entrada que possamos usar em produção e preparar nosso modelo treinado para servir.

def _make_serving_input_fn(tf_transform_output): """Creates an input function reading from raw data. Args: tf_transform_output: Wrapper around output of tf.Transform. Returns: The serving input function. """ raw_feature_spec = RAW_DATA_FEATURE_SPEC.copy() # Remove label since it is not available during serving. raw_feature_spec.pop(LABEL_KEY) def serving_input_fn(): """Input function for serving.""" # Get raw features by generating the basic serving input_fn and calling it. # Here we generate an input_fn that expects a parsed Example proto to be fed # to the model at serving time. See also # tf.estimator.export.build_raw_serving_input_receiver_fn. raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn( raw_feature_spec, default_batch_size=None) serving_input_receiver = raw_input_fn() # Apply the transform function that was used to generate the materialized # data. raw_features = serving_input_receiver.features transformed_features = tf_transform_output.transform_raw_features( raw_features) return tf.estimator.export.ServingInputReceiver( transformed_features, serving_input_receiver.receiver_tensors) return serving_input_fn

###Encapsule os dados de entrada num FeatureColumns Nosso modelo esperará nossos dados em TensorFlow FeatureColumns.

def get_feature_columns(tf_transform_output): """Returns the FeatureColumns for the model. Args: tf_transform_output: A `TFTransformOutput` object. Returns: A list of FeatureColumns. """ # Wrap scalars as real valued columns. real_valued_columns = [tf.feature_column.numeric_column(key, shape=()) for key in NUMERIC_FEATURE_KEYS] # Wrap categorical columns. one_hot_columns = [ tf.feature_column.indicator_column( tf.feature_column.categorical_column_with_identity( key=key, num_buckets=(NUM_OOV_BUCKETS + tf_transform_output.vocabulary_size_by_name( vocab_filename=key)))) for key in CATEGORICAL_FEATURE_KEYS] return real_valued_columns + one_hot_columns

###Treine, avalie e exporte o modelo

def train_and_evaluate(working_dir, num_train_instances=NUM_TRAIN_INSTANCES, num_test_instances=NUM_TEST_INSTANCES): """Train the model on training data and evaluate on test data. Args: working_dir: Directory to read transformed data and metadata from and to write exported model to. num_train_instances: Number of instances in train set num_test_instances: Number of instances in test set Returns: The results from the estimator's 'evaluate' method """ tf_transform_output = tft.TFTransformOutput(working_dir) run_config = tf.estimator.RunConfig() estimator = tf.estimator.LinearClassifier( feature_columns=get_feature_columns(tf_transform_output), config=run_config, loss_reduction=tf.losses.Reduction.SUM) # Fit the model using the default optimizer. train_input_fn = _make_training_input_fn( tf_transform_output, os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE + '*'), batch_size=BATCH_SIZE) estimator.train( input_fn=train_input_fn, max_steps=TRAIN_NUM_EPOCHS * num_train_instances / BATCH_SIZE) # Evaluate model on test dataset. eval_input_fn = _make_training_input_fn( tf_transform_output, os.path.join(working_dir, TRANSFORMED_TEST_DATA_FILEBASE + '*'), batch_size=1) # Export the model. serving_input_fn = _make_serving_input_fn(tf_transform_output) exported_model_dir = os.path.join(working_dir, EXPORTED_MODEL_DIR) estimator.export_saved_model(exported_model_dir, serving_input_fn) return estimator.evaluate(input_fn=eval_input_fn, steps=num_test_instances)

###Junte tudo Criamos tudo o que precisamos para pré-processar os dados do censo, treinar um modelo e prepará-lo para servir. Até agora estamos apenas preparando as coisas. É hora de começar a trabalhar!

Observação: role a saída desta célula para ver todo o processo. Os resultados estarão na parte inferior.

import tempfile temp = temp = os.path.join(tempfile.mkdtemp(),'estimator') transform_data(train_path, test_path, temp) results = train_and_evaluate(temp)
pprint.pprint(results)