Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ja/io/tutorials/bigquery.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

BigQuery TensorFlow リーダーのエンドツーエンドの例

概要

このチュートリアルでは、Keras シーケンス API を使用してニューラルネットワークをトレーニングするために BigQuery TensorFlow リーダーを使用する方法を説明します。

データセット

このチュートリアルでは、カリフォルニア大学アーバイン校の機械学習リポジトリが提供する米国国勢調査所得データセットを使用します。このデータセットには 1994 年の国税調査データベースに記録される市民の年齢、学歴、婚姻状況、職業、および年収が 50,000 ドルを超えるかという情報が含まれています。

セットアップ

GCP プロジェクトをセットアップします。

次の手順は、ノートブックの環境に関係なく必要な手順です。

  1. GCP プロジェクトを選択または作成します。

  2. プロジェクトの課金先が有効であることを確認します。

  3. Enable the BigQuery Storage API

  4. 以下のセルにプロジェクト ID を入力してセルを実行し、このノートブックのすべてのコマンドにおいて、Cloud SDK が正しいプロジェクトを使用することを確認します。

注意: Jupyter は、シェルコマンドとして接頭辞 ! のある行を実行し、接頭辞 $ のある Python 変数をこれらのコマンドに補間します。

必要なパッケージをインストールし、ランタイムを再起動します。

try: # Use the Colab's preinstalled TensorFlow 2.x %tensorflow_version 2.x except: pass
!pip install fastavro !pip install tensorflow-io==0.9.0
!pip install google-cloud-bigquery-storage

認証します。

from google.colab import auth auth.authenticate_user() print('Authenticated')

プロジェクト ID を設定します。

PROJECT_ID = "<YOUR PROJECT>" #@param {type:"string"} ! gcloud config set project $PROJECT_ID %env GCLOUD_PROJECT=$PROJECT_ID

Python ライブラリをインポートして、定数を定義します。

from __future__ import absolute_import, division, print_function, unicode_literals import os from six.moves import urllib import tempfile import numpy as np import pandas as pd import tensorflow as tf from google.cloud import bigquery from google.api_core.exceptions import GoogleAPIError LOCATION = 'us' # Storage directory DATA_DIR = os.path.join(tempfile.gettempdir(), 'census_data') # Download options. DATA_URL = 'https://storage.googleapis.com/cloud-samples-data/ml-engine/census/data' TRAINING_FILE = 'adult.data.csv' EVAL_FILE = 'adult.test.csv' TRAINING_URL = '%s/%s' % (DATA_URL, TRAINING_FILE) EVAL_URL = '%s/%s' % (DATA_URL, EVAL_FILE) DATASET_ID = 'census_dataset' TRAINING_TABLE_ID = 'census_training_table' EVAL_TABLE_ID = 'census_eval_table' CSV_SCHEMA = [ bigquery.SchemaField("age", "FLOAT64"), bigquery.SchemaField("workclass", "STRING"), bigquery.SchemaField("fnlwgt", "FLOAT64"), bigquery.SchemaField("education", "STRING"), bigquery.SchemaField("education_num", "FLOAT64"), bigquery.SchemaField("marital_status", "STRING"), bigquery.SchemaField("occupation", "STRING"), bigquery.SchemaField("relationship", "STRING"), bigquery.SchemaField("race", "STRING"), bigquery.SchemaField("gender", "STRING"), bigquery.SchemaField("capital_gain", "FLOAT64"), bigquery.SchemaField("capital_loss", "FLOAT64"), bigquery.SchemaField("hours_per_week", "FLOAT64"), bigquery.SchemaField("native_country", "STRING"), bigquery.SchemaField("income_bracket", "STRING"), ] UNUSED_COLUMNS = ["fnlwgt", "education_num"]

国税調査データを BigQuery にインポートする

データを BigQuery に読み込むヘルパーメソッドを定義します。

def create_bigquery_dataset_if_necessary(dataset_id): # Construct a full Dataset object to send to the API. client = bigquery.Client(project=PROJECT_ID) dataset = bigquery.Dataset(bigquery.dataset.DatasetReference(PROJECT_ID, dataset_id)) dataset.location = LOCATION try: dataset = client.create_dataset(dataset) # API request return True except GoogleAPIError as err: if err.code != 409: # http_client.CONFLICT raise return False
def load_data_into_bigquery(url, table_id): create_bigquery_dataset_if_necessary(DATASET_ID) client = bigquery.Client(project=PROJECT_ID) dataset_ref = client.dataset(DATASET_ID) table_ref = dataset_ref.table(table_id) job_config = bigquery.LoadJobConfig() job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE job_config.source_format = bigquery.SourceFormat.CSV job_config.schema = CSV_SCHEMA load_job = client.load_table_from_uri( url, table_ref, job_config=job_config ) print("Starting job {}".format(load_job.job_id)) load_job.result() # Waits for table load to complete. print("Job finished.") destination_table = client.get_table(table_ref) print("Loaded {} rows.".format(destination_table.num_rows))

国税調査データを BigQuery に読み込みます。

load_data_into_bigquery(TRAINING_URL, TRAINING_TABLE_ID) load_data_into_bigquery(EVAL_URL, EVAL_TABLE_ID)
Starting job 2ceffef8-e6e4-44bb-9e86-3d97b0501187 Job finished. Loaded 32561 rows. Starting job bf66f1b3-2506-408b-9009-c19f4ae9f58a Job finished. Loaded 16278 rows.

データがインポートされたことを確認します。

作業: をご利用の PROJECT_ID に置き換えます。

注意: --use_bqstorage_api は、BigQueryStorage API を使用してデータを取得し、それを使用する許可が与えられていることを確認するため、プロジェクトで有効化されていることを確認してください。https://cloud.google.com/bigquery/docs/reference/storage/#enabling_the_api

%%bigquery --use_bqstorage_api SELECT * FROM `<YOUR PROJECT>.census_dataset.census_training_table` LIMIT 5

##BigQuery リーダーを使用して、TensorFlow DataSet に国税調査データを読み込む

BigQuery から TensorFlow Dataset に国税調査データを読み出して変換します。

from tensorflow.python.framework import ops from tensorflow.python.framework import dtypes from tensorflow_io.bigquery import BigQueryClient from tensorflow_io.bigquery import BigQueryReadSession def transform_row(row_dict): # Trim all string tensors trimmed_dict = { column: (tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor) for (column,tensor) in row_dict.items() } # Extract feature column income_bracket = trimmed_dict.pop('income_bracket') # Convert feature column to 0.0/1.0 income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), lambda: tf.constant(1.0), lambda: tf.constant(0.0)) return (trimmed_dict, income_bracket_float) def read_bigquery(table_name): tensorflow_io_bigquery_client = BigQueryClient() read_session = tensorflow_io_bigquery_client.read_session( "projects/" + PROJECT_ID, PROJECT_ID, table_name, DATASET_ID, list(field.name for field in CSV_SCHEMA if not field.name in UNUSED_COLUMNS), list(dtypes.double if field.field_type == 'FLOAT64' else dtypes.string for field in CSV_SCHEMA if not field.name in UNUSED_COLUMNS), requested_streams=2) dataset = read_session.parallel_read_rows() transformed_ds = dataset.map(transform_row) return transformed_ds
BATCH_SIZE = 32 training_ds = read_bigquery(TRAINING_TABLE_ID).shuffle(10000).batch(BATCH_SIZE) eval_ds = read_bigquery(EVAL_TABLE_ID).batch(BATCH_SIZE)

##特徴量カラムを定義する

def get_categorical_feature_values(column): query = 'SELECT DISTINCT TRIM({}) FROM `{}`.{}.{}'.format(column, PROJECT_ID, DATASET_ID, TRAINING_TABLE_ID) client = bigquery.Client(project=PROJECT_ID) dataset_ref = client.dataset(DATASET_ID) job_config = bigquery.QueryJobConfig() query_job = client.query(query, job_config=job_config) result = query_job.to_dataframe() return result.values[:,0]
from tensorflow import feature_column feature_columns = [] # numeric cols for header in ['capital_gain', 'capital_loss', 'hours_per_week']: feature_columns.append(feature_column.numeric_column(header)) # categorical cols for header in ['workclass', 'marital_status', 'occupation', 'relationship', 'race', 'native_country', 'education']: categorical_feature = feature_column.categorical_column_with_vocabulary_list( header, get_categorical_feature_values(header)) categorical_feature_one_hot = feature_column.indicator_column(categorical_feature) feature_columns.append(categorical_feature_one_hot) # bucketized cols age = feature_column.numeric_column('age') age_buckets = feature_column.bucketized_column(age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65]) feature_columns.append(age_buckets) feature_layer = tf.keras.layers.DenseFeatures(feature_columns)

##モデルを構築してトレーニングする

モデルを構築します。

Dense = tf.keras.layers.Dense model = tf.keras.Sequential( [ feature_layer, Dense(100, activation=tf.nn.relu, kernel_initializer='uniform'), Dense(75, activation=tf.nn.relu), Dense(50, activation=tf.nn.relu), Dense(25, activation=tf.nn.relu), Dense(1, activation=tf.nn.sigmoid) ]) # Compile Keras model model.compile( loss='binary_crossentropy', metrics=['accuracy'])

モデルをトレーニングします。

model.fit(training_ds, epochs=5)
WARNING:tensorflow:Layer sequential is casting an input tensor from dtype float64 to the layer's dtype of float32, which is new behavior in TensorFlow 2. The layer has dtype float32 because it's dtype defaults to floatx. If you intended to run this layer in float32, you can safely ignore this warning. If in doubt, this warning is likely only an issue if you are porting a TensorFlow 1.X model to TensorFlow 2. To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor. WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/feature_column/feature_column_v2.py:4276: IndicatorColumn._variable_shape (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version. Instructions for updating: The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead. WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/feature_column/feature_column_v2.py:4331: VocabularyListCategoricalColumn._num_buckets (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version. Instructions for updating: The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead. Epoch 1/5 1018/1018 [==============================] - 17s 17ms/step - loss: 0.5985 - accuracy: 0.8105 Epoch 2/5 1018/1018 [==============================] - 10s 10ms/step - loss: 0.3670 - accuracy: 0.8324 Epoch 3/5 1018/1018 [==============================] - 11s 10ms/step - loss: 0.3487 - accuracy: 0.8393 Epoch 4/5 1018/1018 [==============================] - 11s 10ms/step - loss: 0.3398 - accuracy: 0.8435 Epoch 5/5 1018/1018 [==============================] - 11s 11ms/step - loss: 0.3377 - accuracy: 0.8455
<tensorflow.python.keras.callbacks.History at 0x7f978f5b91d0>

##モデルを評価する

モデルを評価します。

loss, accuracy = model.evaluate(eval_ds) print("Accuracy", accuracy)
509/509 [==============================] - 8s 15ms/step - loss: 0.3338 - accuracy: 0.8398 Accuracy 0.8398452

ランダムなサンプルをいくつか評価します。

sample_x = { 'age' : np.array([56, 36]), 'workclass': np.array(['Local-gov', 'Private']), 'education': np.array(['Bachelors', 'Bachelors']), 'marital_status': np.array(['Married-civ-spouse', 'Married-civ-spouse']), 'occupation': np.array(['Tech-support', 'Other-service']), 'relationship': np.array(['Husband', 'Husband']), 'race': np.array(['White', 'Black']), 'gender': np.array(['Male', 'Male']), 'capital_gain': np.array([0, 7298]), 'capital_loss': np.array([0, 0]), 'hours_per_week': np.array([40, 36]), 'native_country': np.array(['United-States', 'United-States']) } model.predict(sample_x)
array([[0.5541261], [0.6209938]], dtype=float32)