Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ko/datasets/beam_datasets.md
25115 views

Apache Beam으로 빅 데이터세트 생성하기

일부 데이터세트는 너무 커서 단일 머신에서 처리할 수 없습니다. tfdsApache Beam을 사용하여 많은 머신에서 데이터를 생성하도록 지원합니다.

이 문서에는 두 개의 섹션이 있습니다:

  • 기존 Beam 데이터세트를 생성하려는 사용자

  • 새로운 Beam 데이터세트를 생성하려는 개발자

Beam 데이터세트 생성하기

다음은 클라우드 또는 로컬에서 Beam 데이터세트를 생성하는 서로 다른 예제입니다.

경고: tfds build CLI를 사용하여 데이터세트를 생성할 때 생성하려는 데이터세트 구성을 지정해야 합니다. 그렇지 않으면 기본적으로 모든 기존 구성이 생성됩니다. 예를 들어, 위키피디아의 경우에 tfds build wikipedia 대신 tfds build wikipedia/20200301.en를 사용합니다.

Google Cloud Dataflow에서

Google Cloud Dataflow를 사용하여 파이프라인을 실행하고 분산 계산을 이용하려면, 먼저 빠른 시작 지침을 따르세요.

환경이 설정되면 GCS의 데이터 디렉터리를 사용하고 --beam_pipeline_options 플래그에 필요한 옵션을 지정하여 tfds build CLI를 실행할 수 있습니다.

스크립트를 보다 쉽게 시작하려면 GCP/GCS 설정의 실제 값과 생성하려는 데이터세트를 사용하여 다음 변수를 정의하면 도움이 됩니다.

DATASET_NAME=<dataset-name> DATASET_CONFIG=<dataset-config> GCP_PROJECT=my-project-id GCS_BUCKET=gs://my-gcs-bucket

Dataflow가 다음 작업자에서 tfds를 설치하도록 지시하는 파일을 작성해야 합니다.

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

tfds-nightly를 사용하는 경우, 마지막 릴리스 이후 데이터세트가 업데이트된 경우를 위해 tfds-nightly에서 echo해야 합니다.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

마지막으로 아래 명령을 사용하여 작업을 실행할 수 있습니다.

python -m tensorflow_datasets.scripts.download_and_prepare \ --datasets=$DATASET_NAME/$DATASET_CONFIG \ --data_dir=$GCS_BUCKET/tensorflow_datasets \ --beam_pipeline_options=\ "runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\ "staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\ "requirements_file=/tmp/beam_requirements.txt"

로컬에서

기본 Apache Beam 실행기를 사용하여 로컬에서 스크립트를 실행할 때(메모리의 모든 데이터에 맞아야 함) 명령어는 다른 데이터세트와 동일합니다.

tfds build my_dataset

경고: Beam 데이터세트는 매우 클 수 있으며(테라바이트 이상), 상당한 양의 리소스가 생성될 수 있습니다(로컬 컴퓨터에서 몇 주가 걸릴 수 있음). 분산 환경을 사용하여 데이터세트를 생성하는 것이 좋습니다. 지원되는 런타임 목록은 Apache Beam 설명서를 참조하세요.

Apache Flink를 사용하여 파이프라인을 실행하려면 공식 문서를 읽을 수 있습니다. Beam이 Flink 버전 호환성을 준수하는지 확인하세요.

스크립트를 보다 쉽게 시작하려면 Flink 설정의 실제 값과 생성하려는 데이터세트를 사용하여 다음 변수를 정의하면 도움이 됩니다.

DATASET_NAME=<dataset-name> DATASET_CONFIG=<dataset-config> FLINK_CONFIG_DIR=<flink-config-directory> FLINK_VERSION=<flink-version>

내장된 Flink 클러스터에서 실행하려면 아래 명령을 사용하여 작업을 시작할 수 있습니다.

tfds build $DATASET_NAME/$DATASET_CONFIG \ --beam_pipeline_options=\ "runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

사용자 정의 스크립트

Beam에서 데이터세트를 생성하기 위해 API는 다른 데이터세트의 경우와 동일합니다. DownloadConfigbeam_options(및 beam_runner) 인수를 사용하여 beam.Pipeline를 사용자 지정할 수 있습니다.

# If you are running on Dataflow, Spark,..., you may have to set-up runtime # flags. Otherwise, you can leave flags empty []. flags = ['--runner=DataflowRunner', '--project=<project-name>', ...] # `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline` dl_config = tfds.download.DownloadConfig( beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags) ) data_dir = 'gs://my-gcs-bucket/tensorflow_datasets' builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir) builder.download_and_prepare(download_config=dl_config)

Beam 데이터세트 구현하기

전제 조건

Apache Beam 데이터세트를 작성하려면 다음 개념에 익숙해야 합니다.

지침

데이터세트 생성 가이드에 익숙한 경우, Beam 데이터세트를 추가하려면 _generate_examples 함수만 수정하면 됩니다. 이 함수는 생성기가 아닌 빔 객체를 반환합니다.

빔이 아닌 데이터세트:

def _generate_examples(self, path): for f in path.iterdir(): yield _process_example(f)

빔 데이터세트:

def _generate_examples(self, path): return ( beam.Create(path.iterdir()) | beam.Map(_process_example) )

나머지 모두는 테스트를 포함하여 100% 동일할 수 있습니다.

몇 가지 추가 고려 사항:

  • tfds.core.lazy_imports를 사용하여 Apache Beam을 가져옵니다. 지연 종속성(lazy dependency)을 사용하면 사용자는 Beam을 설치하지 않고도 생성된 데이터세트를 읽을 수 있습니다.

  • Python 닫힘에 주의하세요. 파이프라인을 실행할 때 beam.Mapbeam.DoFn 함수는 pickle를 사용하여 직렬화되어 모든 작업자에게 전송됩니다. 상태를 작업자 간에 공유해야 하는 경우 beam.PTransform 내에서 개체를 변경할 수 없습니다.

  • tfds.core.DatasetBuilder가 피클을 사용하여 직렬화되는 방식으로 인해 데이터 생성 중 tfds.core.DatasetBuilder 변경은 작업자에서 무시됩니다(예: _split_generators에서 self.info.metadata['offset'] = 123를 설정하고 beam.Map(lambda x: x + self.info.metadata['offset'])와 같은 작업자로부터 여기에 액세스할 수 없음).

  • 분할 사이에 일부 파이프라인 단계를 공유해야 하는 경우 _split_generator에 별도의 pipeline: beam.Pipeline kwarg를 추가하고 전체 세대의 파이프라인을 제어할 수 있습니다. tfds.core.GeneratorBasedBuilder_generate_examples 설명서를 참조하세요.

예제

다음은 Beam 데이터세트의 예입니다.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder): VERSION = tfds.core.Version('1.0.0') def _info(self): return self.dataset_info_from_configs( features=tfds.features.FeaturesDict({ 'image': tfds.features.Image(shape=(16, 16, 1)), 'label': tfds.features.ClassLabel(names=['dog', 'cat']), }), ) def _split_generators(self, dl_manager): ... return { 'train': self._generate_examples(file_dir='path/to/train_data/'), 'test': self._generate_examples(file_dir='path/to/test_data/'), } def _generate_examples(self, file_dir: str): """Generate examples as dicts.""" beam = tfds.core.lazy_imports.apache_beam def _process_example(filename): # Use filename as key return filename, { 'image': os.path.join(file_dir, filename), 'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg" } return ( beam.Create(tf.io.gfile.listdir(file_dir)) | beam.Map(_process_example) )

파이프라인 실행하기

파이프라인을 실행하려면, 위 섹션을 살펴보세요.

참고: 빔이 아닌 데이터세트와 마찬가지로 --register_checksums으로 다운로드 체크섬을 등록하는 것을 잊지 마세요(다운로드를 처음 등록할 때만).

tfds build my_dataset --register_checksums

TFDS를 입력으로 사용하는 파이프라인

TFDS 데이터세트를 소스로 사용하는 빔 파이프라인을 생성하려면 tfds.beam.ReadFromTFDS를 사용할 수 있습니다.

builder = tfds.builder('my_dataset') _ = ( pipeline | tfds.beam.ReadFromTFDS(builder, split='train') | beam.Map(tfds.as_numpy) | ... )

데이터세트의 각 샤드를 병렬로 처리합니다.

참고: 이를 위해서는 데이터세트가 이미 생성되어 있어야 합니다. 빔을 사용하여 데이터세트를 생성하려면 다른 섹션을 참조하세요.