Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/zh-cn/io/tutorials/bigquery.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

BigQuery TensorFlow 阅读器端到端示例

概述

本教程介绍如何使用 BigQuery TensorFlow 阅读器训练使用 Keras 序列式 API 的神经网络。

数据集

本教程使用 UC Irvine Machine Learning Repository 提供的美国人口普查收入数据集。该数据集包含 1994 年人口普查数据库中的人口信息,包括年龄、教育状况、婚姻状况、职业,以及年收入是否超过 5 万美元。

设置

设置 GCP 项目

无论您的笔记本环境如何,您都必须执行以下步骤。

  1. 选择或创建 GCP 项目。

  2. 确保为您的项目启用了结算功能。

  3. 启用 BigQuery Storage API

  4. 在下面的代码单元中输入您的项目 ID。然后运行该代码单元,以确保 Cloud SDK 为此笔记本中的所有命令使用正确的项目。

注:Jupyter 以 Shell 命令形式运行前缀为 ! 的代码行,并将前缀为 $ 的 Python 变量插入这些命令。

安装要求的软件包,然后重新启动运行时

try: # Use the Colab's preinstalled TensorFlow 2.x %tensorflow_version 2.x except: pass
!pip install fastavro !pip install tensorflow-io==0.9.0
!pip install google-cloud-bigquery-storage

身份验证

from google.colab import auth auth.authenticate_user() print('Authenticated')

设置项目 ID

PROJECT_ID = "<YOUR PROJECT>" #@param {type:"string"} ! gcloud config set project $PROJECT_ID %env GCLOUD_PROJECT=$PROJECT_ID

导入 Python 库,定义常量

from __future__ import absolute_import, division, print_function, unicode_literals import os from six.moves import urllib import tempfile import numpy as np import pandas as pd import tensorflow as tf from google.cloud import bigquery from google.api_core.exceptions import GoogleAPIError LOCATION = 'us' # Storage directory DATA_DIR = os.path.join(tempfile.gettempdir(), 'census_data') # Download options. DATA_URL = 'https://storage.googleapis.com/cloud-samples-data/ml-engine/census/data' TRAINING_FILE = 'adult.data.csv' EVAL_FILE = 'adult.test.csv' TRAINING_URL = '%s/%s' % (DATA_URL, TRAINING_FILE) EVAL_URL = '%s/%s' % (DATA_URL, EVAL_FILE) DATASET_ID = 'census_dataset' TRAINING_TABLE_ID = 'census_training_table' EVAL_TABLE_ID = 'census_eval_table' CSV_SCHEMA = [ bigquery.SchemaField("age", "FLOAT64"), bigquery.SchemaField("workclass", "STRING"), bigquery.SchemaField("fnlwgt", "FLOAT64"), bigquery.SchemaField("education", "STRING"), bigquery.SchemaField("education_num", "FLOAT64"), bigquery.SchemaField("marital_status", "STRING"), bigquery.SchemaField("occupation", "STRING"), bigquery.SchemaField("relationship", "STRING"), bigquery.SchemaField("race", "STRING"), bigquery.SchemaField("gender", "STRING"), bigquery.SchemaField("capital_gain", "FLOAT64"), bigquery.SchemaField("capital_loss", "FLOAT64"), bigquery.SchemaField("hours_per_week", "FLOAT64"), bigquery.SchemaField("native_country", "STRING"), bigquery.SchemaField("income_bracket", "STRING"), ] UNUSED_COLUMNS = ["fnlwgt", "education_num"]

将人口普查数据导入 BigQuery

定义将数据加载到 BigQuery 的辅助方法

def create_bigquery_dataset_if_necessary(dataset_id): # Construct a full Dataset object to send to the API. client = bigquery.Client(project=PROJECT_ID) dataset = bigquery.Dataset(bigquery.dataset.DatasetReference(PROJECT_ID, dataset_id)) dataset.location = LOCATION try: dataset = client.create_dataset(dataset) # API request return True except GoogleAPIError as err: if err.code != 409: # http_client.CONFLICT raise return False
def load_data_into_bigquery(url, table_id): create_bigquery_dataset_if_necessary(DATASET_ID) client = bigquery.Client(project=PROJECT_ID) dataset_ref = client.dataset(DATASET_ID) table_ref = dataset_ref.table(table_id) job_config = bigquery.LoadJobConfig() job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE job_config.source_format = bigquery.SourceFormat.CSV job_config.schema = CSV_SCHEMA load_job = client.load_table_from_uri( url, table_ref, job_config=job_config ) print("Starting job {}".format(load_job.job_id)) load_job.result() # Waits for table load to complete. print("Job finished.") destination_table = client.get_table(table_ref) print("Loaded {} rows.".format(destination_table.num_rows))

在 BigQuery 中加载人口普查数据。

load_data_into_bigquery(TRAINING_URL, TRAINING_TABLE_ID) load_data_into_bigquery(EVAL_URL, EVAL_TABLE_ID)
Starting job 2ceffef8-e6e4-44bb-9e86-3d97b0501187 Job finished. Loaded 32561 rows. Starting job bf66f1b3-2506-408b-9009-c19f4ae9f58a Job finished. Loaded 16278 rows.

确认数据已导入

待办事项:将 <YOUR PROJECT> 替换为您的 PROJECT_ID

注:--use_bqstorage_api 将使用 BigQueryStorage API 获取数据,并确保让您获得使用该 API 的授权。请确保为您的项目启用该 API:https://cloud.google.com/bigquery/docs/reference/storage/#enabling_the_api

%%bigquery --use_bqstorage_api SELECT * FROM `<YOUR PROJECT>.census_dataset.census_training_table` LIMIT 5

##使用 BigQuery 阅读器在 TensorFlow 数据集中加载人口普查数据

从 BigQuery 读取人口普查数据并将其转换为 TensorFlow 数据集

from tensorflow.python.framework import ops from tensorflow.python.framework import dtypes from tensorflow_io.bigquery import BigQueryClient from tensorflow_io.bigquery import BigQueryReadSession def transform_row(row_dict): # Trim all string tensors trimmed_dict = { column: (tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor) for (column,tensor) in row_dict.items() } # Extract feature column income_bracket = trimmed_dict.pop('income_bracket') # Convert feature column to 0.0/1.0 income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), lambda: tf.constant(1.0), lambda: tf.constant(0.0)) return (trimmed_dict, income_bracket_float) def read_bigquery(table_name): tensorflow_io_bigquery_client = BigQueryClient() read_session = tensorflow_io_bigquery_client.read_session( "projects/" + PROJECT_ID, PROJECT_ID, table_name, DATASET_ID, list(field.name for field in CSV_SCHEMA if not field.name in UNUSED_COLUMNS), list(dtypes.double if field.field_type == 'FLOAT64' else dtypes.string for field in CSV_SCHEMA if not field.name in UNUSED_COLUMNS), requested_streams=2) dataset = read_session.parallel_read_rows() transformed_ds = dataset.map(transform_row) return transformed_ds
BATCH_SIZE = 32 training_ds = read_bigquery(TRAINING_TABLE_ID).shuffle(10000).batch(BATCH_SIZE) eval_ds = read_bigquery(EVAL_TABLE_ID).batch(BATCH_SIZE)

##定义特征列

def get_categorical_feature_values(column): query = 'SELECT DISTINCT TRIM({}) FROM `{}`.{}.{}'.format(column, PROJECT_ID, DATASET_ID, TRAINING_TABLE_ID) client = bigquery.Client(project=PROJECT_ID) dataset_ref = client.dataset(DATASET_ID) job_config = bigquery.QueryJobConfig() query_job = client.query(query, job_config=job_config) result = query_job.to_dataframe() return result.values[:,0]
from tensorflow import feature_column feature_columns = [] # numeric cols for header in ['capital_gain', 'capital_loss', 'hours_per_week']: feature_columns.append(feature_column.numeric_column(header)) # categorical cols for header in ['workclass', 'marital_status', 'occupation', 'relationship', 'race', 'native_country', 'education']: categorical_feature = feature_column.categorical_column_with_vocabulary_list( header, get_categorical_feature_values(header)) categorical_feature_one_hot = feature_column.indicator_column(categorical_feature) feature_columns.append(categorical_feature_one_hot) # bucketized cols age = feature_column.numeric_column('age') age_buckets = feature_column.bucketized_column(age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65]) feature_columns.append(age_buckets) feature_layer = tf.keras.layers.DenseFeatures(feature_columns)

##构建并训练模型

构建模型

Dense = tf.keras.layers.Dense model = tf.keras.Sequential( [ feature_layer, Dense(100, activation=tf.nn.relu, kernel_initializer='uniform'), Dense(75, activation=tf.nn.relu), Dense(50, activation=tf.nn.relu), Dense(25, activation=tf.nn.relu), Dense(1, activation=tf.nn.sigmoid) ]) # Compile Keras model model.compile( loss='binary_crossentropy', metrics=['accuracy'])

训练模型

model.fit(training_ds, epochs=5)
WARNING:tensorflow:Layer sequential is casting an input tensor from dtype float64 to the layer's dtype of float32, which is new behavior in TensorFlow 2. The layer has dtype float32 because it's dtype defaults to floatx. If you intended to run this layer in float32, you can safely ignore this warning. If in doubt, this warning is likely only an issue if you are porting a TensorFlow 1.X model to TensorFlow 2. To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor. WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/feature_column/feature_column_v2.py:4276: IndicatorColumn._variable_shape (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version. Instructions for updating: The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead. WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/feature_column/feature_column_v2.py:4331: VocabularyListCategoricalColumn._num_buckets (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version. Instructions for updating: The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead. Epoch 1/5 1018/1018 [==============================] - 17s 17ms/step - loss: 0.5985 - accuracy: 0.8105 Epoch 2/5 1018/1018 [==============================] - 10s 10ms/step - loss: 0.3670 - accuracy: 0.8324 Epoch 3/5 1018/1018 [==============================] - 11s 10ms/step - loss: 0.3487 - accuracy: 0.8393 Epoch 4/5 1018/1018 [==============================] - 11s 10ms/step - loss: 0.3398 - accuracy: 0.8435 Epoch 5/5 1018/1018 [==============================] - 11s 11ms/step - loss: 0.3377 - accuracy: 0.8455
<tensorflow.python.keras.callbacks.History at 0x7f978f5b91d0>

##评估模型

评估模型

loss, accuracy = model.evaluate(eval_ds) print("Accuracy", accuracy)
509/509 [==============================] - 8s 15ms/step - loss: 0.3338 - accuracy: 0.8398 Accuracy 0.8398452

评估几个随机样本

sample_x = { 'age' : np.array([56, 36]), 'workclass': np.array(['Local-gov', 'Private']), 'education': np.array(['Bachelors', 'Bachelors']), 'marital_status': np.array(['Married-civ-spouse', 'Married-civ-spouse']), 'occupation': np.array(['Tech-support', 'Other-service']), 'relationship': np.array(['Husband', 'Husband']), 'race': np.array(['White', 'Black']), 'gender': np.array(['Male', 'Male']), 'capital_gain': np.array([0, 7298]), 'capital_loss': np.array([0, 0]), 'hours_per_week': np.array([40, 36]), 'native_country': np.array(['United-States', 'United-States']) } model.predict(sample_x)
array([[0.5541261], [0.6209938]], dtype=float32)