Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/zh-cn/datasets/beam_datasets.md
25115 views

使用 Apache Beam 生成大型数据集

有些数据集因数据量过大而无法在一台计算机上进行处理。tfds 支持使用 Apache Beam 在多台计算机上生成数据。

本文档分为两部分:

  • 面向想要生成现有 Beam 数据集的用户

  • 面向想要创建新的 Beam 数据集的开发者

生成 Beam 数据集

以下是在云端或在本地生成 Beam 数据集的不同示例。

警告:使用 tfds build CLI 生成数据集时,请确保指定要生成的数据集配置,否则将默认生成所有现有配置。例如,对于 wikipedia,请使用 tfds build wikipedia/20200301.en 而非 tfds build wikipedia

在 Google Cloud Dataflow 上生成

要使用 Google Cloud Dataflow 运行流水线并利用分布式计算的优势,请首先遵循快速入门说明

设置好环境后,您可以使用 GCS 上的数据目录并为 --beam_pipeline_options 标志指定所需的选项来运行 tfds build CLI

为了便于启动脚本,建议您使用自己的 GCP/GCS 设置和您要生成的数据集的实际值来定义以下变量:

DATASET_NAME=<dataset-name> DATASET_CONFIG=<dataset-config> GCP_PROJECT=my-project-id GCS_BUCKET=gs://my-gcs-bucket

然后,您需要创建文件来告知 Dataflow 在工作进程上安装 tfds

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

如果您使用的是 tfds-nightly,并且自上次发布以来数据集进行了更新,请确保从 tfds-nightly 回送数据。

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

最后,您可以使用以下命令启动作业:

tfds build $DATASET_NAME/$DATASET_CONFIG \ --data_dir=$GCS_BUCKET/tensorflow_datasets \ --beam_pipeline_options=\ "runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\ "staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\ "requirements_file=/tmp/beam_requirements.txt"

在本地生成

要使用默认的 Apache Beam 运行程序在本地运行您的脚本(它必须将所有数据装入内存),该命令与其他数据集相同:

tfds build my_dataset

警告:Beam 数据集可能非常庞大(数 TB 或更大),并且生成数据集会占用大量资源(在本地计算机上可能需要数周)。建议使用分布式环境生成数据集。请参阅 Apache Beam 文档以查看受支持的运行时列表。

要使用 Apache Flink 运行流水线,您可以阅读官方文档。确保您的 Beam 符合 Flink 版本兼容性要求

为了便于启动脚本,建议您使用自己的 Flink 设置和您要生成的数据集的实际值来定义以下变量:

DATASET_NAME=<dataset-name> DATASET_CONFIG=<dataset-config> FLINK_CONFIG_DIR=<flink-config-directory> FLINK_VERSION=<flink-version>

要在嵌入式 Flink 集群上运行,您可以使用以下命令启动作业:

tfds build $DATASET_NAME/$DATASET_CONFIG \ --beam_pipeline_options=\ "runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

使用自定义脚本生成

要在 Beam 上生成数据集,API 与其他数据集相同。您可以使用 DownloadConfigbeam_options(和 beam_runner)参数自定义 beam.Pipeline

# If you are running on Dataflow, Spark,..., you may have to set-up runtime # flags. Otherwise, you can leave flags empty []. flags = ['--runner=DataflowRunner', '--project=<project-name>', ...] # `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline` dl_config = tfds.download.DownloadConfig( beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags) ) data_dir = 'gs://my-gcs-bucket/tensorflow_datasets' builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir) builder.download_and_prepare(download_config=dl_config)

实现 Beam 数据集

前提条件

为了编写 Apache Beam 数据集,您应该熟悉以下概念:

说明

如果您熟悉数据集创建指南,则仅需修改 _generate_examples 函数即可添加 Beam 数据集。此函数应当返回一个 Beam 对象,而不是一个生成器:

非 Beam 数据集:

def _generate_examples(self, path): for f in path.iterdir(): yield _process_example(f)

Beam 数据集:

def _generate_examples(self, path): return ( beam.Create(path.iterdir()) | beam.Map(_process_example) )

其余所有内容都可以完全相同,包括测试。

其他注意事项:

  • 使用 tfds.core.lazy_imports 导入 Apache Beam。通过使用惰性依赖关系,用户在数据集生成后仍可以读取数据集,而不必安装 Beam。

  • 小心使用 Python 闭包。运行流水线时,beam.Mapbeam.DoFn 函数使用 pickle 序列化并发送给所有工作进程。如果必须在工作进程之间共享状态,请不要在 beam.PTransform 内使用可变对象。

  • 由于 tfds.core.DatasetBuilder 使用 pickle 序列化的方式,在数据创建期间改变 tfds.core.DatasetBuilder 将在工作进程上被忽略(例如,无法在 _split_generators 中设置 self.info.metadata['offset'] = 123 并从 beam.Map(lambda x: x + self.info.metadata['offset']) 之类的工作进程访问它)

  • 如果您需要在拆分之间共享一些流水线步骤,则可以将一个额外的 pipeline: beam.Pipeline kwarg 添加到 _split_generator 并控制完整的生成流水线。请参阅 tfds.core.GeneratorBasedBuilder_generate_examples 文档。

示例

下面是一个 Beam 数据集的示例。

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder): VERSION = tfds.core.Version('1.0.0') def _info(self): return self.dataset_info_from_configs( features=tfds.features.FeaturesDict({ 'image': tfds.features.Image(shape=(16, 16, 1)), 'label': tfds.features.ClassLabel(names=['dog', 'cat']), }), ) def _split_generators(self, dl_manager): ... return { 'train': self._generate_examples(file_dir='path/to/train_data/'), 'test': self._generate_examples(file_dir='path/to/test_data/'), } def _generate_examples(self, file_dir: str): """Generate examples as dicts.""" beam = tfds.core.lazy_imports.apache_beam def _process_example(filename): # Use filename as key return filename, { 'image': os.path.join(file_dir, filename), 'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg" } return ( beam.Create(tf.io.gfile.listdir(file_dir)) | beam.Map(_process_example) )

运行您的流水线

要运行流水线,请参阅上文。

:与非 Beam 数据集一样,不要忘记使用 --register_checksums 注册下载校验和(仅在第一次注册下载时)。

tfds build my_dataset --register_checksums

使用 TFDS 作为输入的流水线

如果要创建以 TFDS 数据集为源的 Beam 流水线,则可以使用 tfds.beam.ReadFromTFDS

builder = tfds.builder('my_dataset') _ = ( pipeline | tfds.beam.ReadFromTFDS(builder, split='train') | beam.Map(tfds.as_numpy) | ... )

它将并行处理数据集的每个分片。

注:这需要已经生成数据集。要使用 Beam 生成数据集,请参阅其他部分。