Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ko/tutorials/distribute/parameter_server_training.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

ParameterServerStrategyλ₯Ό μ‚¬μš©ν•œ λ§€κ°œλ³€μˆ˜ μ„œλ²„ ν›ˆλ ¨

κ°œμš”

λ§€κ°œλ³€μˆ˜ μ„œλ²„ ν›ˆλ ¨μ€ μ—¬λŸ¬ λ¨Έμ‹ μ—μ„œ λͺ¨λΈ ν›ˆλ ¨μ„ ν™•μž₯ν•˜λŠ” 일반적인 데이터 병렬 λ°©λ²•μž…λ‹ˆλ‹€.

λ§€κ°œλ³€μˆ˜ μ„œλ²„ ν›ˆλ ¨ ν΄λŸ¬μŠ€ν„°λŠ” μž‘μ—…μžμ™€ λ§€κ°œλ³€μˆ˜ μ„œλ²„λ‘œ κ΅¬μ„±λ©λ‹ˆλ‹€. λ³€μˆ˜λŠ” λ§€κ°œλ³€μˆ˜ μ„œλ²„μ—μ„œ μƒμ„±λ˜λ©° 각 λ‹¨κ³„μ—μ„œ μž‘μ—…μžκ°€ 읽고 μ—…λ°μ΄νŠΈν•©λ‹ˆλ‹€. 기본적으둜 μž‘μ—…μžλŠ” μ΄λŸ¬ν•œ λ³€μˆ˜λ₯Ό μ„œλ‘œ λ™κΈ°ν™”ν•˜μ§€ μ•Šκ³  λ…λ¦½μ μœΌλ‘œ 읽고 μ—…λ°μ΄νŠΈν•©λ‹ˆλ‹€. 이 λ•Œλ¬Έμ— λ•Œλ‘œ λ§€κ°œλ³€μˆ˜ μ„œλ²„ ν˜•νƒœμ˜ ν›ˆλ ¨μ„ 비동기 ν›ˆλ ¨μ΄λΌκ³  λΆ€λ¦…λ‹ˆλ‹€.

TensorFlow 2μ—μ„œ λ§€κ°œλ³€μˆ˜ μ„œλ²„ ν›ˆλ ¨μ€ tf.distribute.ParameterServerStrategy 클래슀λ₯Ό 기반으둜 ν•˜λŠ”λ°, 이 ν΄λž˜μŠ€λŠ” ν›ˆλ ¨ 단계λ₯Ό μ΅œλŒ€ 수천 개의 μž‘μ—…μž(λ§€κ°œλ³€μˆ˜ μ„œλ²„μ™€ ν•¨κ»˜)둜 ν™•μž₯λ˜λŠ” ν΄λŸ¬μŠ€ν„°μ— λ°°ν¬ν•©λ‹ˆλ‹€.

μ§€μ›λ˜λŠ” ν›ˆλ ¨ 방법

μ§€μ›λ˜λŠ” ν›ˆλ ¨ λ°©λ²•μ—λŠ” 크게 두 κ°€μ§€κ°€ μžˆμŠ΅λ‹ˆλ‹€.

μž‘μ—… 및 νƒœμŠ€ν¬κ°€ μžˆλŠ” ν΄λŸ¬μŠ€ν„°

μ„ νƒν•œ API(Model.fit λ˜λŠ” μ‚¬μš©μž μ •μ˜ ν›ˆλ ¨ 루프)에 관계없이 TensorFlow 2의 λΆ„μ‚° ν›ˆλ ¨μ—λŠ” μ—¬λŸ¬ 'jobs'이 μžˆλŠ” 'cluster'κ°€ ν¬ν•¨λ˜λ©° 각 μž‘μ—…μ—λŠ” ν•˜λ‚˜ μ΄μƒμ˜ 'tasks'이 μžˆμ„ 수 μžˆμŠ΅λ‹ˆλ‹€.

λ§€κ°œλ³€μˆ˜ μ„œλ²„ ν›ˆλ ¨μ„ μ‚¬μš©ν•  λ•Œ λ‹€μŒμ΄ μžˆλŠ” 것이 μ’‹μŠ΅λ‹ˆλ‹€.

  • ν•˜λ‚˜μ˜ 코디넀이터 μž‘μ—…(μž‘μ—… 이름이 chief)

  • μ—¬λŸ¬ μž‘μ—…μž μž‘μ—…(μž‘μ—… 이름 worker)

  • 닀쀑 λ§€κ°œλ³€μˆ˜ μ„œλ²„ μž‘μ—…(μž‘μ—… 이름 ps)

μ½”λ””λ„€μ΄ν„°λŠ” λ¦¬μ†ŒμŠ€λ₯Ό μƒμ„±ν•˜κ³ , ν›ˆλ ¨ μž‘μ—…μ„ μ „λ‹¬ν•˜κ³ , 체크포인트λ₯Ό μž‘μ„±ν•˜κ³ , μž‘μ—… μ‹€νŒ¨λ₯Ό μ²˜λ¦¬ν•©λ‹ˆλ‹€. μž‘μ—…μžμ™€ λ§€κ°œλ³€μˆ˜ μ„œλ²„λŠ” μ½”λ””λ„€μ΄ν„°μ˜ μš”μ²­μ— μˆ˜μ‹  λŒ€κΈ°ν•˜λŠ” tf.distribute.Server μΈμŠ€ν„΄μŠ€λ₯Ό μ‹€ν–‰ν•©λ‹ˆλ‹€.

Model.fit APIλ₯Ό μ‚¬μš©ν•œ λ§€κ°œλ³€μˆ˜ μ„œλ²„ ν›ˆλ ¨

Model.fit APIλ₯Ό μ‚¬μš©ν•œ λ§€κ°œλ³€μˆ˜ μ„œλ²„ ν›ˆλ ¨μ„ μœ„ν•΄μ„œλŠ” 코디넀이터가 tf.distribute.ParameterServerStrategy 객체λ₯Ό μ‚¬μš©ν•΄μ•Ό ν•©λ‹ˆλ‹€. μ „λž΅μ΄ μ—†κ±°λ‚˜ λ‹€λ₯Έ μ „λž΅μ΄ μžˆλŠ” Model.fit μ‚¬μš©κ³Ό μœ μ‚¬ν•˜κ²Œ μ›Œν¬ν”Œλ‘œμ—λŠ” λͺ¨λΈ 생성과 컴파일, 콜백 μ€€λΉ„, 및 Model.fit 호좜이 ν¬ν•¨λ©λ‹ˆλ‹€.

μ‚¬μš©μž μ •μ˜ ν›ˆλ ¨ 루프λ₯Ό μ‚¬μš©ν•œ λ§€κ°œλ³€μˆ˜ μ„œλ²„ ν›ˆλ ¨

μ‚¬μš©μž μ •μ˜ ν›ˆλ ¨ λ£¨ν”„μ—μ„œ tf.distribute.coordinator.ClusterCoordinator ν΄λž˜μŠ€λŠ” 코디넀이터에 μ‚¬μš©λ˜λŠ” 핡심 ꡬ성 μš”μ†Œμž…λ‹ˆλ‹€.

  • ClusterCoordinator ν΄λž˜μŠ€λŠ” tf.distribute.Strategy 객체와 ν•¨κ»˜ μž‘λ™ν•΄μ•Ό ν•©λ‹ˆλ‹€.

  • 이 tf.distribute.Strategy κ°μ²΄λŠ” ν΄λŸ¬μŠ€ν„°μ˜ 정보λ₯Ό μ œκ³΅ν•˜λŠ” 데 ν•„μš”ν•˜λ©° tf.distribute.Strategyλ₯Ό μ‚¬μš©ν•œ μ‚¬μš©μž μ •μ˜ ν›ˆλ ¨μ— μ„€λͺ…λœ 바와 같이 ν›ˆλ ¨ 단계λ₯Ό μ •μ˜ν•˜λŠ” 데 μ‚¬μš©λ©λ‹ˆλ‹€.

  • 그런 λ‹€μŒ ClusterCoordinator κ°μ²΄λŠ” μ΄λŸ¬ν•œ ν›ˆλ ¨ λ‹¨κ³„μ˜ 싀행을 원격 μž‘μ—…μžμ—κ²Œ μ „λ‹¬ν•©λ‹ˆλ‹€.

  • λ§€κ°œλ³€μˆ˜ μ„œλ²„ ν›ˆλ ¨μ„ μœ„ν•΄ ClusterCoordinatorλŠ” tf.distribute.ParameterServerStrategy와 ν•¨κ»˜ μž‘λ™ν•΄μ•Ό ν•©λ‹ˆλ‹€.

ClusterCoordinator 객체가 μ œκ³΅ν•˜λŠ” κ°€μž₯ μ€‘μš”ν•œ APIλŠ” scheduleμž…λ‹ˆλ‹€.

  • schedule APIλŠ” tf.function을 λŒ€κΈ°μ—΄μ— λ„£κ³  λ―Έλž˜μ™€ 같은 RemoteValueλ₯Ό μ¦‰μ‹œ λ°˜ν™˜ν•©λ‹ˆλ‹€.

  • λŒ€κΈ° 쀑인 ν•¨μˆ˜λŠ” λ°±κ·ΈλΌμš΄λ“œ μŠ€λ ˆλ“œμ˜ 원격 μž‘μ—…μžμ—κ²Œ μ „λ‹¬λ˜κ³  ν•΄λ‹Ή RemoteValueλŠ” λΉ„λ™κΈ°μ‹μœΌλ‘œ μ±„μ›Œμ§‘λ‹ˆλ‹€.

  • scheduleμ—λŠ” μž‘μ—…μž 할당이 ν•„μš”ν•˜μ§€ μ•ŠμœΌλ―€λ‘œ μ „λ‹¬λœ tf.function은 μ‚¬μš© κ°€λŠ₯ν•œ λͺ¨λ“  μž‘μ—…μžμ—μ„œ μ‹€ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

  • μ‹€ν–‰λœ μž‘μ—…μžκ°€ μ™„λ£Œλ˜κΈ° 전에 μ‚¬μš©ν•  수 μ—†κ²Œ 되면 μ‚¬μš© κ°€λŠ₯ν•œ λ‹€λ₯Έ μž‘μ—…μžμ—μ„œ ν•¨μˆ˜κ°€ μž¬μ‹œλ„λ©λ‹ˆλ‹€.

  • 이것과 ν•¨κ»˜ ν•¨μˆ˜ 싀행이 μ›μžμ μ΄μ§€ μ•Šλ‹€λŠ” 사싀 λ•Œλ¬Έμ— 단일 ν•¨μˆ˜ 호좜이 두 번 이상 싀행될 수 μžˆμŠ΅λ‹ˆλ‹€.

원격 ν•¨μˆ˜λ₯Ό μ „λ‹¬ν•˜λŠ” 외에도 ClusterCoordinatorλŠ” λͺ¨λ“  μž‘μ—…μžμ— λŒ€ν•œ λ°μ΄ν„°μ„ΈνŠΈλ₯Ό μƒμ„±ν•˜κ³  μž‘μ—…μžκ°€ μž₯μ• μ—μ„œ 볡ꡬ될 λ•Œ μ΄λŸ¬ν•œ λ°μ΄ν„°μ„ΈνŠΈλ₯Ό μž¬κ΅¬μΆ•ν•˜λŠ” 도움을 μ€λ‹ˆλ‹€.

νŠœν† λ¦¬μ–Ό μ„€μ •

이 νŠœν† λ¦¬μ–Όμ€ Model.fit 및 μ‚¬μš©μž μ •μ˜ ν›ˆλ ¨ 루프 경둜둜 λΆ„κΈ°λ˜λ©° ν•„μš”μ— λ§žλŠ” 경둜λ₯Ό 선택할 수 μžˆμŠ΅λ‹ˆλ‹€. "Xλ₯Ό μ΄μš©ν•œ ν›ˆλ ¨" μ΄μ™Έμ˜ μ„Ήμ…˜μ€ 두 경둜 λͺ¨λ‘μ— μ μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

!pip install portpicker
#@title import multiprocessing import os import random import portpicker import tensorflow as tf

ν΄λŸ¬μŠ€ν„° μ„€μ •

μœ„μ—μ„œ μ–ΈκΈ‰ν–ˆλ“―μ΄ λ§€κ°œλ³€μˆ˜ μ„œλ²„ ν›ˆλ ¨ ν΄λŸ¬μŠ€ν„°μ—λŠ” ν›ˆλ ¨ ν”„λ‘œκ·Έλž¨μ„ μ‹€ν–‰ν•˜λŠ” 코디넀이터 μž‘μ—…, TensorFlow μ„œλ²„ tf.distribute.Serverλ₯Ό μ‹€ν–‰ν•˜λŠ” ν•˜λ‚˜ μ΄μƒμ˜ μž‘μ—…μž 및 λ§€κ°œλ³€μˆ˜ μ„œλ²„ μž‘μ—…, 그리고 μ‚¬μ΄λ“œμΉ΄ 평가λ₯Ό μ‹€ν–‰ν•˜λŠ” μΆ”κ°€ 평가 μž‘μ—…μ΄ ν•„μš”ν•©λ‹ˆλ‹€(μ•„λž˜ μ‚¬μ΄λ“œμΉ΄ 평가 μ„Ήμ…˜ μ°Έμ‘°). μ„€μ • μš”κ΅¬ 사항은 λ‹€μŒκ³Ό κ°™μŠ΅λ‹ˆλ‹€.

  • 코디넀이터 μž‘μ—…μ€ ν‰κ°€μžλ₯Ό μ œμ™Έν•œ λ‹€λ₯Έ λͺ¨λ“  TensorFlow μ„œλ²„μ˜ μ£Όμ†Œμ™€ 포트λ₯Ό μ•Œμ•„μ•Ό ν•©λ‹ˆλ‹€.

  • μž‘μ—…μžμ™€ λ§€κ°œλ³€μˆ˜ μ„œλ²„λŠ” μˆ˜μ‹  λŒ€κΈ°ν•΄μ•Ό ν•˜λŠ” 포트λ₯Ό μ•Œμ•„μ•Ό ν•©λ‹ˆλ‹€. λ‹¨μˆœν•˜κ²Œ ν•˜κΈ° μœ„ν•΄ 일반적으둜 μ΄λŸ¬ν•œ μž‘μ—…μ—μ„œ TensorFlow μ„œλ²„λ₯Ό 생성할 λ•Œ 전체 ν΄λŸ¬μŠ€ν„° 정보λ₯Ό 전달할 수 μžˆμŠ΅λ‹ˆλ‹€.

  • ν‰κ°€μž μž‘μ—…μ€ ν›ˆλ ¨ ν΄λŸ¬μŠ€ν„°μ˜ 섀정을 μ•Œ ν•„μš”κ°€ μ—†μŠ΅λ‹ˆλ‹€. μ•Œκ³  μžˆλ‹€λ©΄ ν›ˆλ ¨ ν΄λŸ¬μŠ€ν„°μ— 연결을 μ‹œλ„ν•΄μ„œλŠ” μ•ˆ λ©λ‹ˆλ‹€.

  • μž‘μ—…μž 및 λ§€κ°œλ³€μˆ˜ μ„œλ²„λŠ” 각각 "worker" 및 "ps"의 μž‘μ—… μœ ν˜•μ„ κ°€μ Έμ•Ό ν•©λ‹ˆλ‹€. μ½”λ””λ„€μ΄ν„°λŠ” λ ˆκ±°μ‹œ 문제둜 인해 μž‘μ—… μœ ν˜•μœΌλ‘œ "chief"λ₯Ό μ‚¬μš©ν•΄μ•Ό ν•©λ‹ˆλ‹€.

이 νŠœν† λ¦¬μ–Όμ—μ„œλŠ” 전체 λ§€κ°œλ³€μˆ˜ μ„œλ²„ ν›ˆλ ¨μ„ Colabμ—μ„œ μ‹€ν–‰ν•  수 μžˆλ„λ‘ in-process ν΄λŸ¬μŠ€ν„°λ₯Ό μƒμ„±ν•©λ‹ˆλ‹€. 이후 μ„Ήμ…˜μ—μ„œ μ‹€μ œ ν΄λŸ¬μŠ€ν„°λ₯Ό μ„€μ •ν•˜λŠ” 방법을 λ°°μ›λ‹ˆλ‹€.

In-process ν΄λŸ¬μŠ€ν„°

λ¨Όμ € λͺ‡ 개의 TensorFlow μ„œλ²„λ₯Ό λ§Œλ“€κ³  λ‚˜μ€‘μ— μ—°κ²°ν•  κ²ƒμž…λ‹ˆλ‹€. 이것은 이 νŠœν† λ¦¬μ–Όμ—μ„œ μ‹œμ—°μ„ ν•˜κΈ° μœ„ν•œ 것이며 μ‹€μ œ ν›ˆλ ¨μ—μ„œ μ„œλ²„λŠ” "worker" 및 "ps" λ¨Έμ‹ μ—μ„œ μ‹œμž‘λ©λ‹ˆλ‹€.

def create_in_process_cluster(num_workers, num_ps): """Creates and starts local servers and returns the cluster_resolver.""" worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)] ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)] cluster_dict = {} cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports] if num_ps > 0: cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports] cluster_spec = tf.train.ClusterSpec(cluster_dict) # Workers need some inter_ops threads to work properly. worker_config = tf.compat.v1.ConfigProto() if multiprocessing.cpu_count() < num_workers + 1: worker_config.inter_op_parallelism_threads = num_workers + 1 for i in range(num_workers): tf.distribute.Server( cluster_spec, job_name="worker", task_index=i, config=worker_config, protocol="grpc") for i in range(num_ps): tf.distribute.Server( cluster_spec, job_name="ps", task_index=i, protocol="grpc") cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver( cluster_spec, rpc_layer="grpc") return cluster_resolver # Set the environment variable to allow reporting worker and ps failure to the # coordinator. This is a workaround and won't be necessary in the future. os.environ["GRPC_FAIL_FAST"] = "use_caller" NUM_WORKERS = 3 NUM_PS = 2 cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

In-process ν΄λŸ¬μŠ€ν„° 섀정은 μ—¬κΈ°μ—μ„œμ™€ 같은 λ‹¨μœ„ ν…ŒμŠ€νŠΈμ— 자주 μ‚¬μš©λ©λ‹ˆλ‹€.

둜컬 ν…ŒμŠ€νŠΈλ₯Ό μœ„ν•œ 또 λ‹€λ₯Έ μ˜΅μ…˜μ€ 둜컬 λ¨Έμ‹ μ—μ„œ ν”„λ‘œμ„ΈμŠ€λ₯Ό μ‹œμž‘ν•˜λŠ” κ²ƒμž…λ‹ˆλ‹€. 이 μ ‘κ·Ό λ°©μ‹μ˜ 예λ₯Ό 보렀면 Kerasλ₯Ό μ‚¬μš©ν•œ 닀쀑 μž‘μ—…μž ν›ˆλ ¨μ„ ν™•μΈν•˜μ„Έμš”.

ParameterServerStrategy μΈμŠ€ν„΄μŠ€ν™”

ν›ˆλ ¨ μ½”λ“œλ₯Ό μ‚΄νŽ΄λ³΄κΈ° 전에 tf.distribute.ParameterServerStrategy 객체λ₯Ό μΈμŠ€ν„΄μŠ€ν™”ν•˜κ² μŠ΅λ‹ˆλ‹€. 이 μž‘μ—…μ€ Model.fit λ˜λŠ” μ‚¬μš©μž μ •μ˜ ν›ˆλ ¨ 루프λ₯Ό μ§„ν–‰ν•˜λŠ”μ§€ 여뢀에 관계없이 ν•„μš”ν•©λ‹ˆλ‹€. variable_partitioner μΈμˆ˜λŠ” λ³€μˆ˜ 샀딩 μ„Ήμ…˜μ—μ„œ μ„€λͺ…ν•©λ‹ˆλ‹€.

variable_partitioner = ( tf.distribute.experimental.partitioners.MinSizePartitioner( min_shard_bytes=(256 << 10), max_shards=NUM_PS)) strategy = tf.distribute.ParameterServerStrategy( cluster_resolver, variable_partitioner=variable_partitioner)

ν›ˆλ ¨μ— GPUλ₯Ό μ‚¬μš©ν•˜λ €λ©΄ 각 μž‘μ—…μžμ— ν‘œμ‹œλ˜λŠ” GPUλ₯Ό ν• λ‹Ήν•©λ‹ˆλ‹€. ParameterServerStrategyλŠ” 각 μž‘μ—…μžμ—μ„œ μ‚¬μš© κ°€λŠ₯ν•œ λͺ¨λ“  GPUλ₯Ό μ‚¬μš©ν•˜λ©° λͺ¨λ“  μž‘μ—…μžκ°€ λ™μΌν•œ 수의 GPUλ₯Ό μ‚¬μš©ν•  수 μžˆμ–΄μ•Ό ν•œλ‹€λŠ” μ œν•œμ΄ μžˆμŠ΅λ‹ˆλ‹€.

κ°€λ³€ 샀딩

λ³€μˆ˜ 샀딩은 λ³€μˆ˜λ₯Ό μƒ€λ“œλΌκ³  ν•˜λŠ” μ—¬λŸ¬ 개의 μž‘μ€ λ³€μˆ˜λ‘œ λ‚˜λˆ„λŠ” 것을 λ§ν•©λ‹ˆλ‹€. λ³€μˆ˜ 샀딩은 μ΄λŸ¬ν•œ μƒ€λ“œμ— μ•‘μ„ΈμŠ€ν•  λ•Œ λ„€νŠΈμ›Œν¬ λΆ€ν•˜λ₯Ό λΆ„μ‚°ν•˜λŠ” 데 μœ μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€. 예λ₯Ό λ“€μ–΄ 단일 λ¨Έμ‹ μ˜ λ©”λͺ¨λ¦¬μ— λ§žμ§€ μ•Šμ„ 수 μžˆλŠ” 맀우 큰 μž„λ² λ”©μ„ μ‚¬μš©ν•  λ•Œ 일반 λ³€μˆ˜μ˜ 계산과 μ €μž₯을 μ—¬λŸ¬ λ§€κ°œλ³€μˆ˜ μ„œλ²„μ— λΆ„μ‚°ν•˜λŠ” 데도 μœ μš©ν•©λ‹ˆλ‹€.

λ³€μˆ˜ 샀딩을 μ‚¬μš©ν•˜λ €λ©΄ ParameterServerStrategy 객체λ₯Ό 생성할 λ•Œ variable_partitionerλ₯Ό 전달할 수 μžˆμŠ΅λ‹ˆλ‹€. variable_partitionerλŠ” λ³€μˆ˜κ°€ 생성될 λ•Œλ§ˆλ‹€ 호좜되며 λ³€μˆ˜μ˜ 각 차원을 따라 μƒ€λ“œ 수λ₯Ό λ°˜ν™˜ν•  κ²ƒμœΌλ‘œ μ˜ˆμƒλ©λ‹ˆλ‹€. tf.distribute.experimental.partitioners.MinSizePartitioner와 같은 λͺ‡ κ°€μ§€ κΈ°λ³Έ variable_partitionerκ°€ μ œκ³΅λ©λ‹ˆλ‹€. tf.distribute.experimental.partitioners.MinSizePartitioner와 같은 크기 기반 νŒŒν‹°μ…”λ„ˆλ₯Ό μ‚¬μš©ν•˜μ—¬ λͺ¨λΈ ν›ˆλ ¨ 속도에 뢀정적인 영ν–₯을 쀄 수 μžˆλŠ” μž‘μ€ λ³€μˆ˜μ˜ 뢄할을 λ°©μ§€ν•˜λŠ” 것이 μ’‹μŠ΅λ‹ˆλ‹€.

variable_partitionerκ°€ μ „λ‹¬λ˜κ³  Strategy.scope λ°”λ‘œ μ•„λž˜μ— λ³€μˆ˜λ₯Ό μƒμ„±ν•˜λ©΄ λ³€μˆ˜λŠ” μƒ€λ“œ λͺ©λ‘μ— λŒ€ν•œ μ•‘μ„ΈμŠ€λ₯Ό μ œκ³΅ν•˜λŠ” variables 속성이 μžˆλŠ” μ»¨ν…Œμ΄λ„ˆ μœ ν˜•μ΄ λ©λ‹ˆλ‹€. λŒ€λΆ€λΆ„μ˜ 경우 이 μ»¨ν…Œμ΄λ„ˆλŠ” λͺ¨λ“  μƒ€λ“œλ₯Ό μ—°κ²°ν•˜μ—¬ μžλ™μœΌλ‘œ ν…μ„œλ‘œ λ³€ν™˜λ©λ‹ˆλ‹€. 결과적으둜 일반 λ³€μˆ˜λ‘œ μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€. λ°˜λ©΄μ— tf.nn.embedding_lookupκ³Ό 같은 일뢀 TensorFlow λ©”μ„œλ“œλŠ” 이 μ»¨ν…Œμ΄λ„ˆ μœ ν˜•μ— λŒ€ν•œ 효율적인 κ΅¬ν˜„μ„ μ œκ³΅ν•˜λ©° μ΄λŸ¬ν•œ λ©”μ„œλ“œμ—μ„œλŠ” μžλ™ 연결이 λ°©μ§€λ©λ‹ˆλ‹€.

μžμ„Έν•œ λ‚΄μš©μ€ tf.distribute.ParameterServerStrategy의 API λ¬Έμ„œλ₯Ό μ°Έμ‘°ν•˜μ„Έμš”.

Model.fit으둜 ν›ˆλ ¨ν•˜κΈ°

KerasλŠ” μž¬μ •μ˜ κ°€λŠ₯ν•œ train_step의 μœ μ—°μ„±, 그리고 TensorBoard에 λŒ€ν•œ 체크포인트 μ €μž₯ λ˜λŠ” μš”μ•½ μ €μž₯κ³Ό 같은 κΈ°λŠ₯을 μ œκ³΅ν•˜λŠ” 콜백과 ν•¨κ»˜ λ§‰ν›„μ—μ„œ ν›ˆλ ¨ 루프λ₯Ό μ²˜λ¦¬ν•˜λŠ” Model.fit을 톡해 μ‚¬μš©ν•˜κΈ° μ‰¬μš΄ ν›ˆλ ¨ APIλ₯Ό μ œκ³΅ν•©λ‹ˆλ‹€. Model.fit을 μ‚¬μš©ν•˜λ©΄ κ°„λ‹¨νžˆ μ „λž΅ 객체만 κ΅μ²΄ν•˜μ—¬ λ‹€λ₯Έ μ „λž΅μ—μ„œ λ™μΌν•œ ν›ˆλ ¨ μ½”λ“œλ₯Ό μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

μž…λ ₯ 데이터

Model.fit을 ν¬ν•¨ν•œ tf.distribute.ParameterServerStrategy은 tf.data.Dataset, tf.distribute.DistributedDataset λ˜λŠ” tf.keras.utils.experimental.DatasetCreator ν˜•μ‹μœΌλ‘œ μž…λ ₯ 데이터λ₯Ό 받을 수 있으며 Dataset은 μ‚¬μš© νŽΈλ¦¬μ„±μ„ μœ„ν•΄ ꢌμž₯λ˜λŠ” μ˜΅μ…˜μž…λ‹ˆλ‹€. κ·ΈλŸ¬λ‚˜ Dataset을 μ‚¬μš©ν•˜μ—¬ λ©”λͺ¨λ¦¬ λ¬Έμ œκ°€ λ°œμƒν•˜λ©΄ 호좜 κ°€λŠ₯ν•œ dataset_fn μΈμˆ˜μ™€ ν•¨κ»˜ DatasetCreatorλ₯Ό μ‚¬μš©ν•΄μ•Ό ν•  μˆ˜λ„ μžˆμŠ΅λ‹ˆλ‹€(μžμ„Έν•œ λ‚΄μš©μ€ tf.keras.utils.experimental.DatasetCreator API μ„€λͺ…μ„œ μ°Έμ‘°).

λ°μ΄ν„°μ„ΈνŠΈλ₯Ό tf.data.Dataset으둜 λ³€ν™˜ν•˜λŠ” 경우 μ•„λž˜ μ½”λ“œ μ˜ˆμ œμ™€ 같이 Dataset.shuffle 및 Dataset.repeatλ₯Ό μ‚¬μš©ν•΄μ•Ό ν•©λ‹ˆλ‹€.

  • λ§€κ°œλ³€μˆ˜ μ„œλ²„ ν›ˆλ ¨μ΄ ν¬ν•¨λœ Model.fit은 λ‹€λ₯΄κ²Œ μ„žμΈ 경우λ₯Ό μ œμ™Έν•˜κ³  각 μž‘μ—…μžκ°€ λ™μΌν•œ λ°μ΄ν„°μ„ΈνŠΈλ₯Ό λ°›λŠ”λ‹€κ³  κ°€μ •ν•©λ‹ˆλ‹€. λ”°λΌμ„œ Dataset.shuffle을 ν˜ΈμΆœν•˜μ—¬ 데이터에 λŒ€ν•΄ 더 κ· μΌν•œ λ°˜λ³΅μ„ 보μž₯ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

  • μž‘μ—…μžλŠ” λ™κΈ°ν™”ν•˜μ§€ μ•ŠκΈ° λ•Œλ¬Έμ— μ„œλ‘œ λ‹€λ₯Έ μ‹œκ°„μ— λ°μ΄ν„°μ„ΈνŠΈ 처리λ₯Ό μ™„λ£Œν•  수 μžˆμŠ΅λ‹ˆλ‹€. λ”°λΌμ„œ λ§€κ°œλ³€μˆ˜ μ„œλ²„ ν›ˆλ ¨μœΌλ‘œ epochλ₯Ό μ •μ˜ν•˜λŠ” κ°€μž₯ μ‰¬μš΄ 방법은 Dataset.repeat(인수 없이 호좜될 λ•Œ λ°μ΄ν„°μ„ΈνŠΈλ₯Ό λ¬΄ν•œνžˆ λ°˜λ³΅ν•¨)λ₯Ό μ‚¬μš©ν•˜κ³  Model.fit ν˜ΈμΆœμ—μ„œ steps_per_epoch 인수λ₯Ό μ§€μ •ν•˜λŠ” κ²ƒμž…λ‹ˆλ‹€.

shuffle 및 repeat에 λŒ€ν•œ μžμ„Έν•œ λ‚΄μš©μ€ tf.data κ°€μ΄λ“œμ˜ "ν›ˆλ ¨ μ›Œν¬ν”Œλ‘œ" μ„Ήμ…˜μ„ μ°Έμ‘°ν•˜μ„Έμš”.

global_batch_size = 64 x = tf.random.uniform((10, 10)) y = tf.random.uniform((10,)) dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat() dataset = dataset.batch(global_batch_size) dataset = dataset.prefetch(2)

λŒ€μ‹  tf.keras.utils.experimental.DatasetCreatorλ₯Ό μ‚¬μš©ν•˜μ—¬ λ°μ΄ν„°μ„ΈνŠΈλ₯Ό μƒμ„±ν•˜λ©΄ 각 μž‘μ—…μž λ¨Έμ‹ μ˜ μž…λ ₯ μž₯치(일반적으둜 CPU)μ—μ„œ dataset_fn의 μ½”λ“œκ°€ ν˜ΈμΆœλ©λ‹ˆλ‹€.

λͺ¨λΈ ꡬ성 및 컴파일

이제 데λͺ¨ λͺ©μ μ˜ κ°„λ‹¨ν•œ tf.keras.models.Sequential λͺ¨λΈμΈ tf.keras.Model을 λ§Œλ“  λ‹€μŒ μ˜΅ν‹°λ§ˆμ΄μ €μ™€ 같은 ꡬ성 μš”μ†Œ 및 steps_per_executionκ³Ό 같은 기타 λ§€κ°œλ³€μˆ˜λ₯Ό λ„μž…ν•˜κΈ° μœ„ν•œ Model.compile 호좜이 μ΄λ£¨μ–΄μ§‘λ‹ˆλ‹€.

with strategy.scope(): model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)]) model.compile(tf.keras.optimizers.legacy.SGD(), loss="mse", steps_per_execution=10)

콜백 및 ν›ˆλ ¨

μ‹€μ œ ν›ˆλ ¨μ„ μœ„ν•΄ Keras Model.fit을 ν˜ΈμΆœν•˜κΈ° 전에 λ‹€μŒκ³Ό 같은 일반적인 μž‘μ—…μ— ν•„μš”ν•œ μ½œλ°±μ„ μ€€λΉ„ν•©λ‹ˆλ‹€.

  • tf.keras.callbacks.ModelCheckpoint: λ§€ epoch 후와 같이 νŠΉμ • λΉˆλ„λ‘œ λͺ¨λΈμ„ μ €μž₯ν•©λ‹ˆλ‹€.

  • tf.keras.callbacks.BackupAndRestore: ν΄λŸ¬μŠ€ν„°μ— μ‚¬μš© λΆˆκ°€λŠ₯ν•œ 상황(쀑단 λ˜λŠ” 선점 λ“±)이 λ°œμƒν•˜λŠ” 경우 λͺ¨λΈκ³Ό ν˜„μž¬ epoch 번호λ₯Ό λ°±μ—…ν•˜μ—¬ 내결함성을 μ œκ³΅ν•©λ‹ˆλ‹€. 그런 λ‹€μŒ μž‘μ—… μ‹€νŒ¨ ν›„ λ‹€μ‹œ μ‹œμž‘ν•  λ•Œ ν›ˆλ ¨ μƒνƒœλ₯Ό λ³΅μ›ν•˜κ³  μ€‘λ‹¨λœ epoch의 μ‹œμž‘ λΆ€λΆ„λΆ€ν„° ν›ˆλ ¨μ„ 계속할 수 μžˆμŠ΅λ‹ˆλ‹€.

  • tf.keras.callbacks.TensorBoard: μš”μ•½ νŒŒμΌμ— TensorBoard λ„κ΅¬μ—μ„œ μ‹œκ°ν™”ν•  수 μžˆλŠ” λͺ¨λΈ 둜그λ₯Ό 주기적으둜 μž‘μ„±ν•©λ‹ˆλ‹€.

μ°Έκ³ : μ„±λŠ₯ κ³ λ € μ‚¬ν•­μœΌλ‘œ 인해 μ‚¬μš©μž μ •μ˜ μ½œλ°±μ€ ParameterServerStrategy와 ν•¨κ»˜ μ‚¬μš©λ  λ•Œ 일괄 처리 μˆ˜μ€€ μ½œλ°±μ„ μž¬μ •μ˜ν•  수 μ—†μŠ΅λ‹ˆλ‹€. μ‚¬μš©μž μ •μ˜ μ½œλ°±μ„ μˆ˜μ •ν•˜μ—¬ epoch μˆ˜μ€€ 호좜이 λ˜λ„λ‘ ν•˜κ³  steps_per_epochλ₯Ό μ μ ˆν•œ κ°’μœΌλ‘œ μ‘°μ •ν•©λ‹ˆλ‹€. λ˜ν•œ steps_per_epochλŠ” ParameterServerStrategy와 ν•¨κ»˜ μ‚¬μš©ν•  λ•Œ Model.fit에 λŒ€ν•œ ν•„μˆ˜ μΈμˆ˜μž…λ‹ˆλ‹€.

working_dir = "/tmp/my_working_dir" log_dir = os.path.join(working_dir, "log") ckpt_filepath = os.path.join(working_dir, "ckpt") backup_dir = os.path.join(working_dir, "backup") callbacks = [ tf.keras.callbacks.TensorBoard(log_dir=log_dir), tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath), tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir), ] model.fit(dataset, epochs=5, steps_per_epoch=20, callbacks=callbacks)

ClusterCoordinator둜 직접 μ‚¬μš©(선택 사항)

Model.fit ν›ˆλ ¨ 경둜λ₯Ό μ„ νƒν•˜λ”λΌλ„ tf.distribute.coordinator.ClusterCoordinator 객체λ₯Ό μ„ νƒμ μœΌλ‘œ μΈμŠ€ν„΄μŠ€ν™”ν•˜μ—¬ μž‘μ—…μžμ—μ„œ μ‹€ν–‰ν•˜λ €λŠ” λ‹€λ₯Έ ν•¨μˆ˜λ₯Ό μ˜ˆμ•½ν•  수 μžˆμŠ΅λ‹ˆλ‹€. μžμ„Έν•œ λ‚΄μš©κ³Ό μ˜ˆμ œλŠ” μ‚¬μš©μž μ •μ˜ ν›ˆλ ¨ 루프λ₯Ό μ‚¬μš©ν•œ ν›ˆλ ¨ μ„Ήμ…˜μ„ μ°Έμ‘°ν•˜μ„Έμš”.

μ‚¬μš©μž μ •μ˜ ν›ˆλ ¨ 루프λ₯Ό μ‚¬μš©ν•œ ν›ˆλ ¨

tf.distribute.Strategy와 ν•¨κ»˜ μ‚¬μš©μž μ •μ˜ ν›ˆλ ¨ 루프λ₯Ό μ‚¬μš©ν•˜λ©΄ ν›ˆλ ¨ 루프λ₯Ό μ •μ˜ν•˜λŠ” 큰 μœ μ—°μ„±μ΄ μƒκΉλ‹ˆλ‹€. μœ„μ—μ„œ μ •μ˜ν•œ ParameterServerStrategy(strategy)둜 tf.distribute.coordinator.ClusterCoordinatorλ₯Ό μ‚¬μš©ν•˜μ—¬ ν›ˆλ ¨ 단계 싀행을 원격 μž‘μ—…μžμ—κ²Œ μ „λ‹¬ν•©λ‹ˆλ‹€.

그런 λ‹€μŒ λ‹€λ₯Έ tf.distribute.Strategyλ₯Ό μ‚¬μš©ν•˜μ—¬ ν›ˆλ ¨ λ£¨ν”„μ—μ„œ μˆ˜ν–‰ν•œ κ²ƒμ²˜λŸΌ λͺ¨λΈμ„ μƒμ„±ν•˜κ³ , λ°μ΄ν„°μ„ΈνŠΈλ₯Ό μ •μ˜ν•˜κ³ , 단계 ν•¨μˆ˜λ₯Ό μ •μ˜ν•©λ‹ˆλ‹€. tf.distribute.Strategyλ₯Ό μ‚¬μš©ν•œ μ‚¬μš©μž μ •μ˜ ν›ˆλ ¨ νŠœν† λ¦¬μ–Όμ—μ„œ μžμ„Έν•œ λ‚΄μš©μ„ 찾을 수 μžˆμŠ΅λ‹ˆλ‹€.

효율적인 λ°μ΄ν„°μ„ΈνŠΈ 미리 κ°€μ Έμ˜€κΈ°λ₯Ό μœ„ν•΄ μ•„λž˜ 원격 μž‘μ—…μžμ—κ²Œ ν›ˆλ ¨ 단계 μ „λ‹¬ν•˜κΈ° μ„Ήμ…˜μ— μ–ΈκΈ‰λœ ꢌμž₯ λΆ„μ‚° λ°μ΄ν„°μ„ΈνŠΈ 생성 APIλ₯Ό μ‚¬μš©ν•˜μ„Έμš”. λ˜ν•œ μž‘μ—…μžμ—κ²Œ ν• λ‹Ήλœ GPUλ₯Ό μ΅œλŒ€ν•œ ν™œμš©ν•˜λ €λ©΄ worker_fn λ‚΄μ—μ„œ Strategy.run을 ν˜ΈμΆœν•΄μ•Ό ν•©λ‹ˆλ‹€. λ‚˜λ¨Έμ§€ λ‹¨κ³„λŠ” GPUκ°€ μžˆκ±°λ‚˜ μ—†λŠ” ν›ˆλ ¨μ— λŒ€ν•΄ λ™μΌν•©λ‹ˆλ‹€.

λ‹€μŒ λ‹¨κ³„μ—μ„œ μ΄λŸ¬ν•œ ꡬ성 μš”μ†Œλ₯Ό λ§Œλ“€μ–΄ λ³΄κ² μŠ΅λ‹ˆλ‹€.

데이터 μ„€μ •ν•˜κΈ°

λ¨Όμ € λ°μ΄ν„°μ„ΈνŠΈλ₯Ό μƒμ„±ν•˜λŠ” ν•¨μˆ˜λ₯Ό μž‘μ„±ν•©λ‹ˆλ‹€.

Keras μ „μ²˜λ¦¬ λ ˆμ΄μ–΄ λ˜λŠ” Tensorflow Transform λ ˆμ΄μ–΄λ₯Ό μ‚¬μš©ν•˜μ—¬ 데이터λ₯Ό μ „μ²˜λ¦¬ν•˜λ €λ©΄ λ‹€λ₯Έ Keras λ ˆμ΄μ–΄μ˜ κ²½μš°μ™€ λ§ˆμ°¬κ°€μ§€λ‘œ dataset_fn 외뢀와 Strategy.scope μ•„λž˜μ— μ΄λŸ¬ν•œ λ ˆμ΄μ–΄λ₯Ό μƒμ„±ν•©λ‹ˆλ‹€. μ΄λŠ” dataset_fn이 tf.function으둜 λž˜ν•‘λœ λ‹€μŒ 각 μž‘μ—…μžμ—μ„œ μ‹€ν–‰λ˜μ–΄ 데이터 νŒŒμ΄ν”„λΌμΈμ„ μƒμ„±ν•˜κΈ° λ•Œλ¬Έμž…λ‹ˆλ‹€.

μœ„μ˜ 절차λ₯Ό λ”°λ₯΄μ§€ μ•ŠμœΌλ©΄ λ ˆμ΄μ–΄λ₯Ό 생성할 λ•Œ tf.functionμ—μ„œ μ½”λ””λ„€μ΄ν„°λ‘œ ν•΄μ œλ˜λŠ” Tensorflow μƒνƒœκ°€ 생성될 수 μžˆμŠ΅λ‹ˆλ‹€. λ”°λΌμ„œ μž‘μ—…μžμ—μ„œ 여기에 μ•‘μ„ΈμŠ€ν•˜λ©΄ 코디넀이터와 μž‘μ—…μž 간에 반볡적인 RPC 호좜이 λ°œμƒν•˜κ³  μƒλ‹Ήν•œ 속도 μ €ν•˜κ°€ λ°œμƒν•©λ‹ˆλ‹€.

Strategy.scope μ•„λž˜μ— λ ˆμ΄μ–΄λ₯Ό λ°°μΉ˜ν•˜λ©΄ λŒ€μ‹  λͺ¨λ“  μž‘μ—…μžμ—μ„œ λ ˆμ΄μ–΄κ°€ μƒμ„±λ©λ‹ˆλ‹€. 그러면 tf.data.Dataset.map을 톡해 dataset_fn 내에 λ³€ν™˜μ„ μ μš©ν•©λ‹ˆλ‹€. λΆ„μ‚° μž…λ ₯을 μ‚¬μš©ν•œ 데이터 μ „μ²˜λ¦¬μ— λŒ€ν•œ μžμ„Έν•œ λ‚΄μš©μ€ λΆ„μ‚° μž…λ ₯ νŠœν† λ¦¬μ–Όμ˜ 데이터 μ „μ²˜λ¦¬λ₯Ό μ°Έμ‘°ν•˜μ„Έμš”.

feature_vocab = [ "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman" ] label_vocab = ["yes", "no"] with strategy.scope(): feature_lookup_layer = tf.keras.layers.StringLookup( vocabulary=feature_vocab, mask_token=None) label_lookup_layer = tf.keras.layers.StringLookup( vocabulary=label_vocab, num_oov_indices=0, mask_token=None) raw_feature_input = tf.keras.layers.Input( shape=(3,), dtype=tf.string, name="feature") feature_id_input = feature_lookup_layer(raw_feature_input) feature_preprocess_stage = tf.keras.Model( {"features": raw_feature_input}, feature_id_input) raw_label_input = tf.keras.layers.Input( shape=(1,), dtype=tf.string, name="label") label_id_input = label_lookup_layer(raw_label_input) label_preprocess_stage = tf.keras.Model( {"label": raw_label_input}, label_id_input)

λ°μ΄ν„°μ„ΈνŠΈμ—μ„œ μž₯λ‚œκ° 예제 생성:

def feature_and_label_gen(num_examples=200): examples = {"features": [], "label": []} for _ in range(num_examples): features = random.sample(feature_vocab, 3) label = ["yes"] if "avenger" in features else ["no"] examples["features"].append(features) examples["label"].append(label) return examples examples = feature_and_label_gen()

그런 λ‹€μŒ dataset_fn에 λž˜ν•‘λœ ν›ˆλ ¨ λ°μ΄ν„°μ„ΈνŠΈλ₯Ό μƒμ„±ν•©λ‹ˆλ‹€.

def dataset_fn(_): raw_dataset = tf.data.Dataset.from_tensor_slices(examples) train_dataset = raw_dataset.map( lambda x: ( {"features": feature_preprocess_stage(x["features"])}, label_preprocess_stage(x["label"]) )).shuffle(200).batch(32).repeat() return train_dataset

λͺ¨λΈ κ΅¬μΆ•ν•˜κΈ°

λ‹€μŒμœΌλ‘œ λͺ¨λΈ 및 기타 객체λ₯Ό λ§Œλ“­λ‹ˆλ‹€. Strategy.scope μ•„λž˜μ— λͺ¨λ“  λ³€μˆ˜λ₯Ό 생성해야 ν•©λ‹ˆλ‹€.

# These variables created under the `Strategy.scope` will be placed on parameter # servers in a round-robin fashion. with strategy.scope(): # Create the model. The input needs to be compatible with Keras processing layers. model_input = tf.keras.layers.Input( shape=(3,), dtype=tf.int64, name="model_input") emb_layer = tf.keras.layers.Embedding( input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384) emb_output = tf.reduce_mean(emb_layer(model_input), axis=1) dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output) model = tf.keras.Model({"features": model_input}, dense_output) optimizer = tf.keras.optimizers.legacy.RMSprop(learning_rate=0.1) accuracy = tf.keras.metrics.Accuracy()

FixedShardsPartitioner의 μ‚¬μš©μœΌλ‘œ λͺ¨λ“  λ³€μˆ˜κ°€ 두 개의 μƒ€λ“œλ‘œ λΆ„ν• λ˜κ³  각 μƒ€λ“œκ°€ λ‹€λ₯Έ λ§€κ°œλ³€μˆ˜ μ„œλ²„μ— ν• λ‹Ήλ˜μ—ˆλŠ”μ§€ ν™•μΈν•˜κ² μŠ΅λ‹ˆλ‹€.

assert len(emb_layer.weights) == 2 assert emb_layer.weights[0].shape == (4, 16384) assert emb_layer.weights[1].shape == (4, 16384) print(emb_layer.weights[0].device) print(emb_layer.weights[1].device)

ν›ˆλ ¨ 단계 μ •μ˜ν•˜κΈ°

μ…‹μ§Έ, tf.function에 λž˜ν•‘λœ ν›ˆλ ¨ 단계λ₯Ό μƒμ„±ν•©λ‹ˆλ‹€.

@tf.function def step_fn(iterator): def replica_fn(batch_data, labels): with tf.GradientTape() as tape: pred = model(batch_data, training=True) per_example_loss = tf.keras.losses.BinaryCrossentropy( reduction=tf.keras.losses.Reduction.NONE)(labels, pred) loss = tf.nn.compute_average_loss(per_example_loss) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64) accuracy.update_state(labels, actual_pred) return loss batch_data, labels = next(iterator) losses = strategy.run(replica_fn, args=(batch_data, labels)) return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

μœ„μ˜ ν›ˆλ ¨ 단계 ν•¨μˆ˜μ—μ„œ step_fnμ—μ„œ Strategy.run 및 Strategy.reduceλ₯Ό ν˜ΈμΆœν•˜λ©΄ μž‘μ—…μžλ‹Ή μ—¬λŸ¬ GPUλ₯Ό 지원할 수 μžˆμŠ΅λ‹ˆλ‹€. μž‘μ—…μžμ—κ²Œ GPUκ°€ ν• λ‹Ήλœ 경우 Strategy.run은 λ°μ΄ν„°μ„ΈνŠΈλ₯Ό μ—¬λŸ¬ λ³΅μ œλ³Έμ— λ°°ν¬ν•©λ‹ˆλ‹€.

원격 μž‘μ—…μžμ—κ²Œ ꡐ윑 단계 μ „λ‹¬ν•˜κΈ°

λͺ¨λ“  계산이 ParameterServerStrategy에 μ˜ν•΄ μ •μ˜λœ ν›„ tf.distribute.coordinator.ClusterCoordinator 클래슀λ₯Ό μ‚¬μš©ν•˜μ—¬ λ¦¬μ†ŒμŠ€λ₯Ό μƒμ„±ν•˜κ³  ν›ˆλ ¨ 단계λ₯Ό 원격 μž‘μ—…μžμ—κ²Œ λ°°ν¬ν•©λ‹ˆλ‹€.

λ¨Όμ € ClusterCoordinator 객체λ₯Ό λ§Œλ“€κ³  μ „λž΅ 객체λ₯Ό 전달해 λ³΄κ² μŠ΅λ‹ˆλ‹€.

coordinator = tf.distribute.coordinator.ClusterCoordinator(strategy)

그런 λ‹€μŒ λͺ¨λ“  μž‘μ—…μžμ—κ²Œ λ°μ΄ν„°μ„ΈνŠΈλ₯Ό λ³΅μ œν•˜λŠ” ClusterCoordinator.create_per_worker_dataset APIλ₯Ό μ‚¬μš©ν•˜μ—¬ μž‘μ—…μžλ³„ λ°μ΄ν„°μ„ΈνŠΈμ™€ 반볡자λ₯Ό λ§Œλ“­λ‹ˆλ‹€. GPU둜 미리 κ°€μ Έμ˜€κΈ°κ°€ μ›ν™œν•˜κ³  효율적으둜 μˆ˜ν–‰λ˜λ„λ‘ μ•„λž˜ per_worker_dataset_fnμ—μ„œ dataset_fn을 strategy.distribute_datasets_from_function으둜 λž˜ν•‘ν•˜λŠ” 것이 μ’‹μŠ΅λ‹ˆλ‹€.

@tf.function def per_worker_dataset_fn(): return strategy.distribute_datasets_from_function(dataset_fn) per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn) per_worker_iterator = iter(per_worker_dataset)

λ§ˆμ§€λ§‰ λ‹¨κ³„λŠ” ClusterCoordinator.schedule을 μ‚¬μš©ν•˜μ—¬ 원격 μž‘μ—…μžμ—κ²Œ 계산을 λ°°ν¬ν•˜λŠ” κ²ƒμž…λ‹ˆλ‹€.

  • schedule λ©”μ„œλ“œλŠ” tf.function을 λŒ€κΈ°μ—΄μ— λ„£κ³  λ―Έλž˜μ™€ 같은 RemoteValueλ₯Ό μ¦‰μ‹œ λ°˜ν™˜ν•©λ‹ˆλ‹€. λŒ€κΈ°μ—΄μ— 놓인 ν•¨μˆ˜λŠ” λ°±κ·ΈλΌμš΄λ“œ μŠ€λ ˆλ“œμ˜ 원격 μž‘μ—…μžμ—κ²Œ μ „λ‹¬λ˜κ³  RemoteValueλŠ” λΉ„λ™κΈ°μ‹μœΌλ‘œ μ±„μ›Œμ§‘λ‹ˆλ‹€.

  • join λ©”μ„œλ“œ(ClusterCoordinator.join)λŠ” μ˜ˆμ•½λœ λͺ¨λ“  ν•¨μˆ˜κ°€ 싀행될 λ•ŒκΉŒμ§€ λŒ€κΈ°ν•˜λŠ” 데 μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

num_epochs = 4 steps_per_epoch = 5 for i in range(num_epochs): accuracy.reset_states() for _ in range(steps_per_epoch): coordinator.schedule(step_fn, args=(per_worker_iterator,)) # Wait at epoch boundaries. coordinator.join() print("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))

λ‹€μŒμ€ RemoteValue의 κ²°κ³Όλ₯Ό κ°€μ Έμ˜€λŠ” λ°©λ²•μž…λ‹ˆλ‹€.

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,)) print("Final loss is %f" % loss.fetch())

λ˜λŠ” λͺ¨λ“  단계λ₯Ό μ‹œμž‘ν•˜κ³  μ™„λ£Œλ₯Ό κΈ°λ‹€λ¦¬λŠ” λ™μ•ˆ μž‘μ—…μ„ μˆ˜ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

for _ in range(total_steps): coordinator.schedule(step_fn, args=(per_worker_iterator,)) while not coordinator.done(): time.sleep(10) # Do something like logging metrics or writing checkpoints.

이 νŠΉμ • μ˜ˆμ œμ— λŒ€ν•œ 전체 ν›ˆλ ¨ 및 제곡 μ›Œν¬ν”Œλ‘œλŠ” 이 ν…ŒμŠ€νŠΈλ₯Ό ν™•μΈν•˜μ„Έμš”.

λ°μ΄ν„°μ„ΈνŠΈ 생성에 λŒ€ν•œ μΆ”κ°€ 사항

μœ„ μ½”λ“œμ˜ λ°μ΄ν„°μ„ΈνŠΈλŠ” ClusterCoordinator.create_per_worker_dataset APIλ₯Ό μ‚¬μš©ν•˜μ—¬ μƒμ„±λ©λ‹ˆλ‹€. μž‘μ—…μžλ‹Ή ν•˜λ‚˜μ˜ λ°μ΄ν„°μ„ΈνŠΈλ₯Ό μƒμ„±ν•˜κ³  μ»¨ν…Œμ΄λ„ˆ 객체λ₯Ό λ°˜ν™˜ν•©λ‹ˆλ‹€. μ—¬κΈ°μ„œ iter λ©”μ„œλ“œλ₯Ό ν˜ΈμΆœν•˜μ—¬ μž‘μ—…μžλ³„ 반볡자λ₯Ό λ§Œλ“€ 수 μžˆμŠ΅λ‹ˆλ‹€. μž‘μ—…μžλ³„ λ°˜λ³΅μžλŠ” μž‘μ—…μžλ‹Ή ν•˜λ‚˜μ˜ 반볡자λ₯Ό ν¬ν•¨ν•˜κ³  μž‘μ—…μžμ˜ ν•΄λ‹Ή μŠ¬λΌμ΄μŠ€λŠ” νŠΉμ • μž‘μ—…μžμ—μ„œ ν•¨μˆ˜κ°€ μ‹€ν–‰λ˜κΈ° 전에 ClusterCoordinator.schedule λ©”μ„œλ“œμ— μ „λ‹¬λœ ν•¨μˆ˜μ˜ μž…λ ₯ μΈμˆ˜μ—μ„œ λŒ€μ²΄λ©λ‹ˆλ‹€.

ClusterCoordinator.schedule λ©”μ„œλ“œλŠ” μž‘μ—…μžκ°€ λ™λ“±ν•˜λ‹€κ³  κ°€μ •ν•˜λ―€λ‘œ μ„œλ‘œ λ‹€λ₯Έ μž‘μ—…μžμ˜ λ°μ΄ν„°μ„ΈνŠΈκ°€ λ™μΌν•˜λ‹€κ³  κ°€μ •ν•©λ‹ˆλ‹€(λ‹€λ₯΄κ²Œ μ„žμΌ 수 μžˆλ‹€λŠ” 점은 μ œμ™Έ). 이 λ•Œλ¬Έμ— λ°μ΄ν„°μ„ΈνŠΈλ₯Ό λ°˜λ³΅ν•˜κ³ , λ°μ΄ν„°μ„ΈνŠΈμ—μ„œ OutOfRangeErrorλ₯Ό μˆ˜μ‹ ν•˜λŠ” λŒ€μ‹  μœ ν•œν•œ 수의 단계λ₯Ό μ˜ˆμ•½ν•˜λŠ” 것이 μ’‹μŠ΅λ‹ˆλ‹€.

또 λ‹€λ₯Έ μ€‘μš”ν•œ 점은 tf.data λ°μ΄ν„°μ„ΈνŠΈκ°€ μž‘μ—… 경계λ₯Ό λ„˜μ–΄ μ•”μ‹œμ  직렬화 및 역직렬화λ₯Ό μ§€μ›ν•˜μ§€ μ•ŠλŠ”λ‹€λŠ” κ²ƒμž…λ‹ˆλ‹€. λ”°λΌμ„œ ClusterCoordinator.create_per_worker_dataset에 μ „λ‹¬λœ ν•¨μˆ˜ 내뢀에 전체 λ°μ΄ν„°μ„ΈνŠΈλ₯Ό μƒμ„±ν•˜λŠ” 것이 μ€‘μš”ν•©λ‹ˆλ‹€. create_per_worker_dataset APIλŠ” tf.data.Dataset λ˜λŠ” tf.distribute.DistributedDataset을 직접 μž…λ ₯으둜 μ‚¬μš©ν•  μˆ˜λ„ μžˆμŠ΅λ‹ˆλ‹€.

평가

tf.distribute.ParameterServerStrategy ν›ˆλ ¨μœΌλ‘œ 평가λ₯Ό μˆ˜ν–‰ν•˜λŠ” 두 κ°€μ§€ μ£Όμš” μ ‘κ·Ό 방식은 인라인 평가와 μ‚¬μ΄λ“œμΉ΄ ν‰κ°€μž…λ‹ˆλ‹€. 각각은 μ•„λž˜μ™€ 같이 μž₯단점이 μžˆμŠ΅λ‹ˆλ‹€. νŠΉλ³„ν•œ μ„ ν˜Έλ„κ°€ μ—†λ‹€λ©΄ 인라인 평가 방법을 ꢌμž₯ν•©λ‹ˆλ‹€.

인라인 평가

이 λ°©λ²•μ—μ„œ μ½”λ””λ„€μ΄ν„°λŠ” ν›ˆλ ¨κ³Ό 평가λ₯Ό λ²ˆκ°ˆμ•„ κ°€λ©° μˆ˜ν–‰ν•˜λ―€λ‘œ 인라인 평가라고 ν•©λ‹ˆλ‹€.

인라인 ν‰κ°€μ—λŠ” λ‹€μŒκ³Ό 같은 λͺ‡ κ°€μ§€ 이점이 μžˆμŠ΅λ‹ˆλ‹€.

  • 단일 μž‘μ—…μœΌλ‘œ λ³΄μœ ν•  수 μ—†λŠ” λŒ€κ·œλͺ¨ 평가 λͺ¨λΈ 및 평가 λ°μ΄ν„°μ„ΈνŠΈλ₯Ό 지원할 수 μžˆμŠ΅λ‹ˆλ‹€.

  • 평가 κ²°κ³ΌλŠ” 예λ₯Ό λ“€μ–΄ ν›ˆλ ¨μ„ 쑰기에 쀑단할지 여뢀와 같이 λ‹€μŒ epochλ₯Ό ν›ˆλ ¨ν•˜κΈ° μœ„ν•œ 결정을 λ‚΄λ¦¬λŠ” 데 μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

인라인 평가λ₯Ό κ΅¬ν˜„ν•˜λŠ” λ°©λ²•μ—λŠ” 직접 평가와 λΆ„μ‚° ν‰κ°€μ˜ 두 κ°€μ§€κ°€ μžˆμŠ΅λ‹ˆλ‹€.

  • 직접 평가: μ†Œκ·œλͺ¨ λͺ¨λΈ 및 평가 λ°μ΄ν„°μ„ΈνŠΈμ˜ 경우 μ½”λ””λ„€μ΄ν„°λŠ” μ½”λ””λ„€μ΄ν„°μ˜ 평가 λ°μ΄ν„°μ„ΈνŠΈλ₯Ό μ‚¬μš©ν•˜μ—¬ λΆ„μ‚° λͺ¨λΈμ—μ„œ 직접 평가λ₯Ό μ‹€ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

eval_dataset = tf.data.Dataset.from_tensor_slices( feature_and_label_gen(num_examples=16)).map( lambda x: ( {"features": feature_preprocess_stage(x["features"])}, label_preprocess_stage(x["label"]) )).batch(8) eval_accuracy = tf.keras.metrics.Accuracy() for batch_data, labels in eval_dataset: pred = model(batch_data, training=False) actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64) eval_accuracy.update_state(labels, actual_pred) print("Evaluation accuracy: %f" % eval_accuracy.result())
  • λΆ„μ‚° 평가: μ½”λ””λ„€μ΄ν„°μ—μ„œ 직접 μ‹€ν–‰ν•  수 μ—†λŠ” λŒ€κ·œλͺ¨ λͺ¨λΈ λ˜λŠ” λ°μ΄ν„°μ„ΈνŠΈμ˜ 경우, 코디넀이터 μž‘μ—…μœΌλ‘œ ClusterCoordinator.schedule/ClusterCoordinator.join λ©”μ„œλ“œλ₯Ό 톡해 μž‘μ—…μžμ—κ²Œ 평가 μž‘μ—…μ„ 배포할 수 μžˆμŠ΅λ‹ˆλ‹€.

with strategy.scope(): # Define the eval metric on parameter servers. eval_accuracy = tf.keras.metrics.Accuracy() @tf.function def eval_step(iterator): def replica_fn(batch_data, labels): pred = model(batch_data, training=False) actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64) eval_accuracy.update_state(labels, actual_pred) batch_data, labels = next(iterator) strategy.run(replica_fn, args=(batch_data, labels)) def eval_dataset_fn(): return tf.data.Dataset.from_tensor_slices( feature_and_label_gen(num_examples=16)).map( lambda x: ( {"features": feature_preprocess_stage(x["features"])}, label_preprocess_stage(x["label"]) )).shuffle(16).repeat().batch(8) per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn) per_worker_eval_iterator = iter(per_worker_eval_dataset) eval_steps_per_epoch = 2 for _ in range(eval_steps_per_epoch): coordinator.schedule(eval_step, args=(per_worker_eval_iterator,)) coordinator.join() print("Evaluation accuracy: %f" % eval_accuracy.result())

μ°Έκ³ : tf.distribute.coordinator.ClusterCoordinator의 schedule 및 join λ©”μ„œλ“œλŠ” λ°©λ¬Έ 보μž₯ λ˜λŠ” μ •ν™•νžˆ ν•œ 번의 의미 체계λ₯Ό μ§€μ›ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€. 즉, λ°μ΄ν„°μ„ΈνŠΈμ˜ λͺ¨λ“  평가 μ˜ˆμ œκ°€ μ •ν™•νžˆ ν•œ 번 ν‰κ°€λœλ‹€λŠ” 보μž₯은 μ—†μŠ΅λ‹ˆλ‹€. μΌλΆ€λŠ” λ°©λ¬Έλ˜μ§€ μ•Šμ„ 수 있고 μΌλΆ€λŠ” μ—¬λŸ¬ 번 평가될 수 μžˆμŠ΅λ‹ˆλ‹€. tf.data μ„œλΉ„μŠ€ APIλŠ” ParameterServerStrategyλ₯Ό μ‚¬μš©ν•  λ•Œ 평가λ₯Ό μœ„ν•΄ μ •ν™•νžˆ ν•œ 번 방문을 μ œκ³΅ν•˜λŠ” 데 μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€(tf.data.experimental.service API λ¬Έμ„œμ˜ 동적 샀딩 μ„Ήμ…˜ μ°Έμ‘°).

μ‚¬μ΄λ“œμΉ΄ 평가

tf.distribute.ParameterServerStrategy ν›ˆλ ¨μ—μ„œ 평가 루프λ₯Ό μ •μ˜ν•˜κ³  μ‹€ν–‰ν•˜λŠ” 또 λ‹€λ₯Έ 방법은 체크포인트λ₯Ό 반볡적으둜 읽고 μ΅œμ‹  μ²΄ν¬ν¬μΈνŠΈμ—μ„œ 평가λ₯Ό μ‹€ν–‰ν•˜λŠ” μ „μš© ν‰κ°€μž μž‘μ—…μ„ μƒμ„±ν•˜λŠ” μ‚¬μ΄λ“œμΉ΄ ν‰κ°€μž…λ‹ˆλ‹€(μ²΄ν¬ν¬μΈνŠΈμ— λŒ€ν•œ μžμ„Έν•œ λ‚΄μš©μ€ 이 κ°€μ΄λ“œ μ°Έμ‘°). μˆ˜μ„ 및 μž‘μ—…μž μž‘μ—…μ€ 평가에 μ‹œκ°„μ„ 듀이지 μ•ŠμœΌλ―€λ‘œ κ³ μ •λœ 반볡 νšŸμˆ˜μ— λŒ€ν•΄ 전체 ν›ˆλ ¨ μ‹œκ°„μ€ λ‹€λ₯Έ 평가 방법을 μ‚¬μš©ν•˜λŠ” 것보닀 μ§§μŠ΅λ‹ˆλ‹€. κ·ΈλŸ¬λ‚˜ 평가λ₯Ό νŠΈλ¦¬κ±°ν•˜λ €λ©΄ 좔가적인 ν‰κ°€μž μž‘μ—…κ³Ό 주기적인 체크포인트 μ ˆμ°¨κ°€ ν•„μš”ν•©λ‹ˆλ‹€.

μ‚¬μ΄λ“œμΉ΄ 평가λ₯Ό μœ„ν•œ 평가 루프λ₯Ό μž‘μ„±ν•  λ•Œ 두 κ°€μ§€ μ˜΅μ…˜μ΄ μžˆμŠ΅λ‹ˆλ‹€.

  1. tf.keras.utils.SidecarEvaluator APIλ₯Ό μ‚¬μš©ν•©λ‹ˆλ‹€.

  2. μ‚¬μš©μž μ •μ˜ 평가 루프λ₯Ό λ§Œλ“­λ‹ˆλ‹€.

μ˜΅μ…˜ 1에 λŒ€ν•œ μžμ„Έν•œ λ‚΄μš©μ€ tf.keras.utils.SidecarEvaluator API λ¬Έμ„œλ₯Ό μ°Έμ‘°ν•˜μ„Έμš”.

μ‚¬μ΄λ“œμΉ΄ ν‰κ°€λŠ” 단일 μž‘μ—…μ—μ„œλ§Œ μ§€μ›λ©λ‹ˆλ‹€. 이것은 λ‹€μŒμ„ μ˜λ―Έν•©λ‹ˆλ‹€.

  • 각 μ˜ˆμ œλŠ” ν™•μ‹€ν•˜κ²Œ ν•œ 번만 ν‰κ°€λ©λ‹ˆλ‹€. ν‰κ°€μžκ°€ μ„ μ λ˜κ±°λ‚˜ λ‹€μ‹œ μ‹œμž‘λ˜λŠ” 경우 졜근 μ²΄ν¬ν¬μΈνŠΈμ—μ„œ 평가 루프λ₯Ό λ‹€μ‹œ μ‹œμž‘ν•˜κ³  λ‹€μ‹œ μ‹œμž‘ν•˜κΈ° 전에 이루어진 평가 μ§„ν–‰ 뢀뢄은 νκΈ°λ©λ‹ˆλ‹€.

  • κ·ΈλŸ¬λ‚˜ 단일 μž‘μ—…μ— λŒ€ν•΄ 평가λ₯Ό μ‹€ν–‰ν•˜λ©΄ 전체 평가에 μ‹œκ°„μ΄ 였래 걸릴 수 μžˆμŠ΅λ‹ˆλ‹€.

  • λͺ¨λΈμ˜ 크기가 λ„ˆλ¬΄ μ»€μ„œ ν‰κ°€μžμ˜ λ©”λͺ¨λ¦¬μ— λ§žμ§€ μ•ŠλŠ” 경우 단일 μ‚¬μ΄λ“œμΉ΄ 평가가 μ μš©λ˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.

또 λ‹€λ₯Έ 주의 사항은 tf.keras.utils.SidecarEvaluator κ΅¬ν˜„κ³Ό μ•„λž˜μ˜ μ‚¬μš©μž μ •μ˜ 평가 루프가 항상 μ‚¬μš© κ°€λŠ₯ν•œ μ΅œμ‹  체크포인트λ₯Ό μ„ νƒν•˜κ³  평가 epoch λ™μ•ˆ ν›ˆλ ¨ ν΄λŸ¬μŠ€ν„°μ—μ„œ μ—¬λŸ¬ 체크포인트λ₯Ό 생성할 수 있기 λ•Œλ¬Έμ— 일뢀 체크포인트λ₯Ό κ±΄λ„ˆλ›Έ 수 μžˆλ‹€λŠ” κ²ƒμž…λ‹ˆλ‹€. λͺ¨λ“  체크포인트λ₯Ό ν‰κ°€ν•˜λŠ” μ‚¬μš©μž μ •μ˜ 평가 루프λ₯Ό μž‘μ„±ν•  수 μžˆμ§€λ§Œ 이 νŠœν† λ¦¬μ–Όμ—μ„œλŠ” 닀루지 μ•ŠμŠ΅λ‹ˆλ‹€. λ°˜λ©΄μ— μ²΄ν¬ν¬μΈνŠΈκ°€ 평가λ₯Ό μ‹€ν–‰ν•˜λŠ” 데 κ±Έλ¦¬λŠ” μ‹œκ°„λ³΄λ‹€ 덜 자주 μƒμ„±λ˜λ©΄ 유휴 μƒνƒœλ‘œ μžˆμ„ 수 μžˆμŠ΅λ‹ˆλ‹€.

μ‚¬μš©μž μ •μ˜ 평가 λ£¨ν”„λŠ” 평가할 체크포인트λ₯Ό μ„ νƒν•˜κ±°λ‚˜ 평가와 ν•¨κ»˜ μ‹€ν–‰ν•  μΆ”κ°€ 논리λ₯Ό μ œκ³΅ν•˜λŠ” λ“± μ„ΈλΆ€ 사항에 λŒ€ν•œ 더 λ§Žμ€ ν†΅μ œλ ₯을 μ œκ³΅ν•©λ‹ˆλ‹€. λ‹€μŒμ€ κ°€λŠ₯ν•œ μ‚¬μš©μž μ •μ˜ μ‚¬μ΄λ“œμΉ΄ 평가 λ£¨ν”„μž…λ‹ˆλ‹€.

checkpoint_dir = ... eval_model = ... eval_data = ... checkpoint = tf.train.Checkpoint(model=eval_model) for latest_checkpoint in tf.train.checkpoints_iterator( checkpoint_dir): try: checkpoint.restore(latest_checkpoint).expect_partial() except (tf.errors.OpError,) as e: # checkpoint may be deleted by training when it is about to read it. continue # Optionally add callbacks to write summaries. eval_model.evaluate(eval_data) # Evaluation finishes when it has evaluated the last epoch. if latest_checkpoint.endswith('-{}'.format(train_epochs)): break

μ‹€μ œ μƒν™©μ—μ„œ ν΄λŸ¬μŠ€ν„°

μ°Έκ³ : 이 μ„Ήμ…˜μ€ 이 νŽ˜μ΄μ§€μ˜ νŠœν† λ¦¬μ–Ό μ½”λ“œλ₯Ό μ‹€ν–‰ν•˜λŠ” 데 ν•„μš”ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.

μ‹€μ œ ν”„λ‘œλ•μ…˜ ν™˜κ²½μ—μ„œλŠ” μ„œλ‘œ λ‹€λ₯Έ λ¨Έμ‹ μ˜ μ„œλ‘œ λ‹€λ₯Έ ν”„λ‘œμ„ΈμŠ€μ—μ„œ λͺ¨λ“  μž‘μ—…μ„ μ‹€ν–‰ν•©λ‹ˆλ‹€. 각 μž‘μ—…μ— λŒ€ν•œ ν΄λŸ¬μŠ€ν„° 정보λ₯Ό κ΅¬μ„±ν•˜λŠ” κ°€μž₯ κ°„λ‹¨ν•œ 방법은 "TF_CONFIG" ν™˜κ²½ λ³€μˆ˜λ₯Ό μ„€μ •ν•˜κ³  tf.distribute.cluster_resolver.TFConfigClusterResolverλ₯Ό μ‚¬μš©ν•˜μ—¬ "TF_CONFIG"λ₯Ό ꡬ문 λΆ„μ„ν•˜λŠ” κ²ƒμž…λ‹ˆλ‹€.

"TF_CONFIG" ν™˜κ²½ λ³€μˆ˜μ— λŒ€ν•œ 일반적인 μ„€λͺ…은 λΆ„μ‚° ν›ˆλ ¨ κ°€μ΄λ“œμ˜ "TF_CONFIG ν™˜κ²½ λ³€μˆ˜ μ„€μ •"을 μ°Έμ‘°ν•˜μ„Έμš”.

Kubernetes λ˜λŠ” 기타 ꡬ성 ν…œν”Œλ¦Ώμ„ μ‚¬μš©ν•˜μ—¬ ν›ˆλ ¨ μž‘μ—…μ„ μ‹œμž‘ν•˜λŠ” 경우 μ΄λŸ¬ν•œ ν…œν”Œλ¦Ώμ΄ 이미 β€œTF_CONFIG"λ₯Ό μ„€μ •ν–ˆμ„ κ°€λŠ₯성이 λ†’μŠ΅λ‹ˆλ‹€.

"TF_CONFIG" ν™˜κ²½ λ³€μˆ˜ μ„€μ •ν•˜κΈ°

3개의 μž‘μ—…μžμ™€ 2개의 λ§€κ°œλ³€μˆ˜ μ„œλ²„κ°€ μžˆλ‹€κ³  κ°€μ •ν•©λ‹ˆλ‹€. 그러면 μž‘μ—…μž 1의 "TF_CONFIG"λŠ” λ‹€μŒκ³Ό 같을 수 μžˆμŠ΅λ‹ˆλ‹€.

os.environ["TF_CONFIG"] = json.dumps({ "cluster": { "worker": ["host1:port", "host2:port", "host3:port"], "ps": ["host4:port", "host5:port"], "chief": ["host6:port"] }, "task": {"type": "worker", "index": 1} })

ν‰κ°€μžμ˜ "TF_CONFIG"λŠ” λ‹€μŒκ³Ό 같을 수 μžˆμŠ΅λ‹ˆλ‹€.

os.environ["TF_CONFIG"] = json.dumps({ "cluster": { "evaluator": ["host7:port"] }, "task": {"type": "evaluator", "index": 0} })

μœ„μ˜ ν‰κ°€μžμ— λŒ€ν•œ "TF_CONFIG" λ¬Έμžμ—΄ 쀑 "cluster" 뢀뢄은 선택 μ‚¬ν•­μž…λ‹ˆλ‹€.

λͺ¨λ“  μž‘μ—…μ— λ™μΌν•œ λ°”μ΄λ„ˆλ¦¬λ₯Ό μ‚¬μš©ν•˜λŠ” 경우

단일 λ°”μ΄λ„ˆλ¦¬λ₯Ό μ‚¬μš©ν•˜μ—¬ μ΄λŸ¬ν•œ λͺ¨λ“  μž‘μ—…μ„ μ‹€ν–‰ν•˜λ €λ©΄ μ²˜μŒμ— ν”„λ‘œκ·Έλž¨μ΄ μ—¬λŸ¬ μ—­ν• λ‘œ λΆ„κΈ°λ˜λ„λ‘ ν•΄μ•Ό ν•©λ‹ˆλ‹€.

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver() if cluster_resolver.task_type in ("worker", "ps"): # Start a TensorFlow server and wait. elif cluster_resolver.task_type == "evaluator": # Run sidecar evaluation else: # Run the coordinator.

TensorFlow μ„œλ²„λ₯Ό μ‹œμž‘ν•˜κ³  λŒ€κΈ°ν•˜λŠ” λ‹€μŒ μ½”λ“œλŠ” "worker" 및 "ps" 역할에 μœ μš©ν•©λ‹ˆλ‹€.

# Set the environment variable to allow reporting worker and ps failure to the # coordinator. This is a workaround and won't be necessary in the future. os.environ["GRPC_FAIL_FAST"] = "use_caller" server = tf.distribute.Server( cluster_resolver.cluster_spec(), job_name=cluster_resolver.task_type, task_index=cluster_resolver.task_id, protocol=cluster_resolver.rpc_layer or "grpc", start=True) server.join()

μž‘μ—… 였λ₯˜ μ²˜λ¦¬ν•˜κΈ°

μž‘μ—…μž 였λ₯˜

tf.distribute.coordinator.ClusterCoordinator μ‚¬μš©μž μ •μ˜ ν›ˆλ ¨ 루프와 Model.fit μ ‘κ·Ό 방식은 λͺ¨λ‘ μž‘μ—…μž 였λ₯˜μ— λŒ€ν•œ 내결함성을 기본적으둜 μ œκ³΅ν•©λ‹ˆλ‹€. μž‘μ—…μž 볡ꡬ μ‹œ ClusterCoordinatorλŠ” μž‘μ—…μžμ— λŒ€ν•œ λ°μ΄ν„°μ„ΈνŠΈ μž¬μƒμ„±μ„ ν˜ΈμΆœν•©λ‹ˆλ‹€.

λ§€κ°œλ³€μˆ˜ μ„œλ²„ λ˜λŠ” 코디넀이터 였λ₯˜

κ·ΈλŸ¬λ‚˜ μ½”λ””λ„€μ΄ν„°λŠ” λ§€κ°œλ³€μˆ˜ μ„œλ²„ 였λ₯˜λ₯Ό λ°œκ²¬ν•˜λ©΄ μ¦‰μ‹œ UnavailableError λ˜λŠ” AbortedErrorλ₯Ό λ°œμƒμ‹œν‚΅λ‹ˆλ‹€. 이 경우 코디넀이터λ₯Ό λ‹€μ‹œ μ‹œμž‘ν•  수 μžˆμŠ΅λ‹ˆλ‹€. 코디넀이터 μžμ²΄λ„ μ‚¬μš©ν•  수 μ—†κ²Œ 될 수 μžˆμŠ΅λ‹ˆλ‹€. λ”°λΌμ„œ ν›ˆλ ¨ μ§„ν–‰ 상황을 μžƒμ§€ μ•ŠκΈ° μœ„ν•΄ νŠΉμ • 도ꡬλ₯Ό μ‚¬μš©ν•˜λŠ” 것이 μ’‹μŠ΅λ‹ˆλ‹€.

  • Model.fit의 경우 μ§„ν–‰λ₯  μ €μž₯ 및 볡원을 μžλ™μœΌλ‘œ μ²˜λ¦¬ν•˜λŠ” BackupAndRestore μ½œλ°±μ„ μ‚¬μš©ν•΄μ•Ό ν•©λ‹ˆλ‹€. μ˜ˆμ œλŠ” μœ„μ˜ 콜백 및 ν›ˆλ ¨ μ„Ήμ…˜μ„ μ°Έμ‘°ν•˜μ„Έμš”.

  • μ‚¬μš©μž μ •μ˜ ν›ˆλ ¨ λ£¨ν”„μ˜ 경우 ν›ˆλ ¨μ΄ μ‹œμž‘λ˜κΈ° 전에 주기적으둜 λͺ¨λΈ λ³€μˆ˜λ₯Ό μ²΄ν¬ν¬μΈνŠΈν•˜κ³  μ²΄ν¬ν¬μΈνŠΈμ—μ„œ λͺ¨λΈ λ³€μˆ˜λ₯Ό λ‘œλ“œν•΄μ•Ό ν•©λ‹ˆλ‹€(μžˆλŠ” 경우). μ˜΅ν‹°λ§ˆμ΄μ €κ°€ 체크포인트된 경우 ν›ˆλ ¨ μ§„ν–‰ 상황은 optimizer.iterationsμ—μ„œ λŒ€λž΅μ μœΌλ‘œ μΆ”λ‘ ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

checkpoint_manager = tf.train.CheckpointManager( tf.train.Checkpoint(model=model, optimizer=optimizer), checkpoint_dir, max_to_keep=3) if checkpoint_manager.latest_checkpoint: checkpoint = checkpoint_manager.checkpoint checkpoint.restore( checkpoint_manager.latest_checkpoint).assert_existing_objects_matched() global_steps = int(optimizer.iterations.numpy()) starting_epoch = global_steps // steps_per_epoch for _ in range(starting_epoch, num_epochs): for _ in range(steps_per_epoch): coordinator.schedule(step_fn, args=(per_worker_iterator,)) coordinator.join() checkpoint_manager.save()

RemoteValue κ°€μ Έμ˜€κΈ°

ν•¨μˆ˜κ°€ μ„±κ³΅μ μœΌλ‘œ μ‹€ν–‰λ˜λ©΄ RemoteValue κ°€μ Έμ˜€κΈ°μ˜ 성곡이 보μž₯λ©λ‹ˆλ‹€. ν˜„μž¬λŠ” ν•¨μˆ˜κ°€ μ‹€ν–‰λœ ν›„ λ°˜ν™˜ 값이 μ¦‰μ‹œ 코디넀이터에 λ³΅μ‚¬λ˜κΈ° λ•Œλ¬Έμž…λ‹ˆλ‹€. λ³΅μ‚¬ν•˜λŠ” λ™μ•ˆ μž‘μ—…μž 였λ₯˜κ°€ λ°œμƒν•˜λ©΄ μ‚¬μš© κ°€λŠ₯ν•œ λ‹€λ₯Έ μž‘μ—…μžμ—μ„œ ν•¨μˆ˜κ°€ μž¬μ‹œλ„λ©λ‹ˆλ‹€. λ”°λΌμ„œ μ„±λŠ₯을 μ΅œμ ν™”ν•˜λ €λ©΄ λ°˜ν™˜ κ°’ 없이 ν•¨μˆ˜λ₯Ό μ˜ˆμ•½ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

였λ₯˜ 보고

코디넀이터가 λ§€κ°œλ³€μˆ˜ μ„œλ²„μ˜ UnavailableError와 같은 였λ₯˜ λ˜λŠ” tf.debugging.check_numerics의 InvalidArgument와 같은 기타 μ• ν”Œλ¦¬μΌ€μ΄μ…˜ 였λ₯˜λ₯Ό ν™•μΈν•˜λ©΄ 였λ₯˜λ₯Ό λ°œμƒμ‹œν‚€κΈ° 전에 보λ₯˜ 쀑 및 λŒ€κΈ°μ—΄μ— μžˆλŠ” λͺ¨λ“  ν•¨μˆ˜λ₯Ό μ·¨μ†Œν•©λ‹ˆλ‹€. ν•΄λ‹Ή RemoteValueλ₯Ό κ°€μ Έμ˜€λ©΄ CancelledErrorκ°€ λ°œμƒν•©λ‹ˆλ‹€.

였λ₯˜κ°€ λ°œμƒν•œ ν›„ μ½”λ””λ„€μ΄ν„°λŠ” λ™μΌν•œ 였λ₯˜ λ˜λŠ” μ·¨μ†Œλœ ν•¨μˆ˜μ˜ 였λ₯˜λ₯Ό λ°œμƒμ‹œν‚€μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.

μ„±λŠ₯ ν–₯상

tf.distribute.ParameterServerStrategy 및 tf.distribute.coordinator.ClusterCoordinator둜 ν›ˆλ ¨ν•  λ•Œ μ„±λŠ₯ λ¬Έμ œκ°€ λ°œμƒν•  수 μžˆλŠ” λͺ‡ κ°€μ§€ κ°€λŠ₯ν•œ μ΄μœ κ°€ μžˆμŠ΅λ‹ˆλ‹€.

ν•œ κ°€μ§€ 일반적인 μ΄μœ λŠ” λ§€κ°œλ³€μˆ˜ μ„œλ²„μ˜ λ‘œλ“œ κ· ν˜•μ΄ λ§žμ§€ μ•Šκ³  λ‘œλ“œκ°€ μ‹¬ν•œ 일뢀 λ§€κ°œλ³€μˆ˜ μ„œλ²„κ°€ μš©λŸ‰μ— λ„λ‹¬ν–ˆκΈ° λ•Œλ¬Έμž…λ‹ˆλ‹€. λ˜ν•œ μ—¬λŸ¬ κ·Όλ³Έ 원인이 μžˆμ„ 수 μžˆμŠ΅λ‹ˆλ‹€. λ‹€μŒκ³Ό 같은 λͺ‡ κ°€μ§€ κ°„λ‹¨ν•œ λ°©λ²•μœΌλ‘œ 이 문제λ₯Ό μ™„ν™”ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

  1. ParameterServerStrategyλ₯Ό ꡬ성할 λ•Œ variable_partitionerλ₯Ό μ§€μ •ν•˜μ—¬ 큰 λͺ¨λΈ λ³€μˆ˜λ₯Ό λΆ„ν• ν•©λ‹ˆλ‹€.

  2. κ°€λŠ₯ν•˜λ©΄ λͺ¨λ“  λ§€κ°œλ³€μˆ˜ μ„œλ²„μ— ν•„μš”ν•œ ν•«μŠ€νŒŸ λ³€μˆ˜λ₯Ό 단일 λ‹¨κ³„λ‘œ μƒμ„±ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€. 예λ₯Ό λ“€μ–΄, μ˜΅ν‹°λ§ˆμ΄μ €μ—μ„œ μΌμ •ν•œ ν•™μŠ΅λ₯  λ˜λŠ” ν•˜μœ„ 클래슀 tf.keras.optimizers.schedules.LearningRateSchedule을 μ‚¬μš©ν•©λ‹ˆλ‹€. ν•™μŠ΅λ₯ μ΄ νŠΉμ • λ§€κ°œλ³€μˆ˜ μ„œλ²„μ— 배치되고 각 λ‹¨κ³„μ—μ„œ λ‹€λ₯Έ λͺ¨λ“  λ§€κ°œλ³€μˆ˜ μ„œλ²„μ—μ„œ μš”μ²­ν•˜λŠ” λ³€μˆ˜κ°€ λ˜λŠ” 것이 κΈ°λ³Έ λ™μž‘μ΄κΈ° λ•Œλ¬Έμž…λ‹ˆλ‹€.

  3. Keras μ „μ²˜λ¦¬ λ ˆμ΄μ–΄μ— μ „λ‹¬ν•˜κΈ° 전에 큰 μ–΄νœ˜λ₯Ό μ„žμŠ΅λ‹ˆλ‹€.

μ„±λŠ₯ 문제의 또 λ‹€λ₯Έ κ°€λŠ₯ν•œ μ΄μœ λŠ” 코디넀이터에 μžˆμŠ΅λ‹ˆλ‹€. schedule/join의 κ΅¬ν˜„μ€ Python κΈ°λ°˜μ΄λ―€λ‘œ μŠ€λ ˆλ”© μ˜€λ²„ν—€λ“œκ°€ μžˆμ„ 수 μžˆμŠ΅λ‹ˆλ‹€. λ˜ν•œ 코디넀이터와 μž‘μ—…μž κ°„μ˜ λŒ€κΈ° μ‹œκ°„μ΄ 클 수 μžˆμŠ΅λ‹ˆλ‹€. μ΄λŸ¬ν•œ 경우라면 λ‹€μŒκ³Ό 같이 ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

  • Model.fit의 경우 Model.compile에 제곡된 steps_per_execution 인수λ₯Ό 1보닀 큰 κ°’μœΌλ‘œ μ„€μ •ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

  • μ‚¬μš©μž μ •μ˜ ν›ˆλ ¨ λ£¨ν”„μ˜ 경우 μ—¬λŸ¬ 단계λ₯Ό 단일 tf.function으둜 묢을 수 μžˆμŠ΅λ‹ˆλ‹€.

steps_per_invocation = 10 @tf.function def step_fn(iterator): for _ in range(steps_per_invocation): features, labels = next(iterator) def replica_fn(features, labels): ... strategy.run(replica_fn, args=(features, labels))

λΌμ΄λΈŒλŸ¬λ¦¬κ°€ λ”μš± μ΅œμ ν™”λ¨μ— 따라 μ•žμœΌλ‘œ λŒ€λΆ€λΆ„μ˜ μ‚¬μš©μžκ°€ μˆ˜λ™μœΌλ‘œ 단계λ₯Ό 묢을 ν•„μš”κ°€ μ—†κ²Œ 되기λ₯Ό λ°”λžλ‹ˆλ‹€.

λ˜ν•œ μ„±λŠ₯ ν–₯상을 μœ„ν•œ μ•½κ°„μ˜ μš”λ Ήμ€ μœ„μ˜ μž‘μ—… 였λ₯˜ μ²˜λ¦¬ν•˜κΈ° μ„Ήμ…˜μ—μ„œ μ„€λͺ…ν•œ λŒ€λ‘œ λ°˜ν™˜ κ°’ 없이 ν•¨μˆ˜λ₯Ό μ˜ˆμ•½ν•˜λŠ” κ²ƒμž…λ‹ˆλ‹€.

μ•Œλ €μ§„ μ œν•œ 사항

μ•Œλ €μ§„ λŒ€λΆ€λΆ„μ˜ μ œν•œ 사항은 이미 μœ„ μ„Ήμ…˜μ—μ„œ λ‹€λ£¨μ—ˆμŠ΅λ‹ˆλ‹€. 이 μ„Ήμ…˜μ—μ„œλŠ” μš”μ•½μ„ μ œκ³΅ν•©λ‹ˆλ‹€.

ParameterServerStrategy 일반

  • 내결함성이 μ œλŒ€λ‘œ μž‘λ™ν•˜λ €λ©΄ 코디넀이터λ₯Ό ν¬ν•¨ν•œ λͺ¨λ“  μž‘μ—…μ— os.environment["grpc_fail_fast"]="use_caller"κ°€ ν•„μš”ν•©λ‹ˆλ‹€.

  • 동기 λ§€κ°œλ³€μˆ˜ μ„œλ²„ ν›ˆλ ¨μ€ μ§€μ›λ˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.

  • 졜적의 μ„±λŠ₯을 μ–»μœΌλ €λ©΄ 일반적으둜 μ—¬λŸ¬ 단계λ₯Ό 단일 ν•¨μˆ˜λ‘œ μ••μΆ•ν•΄μ•Ό ν•©λ‹ˆλ‹€.

  • μƒ€λ”©λœ λ³€μˆ˜λ₯Ό ν¬ν•¨ν•˜λŠ” tf.saved_model.loadλ₯Ό 톡해 saved_model을 λ‘œλ“œν•˜λŠ” 것은 μ§€μ›λ˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€. TensorFlow Serving을 μ‚¬μš©ν•˜μ—¬ μ΄λŸ¬ν•œ stored_model을 λ‘œλ“œν•˜λŠ” 것은 μž‘λ™ν•  κ²ƒμœΌλ‘œ μ˜ˆμƒλ©λ‹ˆλ‹€(μžμ„Έν•œ λ‚΄μš©μ€ 제곡 νŠœν† λ¦¬μ–Ό μ°Έμ‘°).

  • 코디넀이터 μž‘μ—…μ„ λ‹€μ‹œ μ‹œμž‘ν•˜μ§€ μ•Šκ³  λ§€κ°œλ³€μˆ˜ μ„œλ²„ 였λ₯˜μ—μ„œ λ³΅κ΅¬ν•˜λŠ” 것은 μ§€μ›λ˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.

  • tf.keras.layers.IntegerLookup, tf.keras.layers.StringLookup 및 tf.keras.layers.TextVectorizationκ³Ό 같은 일뢀 Keras μ „μ²˜λ¦¬ λ ˆμ΄μ–΄μ—μ„œ 일반적으둜 μ‚¬μš©λ˜λŠ” tf.lookup.StaticHashTable의 생성은 Strategy.scope μ•„λž˜μ— λ°°μΉ˜ν•΄μ•Ό ν•©λ‹ˆλ‹€. κ·Έλ ‡μ§€ μ•ŠμœΌλ©΄ λ¦¬μ†ŒμŠ€κ°€ 코디넀이터에 배치되고 μž‘μ—…μžμ—μ„œ μ½”λ””λ„€μ΄ν„°λ‘œμ˜ 쑰회 RPCκ°€ μ„±λŠ₯에 영ν–₯을 λ―ΈμΉ©λ‹ˆλ‹€.

Model.fit 특이 사항

  • steps_per_epoch μΈμˆ˜λŠ” Model.fit에 ν•„μš”ν•©λ‹ˆλ‹€. Epochμ—μ„œ μ μ ˆν•œ 간격을 μ œκ³΅ν•˜λŠ” 값을 선택할 수 μžˆμŠ΅λ‹ˆλ‹€.

  • ParameterServerStrategyλŠ” μ„±λŠ₯μƒμ˜ 이유둜 배치 μˆ˜μ€€ 호좜이 μžˆλŠ” μ‚¬μš©μž μ •μ˜ μ½œλ°±μ„ μ§€μ›ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€. μ΄λŸ¬ν•œ ν˜ΈμΆœμ„ μ μ ˆν•˜κ²Œ μ„ νƒλœ steps_per_epochλ₯Ό μ΄μš©ν•΄ epoch μˆ˜μ€€ 호좜둜 λ³€ν™˜ν•˜μ—¬ steps_per_epoch 단계 μˆ˜λ§ˆλ‹€ ν˜ΈμΆœλ˜λ„λ‘ ν•΄μ•Ό ν•©λ‹ˆλ‹€. λ‚΄μž₯ μ½œλ°±μ€ 영ν–₯을 λ°›μ§€ μ•ŠμŠ΅λ‹ˆλ‹€(ν•΄λ‹Ή 배치 μˆ˜μ€€ 호좜이 μ„±λŠ₯을 λ°œνœ˜ν•˜λ„λ‘ μˆ˜μ •λ˜μ—ˆμŒ). ParameterServerStrategy에 λŒ€ν•œ 배치 μˆ˜μ€€ ν˜ΈμΆœμ„ 지원할 κ³„νšμ— μžˆμŠ΅λ‹ˆλ‹€.

  • 같은 이유둜, λ‹€λ₯Έ μ „λž΅κ³Ό 달리 μ§„ν–‰λ₯  ν‘œμ‹œμ€„κ³Ό λ©”νŠΈλ¦­μ€ epoch κ²½κ³„μ—μ„œλ§Œ κΈ°λ‘λ©λ‹ˆλ‹€.

  • run_eagerlyλŠ” μ§€μ›λ˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.

μ‚¬μš©μž μ •μ˜ ν›ˆλ ¨ 루프 특이 사항

  • ClusterCoordinator.schedule은 λ°μ΄ν„°μ„ΈνŠΈμ— λŒ€ν•œ λ°©λ¬Έ 보μž₯을 μ§€μ›ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.

  • ClusterCoordinator.create_per_worker_dataset이 μ½œλŸ¬λΈ”μ„ μž…λ ₯으둜 μ‚¬μš©ν•˜λŠ” 경우 μ „λ‹¬λœ ν•¨μˆ˜ λ‚΄μ—μ„œ 전체 λ°μ΄ν„°μ„ΈνŠΈλ₯Ό 생성해야 ν•©λ‹ˆλ‹€.

  • tf.data.OptionsλŠ” ClusterCoordinator.create_per_worker_dataset에 μ˜ν•΄ μƒμ„±λœ λ°μ΄ν„°μ„ΈνŠΈμ—μ„œ λ¬΄μ‹œλ©λ‹ˆλ‹€.