Path: blob/master/site/ja/datasets/beam_datasets.md
25115 views
Apache Beam でビッグデータセットを生成する
データセットによっては、1 台のマシンで処理するには大きすぎるものがあります。tfds
は、Apache Beam を使用することによって、多くのマシンにまたがったデータ生成のサポートをします。
このドキュメントには、2 つのセクションがあります。
既存の Beam のデータセットを生成するユーザー向け
新規の Beam のデータセットを作成する開発者向け
Beam のデータセットを生成する
クラウドまたはローカルで Beam のデータセットを生成するさまざまな例を以下に紹介します。
警告: tfds build
CLI でデータセットを生成する際には、生成するデータセットの構成を必ず指定してください。指定しない場合、すべての既存の構成が生成されてしまいます。たとえばウィキペディアの場合は、tfds build wikipedia
の代わりに tfds build wikipedia/20200301.en
を使用します。
Google Cloud Dataflow で生成する
Google Cloud Dataflow を使用してパイプラインを実行し、分散計算を活用するには、まずクイックスタートの手順に従います。
環境をセットアップしたら、tfds build
CLI を実行できます。これには、GCS のデータディレクトリを使用し、必要なオプションを --beam_pipeline_options
フラグに指定します。
スクリプトの起動を容易にするためには、GCP/GCS セットアップの実際の値と生成するデータセットを使用して、以下の変数を定義すると便利です。
次に、ワーカーに tfds
をインストールするよう Dataflow に指示をするファイルを作成する必要があります。
tfds-nightly
を使用している場合には、データセットが前回のリリースから更新されている場合に備え、tfds-nightly
からエコーするようにします。
最後に、以下のコマンドを使用してジョブを起動します。
ローカルで生成する
デフォルトの Apache Beam ランナーを使用してローカルでスクリプトを実行する場合(すべてのデータがメモリに収まる必要があります)のコマンドは、他のデータセットと同じです。
警告: Beam のデータセットは巨大な(テラバイト以上)場合があり、生成には相当量のリソースを必要とします(ローカルコンピュータでは数週間かかることもあります)。データセットの生成には分散環境の使用を推奨しています。サポートされているランタイムのリストについては Apache Beam ドキュメントを参照してください。
Apache Flink を使用する
Apache Flink を使用してパイプラインを実行する場合は、公式ドキュメントをお読みください。Beam が Flink のバージョン互換性に準拠していることを確認してください。
スクリプトの起動を容易にするには、Flink セットアップと生成するデータセットの実際の値を使って、以下の変数を定義すると便利です。
組み込みの Flink クラスタで実行するには、以下のコマンドを使用してジョブを起動できます。
カスタムスクリプト内で生成する
Beam でデータセットを生成する場合、API は他のデータセットの場合と同じですが、beam.Pipeline
を、DownloadConfig
の beam_options
(および beam_runner
)引数を使ってカスタマイズできます。
Beam のデータセットを実装する
前提条件
Apache Beam のデータセットを書き込むにあたり、以下の概念を理解しておく必要があります。
tfds
データセット作成ガイドをよく読みましょう。ほとんどの内容は Beam データセットにも適用されます。Beam プログラミングガイドで Apache Beam の概要を把握しましょう。
Cloud Dataflow を使用してデータセットを生成する場合は、Google Cloud ドキュメント と Apache Beam 依存性ガイドをお読みください。
手順
データセット作成ガイドを理解しているのであれば、Beam データセットの追加には、_generate_examples
関数を変更することだけが必要です。この関数はジェネレータではなく beam オブジェクトを返します。
Beam 以外のデータセット:
Beam データセット:
その他すべては、テストも含め、まったく同じになります。
その他の考慮事項 :
Apache Beam のインポートには、
tfds.core.lazy_imports
を使用します。遅延依存関係を使用すると、ユーザーは Beam をインストールしなくても、生成された後のデータセットを読むことができます。Python のクロージャには注意してください。パイプラインを実行する際、
beam.Map
とbeam.DoFn
関数は、pickle
を使ってシリアル化され、すべてのワーカーに送信されます。ワーカー間で状態を共有する必要がある場合は、beam.PTransform
内でオブジェクトをミュータブルにしないでください。tfds.core.DatasetBuilder
が pickle でシリアル化される方法により、データ作成中、ワーカーでのtfds.core.DatasetBuilder
のミュート化は無視されます(_split_generators
でself.info.metadata['offset'] = 123
を設定し、beam.Map(lambda x: x + self.info.metadata['offset'])
のようにしてワーカーからそれにアクセスすることはできません)。Split 間で一部のパイプラインステップを共有する櫃夜ぐあある場合は、追加の
pipeline: beam.Pipeline
kwarg を_split_generator
に追加して、生成パイプライン全体を制御することができます。tfds.core.GeneratorBasedBuilder
の_generate_examples
ドキュメントをご覧ください。
例
Beam データセットの例を以下に示します。
パイプラインの実行
パイプラインの実行には、上記のセクションをご覧ください。
注意: Beam 以外のデータセットと同様に、--register_checksums
でダウンロードチェックサムを必ず登録してください(ダウンロードを初めて登録する場合のみ)。
TFDS を入力として使用するパイプライン
TFDS データセットをソースとして取る Beam パイプラインを作成する場合は、tfds.beam.ReadFromTFDS
を使用できます。
データセットの各シャードを並行して処理します。
注意: これには、データベースがすでに生成されていることが必要です。Beam を使ってデータセットを生成するには、ほかのセクションをご覧ください。