Path: blob/master/site/ja/datasets/beam_datasets.md
38482 views
Apache Beam でビッグデータセットを生成する
データセットによっては、1 台のマシンで処理するには大きすぎるものがあります。tfdsは、Apache Beam を使用することによって、多くのマシンにまたがったデータ生成のサポートをします。
このドキュメントには、2 つのセクションがあります。
既存の Beam のデータセットを生成するユーザー向け
新規の Beam のデータセットを作成する開発者向け
Beam のデータセットを生成する
クラウドまたはローカルで Beam のデータセットを生成するさまざまな例を以下に紹介します。
警告: tfds build CLI でデータセットを生成する際には、生成するデータセットの構成を必ず指定してください。指定しない場合、すべての既存の構成が生成されてしまいます。たとえばウィキペディアの場合は、tfds build wikipedia の代わりに tfds build wikipedia/20200301.en を使用します。
Google Cloud Dataflow で生成する
Google Cloud Dataflow を使用してパイプラインを実行し、分散計算を活用するには、まずクイックスタートの手順に従います。
環境をセットアップしたら、tfds build CLI を実行できます。これには、GCS のデータディレクトリを使用し、必要なオプションを --beam_pipeline_options フラグに指定します。
スクリプトの起動を容易にするためには、GCP/GCS セットアップの実際の値と生成するデータセットを使用して、以下の変数を定義すると便利です。
次に、ワーカーに tfds をインストールするよう Dataflow に指示をするファイルを作成する必要があります。
tfds-nightly を使用している場合には、データセットが前回のリリースから更新されている場合に備え、tfds-nightly からエコーするようにします。
最後に、以下のコマンドを使用してジョブを起動します。
ローカルで生成する
デフォルトの Apache Beam ランナーを使用してローカルでスクリプトを実行する場合(すべてのデータがメモリに収まる必要があります)のコマンドは、他のデータセットと同じです。
警告: Beam のデータセットは巨大な(テラバイト以上)場合があり、生成には相当量のリソースを必要とします(ローカルコンピュータでは数週間かかることもあります)。データセットの生成には分散環境の使用を推奨しています。サポートされているランタイムのリストについては Apache Beam ドキュメントを参照してください。
Apache Flink を使用する
Apache Flink を使用してパイプラインを実行する場合は、公式ドキュメントをお読みください。Beam が Flink のバージョン互換性に準拠していることを確認してください。
スクリプトの起動を容易にするには、Flink セットアップと生成するデータセットの実際の値を使って、以下の変数を定義すると便利です。
組み込みの Flink クラスタで実行するには、以下のコマンドを使用してジョブを起動できます。
カスタムスクリプト内で生成する
Beam でデータセットを生成する場合、API は他のデータセットの場合と同じですが、beam.Pipeline を、DownloadConfig の beam_options(および beam_runner)引数を使ってカスタマイズできます。
Beam のデータセットを実装する
前提条件
Apache Beam のデータセットを書き込むにあたり、以下の概念を理解しておく必要があります。
tfdsデータセット作成ガイドをよく読みましょう。ほとんどの内容は Beam データセットにも適用されます。Beam プログラミングガイドで Apache Beam の概要を把握しましょう。
Cloud Dataflow を使用してデータセットを生成する場合は、Google Cloud ドキュメント と Apache Beam 依存性ガイドをお読みください。
手順
データセット作成ガイドを理解しているのであれば、Beam データセットの追加には、_generate_examples 関数を変更することだけが必要です。この関数はジェネレータではなく beam オブジェクトを返します。
Beam 以外のデータセット:
Beam データセット:
その他すべては、テストも含め、まったく同じになります。
その他の考慮事項 :
Apache Beam のインポートには、
tfds.core.lazy_importsを使用します。遅延依存関係を使用すると、ユーザーは Beam をインストールしなくても、生成された後のデータセットを読むことができます。Python のクロージャには注意してください。パイプラインを実行する際、
beam.Mapとbeam.DoFn関数は、pickleを使ってシリアル化され、すべてのワーカーに送信されます。ワーカー間で状態を共有する必要がある場合は、beam.PTransform内でオブジェクトをミュータブルにしないでください。tfds.core.DatasetBuilderが pickle でシリアル化される方法により、データ作成中、ワーカーでのtfds.core.DatasetBuilderのミュート化は無視されます(_split_generatorsでself.info.metadata['offset'] = 123を設定し、beam.Map(lambda x: x + self.info.metadata['offset'])のようにしてワーカーからそれにアクセスすることはできません)。Split 間で一部のパイプラインステップを共有する櫃夜ぐあある場合は、追加の
pipeline: beam.Pipelinekwarg を_split_generatorに追加して、生成パイプライン全体を制御することができます。tfds.core.GeneratorBasedBuilderの_generate_examplesドキュメントをご覧ください。
例
Beam データセットの例を以下に示します。
パイプラインの実行
パイプラインの実行には、上記のセクションをご覧ください。
注意: Beam 以外のデータセットと同様に、--register_checksums でダウンロードチェックサムを必ず登録してください(ダウンロードを初めて登録する場合のみ)。
TFDS を入力として使用するパイプライン
TFDS データセットをソースとして取る Beam パイプラインを作成する場合は、tfds.beam.ReadFromTFDS を使用できます。
データセットの各シャードを並行して処理します。
注意: これには、データベースがすでに生成されていることが必要です。Beam を使ってデータセットを生成するには、ほかのセクションをご覧ください。