Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/zh-cn/tfx/tutorials/transform/census.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

使用 TensorFlow Transform 预处理数据

TensorFlow Extended (TFX) 的特征工程组件

此示例 Colab 笔记本提供了一个更高级的示例,说明了如何使用 TensorFlow Transform (tf.Transform) 预处理数据,此示例使用完全相同的代码训练模型和在生产环境中应用推断。

TensorFlow Transform 是一个用于预处理 TensorFlow 输入数据的库,包括创建需要在训练数据集上进行完整传递的特征。利用 TensorFlow Transform,您可以:

  • 使用平均值和标准差归一化输入值

  • 通过在所有输入值上生成词汇将字符串转换为整数

  • 根据观测到的数据分布,通过分配给桶将浮点数转换为整数

TensorFlow 内置了对在单个样本或一批样本上进行操作的支持。tf.Transform 扩展了这些功能,支持在整个训练数据集上进行完整传递。

tf.Transform 的输出将导出为可用于训练和应用的 TensorFlow 计算图。将同一个计算图用于训练和应用可以避免偏差,因为会在两个阶段应用相同的转换。

要点:要了解 tf.Transform 以及它如何与 Apache Beam 配合使用,您需要对 Apache Beam 有所了解。最好是从Beam 编程指南开始着手。

##我们在此示例中执行的操作

在此示例中,我们将处理包含人口普查数据的广泛使用的数据集,并训练模型进行分类。在这个过程中,我们将使用 tf.Transform 转换数据。

要点:作为模型创建者和开发者,思考如何使用这些数据以及模型预测的潜在好处和危害。此类模型可能会加剧社会偏见和差距。某个特征与您要解决的问题相关,还是会引入偏差?有关更多信息,请阅读 ML 公平性

注:TensorFlow Model Analysis 是了解模型对数据各个部分的预测能力的强大工具,包括了解模型如何加剧社会偏见和差距。

安装 TensorFlow Transform

!pip install tensorflow-transform
# This cell is only necessary because packages were installed while python was # running. It avoids the need to restart the runtime when running in Colab. import pkg_resources import importlib importlib.reload(pkg_resources)

导入和全局

首先导入我们需要的东西。

import math import os import pprint import pandas as pd import matplotlib.pyplot as plt import tensorflow as tf print('TF: {}'.format(tf.__version__)) import apache_beam as beam print('Beam: {}'.format(beam.__version__)) import tensorflow_transform as tft import tensorflow_transform.beam as tft_beam print('Transform: {}'.format(tft.__version__)) from tfx_bsl.public import tfxio from tfx_bsl.coders.example_coder import RecordBatchToExamplesEncoder

接下来下载数据文件:

!wget https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/census/adult.data !wget https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/census/adult.test train_path = './adult.data' test_path = './adult.test'

为列命名

我们将创建一些方便的列表来引用数据集中的列。

CATEGORICAL_FEATURE_KEYS = [ 'workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country', ] NUMERIC_FEATURE_KEYS = [ 'age', 'capital-gain', 'capital-loss', 'hours-per-week', 'education-num' ] ORDERED_CSV_COLUMNS = [ 'age', 'workclass', 'fnlwgt', 'education', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'label' ] LABEL_KEY = 'label'

以下是数据的快速预览:

pandas_train = pd.read_csv(train_path, header=None, names=ORDERED_CSV_COLUMNS) pandas_train.head(5)
one_row = dict(pandas_train.loc[0])
COLUMN_DEFAULTS = [ '' if isinstance(v, str) else 0.0 for v in dict(pandas_train.loc[1]).values()]

测试数据有 1 个需要跳过的标题行,在每一行的结尾还有一个尾随“.”。

pandas_test = pd.read_csv(test_path, header=1, names=ORDERED_CSV_COLUMNS) pandas_test.head(5)
testing = os.getenv("WEB_TEST_BROWSER", False) if testing: pandas_train = pandas_train.loc[:1] pandas_test = pandas_test.loc[:1]

###定义特征和架构
我们根据输入中列的类型来定义一个架构。这将有助于正确导入它们,也将惠及其他操作。

RAW_DATA_FEATURE_SPEC = dict( [(name, tf.io.FixedLenFeature([], tf.string)) for name in CATEGORICAL_FEATURE_KEYS] + [(name, tf.io.FixedLenFeature([], tf.float32)) for name in NUMERIC_FEATURE_KEYS] + [(LABEL_KEY, tf.io.FixedLenFeature([], tf.string))] ) SCHEMA = tft.DatasetMetadata.from_feature_spec(RAW_DATA_FEATURE_SPEC).schema

[可选] 编码和解码 tf.train.Example 原型

本教程需要在几个地方将数据集中的示例与 tf.train.Example 原型相互转换。

下面隐藏的 encode_example 函数将数据集中的特征字典转换为 tf.train.Example

#@title def encode_example(input_features): input_features = dict(input_features) output_features = {} for key in CATEGORICAL_FEATURE_KEYS: value = input_features[key] feature = tf.train.Feature( bytes_list=tf.train.BytesList(value=[value.strip().encode()])) output_features[key] = feature for key in NUMERIC_FEATURE_KEYS: value = input_features[key] feature = tf.train.Feature( float_list=tf.train.FloatList(value=[value])) output_features[key] = feature label_value = input_features.get(LABEL_KEY, None) if label_value is not None: output_features[LABEL_KEY] = tf.train.Feature( bytes_list = tf.train.BytesList(value=[label_value.strip().encode()])) example = tf.train.Example( features = tf.train.Features(feature=output_features) ) return example

现在,您可以将数据集示例转换为 Example 原型:

tf_example = encode_example(pandas_train.loc[0]) tf_example.features.feature['age']
serialized_example_batch = tf.constant([ encode_example(pandas_train.loc[i]).SerializeToString() for i in range(3) ]) serialized_example_batch

您还可以将成批的序列化 Example 原型转换回张量字典:

decoded_tensors = tf.io.parse_example( serialized_example_batch, features=RAW_DATA_FEATURE_SPEC )

在某些情况下不会传入标签,所以我们编写了 encode 函数,以便标签可选:

features_dict = dict(pandas_train.loc[0]) features_dict.pop(LABEL_KEY) LABEL_KEY in features_dict

创建 Example 原型时,它根本不包含标签键。

no_label_example = encode_example(features_dict) LABEL_KEY in no_label_example.features.feature.keys()

###设置超参数和基本内务管理

用于训练的常量和超参数。

NUM_OOV_BUCKETS = 1 EPOCH_SPLITS = 10 TRAIN_NUM_EPOCHS = 2*EPOCH_SPLITS NUM_TRAIN_INSTANCES = len(pandas_train) NUM_TEST_INSTANCES = len(pandas_test) BATCH_SIZE = 128 STEPS_PER_TRAIN_EPOCH = tf.math.ceil(NUM_TRAIN_INSTANCES/BATCH_SIZE/EPOCH_SPLITS) EVALUATION_STEPS = tf.math.ceil(NUM_TEST_INSTANCES/BATCH_SIZE) # Names of temp files TRANSFORMED_TRAIN_DATA_FILEBASE = 'train_transformed' TRANSFORMED_TEST_DATA_FILEBASE = 'test_transformed' EXPORTED_MODEL_DIR = 'exported_model_dir'
if testing: TRAIN_NUM_EPOCHS = 1

##使用 tf.Transform 进行预处理

###创建一个 tf.Transform preprocessing_fn
预处理函数是 tf.Transform 最重要的概念。预处理函数是真正发生数据集转换的地方。它接受并返回一个张量字典,其中张量是指 TensorSparseTensor。有两组主要的 API 调用,它们通常构成预处理函数的核心:

  1. TensorFlow 运算:接受并返回张量的任何函数,通常是指 TensorFlow 运算。这些函数会将 TensorFlow 运算添加到计算图中,计算图能够以每次一个特征向量的方式转换原始数据。在训练和应用期间,将为每个样本运行这种转换。

  2. TensorFlow Transform 分析器/映射器:tf.Transform 提供的任何分析器/映射器。它们也接受并返回张量,并且通常包含 TensorFlow 运算和 Beam 计算的组合,但与 TensorFlow 运算不同的是,它们仅在需要对整个训练数据集进行完整传递的分析期间在 Beam 流水线中运行。Beam 计算只运行一次(在训练之前,分析期间运行),且通常会对整个训练数据集进行一次完整传递。它们会创建 tf.constant 张量,并将其添加到您的计算图中。例如,tft.min 计算训练数据集上张量的最小值。

警告:将预处理函数用于应用推断时,分析器在训练过程中创建的常量不会更改。如果您的数据包含趋势或季节性分量,请相应地制定计划。

这是此数据集的 preprocessing_fn。其用于执行几项操作:

  1. 使用 tft.scale_to_0_1 将数字特征缩放到 [0,1] 范围。

  2. 使用 tft.compute_and_apply_vocabulary 计算每个分类特征的词汇表,并将每个输入的整数 ID 作为 tf.int64 返回。这适用于字符串和整数分类输入。

  3. 使用标准 TensorFlow 运算对数据进行一些手动转换。在此处,这些运算应用于标签,但也可以转换特征。TensorFlow 运算实现几个目标:

    • 为标签构建查找表(tf.init_scope 确保仅在第一次调用函数时创建该表)。

    • 规范化标签的文本。

    • 将标签转换为 one-hot。

def preprocessing_fn(inputs): """Preprocess input columns into transformed columns.""" # Since we are modifying some features and leaving others unchanged, we # start by setting `outputs` to a copy of `inputs. outputs = inputs.copy() # Scale numeric columns to have range [0, 1]. for key in NUMERIC_FEATURE_KEYS: outputs[key] = tft.scale_to_0_1(inputs[key]) # For all categorical columns except the label column, we generate a # vocabulary but do not modify the feature. This vocabulary is instead # used in the trainer, by means of a feature column, to convert the feature # from a string to an integer id. for key in CATEGORICAL_FEATURE_KEYS: outputs[key] = tft.compute_and_apply_vocabulary( tf.strings.strip(inputs[key]), num_oov_buckets=NUM_OOV_BUCKETS, vocab_filename=key) # For the label column we provide the mapping from string to index. table_keys = ['>50K', '<=50K'] with tf.init_scope(): initializer = tf.lookup.KeyValueTensorInitializer( keys=table_keys, values=tf.cast(tf.range(len(table_keys)), tf.int64), key_dtype=tf.string, value_dtype=tf.int64) table = tf.lookup.StaticHashTable(initializer, default_value=-1) # Remove trailing periods for test data when the data is read with tf.data. # label_str = tf.sparse.to_dense(inputs[LABEL_KEY]) label_str = inputs[LABEL_KEY] label_str = tf.strings.regex_replace(label_str, r'\.$', '') label_str = tf.strings.strip(label_str) data_labels = table.lookup(label_str) transformed_label = tf.one_hot( indices=data_labels, depth=len(table_keys), on_value=1.0, off_value=0.0) outputs[LABEL_KEY] = tf.reshape(transformed_label, [-1, len(table_keys)]) return outputs

语法

您几乎已准备好将所有内容放在一起并使用 Apache Beam 运行它。

Apache Beam 使用特殊的语法定义和调用转换。例如,在这一行中:

result = pass_this | 'name this step' >> to_this_call

方法 to_this_call 正在被调用并传递给名为 pass_this 的对象,在堆栈跟踪中,此运算被称为 name this step。调用 to_this_call 的结果将在 result 中返回。您经常会看到流水线的各个阶段像这样链接在一起:

result = apache_beam.Pipeline() | 'first step' >> do_this_first() | 'second step' >> do_this_last()

由于这是从一个新的流水线开始的,因此您可以按以下方式继续:

next_result = result | 'doing more stuff' >> another_function()

转换数据

现在,我们准备开始在 Apache Beam 流水线中转换数据。

  1. 使用 tfxio.CsvTFXIO CSV 读取器读入数据(要处理流水线中的文本行,请改用 tfxio.BeamRecordCsvTFXIO)。

  2. 使用上面定义的 preprocessing_fn 分析和转换数据。

  3. 将结果作为 Example proto 的 TFRecord 写出来,我们稍后会使用它来训练模型

def transform_data(train_data_file, test_data_file, working_dir): """Transform the data and write out as a TFRecord of Example protos. Read in the data using the CSV reader, and transform it using a preprocessing pipeline that scales numeric data and converts categorical data from strings to int64 values indices, by creating a vocabulary for each category. Args: train_data_file: File containing training data test_data_file: File containing test data working_dir: Directory to write transformed data and metadata to """ # The "with" block will create a pipeline, and run that pipeline at the exit # of the block. with beam.Pipeline() as pipeline: with tft_beam.Context(temp_dir=tempfile.mkdtemp()): # Create a TFXIO to read the census data with the schema. To do this we # need to list all columns in order since the schema doesn't specify the # order of columns in the csv. # We first read CSV files and use BeamRecordCsvTFXIO whose .BeamSource() # accepts a PCollection[bytes] because we need to patch the records first # (see "FixCommasTrainData" below). Otherwise, tfxio.CsvTFXIO can be used # to both read the CSV files and parse them to TFT inputs: # csv_tfxio = tfxio.CsvTFXIO(...) # raw_data = (pipeline | 'ToRecordBatches' >> csv_tfxio.BeamSource()) train_csv_tfxio = tfxio.CsvTFXIO( file_pattern=train_data_file, telemetry_descriptors=[], column_names=ORDERED_CSV_COLUMNS, schema=SCHEMA) # Read in raw data and convert using CSV TFXIO. raw_data = ( pipeline | 'ReadTrainCsv' >> train_csv_tfxio.BeamSource()) # Combine data and schema into a dataset tuple. Note that we already used # the schema to read the CSV data, but we also need it to interpret # raw_data. cfg = train_csv_tfxio.TensorAdapterConfig() raw_dataset = (raw_data, cfg) # The TFXIO output format is chosen for improved performance. transformed_dataset, transform_fn = ( raw_dataset | tft_beam.AnalyzeAndTransformDataset( preprocessing_fn, output_record_batches=True)) # Transformed metadata is not necessary for encoding. transformed_data, _ = transformed_dataset # Extract transformed RecordBatches, encode and write them to the given # directory. coder = RecordBatchToExamplesEncoder() _ = ( transformed_data | 'EncodeTrainData' >> beam.FlatMapTuple(lambda batch, _: coder.encode(batch)) | 'WriteTrainData' >> beam.io.WriteToTFRecord( os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE))) # Now apply transform function to test data. In this case we remove the # trailing period at the end of each line, and also ignore the header line # that is present in the test data file. test_csv_tfxio = tfxio.CsvTFXIO( file_pattern=test_data_file, skip_header_lines=1, telemetry_descriptors=[], column_names=ORDERED_CSV_COLUMNS, schema=SCHEMA) raw_test_data = ( pipeline | 'ReadTestCsv' >> test_csv_tfxio.BeamSource()) raw_test_dataset = (raw_test_data, test_csv_tfxio.TensorAdapterConfig()) # The TFXIO output format is chosen for improved performance. transformed_test_dataset = ( (raw_test_dataset, transform_fn) | tft_beam.TransformDataset(output_record_batches=True)) # Transformed metadata is not necessary for encoding. transformed_test_data, _ = transformed_test_dataset # Extract transformed RecordBatches, encode and write them to the given # directory. _ = ( transformed_test_data | 'EncodeTestData' >> beam.FlatMapTuple(lambda batch, _: coder.encode(batch)) | 'WriteTestData' >> beam.io.WriteToTFRecord( os.path.join(working_dir, TRANSFORMED_TEST_DATA_FILEBASE))) # Will write a SavedModel and metadata to working_dir, which can then # be read by the tft.TFTransformOutput class. _ = ( transform_fn | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))

运行流水线:

import tempfile import pathlib output_dir = os.path.join(tempfile.mkdtemp(), 'keras') transform_data(train_path, test_path, output_dir)

将输出目录封装为 tft.TFTransformOutput

tf_transform_output = tft.TFTransformOutput(output_dir)
tf_transform_output.transformed_feature_spec()

如果您查看该目录,则会看到它包含三个内容:

  1. train_transformedtest_transformed 数据文件

  2. transform_fn 目录 (tf.saved_model)

  3. transformed_metadata

以下部分显示如何使用这些工件来训练模型。

!ls -l {output_dir}

##使用预处理数据通过 tf.keras 训练模型

为了展示 tf.Transform 如何使我们能够将相同的代码用于训练和应用,进而避免偏差,我们将训练一个模型。要训​​练模型并为生产环境准备经过训练的模型,我们需要创建输入函数。训练输入函数与应用输入函数之间的主要区别在于,训练数据包含标签,而生产数据则不包含标签。两者的参数和返回值也有所不同。

###创建训练输入函数

运行上一部分中的流水线会创建 TFRecord 文件,其中包含转换后的数据。

以下代码使用 tf.data.experimental.make_batched_features_datasettft.TFTransformOutput.transformed_feature_spec 将这些数据文件读取为 tf.data.Dataset

def _make_training_input_fn(tf_transform_output, train_file_pattern, batch_size): """An input function reading from transformed data, converting to model input. Args: tf_transform_output: Wrapper around output of tf.Transform. transformed_examples: Base filename of examples. batch_size: Batch size. Returns: The input data for training or eval, in the form of k. """ def input_fn(): return tf.data.experimental.make_batched_features_dataset( file_pattern=train_file_pattern, batch_size=batch_size, features=tf_transform_output.transformed_feature_spec(), reader=tf.data.TFRecordDataset, label_key=LABEL_KEY, shuffle=True) return input_fn
train_file_pattern = pathlib.Path(output_dir)/f'{TRANSFORMED_TRAIN_DATA_FILEBASE}*' input_fn = _make_training_input_fn( tf_transform_output=tf_transform_output, train_file_pattern = str(train_file_pattern), batch_size = 10 )

您可以在下面看到转换后的数据示例。请注意,education-numhourd-per-week 这样的数字列已转换为范围为 [0,1] 的浮点数,而字符串列已转换为 ID:

for example, label in input_fn().take(1): break pd.DataFrame(example)
label

训练、评估模型

构建模型

def build_keras_model(working_dir): inputs = build_keras_inputs(working_dir) encoded_inputs = encode_inputs(inputs) stacked_inputs = tf.concat(tf.nest.flatten(encoded_inputs), axis=1) output = tf.keras.layers.Dense(100, activation='relu')(stacked_inputs) output = tf.keras.layers.Dense(50, activation='relu')(output) output = tf.keras.layers.Dense(2)(output) model = tf.keras.Model(inputs=inputs, outputs=output) return model
def build_keras_inputs(working_dir): tf_transform_output = tft.TFTransformOutput(working_dir) feature_spec = tf_transform_output.transformed_feature_spec().copy() feature_spec.pop(LABEL_KEY) # Build the `keras.Input` objects. inputs = {} for key, spec in feature_spec.items(): if isinstance(spec, tf.io.VarLenFeature): inputs[key] = tf.keras.layers.Input( shape=[None], name=key, dtype=spec.dtype, sparse=True) elif isinstance(spec, tf.io.FixedLenFeature): inputs[key] = tf.keras.layers.Input( shape=spec.shape, name=key, dtype=spec.dtype) else: raise ValueError('Spec type is not supported: ', key, spec) return inputs
def encode_inputs(inputs): encoded_inputs = {} for key in inputs: feature = tf.expand_dims(inputs[key], -1) if key in CATEGORICAL_FEATURE_KEYS: num_buckets = tf_transform_output.num_buckets_for_transformed_feature(key) encoding_layer = ( tf.keras.layers.CategoryEncoding( num_tokens=num_buckets, output_mode='binary', sparse=False)) encoded_inputs[key] = encoding_layer(feature) else: encoded_inputs[key] = feature return encoded_inputs
model = build_keras_model(output_dir) tf.keras.utils.plot_model(model,rankdir='LR', show_shapes=True)

构建数据集

def get_dataset(working_dir, filebase): tf_transform_output = tft.TFTransformOutput(working_dir) data_path_pattern = os.path.join( working_dir, filebase + '*') input_fn = _make_training_input_fn( tf_transform_output, data_path_pattern, batch_size=BATCH_SIZE) dataset = input_fn() return dataset

训练并评估模型:

def train_and_evaluate( model, working_dir): """Train the model on training data and evaluate on test data. Args: working_dir: The location of the Transform output. num_train_instances: Number of instances in train set num_test_instances: Number of instances in test set Returns: The results from the estimator's 'evaluate' method """ train_dataset = get_dataset(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE) validation_dataset = get_dataset(working_dir, TRANSFORMED_TEST_DATA_FILEBASE) model = build_keras_model(working_dir) history = train_model(model, train_dataset, validation_dataset) metric_values = model.evaluate(validation_dataset, steps=EVALUATION_STEPS, return_dict=True) return model, history, metric_values
def train_model(model, train_dataset, validation_dataset): model.compile(optimizer='adam', loss=tf.losses.CategoricalCrossentropy(from_logits=True), metrics=['accuracy']) history = model.fit(train_dataset, validation_data=validation_dataset, epochs=TRAIN_NUM_EPOCHS, steps_per_epoch=STEPS_PER_TRAIN_EPOCH, validation_steps=EVALUATION_STEPS) return history
model, history, metric_values = train_and_evaluate(model, output_dir)
plt.plot(history.history['loss'], label='Train') plt.plot(history.history['val_loss'], label='Eval') plt.ylim(0,max(plt.ylim())) plt.legend() plt.title('Loss');

转换新数据

在上一部分中,训练过程使用了由 transform_dataset 函数中的 tft_beam.AnalyzeAndTransformDataset 生成的转换数据的硬拷贝。

要对新数据执行运算,您需要加载 tft_beam.WriteTransformFn 所保存的 preprocessing_fn 的最终版本。

TFTransformOutput.transform_features_layer 方法从输出目录加载 preprocessing_fn SavedModel。

下面是从源文件加载未处理的新批次的函数:

def read_csv(file_name, batch_size): return tf.data.experimental.make_csv_dataset( file_pattern=file_name, batch_size=batch_size, column_names=ORDERED_CSV_COLUMNS, column_defaults=COLUMN_DEFAULTS, prefetch_buffer_size=0, ignore_errors=True)
for ex in read_csv(test_path, batch_size=5): break pd.DataFrame(ex)

加载 tft.TransformFeaturesLayer 以使用 preprocessing_fn 转换此数据:

ex2 = ex.copy() ex2.pop('fnlwgt') tft_layer = tf_transform_output.transform_features_layer() t_ex = tft_layer(ex2) label = t_ex.pop(LABEL_KEY) pd.DataFrame(t_ex)

tft_layer 足够聪明,如果只传入某些特征,它仍可以执行转换。例如,如果您仅传入两个特征,则只会得到这些特征的转换后版本:

ex2 = pd.DataFrame(ex)[['education', 'hours-per-week']] ex2
pd.DataFrame(tft_layer(dict(ex2)))

下面的版本更加强大,它会删除不在特征规范中的特征,如果标签在提供的特征中,则返回 (features, label) 对:

class Transform(tf.Module): def __init__(self, working_dir): self.working_dir = working_dir self.tf_transform_output = tft.TFTransformOutput(working_dir) self.tft_layer = tf_transform_output.transform_features_layer() @tf.function def __call__(self, features): raw_features = {} for key, val in features.items(): # Skip unused keys if key not in RAW_DATA_FEATURE_SPEC: continue raw_features[key] = val # Apply the `preprocessing_fn`. transformed_features = tft_layer(raw_features) if LABEL_KEY in transformed_features: # Pop the label and return a (features, labels) pair. data_labels = transformed_features.pop(LABEL_KEY) return (transformed_features, data_labels) else: return transformed_features
transform = Transform(output_dir)
t_ex, t_label = transform(ex)
pd.DataFrame(t_ex)

现在,您可以使用 Dataset.map 将该转换即时应用于新数据:

model.evaluate( read_csv(test_path, batch_size=5).map(transform), steps=EVALUATION_STEPS, return_dict=True )

导出模型

您目前拥有经过训练的模型,以及将 preprocessing_fn 应用于新数据的方法。将它们组装成一个新模型,该模型接受序列化的 tf.train.Example proto 作为输入。

class ServingModel(tf.Module): def __init__(self, model, working_dir): self.model = model self.working_dir = working_dir self.transform = Transform(working_dir) @tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string)]) def __call__(self, serialized_tf_examples): # parse the tf.train.Example feature_spec = RAW_DATA_FEATURE_SPEC.copy() feature_spec.pop(LABEL_KEY) parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec) # Apply the `preprocessing_fn` transformed_features = self.transform(parsed_features) # Run the model outputs = self.model(transformed_features) # Format the output classes_names = tf.constant([['0', '1']]) classes = tf.tile(classes_names, [tf.shape(outputs)[0], 1]) return {'classes': classes, 'scores': outputs} def export(self, output_dir): # Increment the directory number. This is required in order to make this # model servable with model_server. save_model_dir = pathlib.Path(output_dir)/'model' number_dirs = [int(p.name) for p in save_model_dir.glob('*') if p.name.isdigit()] id = max([0] + number_dirs)+1 save_model_dir = save_model_dir/str(id) # Set the signature to make it visible for serving. concrete_serving_fn = self.__call__.get_concrete_function() signatures = {'serving_default': concrete_serving_fn} # Export the model. tf.saved_model.save( self, str(save_model_dir), signatures=signatures) return save_model_dir

构建模型并在一批序列化示例上测试运行它:

serving_model = ServingModel(model, output_dir) serving_model(serialized_example_batch)

将模型导出为 SavedModel:

saved_model_dir = serving_model.export(output_dir) saved_model_dir

重新加载模型,并在同一批示例上对其进行测试:

reloaded = tf.saved_model.load(str(saved_model_dir)) run_model = reloaded.signatures['serving_default']
run_model(serialized_example_batch)

##我们所执行的操作。在本例中,我们使用 tf.Transform 预处理人口普查数据集,并使用经过清理和转换的数据来训练模型。我们还创建一个输入函数,当我们在生产环境中部署经过训练的模型以执行推断时可以使用该函数。通过对训练和推断使用相同的代码,我们避免了数据偏差的任何问题。在此过程中,我们了解了如何创建 Apache Beam 转换来执行清理数据所需的转换。我们还看到了如何使用这些转换后的数据通过 tf.keras 来训练模型。这只是 TensorFlow Transform 可以实现的一小部分!我们鼓励您深入研究 tf.Transform 并发现它可以为您做些什么。

[可选] 使用预处理数据通过 tf.estimator 来训练模型

警告:不建议将 Estimator 用于新代码。Estimator 运行 v1.Session 风格的代码,此类代码更加难以正确编写,并且可能会出现意外行为,尤其是与 TF 2 代码结合使用时。Estimator 确实在我们的兼容性保证范围内,但除了安全漏洞之外不会得到任何修复。请参阅迁移指南以了解详情。

###创建训练输入函数

def _make_training_input_fn(tf_transform_output, transformed_examples, batch_size): """Creates an input function reading from transformed data. Args: tf_transform_output: Wrapper around output of tf.Transform. transformed_examples: Base filename of examples. batch_size: Batch size. Returns: The input function for training or eval. """ def input_fn(): """Input function for training and eval.""" dataset = tf.data.experimental.make_batched_features_dataset( file_pattern=transformed_examples, batch_size=batch_size, features=tf_transform_output.transformed_feature_spec(), reader=tf.data.TFRecordDataset, shuffle=True) transformed_features = tf.compat.v1.data.make_one_shot_iterator( dataset).get_next() # Extract features and label from the transformed tensors. transformed_labels = tf.where( tf.equal(transformed_features.pop(LABEL_KEY), 1)) return transformed_features, transformed_labels[:,1] return input_fn

###创建应用输入函数

我们创建一个可以在生产环境中使用的输入函数,并针对应用准备经过训练的模型。

def _make_serving_input_fn(tf_transform_output): """Creates an input function reading from raw data. Args: tf_transform_output: Wrapper around output of tf.Transform. Returns: The serving input function. """ raw_feature_spec = RAW_DATA_FEATURE_SPEC.copy() # Remove label since it is not available during serving. raw_feature_spec.pop(LABEL_KEY) def serving_input_fn(): """Input function for serving.""" # Get raw features by generating the basic serving input_fn and calling it. # Here we generate an input_fn that expects a parsed Example proto to be fed # to the model at serving time. See also # tf.estimator.export.build_raw_serving_input_receiver_fn. raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn( raw_feature_spec, default_batch_size=None) serving_input_receiver = raw_input_fn() # Apply the transform function that was used to generate the materialized # data. raw_features = serving_input_receiver.features transformed_features = tf_transform_output.transform_raw_features( raw_features) return tf.estimator.export.ServingInputReceiver( transformed_features, serving_input_receiver.receiver_tensors) return serving_input_fn

###将输入数据封装到 FeatureColumns 中
模型希望我们的数据处于 TensorFlow FeatureColumns 中。

def get_feature_columns(tf_transform_output): """Returns the FeatureColumns for the model. Args: tf_transform_output: A `TFTransformOutput` object. Returns: A list of FeatureColumns. """ # Wrap scalars as real valued columns. real_valued_columns = [tf.feature_column.numeric_column(key, shape=()) for key in NUMERIC_FEATURE_KEYS] # Wrap categorical columns. one_hot_columns = [ tf.feature_column.indicator_column( tf.feature_column.categorical_column_with_identity( key=key, num_buckets=(NUM_OOV_BUCKETS + tf_transform_output.vocabulary_size_by_name( vocab_filename=key)))) for key in CATEGORICAL_FEATURE_KEYS] return real_valued_columns + one_hot_columns

##训练、评估并导出模型

def train_and_evaluate(working_dir, num_train_instances=NUM_TRAIN_INSTANCES, num_test_instances=NUM_TEST_INSTANCES): """Train the model on training data and evaluate on test data. Args: working_dir: Directory to read transformed data and metadata from and to write exported model to. num_train_instances: Number of instances in train set num_test_instances: Number of instances in test set Returns: The results from the estimator's 'evaluate' method """ tf_transform_output = tft.TFTransformOutput(working_dir) run_config = tf.estimator.RunConfig() estimator = tf.estimator.LinearClassifier( feature_columns=get_feature_columns(tf_transform_output), config=run_config, loss_reduction=tf.losses.Reduction.SUM) # Fit the model using the default optimizer. train_input_fn = _make_training_input_fn( tf_transform_output, os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE + '*'), batch_size=BATCH_SIZE) estimator.train( input_fn=train_input_fn, max_steps=TRAIN_NUM_EPOCHS * num_train_instances / BATCH_SIZE) # Evaluate model on test dataset. eval_input_fn = _make_training_input_fn( tf_transform_output, os.path.join(working_dir, TRANSFORMED_TEST_DATA_FILEBASE + '*'), batch_size=1) # Export the model. serving_input_fn = _make_serving_input_fn(tf_transform_output) exported_model_dir = os.path.join(working_dir, EXPORTED_MODEL_DIR) estimator.export_saved_model(exported_model_dir, serving_input_fn) return estimator.evaluate(input_fn=eval_input_fn, steps=num_test_instances)

##总结
我们已经创建了所需的一切来预处理人口普查数据、训练模型并针对应用准备模型。到目前为止,我们已经做好了一切准备。是时候开始运行了!

注:滚动此单元的输出可查看整个流程。结果位于底部。

import tempfile temp = temp = os.path.join(tempfile.mkdtemp(),'estimator') transform_data(train_path, test_path, temp) results = train_and_evaluate(temp)
pprint.pprint(results)