Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/zh-cn/guide/tpu.ipynb
25115 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

本指南演示了如何在张量处理单元 (TPU) 和 TPU Pod 上执行基本训练,TPU Pod 是一组通过专用高速网络接口连接的 TPU 设备,带有 tf.keras 和自定义训练循环。

TPU 是 Google 定制开发的专用集成电路 (ASIC),用于加速机器学习工作负载。它们可通过 Google ColabTPU Research CloudCloud TPU 获得。

安装

在运行此 Colab 笔记本之前,请在以下路径下检查笔记本设置,确保硬件加速器为 TPU:Runtime > Change runtime type > Hardware accelerator > TPU

导入一些必要的库,包括 TensorFlow Datasets:

import tensorflow as tf import os import tensorflow_datasets as tfds

TPU 初始化

与运行用户 Python 程序的本地流程不同,TPU 通常在 Cloud TPU 工作进程上。因此,需要完成一些初始化工作才能连接到远程集群并初始化 TPU。请注意,tf.distribute.cluster_resolver.TPUClusterResolvertpu 参数是一个仅适用于 Colab 的特殊地址。如果在 Google Compute Engine (GCE) 上运行,应改为传入 Cloud TPU 的名称。

注:必须将 TPU 初始化代码放在程序的开头位置。

resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='') tf.config.experimental_connect_to_cluster(resolver) # This is the TPU initialization code that has to be at the beginning. tf.tpu.experimental.initialize_tpu_system(resolver) print("All devices: ", tf.config.list_logical_devices('TPU'))

手动设备放置

初始化 TPU 后,您可以通过手动设备放置将计算放置在单个 TPU 设备上:

a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]) b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]) with tf.device('/TPU:0'): c = tf.matmul(a, b) print("c device: ", c.device) print(c)

分布策略

通常,您可以在多个 TPU 上以数据并行的方式运行模型。为了在多个 TPU(以及多个 GPU 或多台机器)上分布模型,TensorFlow 提供了 tf.distribute.Strategy API。您可以更换分布策略,该模型将在任何给定的 (TPU) 设备上运行。在使用 TensorFlow 进行分布式训练指南中了解详情。

使用 tf.distribute.TPUStrategy 选项实现同步分布式训练。TPU 会在多个 TPU 核心之间实现高效的全归约和其他集合运算,并将其用于 TPUStrategy

要演示这一点,请创建一个 tf.distribute.TPUStrategy 对象:

strategy = tf.distribute.TPUStrategy(resolver)

要复制计算,以便在所有 TPU 核心中运行,可以直接将其传入 strategy.run API。在下面的示例中,所有核心都会获得相同的输入 (a, b),并单独在每个核心上执行矩阵乘法运算。输出是所有副本的值。

@tf.function def matmul_fn(x, y): z = tf.matmul(x, y) return z z = strategy.run(matmul_fn, args=(a, b)) print(z)

TPU 上的分类

我们已经学习了基本概念,现在来看看具体示例。本部分会演示如何使用分布策略 tf.distribute.experimental.TPUStrategy 在 Cloud TPU 上训练 Keras 模型。

定义 Keras 模型

首先定义 Sequential Keras 模型,对 MNIST 数据集进行图像分类。这与您在 CPU 或 GPU 上进行训练时使用的定义相同。请注意,Keras 模型创建需要位于 Strategy.scope 内,这样才能在每个 TPU 设备上创建变量。代码的其他部分不必放在 Strategy 作用域内。

def create_model(): regularizer = tf.keras.regularizers.L2(1e-5) return tf.keras.Sequential( [tf.keras.layers.Conv2D(256, 3, input_shape=(28, 28, 1), activation='relu', kernel_regularizer=regularizer), tf.keras.layers.Conv2D(256, 3, activation='relu', kernel_regularizer=regularizer), tf.keras.layers.Flatten(), tf.keras.layers.Dense(256, activation='relu', kernel_regularizer=regularizer), tf.keras.layers.Dense(128, activation='relu', kernel_regularizer=regularizer), tf.keras.layers.Dense(10, kernel_regularizer=regularizer)])

此模型将 L2 正则化项放在每层的权重上,以便下面的自定义训练循环可以显示如何从 Model.losses 中选取它们。

加载数据集

使用 Cloud TPU 时,有效使用 tf.data.Dataset API 很关键。有关数据集性能的详细信息,请参阅输入流水线性能指南

如果使用的是 TPU Nodes,则需要将 TensorFlow Dataset 读取的所有数据文件存储在 Google Cloud Storage (GCS) 存储分区中。如果使用的是 TPU VM,则可以将数据存储在任意位置。有关 TPU Nodes 和 TPU VM 的更多信息,请参阅 TPU 系统架构文档。

对于大多数用例,建议将数据转换为 TFRecord 格式,并使用 tf.data.TFRecordDataset 进行读取。有关操作方法的详细信息,请参阅 TFRecord 和 tf.Example 教程。不过,这并非硬性要求,如果愿意,您可以使用其他数据集读取器,如 tf.data.FixedLengthRecordDatasettf.data.TextLineDataset

您可以使用 tf.data.Dataset.cache 将整个小数据集加载到内存中。

无论使用哪一种数据格式,我们都强烈建议使用大文件(100MB 左右)。在这种网络化环境下,这一点尤其重要,因为打开文件的开销非常高。

如下面的代码所示,您应使用 Tensorflow Datasets tfds.load 模块获取 MNIST 训练和测试数据的副本。请注意,代码中已指定 try_gcs 来使用公共 GCS 存储分区中提供的副本。如果不这样指定,TPU 将无法访问下载的数据。

def get_dataset(batch_size, is_training=True): split = 'train' if is_training else 'test' dataset, info = tfds.load(name='mnist', split=split, with_info=True, as_supervised=True, try_gcs=True) # Normalize the input data. def scale(image, label): image = tf.cast(image, tf.float32) image /= 255.0 return image, label dataset = dataset.map(scale) # Only shuffle and repeat the dataset in training. The advantage of having an # infinite dataset for training is to avoid the potential last partial batch # in each epoch, so that you don't need to think about scaling the gradients # based on the actual batch size. if is_training: dataset = dataset.shuffle(10000) dataset = dataset.repeat() dataset = dataset.batch(batch_size) return dataset

使用 Keras 高级 API 训练模型

可以使用 Keras Model.fitModel.compile API 训练模型。在此步骤中没有特定于 TPU 的内容,可以像使用多个 GPU 和 MirroredStrategy 而不是 TPUStrategy 一样编写代码。可以在使用 Keras 进行分布式训练教程中了解详情。

with strategy.scope(): model = create_model() model.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['sparse_categorical_accuracy']) batch_size = 200 steps_per_epoch = 60000 // batch_size validation_steps = 10000 // batch_size train_dataset = get_dataset(batch_size, is_training=True) test_dataset = get_dataset(batch_size, is_training=False) model.fit(train_dataset, epochs=5, steps_per_epoch=steps_per_epoch, validation_data=test_dataset, validation_steps=validation_steps)

为了减少 Python 开销,同时最大限度提升 TPU 的性能,请将 steps_per_execution 参数传入 Keras Model.compile。在本例中,它可以将吞吐量提升约 50%:

with strategy.scope(): model = create_model() model.compile(optimizer='adam', # Anything between 2 and `steps_per_epoch` could help here. steps_per_execution = 50, loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['sparse_categorical_accuracy']) model.fit(train_dataset, epochs=5, steps_per_epoch=steps_per_epoch, validation_data=test_dataset, validation_steps=validation_steps)

使用自定义训练循环训练模型

还可以直接使用 tf.functiontf.distribute API 创建和训练模型。可以使用 Strategy.experimental_distribute_datasets_from_function API 通过给定的数据集函数分布 tf.data.Dataset。请注意,在下面的示例中,传递给 Dataset 的批次大小是每个副本的批次大小,而非全局批次大小。要了解详情,请查阅使用 tf.distribute.Strategy 进行自定义训练教程。

首先,创建模型、数据集和 tf.function

# Create the model, optimizer and metrics inside the `tf.distribute.Strategy` # scope, so that the variables can be mirrored on each device. with strategy.scope(): model = create_model() optimizer = tf.keras.optimizers.Adam() training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32) training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy( 'training_accuracy', dtype=tf.float32) # Calculate per replica batch size, and distribute the `tf.data.Dataset`s # on each TPU worker. per_replica_batch_size = batch_size // strategy.num_replicas_in_sync train_dataset = strategy.experimental_distribute_datasets_from_function( lambda _: get_dataset(per_replica_batch_size, is_training=True)) @tf.function def train_step(iterator): """The step function for one training step.""" def step_fn(inputs): """The computation to run on each TPU device.""" images, labels = inputs with tf.GradientTape() as tape: logits = model(images, training=True) per_example_loss = tf.keras.losses.sparse_categorical_crossentropy( labels, logits, from_logits=True) loss = tf.nn.compute_average_loss(per_example_loss) model_losses = model.losses if model_losses: loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses)) grads = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(list(zip(grads, model.trainable_variables))) training_loss.update_state(loss * strategy.num_replicas_in_sync) training_accuracy.update_state(labels, logits) strategy.run(step_fn, args=(next(iterator),))

然后,运行训练循环:

steps_per_eval = 10000 // batch_size train_iterator = iter(train_dataset) for epoch in range(5): print('Epoch: {}/5'.format(epoch)) for step in range(steps_per_epoch): train_step(train_iterator) print('Current step: {}, training loss: {}, training accuracy: {}%'.format( optimizer.iterations.numpy(), round(float(training_loss.result()), 4), round(float(training_accuracy.result()) * 100, 2))) training_loss.reset_states() training_accuracy.reset_states()

tf.function 中利用多步法提升性能

您可以通过在 tf.function. 中运行多步以提升性能。在 tf.function 内使用 tf.range 包装 Strategy.run 调用即可实现此目的,在 TPU 工作进程上,AutoGraph 会将其转换为 tf.while_loop。可以在使用 tf.function 升性能指南中详细了解 tf.function

tf.function 中,虽然多步法的性能更高,但是与单步法相比,可谓各有利弊。在 tf.function 中运行多个步骤不够灵活,您无法以 Eager 方式运行,也不能运行任意 Python 代码。

@tf.function def train_multiple_steps(iterator, steps): """The step function for one training step.""" def step_fn(inputs): """The computation to run on each TPU device.""" images, labels = inputs with tf.GradientTape() as tape: logits = model(images, training=True) per_example_loss = tf.keras.losses.sparse_categorical_crossentropy( labels, logits, from_logits=True) loss = tf.nn.compute_average_loss(per_example_loss) model_losses = model.losses if model_losses: loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses)) grads = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(list(zip(grads, model.trainable_variables))) training_loss.update_state(loss * strategy.num_replicas_in_sync) training_accuracy.update_state(labels, logits) for _ in tf.range(steps): strategy.run(step_fn, args=(next(iterator),)) # Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get # retraced if the value changes. train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch)) print('Current step: {}, training loss: {}, training accuracy: {}%'.format( optimizer.iterations.numpy(), round(float(training_loss.result()), 4), round(float(training_accuracy.result()) * 100, 2)))

后续步骤

要详细了解 Cloud TPU 以及如何使用它们,请查看以下资源:

  • Google Cloud TPU:Google Cloud TPU 首页。

  • Google Cloud TPU 文档:Google Cloud TPU 文档,其中包括:

  • Google Cloud TPU Colab 笔记本:端到端训练示例。

  • Google Cloud TPU 性能指南:通过为应用调整 Cloud TPU 配置参数来进一步增强 Cloud TPU 性能。

  • Distributed training with TensorFlow: How to use distribution strategies—including tf.distribute.TPUStrategy—with examples showing best practices.

  • TPU 嵌入向量:TensorFlow 包括通过 tf.tpu.experimental.embedding 在 TPU 上训练嵌入向量的专门支持。此外,TensorFlow Recommenders 还具有 tfrs.layers.embedding.TPUEmbedding。嵌入向量提供高效和密集的表示,捕捉特征之间的复杂相似度和关系。TensorFlow 的 TPU 特定嵌入向量支持允许您训练大于单个 TPU 设备内存的嵌入向量,并在 TPU 上使用稀疏和不规则输入。

  • TPU Research Cloud (TRC):TRC 让研究人员能够申请访问由超过 1,000 个 Cloud TPU 设备组成的集群。