Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ja/datasets/beam_datasets.md
25115 views

Apache Beam でビッグデータセットを生成する

データセットによっては、1 台のマシンで処理するには大きすぎるものがあります。tfdsは、Apache Beam を使用することによって、多くのマシンにまたがったデータ生成のサポートをします。

このドキュメントには、2 つのセクションがあります。

  • 既存の Beam のデータセットを生成するユーザー向け

  • 新規の Beam のデータセットを作成する開発者向け

Beam のデータセットを生成する

クラウドまたはローカルで Beam のデータセットを生成するさまざまな例を以下に紹介します。

警告: tfds build CLI でデータセットを生成する際には、生成するデータセットの構成を必ず指定してください。指定しない場合、すべての既存の構成が生成されてしまいます。たとえばウィキペディアの場合は、tfds build wikipedia の代わりに tfds build wikipedia/20200301.en を使用します。

Google Cloud Dataflow で生成する

Google Cloud Dataflow を使用してパイプラインを実行し、分散計算を活用するには、まずクイックスタートの手順に従います。

環境をセットアップしたら、tfds build CLI を実行できます。これには、GCS のデータディレクトリを使用し、必要なオプション--beam_pipeline_options フラグに指定します。

スクリプトの起動を容易にするためには、GCP/GCS セットアップの実際の値と生成するデータセットを使用して、以下の変数を定義すると便利です。

DATASET_NAME=<dataset-name> DATASET_CONFIG=<dataset-config> GCP_PROJECT=my-project-id GCS_BUCKET=gs://my-gcs-bucket

次に、ワーカーに tfds をインストールするよう Dataflow に指示をするファイルを作成する必要があります。

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

tfds-nightly を使用している場合には、データセットが前回のリリースから更新されている場合に備え、tfds-nightly からエコーするようにします。

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

最後に、以下のコマンドを使用してジョブを起動します。

python -m tensorflow_datasets.scripts.download_and_prepare \ --datasets=$DATASET_NAME/$DATASET_CONFIG \ --data_dir=$GCS_BUCKET/tensorflow_datasets \ --beam_pipeline_options=\ "runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\ "staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\ "requirements_file=/tmp/beam_requirements.txt"

ローカルで生成する

デフォルトの Apache Beam ランナーを使用してローカルでスクリプトを実行する場合(すべてのデータがメモリに収まる必要があります)のコマンドは、他のデータセットと同じです。

tfds build my_dataset

警告: Beam のデータセットは巨大な(テラバイト以上)場合があり、生成には相当量のリソースを必要とします(ローカルコンピュータでは数週間かかることもあります)。データセットの生成には分散環境の使用を推奨しています。サポートされているランタイムのリストについては Apache Beam ドキュメントを参照してください。

Apache Flink を使用してパイプラインを実行する場合は、公式ドキュメントをお読みください。Beam が Flink のバージョン互換性に準拠していることを確認してください。

スクリプトの起動を容易にするには、Flink セットアップと生成するデータセットの実際の値を使って、以下の変数を定義すると便利です。

DATASET_NAME=<dataset-name> DATASET_CONFIG=<dataset-config> FLINK_CONFIG_DIR=<flink-config-directory> FLINK_VERSION=<flink-version>

組み込みの Flink クラスタで実行するには、以下のコマンドを使用してジョブを起動できます。

tfds build $DATASET_NAME/$DATASET_CONFIG \ --beam_pipeline_options=\ "runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

カスタムスクリプト内で生成する

Beam でデータセットを生成する場合、API は他のデータセットの場合と同じですが、beam.Pipeline を、DownloadConfigbeam_options(および beam_runner)引数を使ってカスタマイズできます。

# If you are running on Dataflow, Spark,..., you may have to set-up runtime # flags. Otherwise, you can leave flags empty []. flags = ['--runner=DataflowRunner', '--project=<project-name>', ...] # `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline` dl_config = tfds.download.DownloadConfig( beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags) ) data_dir = 'gs://my-gcs-bucket/tensorflow_datasets' builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir) builder.download_and_prepare(download_config=dl_config)

Beam のデータセットを実装する

前提条件

Apache Beam のデータセットを書き込むにあたり、以下の概念を理解しておく必要があります。

手順

データセット作成ガイドを理解しているのであれば、Beam データセットの追加には、_generate_examples 関数を変更することだけが必要です。この関数はジェネレータではなく beam オブジェクトを返します。

Beam 以外のデータセット:

def _generate_examples(self, path): for f in path.iterdir(): yield _process_example(f)

Beam データセット:

def _generate_examples(self, path): return ( beam.Create(path.iterdir()) | beam.Map(_process_example) )

その他すべては、テストも含め、まったく同じになります。

その他の考慮事項 :

  • Apache Beam のインポートには、tfds.core.lazy_importsを使用します。遅延依存関係を使用すると、ユーザーは Beam をインストールしなくても、生成された後のデータセットを読むことができます。

  • Python のクロージャには注意してください。パイプラインを実行する際、beam.Mapbeam.DoFn 関数は、pickle を使ってシリアル化され、すべてのワーカーに送信されます。ワーカー間で状態を共有する必要がある場合は、beam.PTransform 内でオブジェクトをミュータブルにしないでください。

  • tfds.core.DatasetBuilder が pickle でシリアル化される方法により、データ作成中、ワーカーでの tfds.core.DatasetBuilder のミュート化は無視されます(_split_generatorsself.info.metadata['offset'] = 123 を設定し、beam.Map(lambda x: x + self.info.metadata['offset']) のようにしてワーカーからそれにアクセスすることはできません)。

  • Split 間で一部のパイプラインステップを共有する櫃夜ぐあある場合は、追加の pipeline: beam.Pipeline kwarg を _split_generator に追加して、生成パイプライン全体を制御することができます。tfds.core.GeneratorBasedBuilder_generate_examples ドキュメントをご覧ください。

Beam データセットの例を以下に示します。

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder): VERSION = tfds.core.Version('1.0.0') def _info(self): return self.dataset_info_from_configs( features=tfds.features.FeaturesDict({ 'image': tfds.features.Image(shape=(16, 16, 1)), 'label': tfds.features.ClassLabel(names=['dog', 'cat']), }), ) def _split_generators(self, dl_manager): ... return { 'train': self._generate_examples(file_dir='path/to/train_data/'), 'test': self._generate_examples(file_dir='path/to/test_data/'), } def _generate_examples(self, file_dir: str): """Generate examples as dicts.""" beam = tfds.core.lazy_imports.apache_beam def _process_example(filename): # Use filename as key return filename, { 'image': os.path.join(file_dir, filename), 'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg" } return ( beam.Create(tf.io.gfile.listdir(file_dir)) | beam.Map(_process_example) )

パイプラインの実行

パイプラインの実行には、上記のセクションをご覧ください。

注意: Beam 以外のデータセットと同様に、--register_checksums でダウンロードチェックサムを必ず登録してください(ダウンロードを初めて登録する場合のみ)。

tfds build my_dataset --register_checksums

TFDS を入力として使用するパイプライン

TFDS データセットをソースとして取る Beam パイプラインを作成する場合は、tfds.beam.ReadFromTFDS を使用できます。

builder = tfds.builder('my_dataset') _ = ( pipeline | tfds.beam.ReadFromTFDS(builder, split='train') | beam.Map(tfds.as_numpy) | ... )

データセットの各シャードを並行して処理します。

注意: これには、データベースがすでに生成されていることが必要です。Beam を使ってデータセットを生成するには、ほかのセクションをご覧ください。