Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ko/tfx/tutorials/transform/census.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

TensorFlow Transform으로 데이터 전처리하기

TensorFlow Extended(TFX)의 특성 엔지니어링 구성 요소

이 예제 colab 노트북은 TensorFlow Transform(tf.Transform)을 사용하여 모델을 훈련하고 프로덕션에서 추론을 제공하는 데 정확히 동일한 코드를 사용하여 데이터를 사전 처리하는 방법에 대한 좀 더 고급스러운 예를 제공합니다.

TensorFlow Transform은 훈련 데이터세트에 대한 전체 전달이 필요한 특성 생성을 포함하여 TensorFlow에 대한 입력 데이터를 전처리하기 위한 라이브러리입니다. 예를 들어 TensorFlow Transform을 사용하여 다음을 수행할 수 있습니다.

  • 평균과 표준 편차를 이용하여 입력값 정규화

  • 모든 입력값에 대해 어휘를 생성하여 문자열을 정수로 변환

  • 관찰된 데이터 분포를 기반으로 부동 소수점을 버킷에 할당하여 정수로 변환

TensorFlow는 단일 예제 또는 예제 배치에 대한 조작을 기본적으로 지원합니다. tf.Transform은 이러한 기능을 확장하여 전체 훈련 데이터세트에 대한 전체 전달을 지원합니다.

tf.Transform의 출력은 훈련과 제공 모두에 사용할 수 있는 TensorFlow 그래프로 내보내집니다. 훈련과 제공 모두에 동일한 그래프를 사용하면 두 단계에 동일한 변환이 적용되므로 왜곡을 방지할 수 있습니다.

핵심 포인트: tf.Transform 및 이것이 Apache Beam과 작동하는 방식을 이해하려면 Apache Beam 자체에 대해 약간 알아야 합니다. Beam 프로그래밍 가이드가 좋은 출발점을 제공합니다.

##이 예제에서 수행하는 작업

이 예에서 우리는 인구 조사 데이터를 포함하는 널리 사용되는 데이터 세트를 처리하고 분류를 수행하도록 모델을 훈련할 것입니다. 그 과정에서 tf.Transform을 사용하여 데이터를 변환할 것입니다.

핵심 포인트: 모델러 및 개발자로서 이 데이터가 어떻게 사용되는지, 그리고 모델의 예측이 초래할 수 있는 잠재적인 이점과 피해에 대해 생각해보세요. 이와 같은 모델은 사회적 편견과 불균형을 강화시킬 수 있습니다. 기능이 해결하려는 문제와 관련이 있습니까? 아니면 편견을 유발합니까? 자세한 내용은 ML 공정성에 대해 읽어보세요.

참고: TensorFlow 모델 분석은 모델이 사회적 바이어스와 격차를 강화할 수 있는 방법을 이해하는 것을 포함하여 모델이 데이터의 다양한 세그먼트에 대해 예측을 얼마나 잘 수행하는지 이해하기 위한 강력한 도구입니다.

TensorFlow Transform 설치하기

!pip install tensorflow-transform
# This cell is only necessary because packages were installed while python was # running. It avoids the need to restart the runtime when running in Colab. import pkg_resources import importlib importlib.reload(pkg_resources)

가져오기 및 전역

먼저 필요한 항목을 가져옵니다.

import math import os import pprint import pandas as pd import matplotlib.pyplot as plt import tensorflow as tf print('TF: {}'.format(tf.__version__)) import apache_beam as beam print('Beam: {}'.format(beam.__version__)) import tensorflow_transform as tft import tensorflow_transform.beam as tft_beam print('Transform: {}'.format(tft.__version__)) from tfx_bsl.public import tfxio from tfx_bsl.coders.example_coder import RecordBatchToExamplesEncoder

다음으로 데이터 파일을 다운로드합니다.

!wget https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/census/adult.data !wget https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/census/adult.test train_path = './adult.data' test_path = './adult.test'

열 이름 지정하기

데이터세트의 열을 참조하기 위한 몇 가지 편리한 목록을 만들 것입니다.

CATEGORICAL_FEATURE_KEYS = [ 'workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country', ] NUMERIC_FEATURE_KEYS = [ 'age', 'capital-gain', 'capital-loss', 'hours-per-week', 'education-num' ] ORDERED_CSV_COLUMNS = [ 'age', 'workclass', 'fnlwgt', 'education', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'label' ] LABEL_KEY = 'label'

다음은 데이터에 대한 빠른 미리보기입니다.

pandas_train = pd.read_csv(train_path, header=None, names=ORDERED_CSV_COLUMNS) pandas_train.head(5)
one_row = dict(pandas_train.loc[0])
COLUMN_DEFAULTS = [ '' if isinstance(v, str) else 0.0 for v in dict(pandas_train.loc[1]).values()]

테스트 데이터에는 건너뛰어야 있는 1개의 헤더 행과 각 행의 끝에 후행 "."가 있습니다.

pandas_test = pd.read_csv(test_path, header=1, names=ORDERED_CSV_COLUMNS) pandas_test.head(5)
testing = os.getenv("WEB_TEST_BROWSER", False) if testing: pandas_train = pandas_train.loc[:1] pandas_test = pandas_test.loc[:1]

###특성과 스키마 정의. 입력에 있는 열의 유형을 기반으로 스키마를 정의해 보겠습니다. 무엇보다도 이를 올바르게 가져오는 데 도움이 됩니다.

RAW_DATA_FEATURE_SPEC = dict( [(name, tf.io.FixedLenFeature([], tf.string)) for name in CATEGORICAL_FEATURE_KEYS] + [(name, tf.io.FixedLenFeature([], tf.float32)) for name in NUMERIC_FEATURE_KEYS] + [(LABEL_KEY, tf.io.FixedLenFeature([], tf.string))] ) SCHEMA = tft.DatasetMetadata.from_feature_spec(RAW_DATA_FEATURE_SPEC).schema

[선택 사항] tf.train.Example proto 인코딩 및 디코딩

이 튜토리얼에서는 몇몇 위치에서 데이터세트의 예제를 tf.train.Example proto와 상호 변환해야 합니다.

아래의 숨겨진 encode_example 함수는 데이터세트의 특성 사전을 tf.train.Example로 변환합니다.

#@title def encode_example(input_features): input_features = dict(input_features) output_features = {} for key in CATEGORICAL_FEATURE_KEYS: value = input_features[key] feature = tf.train.Feature( bytes_list=tf.train.BytesList(value=[value.strip().encode()])) output_features[key] = feature for key in NUMERIC_FEATURE_KEYS: value = input_features[key] feature = tf.train.Feature( float_list=tf.train.FloatList(value=[value])) output_features[key] = feature label_value = input_features.get(LABEL_KEY, None) if label_value is not None: output_features[LABEL_KEY] = tf.train.Feature( bytes_list = tf.train.BytesList(value=[label_value.strip().encode()])) example = tf.train.Example( features = tf.train.Features(feature=output_features) ) return example

이제 데이터세트 예제를 Example protos로 변환할 수 있습니다.

tf_example = encode_example(pandas_train.loc[0]) tf_example.features.feature['age']
serialized_example_batch = tf.constant([ encode_example(pandas_train.loc[i]).SerializeToString() for i in range(3) ]) serialized_example_batch

직렬화된 예제 proto의 배치를 다시 텐서 사전으로 변환할 수도 있습니다.

decoded_tensors = tf.io.parse_example( serialized_example_batch, features=RAW_DATA_FEATURE_SPEC )

어떤 경우에는 레이블이 전달되지 않으므로 레이블이 선택 사항이 되도록 인코딩 함수가 작성됩니다.

features_dict = dict(pandas_train.loc[0]) features_dict.pop(LABEL_KEY) LABEL_KEY in features_dict

Example proto를 만들 때 레이블 키가 포함되지 않습니다.

no_label_example = encode_example(features_dict) LABEL_KEY in no_label_example.features.feature.keys()

###하이퍼파라미터 설정 및 기본 하우스키핑

훈련에 사용되는 상수 및 하이퍼파라미터.

NUM_OOV_BUCKETS = 1 EPOCH_SPLITS = 10 TRAIN_NUM_EPOCHS = 2*EPOCH_SPLITS NUM_TRAIN_INSTANCES = len(pandas_train) NUM_TEST_INSTANCES = len(pandas_test) BATCH_SIZE = 128 STEPS_PER_TRAIN_EPOCH = tf.math.ceil(NUM_TRAIN_INSTANCES/BATCH_SIZE/EPOCH_SPLITS) EVALUATION_STEPS = tf.math.ceil(NUM_TEST_INSTANCES/BATCH_SIZE) # Names of temp files TRANSFORMED_TRAIN_DATA_FILEBASE = 'train_transformed' TRANSFORMED_TEST_DATA_FILEBASE = 'test_transformed' EXPORTED_MODEL_DIR = 'exported_model_dir'
if testing: TRAIN_NUM_EPOCHS = 1

##tf.Transform으로 전처리하기

###tf.Transform 전처리 함수 생성. 전처리 함수는 tf.Transform의 가장 중요한 개념입니다. 전처리 함수는 데이터세트의 변환이 실제로 일어나는 곳으로, 텐서 사전을 수락하고 반환합니다. 여기서 텐서는 Tensor 또는 SparseTensor를 의미합니다. 일반적으로 전처리 함수의 핵심을 구성하는 두 가지 주요 API 호출 그룹이 있습니다.

  1. TensorFlow Ops: 일반적으로 TensorFlow 연산을 의미하는 텐서를 수락하고 반환하는 모든 함수로, 원시 데이터를 변환된 데이터로 한 번에 하나의 특성 벡터씩 변환하는 TensorFlow 연산을 그래프에 추가합니다. 이는 훈련 및 제공 기간 동안 모든 예에 대해 실행됩니다.

  2. Tensorflow 변환 분석기/맵퍼: tf.Transform에서 제공하는 모든 분석기/맵퍼입니다. 이들은 또한 텐서를 수락 및 반환하며 일반적으로 Tensorflow 연산과 Beam 계산을 조합적으로 포함하지만 TensorFlow 연산과 달리 전체 훈련 데이터세트에 대한 전체 패스가 필요한 분석 중에 Beam 파이프라인에서만 실행됩니다. Beam 계산은 훈련하기 전과 훈련하는 동안 한 번만 실행되며 일반적으로 전체 훈련 데이터세트에 대해 전체 패스를 만듭니다. 그래프에 추가되는 tf.constant 텐서가 생성됩니다. 예를 들어, tft.min은 훈련 데이터세트에 대해 텐서의 최솟값을 계산합니다.

주의: 전처리 함수를 추론 제공에 적용할 때 훈련 중에 분석기가 생성한 상수는 변경되지 않습니다. 데이터에 추세 또는 계절성 요소가 있는 경우 그에 따라 계획하세요.

다음은 이 데이터세트에 대한 preprocessing_fn로, 여러 가지 작업을 수행합니다.

  1. tft.scale_to_0_1을 사용하여 숫자 특성을 [0,1] 범위로 확장합니다.

  2. tft.compute_and_apply_vocabulary를 사용하여 각 범주 특성에 대한 어휘를 계산하고 각 입력에 대한 정수 ID를 tf.int64로 반환합니다. 이는 문자열 및 정수 범주형 입력에 모두 적용됩니다.

  3. 표준 TensorFlow 작업을 사용하여 데이터에 일부 수동 변환을 적용합니다. 여기에서 이러한 작업은 레이블에 적용되지만 특성도 변환할 수 있습니다. TensorFlow 작업은 다음과 같은 몇 가지 작업을 수행합니다.

    • 레이블에 대한 조회 테이블을 만듭니다(tf.init_scope는 함수가 처음 호출될 때만 테이블이 생성되도록 함).

    • 레이블의 텍스트를 정규화합니다.

    • 레이블을 one-hot으로 변환합니다.

def preprocessing_fn(inputs): """Preprocess input columns into transformed columns.""" # Since we are modifying some features and leaving others unchanged, we # start by setting `outputs` to a copy of `inputs. outputs = inputs.copy() # Scale numeric columns to have range [0, 1]. for key in NUMERIC_FEATURE_KEYS: outputs[key] = tft.scale_to_0_1(inputs[key]) # For all categorical columns except the label column, we generate a # vocabulary but do not modify the feature. This vocabulary is instead # used in the trainer, by means of a feature column, to convert the feature # from a string to an integer id. for key in CATEGORICAL_FEATURE_KEYS: outputs[key] = tft.compute_and_apply_vocabulary( tf.strings.strip(inputs[key]), num_oov_buckets=NUM_OOV_BUCKETS, vocab_filename=key) # For the label column we provide the mapping from string to index. table_keys = ['>50K', '<=50K'] with tf.init_scope(): initializer = tf.lookup.KeyValueTensorInitializer( keys=table_keys, values=tf.cast(tf.range(len(table_keys)), tf.int64), key_dtype=tf.string, value_dtype=tf.int64) table = tf.lookup.StaticHashTable(initializer, default_value=-1) # Remove trailing periods for test data when the data is read with tf.data. # label_str = tf.sparse.to_dense(inputs[LABEL_KEY]) label_str = inputs[LABEL_KEY] label_str = tf.strings.regex_replace(label_str, r'\.$', '') label_str = tf.strings.strip(label_str) data_labels = table.lookup(label_str) transformed_label = tf.one_hot( indices=data_labels, depth=len(table_keys), on_value=1.0, off_value=0.0) outputs[LABEL_KEY] = tf.reshape(transformed_label, [-1, len(table_keys)]) return outputs

구문

이제 모든 것을 한곳으로 모으고 Apache Beam을 사용하여 실행할 준비가 완료되었습니다.

Apache Beam은 변환을 정의하고 호출하기 위해 특수 구문을 사용합니다. 예를 들어 다음 줄을 보겠습니다.

result = pass_this | 'name this step' >> to_this_call

to_this_call 메서드는 pass_this라는 개체를 호출 및 전달하고 이 연산을 스택 추적에서 name this step이라고 합니다. to_this_call에 대한 호출의 결과는 result에서 반환됩니다. 다음과 같이 함께 연결된 파이프라인의 단계를 종종 볼 수 있습니다.

result = apache_beam.Pipeline() | 'first step' >> do_this_first() | 'second step' >> do_this_last()

그리고 새 파이프라인으로 시작했기 때문에 다음과 같이 계속할 수 있습니다.

next_result = result | 'doing more stuff' >> another_function()

데이터 변환하기

이제 Apache Beam 파이프라인에서 데이터 변환을 시작할 준비가 되었습니다.

  1. tfxio.CsvTFXIO CSV 판독기를 사용하여 데이터를 읽습니다(파이프라인에서 텍스트 행을 처리하려면 대신 tfxio.BeamRecordCsvTFXIO 사용).

  2. 위에서 정의한 preprocessing_fn을 사용하여 데이터를 분석하고 변환합니다.

  3. 결과를 Example protos의 TFRecord로 결과를 작성합니다. 나중에 모델 훈련에 이를 사용합니다.

def transform_data(train_data_file, test_data_file, working_dir): """Transform the data and write out as a TFRecord of Example protos. Read in the data using the CSV reader, and transform it using a preprocessing pipeline that scales numeric data and converts categorical data from strings to int64 values indices, by creating a vocabulary for each category. Args: train_data_file: File containing training data test_data_file: File containing test data working_dir: Directory to write transformed data and metadata to """ # The "with" block will create a pipeline, and run that pipeline at the exit # of the block. with beam.Pipeline() as pipeline: with tft_beam.Context(temp_dir=tempfile.mkdtemp()): # Create a TFXIO to read the census data with the schema. To do this we # need to list all columns in order since the schema doesn't specify the # order of columns in the csv. # We first read CSV files and use BeamRecordCsvTFXIO whose .BeamSource() # accepts a PCollection[bytes] because we need to patch the records first # (see "FixCommasTrainData" below). Otherwise, tfxio.CsvTFXIO can be used # to both read the CSV files and parse them to TFT inputs: # csv_tfxio = tfxio.CsvTFXIO(...) # raw_data = (pipeline | 'ToRecordBatches' >> csv_tfxio.BeamSource()) train_csv_tfxio = tfxio.CsvTFXIO( file_pattern=train_data_file, telemetry_descriptors=[], column_names=ORDERED_CSV_COLUMNS, schema=SCHEMA) # Read in raw data and convert using CSV TFXIO. raw_data = ( pipeline | 'ReadTrainCsv' >> train_csv_tfxio.BeamSource()) # Combine data and schema into a dataset tuple. Note that we already used # the schema to read the CSV data, but we also need it to interpret # raw_data. cfg = train_csv_tfxio.TensorAdapterConfig() raw_dataset = (raw_data, cfg) # The TFXIO output format is chosen for improved performance. transformed_dataset, transform_fn = ( raw_dataset | tft_beam.AnalyzeAndTransformDataset( preprocessing_fn, output_record_batches=True)) # Transformed metadata is not necessary for encoding. transformed_data, _ = transformed_dataset # Extract transformed RecordBatches, encode and write them to the given # directory. coder = RecordBatchToExamplesEncoder() _ = ( transformed_data | 'EncodeTrainData' >> beam.FlatMapTuple(lambda batch, _: coder.encode(batch)) | 'WriteTrainData' >> beam.io.WriteToTFRecord( os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE))) # Now apply transform function to test data. In this case we remove the # trailing period at the end of each line, and also ignore the header line # that is present in the test data file. test_csv_tfxio = tfxio.CsvTFXIO( file_pattern=test_data_file, skip_header_lines=1, telemetry_descriptors=[], column_names=ORDERED_CSV_COLUMNS, schema=SCHEMA) raw_test_data = ( pipeline | 'ReadTestCsv' >> test_csv_tfxio.BeamSource()) raw_test_dataset = (raw_test_data, test_csv_tfxio.TensorAdapterConfig()) # The TFXIO output format is chosen for improved performance. transformed_test_dataset = ( (raw_test_dataset, transform_fn) | tft_beam.TransformDataset(output_record_batches=True)) # Transformed metadata is not necessary for encoding. transformed_test_data, _ = transformed_test_dataset # Extract transformed RecordBatches, encode and write them to the given # directory. _ = ( transformed_test_data | 'EncodeTestData' >> beam.FlatMapTuple(lambda batch, _: coder.encode(batch)) | 'WriteTestData' >> beam.io.WriteToTFRecord( os.path.join(working_dir, TRANSFORMED_TEST_DATA_FILEBASE))) # Will write a SavedModel and metadata to working_dir, which can then # be read by the tft.TFTransformOutput class. _ = ( transform_fn | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))

파이프라인을 실행합니다.

import tempfile import pathlib output_dir = os.path.join(tempfile.mkdtemp(), 'keras') transform_data(train_path, test_path, output_dir)

출력 디렉터리를 tft.TFTransformOutput으로 래핑합니다.

tf_transform_output = tft.TFTransformOutput(output_dir)
tf_transform_output.transformed_feature_spec()

디렉터리를 보면 세 가지가 포함되어 있음을 알 수 있습니다.

  1. train_transformedtest_transformed 데이터 파일

  2. transform_fn 디렉터리(tf.saved_model)

  3. The transformed_metadata

다음 섹션에서는 이러한 아티팩트를 사용하여 모델을 훈련하는 방법을 보여줍니다.

!ls -l {output_dir}

##전처리된 데이터를 사용하여 tf.keras로 모델 훈련하기

훈련과 적용 모두에 동일한 코드를 사용하여 왜곡을 방지하는 데 tf.Transform이 어떻게 이용되는지 보여주기 위해 모델을 훈련할 것입니다. 모델을 훈련하고 훈련된 모델을 운영에 적합하게 준비하려면 입력 함수를 생성해야 합니다. 훈련 입력 함수와 적용 입력 함수의 주된 차이점은 훈련 데이터에는 레이블이 포함되고 운영 데이터에는 포함되지 않는다는 것입니다. 인수와 반환도 약간 다릅니다.

###훈련을 위한 입력 함수 생성

이전 섹션의 파이프라인을 실행하면 변환된 데이터가 포함된 TFRecord 파일이 생성됩니다.

다음 코드는 tf.data.experimental.make_batched_features_datasettft.TFTransformOutput.transformed_feature_spec을 사용하여 이러한 데이터 파일을 tf.data.Dataset으로 읽습니다.

def _make_training_input_fn(tf_transform_output, train_file_pattern, batch_size): """An input function reading from transformed data, converting to model input. Args: tf_transform_output: Wrapper around output of tf.Transform. transformed_examples: Base filename of examples. batch_size: Batch size. Returns: The input data for training or eval, in the form of k. """ def input_fn(): return tf.data.experimental.make_batched_features_dataset( file_pattern=train_file_pattern, batch_size=batch_size, features=tf_transform_output.transformed_feature_spec(), reader=tf.data.TFRecordDataset, label_key=LABEL_KEY, shuffle=True) return input_fn
train_file_pattern = pathlib.Path(output_dir)/f'{TRANSFORMED_TRAIN_DATA_FILEBASE}*' input_fn = _make_training_input_fn( tf_transform_output=tf_transform_output, train_file_pattern = str(train_file_pattern), batch_size = 10 )

아래에서 변환된 데이터 샘플을 볼 수 있습니다. education-numhourd-per-week와 같은 숫자 열이 [0,1] 범위의 부동 소수점으로 변환되고 문자열 열이 ID로 변환되는 방식에 주목하세요.

for example, label in input_fn().take(1): break pd.DataFrame(example)
label

모델 훈련 및 평가하기

모델 구성하기

def build_keras_model(working_dir): inputs = build_keras_inputs(working_dir) encoded_inputs = encode_inputs(inputs) stacked_inputs = tf.concat(tf.nest.flatten(encoded_inputs), axis=1) output = tf.keras.layers.Dense(100, activation='relu')(stacked_inputs) output = tf.keras.layers.Dense(50, activation='relu')(output) output = tf.keras.layers.Dense(2)(output) model = tf.keras.Model(inputs=inputs, outputs=output) return model
def build_keras_inputs(working_dir): tf_transform_output = tft.TFTransformOutput(working_dir) feature_spec = tf_transform_output.transformed_feature_spec().copy() feature_spec.pop(LABEL_KEY) # Build the `keras.Input` objects. inputs = {} for key, spec in feature_spec.items(): if isinstance(spec, tf.io.VarLenFeature): inputs[key] = tf.keras.layers.Input( shape=[None], name=key, dtype=spec.dtype, sparse=True) elif isinstance(spec, tf.io.FixedLenFeature): inputs[key] = tf.keras.layers.Input( shape=spec.shape, name=key, dtype=spec.dtype) else: raise ValueError('Spec type is not supported: ', key, spec) return inputs
def encode_inputs(inputs): encoded_inputs = {} for key in inputs: feature = tf.expand_dims(inputs[key], -1) if key in CATEGORICAL_FEATURE_KEYS: num_buckets = tf_transform_output.num_buckets_for_transformed_feature(key) encoding_layer = ( tf.keras.layers.CategoryEncoding( num_tokens=num_buckets, output_mode='binary', sparse=False)) encoded_inputs[key] = encoding_layer(feature) else: encoded_inputs[key] = feature return encoded_inputs
model = build_keras_model(output_dir) tf.keras.utils.plot_model(model,rankdir='LR', show_shapes=True)

데이터세트를 빌드합니다.

def get_dataset(working_dir, filebase): tf_transform_output = tft.TFTransformOutput(working_dir) data_path_pattern = os.path.join( working_dir, filebase + '*') input_fn = _make_training_input_fn( tf_transform_output, data_path_pattern, batch_size=BATCH_SIZE) dataset = input_fn() return dataset

모델을 훈련하고 평가합니다.

def train_and_evaluate( model, working_dir): """Train the model on training data and evaluate on test data. Args: working_dir: The location of the Transform output. num_train_instances: Number of instances in train set num_test_instances: Number of instances in test set Returns: The results from the estimator's 'evaluate' method """ train_dataset = get_dataset(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE) validation_dataset = get_dataset(working_dir, TRANSFORMED_TEST_DATA_FILEBASE) model = build_keras_model(working_dir) history = train_model(model, train_dataset, validation_dataset) metric_values = model.evaluate(validation_dataset, steps=EVALUATION_STEPS, return_dict=True) return model, history, metric_values
def train_model(model, train_dataset, validation_dataset): model.compile(optimizer='adam', loss=tf.losses.CategoricalCrossentropy(from_logits=True), metrics=['accuracy']) history = model.fit(train_dataset, validation_data=validation_dataset, epochs=TRAIN_NUM_EPOCHS, steps_per_epoch=STEPS_PER_TRAIN_EPOCH, validation_steps=EVALUATION_STEPS) return history
model, history, metric_values = train_and_evaluate(model, output_dir)
plt.plot(history.history['loss'], label='Train') plt.plot(history.history['val_loss'], label='Eval') plt.ylim(0,max(plt.ylim())) plt.legend() plt.title('Loss');

새 데이터 변환하기

이전 섹션에서 훈련 프로세스는 transform_dataset 함수에서 tft_beam.AnalyzeAndTransformDataset에 의해 생성된 변환된 데이터의 하드 카피를 사용했습니다.

새 데이터에서 작업하려면 tft_beam.WriteTransformFn에 의해 저장된 preprocessing_fn의 최종 버전을 로드해야 합니다.

TFTransformOutput.transform_features_layer 메서드는 출력 디렉터리에서 preprocessing_fn SavedModel을 로드합니다.

소스 파일에서 처리되지 않은 새 배치를 로드하는 함수는 다음과 같습니다.

def read_csv(file_name, batch_size): return tf.data.experimental.make_csv_dataset( file_pattern=file_name, batch_size=batch_size, column_names=ORDERED_CSV_COLUMNS, column_defaults=COLUMN_DEFAULTS, prefetch_buffer_size=0, ignore_errors=True)
for ex in read_csv(test_path, batch_size=5): break pd.DataFrame(ex)

tft.TransformFeaturesLayer를 로드하여 preprocessing_fn으로 이 데이터를 변환합니다.

ex2 = ex.copy() ex2.pop('fnlwgt') tft_layer = tf_transform_output.transform_features_layer() t_ex = tft_layer(ex2) label = t_ex.pop(LABEL_KEY) pd.DataFrame(t_ex)

tft_layer는 특성의 일부만 전달되는 경우에도 변환을 실행할 만큼 충분히 똑똑합니다. 예를 들어, 두 개의 특성만 전달하면 해당 특성의 변환된 버전만 다시 얻을 수 있습니다.

ex2 = pd.DataFrame(ex)[['education', 'hours-per-week']] ex2
pd.DataFrame(tft_layer(dict(ex2)))

다음은 특성 사양에 없는 특성을 삭제하고 레이블이 제공된 특성에 있는 경우 (features, label) 쌍을 반환하는 보다 강력한 버전입니다.

class Transform(tf.Module): def __init__(self, working_dir): self.working_dir = working_dir self.tf_transform_output = tft.TFTransformOutput(working_dir) self.tft_layer = tf_transform_output.transform_features_layer() @tf.function def __call__(self, features): raw_features = {} for key, val in features.items(): # Skip unused keys if key not in RAW_DATA_FEATURE_SPEC: continue raw_features[key] = val # Apply the `preprocessing_fn`. transformed_features = tft_layer(raw_features) if LABEL_KEY in transformed_features: # Pop the label and return a (features, labels) pair. data_labels = transformed_features.pop(LABEL_KEY) return (transformed_features, data_labels) else: return transformed_features
transform = Transform(output_dir)
t_ex, t_label = transform(ex)
pd.DataFrame(t_ex)

이제 Dataset.map을 사용하여 즉시 새 데이터에 해당 변환을 적용할 수 있습니다.

model.evaluate( read_csv(test_path, batch_size=5).map(transform), steps=EVALUATION_STEPS, return_dict=True )

모델 내보내기

따라서 훈련된 모델과 preporcessing_fn을 새 데이터에 적용하는 메서드가 준비되었습니다. 직렬화된 tf.train.Example proto를 입력으로 받아들이는 새 모델로 이를 결합합니다.

class ServingModel(tf.Module): def __init__(self, model, working_dir): self.model = model self.working_dir = working_dir self.transform = Transform(working_dir) @tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string)]) def __call__(self, serialized_tf_examples): # parse the tf.train.Example feature_spec = RAW_DATA_FEATURE_SPEC.copy() feature_spec.pop(LABEL_KEY) parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec) # Apply the `preprocessing_fn` transformed_features = self.transform(parsed_features) # Run the model outputs = self.model(transformed_features) # Format the output classes_names = tf.constant([['0', '1']]) classes = tf.tile(classes_names, [tf.shape(outputs)[0], 1]) return {'classes': classes, 'scores': outputs} def export(self, output_dir): # Increment the directory number. This is required in order to make this # model servable with model_server. save_model_dir = pathlib.Path(output_dir)/'model' number_dirs = [int(p.name) for p in save_model_dir.glob('*') if p.name.isdigit()] id = max([0] + number_dirs)+1 save_model_dir = save_model_dir/str(id) # Set the signature to make it visible for serving. concrete_serving_fn = self.__call__.get_concrete_function() signatures = {'serving_default': concrete_serving_fn} # Export the model. tf.saved_model.save( self, str(save_model_dir), signatures=signatures) return save_model_dir

모델을 빌드하고 직렬화된 예제 배치에서 테스트 실행합니다.

serving_model = ServingModel(model, output_dir) serving_model(serialized_example_batch)

모델을 SavedModel로 내보냅니다.

saved_model_dir = serving_model.export(output_dir) saved_model_dir

모델을 다시 로드하고 동일한 예제 배치에서 테스트합니다.

reloaded = tf.saved_model.load(str(saved_model_dir)) run_model = reloaded.signatures['serving_default']
run_model(serialized_example_batch)

##우리가 수행한 작업. 이 예제에서는 tf.Transform을 사용하여 인구 조사 데이터의 데이터세트를 전처리하고 정리 및 변환된 데이터로 모델을 훈련했습니다. 또한 추론을 수행하기 위해 운영 환경에 훈련된 모델을 배포할 때 사용할 수 있는 입력 함수를 만들었습니다. 훈련과 추론 모두에 동일한 코드를 사용함으로써 데이터 기울이기 문제를 피할 수 있습니다. 그 과정에서 데이터 정리에 필요한 변환을 수행하기 위해 Apache Beam 변환을 생성하는 방법을 배웠습니다. 또한 이 변환 데이터를 이용해 tf.keras로 모델을 훈련하는 방법도 보았습니다. 이것은 TensorFlow Transform으로 수행할 수 있는 작업의 일부일 뿐입니다! tf.Transform을 더욱 자세히 살펴보고 이것으로 무엇을 할 수 있는지 알아보기 바랍니다.

[선택 사항] 전처리된 데이터를 사용하여 tf.estimator로 모델 훈련하기

경고: Estimator는 새 코드에 권장되지 않습니다. Estimator는 v1.Session 스타일 코드를 실행하며, 이 코드는 올바르게 작성하기가 좀 더 어렵고 특히 TF 2 코드와 결합할 경우 예기치 않게 작동할 수 있습니다. Estimator는 호환성 보장이 적용되지만 보안 취약점 외에는 수정 사항이 제공되지 않습니다. 자세한 내용은 마이그레이션 가이드를 참조하세요.

###훈련을 위한 입력 함수 생성

def _make_training_input_fn(tf_transform_output, transformed_examples, batch_size): """Creates an input function reading from transformed data. Args: tf_transform_output: Wrapper around output of tf.Transform. transformed_examples: Base filename of examples. batch_size: Batch size. Returns: The input function for training or eval. """ def input_fn(): """Input function for training and eval.""" dataset = tf.data.experimental.make_batched_features_dataset( file_pattern=transformed_examples, batch_size=batch_size, features=tf_transform_output.transformed_feature_spec(), reader=tf.data.TFRecordDataset, shuffle=True) transformed_features = tf.compat.v1.data.make_one_shot_iterator( dataset).get_next() # Extract features and label from the transformed tensors. transformed_labels = tf.where( tf.equal(transformed_features.pop(LABEL_KEY), 1)) return transformed_features, transformed_labels[:,1] return input_fn

###제공을 위한 입력 함수 생성

프로덕션에서 사용할 수 있는 입력 함수를 만들고 훈련된 모델을 제공하기에 적합하게 준비하겠습니다.

def _make_serving_input_fn(tf_transform_output): """Creates an input function reading from raw data. Args: tf_transform_output: Wrapper around output of tf.Transform. Returns: The serving input function. """ raw_feature_spec = RAW_DATA_FEATURE_SPEC.copy() # Remove label since it is not available during serving. raw_feature_spec.pop(LABEL_KEY) def serving_input_fn(): """Input function for serving.""" # Get raw features by generating the basic serving input_fn and calling it. # Here we generate an input_fn that expects a parsed Example proto to be fed # to the model at serving time. See also # tf.estimator.export.build_raw_serving_input_receiver_fn. raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn( raw_feature_spec, default_batch_size=None) serving_input_receiver = raw_input_fn() # Apply the transform function that was used to generate the materialized # data. raw_features = serving_input_receiver.features transformed_features = tf_transform_output.transform_raw_features( raw_features) return tf.estimator.export.ServingInputReceiver( transformed_features, serving_input_receiver.receiver_tensors) return serving_input_fn

###FeatureColumns에 입력 데이터 래핑. 예제 모델은 TensorFlow FeatureColumns에 데이터를 예상합니다.

def get_feature_columns(tf_transform_output): """Returns the FeatureColumns for the model. Args: tf_transform_output: A `TFTransformOutput` object. Returns: A list of FeatureColumns. """ # Wrap scalars as real valued columns. real_valued_columns = [tf.feature_column.numeric_column(key, shape=()) for key in NUMERIC_FEATURE_KEYS] # Wrap categorical columns. one_hot_columns = [ tf.feature_column.indicator_column( tf.feature_column.categorical_column_with_identity( key=key, num_buckets=(NUM_OOV_BUCKETS + tf_transform_output.vocabulary_size_by_name( vocab_filename=key)))) for key in CATEGORICAL_FEATURE_KEYS] return real_valued_columns + one_hot_columns

###모델 학습, 평가 및 내보내기

def train_and_evaluate(working_dir, num_train_instances=NUM_TRAIN_INSTANCES, num_test_instances=NUM_TEST_INSTANCES): """Train the model on training data and evaluate on test data. Args: working_dir: Directory to read transformed data and metadata from and to write exported model to. num_train_instances: Number of instances in train set num_test_instances: Number of instances in test set Returns: The results from the estimator's 'evaluate' method """ tf_transform_output = tft.TFTransformOutput(working_dir) run_config = tf.estimator.RunConfig() estimator = tf.estimator.LinearClassifier( feature_columns=get_feature_columns(tf_transform_output), config=run_config, loss_reduction=tf.losses.Reduction.SUM) # Fit the model using the default optimizer. train_input_fn = _make_training_input_fn( tf_transform_output, os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE + '*'), batch_size=BATCH_SIZE) estimator.train( input_fn=train_input_fn, max_steps=TRAIN_NUM_EPOCHS * num_train_instances / BATCH_SIZE) # Evaluate model on test dataset. eval_input_fn = _make_training_input_fn( tf_transform_output, os.path.join(working_dir, TRANSFORMED_TEST_DATA_FILEBASE + '*'), batch_size=1) # Export the model. serving_input_fn = _make_serving_input_fn(tf_transform_output) exported_model_dir = os.path.join(working_dir, EXPORTED_MODEL_DIR) estimator.export_saved_model(exported_model_dir, serving_input_fn) return estimator.evaluate(input_fn=eval_input_fn, steps=num_test_instances)

###모두 결합하기. 인구 조사 데이터를 사전 처리하고, 모델을 훈련하고, 제공에 적합하게 준비하는 데 필요한 모든 요소들을 만들었습니다. 지금까지는 준비 작업만 했고, 이제 실행해볼 차례입니다!

참고: 전체 프로세스를 보려면 이 셀의 출력을 스크롤하세요. 결과는 맨 아래에 있습니다.

import tempfile temp = temp = os.path.join(tempfile.mkdtemp(),'estimator') transform_data(train_path, test_path, temp) results = train_and_evaluate(temp)
pprint.pprint(results)