Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ko/federated/tutorials/loading_remote_data.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

TFF에서 원격 데이터 로드


참고: 이 Colab은 tensorflow_federated pip 패키지의 최신 릴리즈 버전에서 동작하는 것으로 확인되었지만 Tensorflow Federated 프로젝트는 아직 시험판 개발 중이며 main에서 동작하지 않을 수 있습니다.

페더레이션 학습의 실제 응용에서 원시 훈련 데이터는 일반적으로 많은 장치 또는 데이터 사일로에 분산되므로 사용하기 전에 특별한 사전 처리와 로드가 필요합니다.

이 튜토리얼에서는 TFF의 DataBackendDataExecutor 인터페이스를 사용하여 원격 위치에 저장된 예제를 로드하고 페더레이션 학습을 사용하여 모델을 훈련하는 데 이를 사용하는 방법에 대해 설명합니다. 로컬에 저장된 훈련 데이터세트를 사용하여 데이터 로드 API의 사용을 시연하고 데이터세트가 개별 원격 클라이언트에 분할된 것처럼 예제 샘플링을 시뮬레이션합니다. 이 튜토리얼을 사용 사례에 맞게 조정할 때는 해당 데이터세트를 자신의 분산 데이터로 교체하기만 하면 됩니다.

페더레이션 학습 또는 TFF를 처음 접하는 경우, 입문서로 이미지 분류를 위한 페더레이션 학습을 읽어보세요.

시작하기 전에

시작하기 전에 다음을 실행하여 환경이 올바르게 설정되었는지 확인하세요. 자세한 내용은 설치 안내서를 참조하세요.

#@title Set up open-source environment #@test {"skip": true} !pip install --quiet --upgrade tensorflow-federated !pip install --quiet --upgrade nest-asyncio import nest_asyncio nest_asyncio.apply()
#@title Import packages import collections import random from typing import Any, List, Optional, Sequence import numpy as np import tensorflow as tf import tensorflow_federated as tff np.random.seed(0)

입력 데이터 준비하기

우선, 기본 제공 리포지토리에서 EMNIST 데이터세트의 TFF 페더레이션 버전을 로드하겠습니다.

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

그리고 EMNIST 데이터세트의 원시 예제를 변환하는 전처리 기능을 구성합니다.

NUM_EPOCHS = 5 SHUFFLE_BUFFER = 100 def preprocess(dataset): def map_fn(element): # Rename the features from `pixels` and `label`, to `x` and `y` for use with # Keras. return collections.OrderedDict( # Transform each `28x28` image into a `784`-element array. x=tf.reshape(element['pixels'], [-1, 784]), y=tf.reshape(element['label'], [-1, 1])) # Shuffle the individual examples and `repeat` over several epochs. return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER, seed=1).map(map_fn)

이것이 작동하는지 확인해 보겠습니다.

# The local dataset corresponding to a single client as tf.data.Dataset. example_dataset = emnist_train.create_tf_dataset_for_client( emnist_train.client_ids[0]) preprocessed_example_dataset = preprocess(example_dataset) print(preprocessed_example_dataset)
<MapDataset element_spec=OrderedDict([('x', TensorSpec(shape=(1, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(1, 1), dtype=tf.int32, name=None))])>

다음으로, EMNIST 데이터세트의 클라이언트에서 로컬 예제를 로드하고 사전 처리하는 DataBackend 구현을 구성합니다. 이는 페더레이션 학습 중에 훈련 가능한 예제를 가져오는 데 중요합니다.

클라이언트 데이터를 가져오는 방법 정의하기

TFF 작업자에게 로컬 데이터를 로드하고 변환하는 방법을 알려주려면 DataBackend의 인스턴스가 필요합니다.

TFF 작업자는 에지 머신에서 실행되고 단일 또는 다중 논리 클라이언트에 대한 작업을 수행하는 프로세스입니다. 이 예에서 훈련에 사용할 EMNIST 데이터세트는 이미 논리적 클라이언트에 의해 분할되었으며 모든 작업자는 동일한 로컬 환경에서 실행됩니다. 따라서 DataBackend는 모든 클라이언트에 해당하는 데이터를 참조할 수 있습니다. 그러나 실험적이지 않은 환경에서는 TFF 작업자가 개별 원격 시스템에 분산되어 각각 고유한 클라이언트 집합에 매핑되며 DataBackend가 로컬 컨텍스트에 따라 데이터 참조를 올바르게 확인할 수 있는지 확인해야 합니다.

# A `DataBackend` is a programmatic construct that resolves symbolic references, # represented as application-specific URIs, to materialized examples that # TFF operations can process. class MyDataBackend(tff.framework.DataBackend): async def materialize(self, data, type_spec): # In this example, the URI contains the Id of a client. client_id = int(data.uri[-1]) # The client Id is used to retrieve the corresponding local data. client_dataset = emnist_train.create_tf_dataset_for_client( emnist_train.client_ids[client_id]) # We process the client dataset before returning so its compatible with our # model definitions. return preprocess(client_dataset)

런타임 환경 설정하기

TFF 계산은 ExecutionContext에 의해 호출되며 TFF 계산에 정의된 데이터 URI가 런타임에 이해되도록 하려면 방금 만든 DataBackend에 대한 포인터를 포함하는 사용자 정의 컨텍스트를 TFF 작업자에 대해 정의해야 URI가 제대로 해석될 수 있습니다.

def ex_fn(device: tf.config.LogicalDevice) -> tff.framework.DataExecutor: # A `DataBackend` object is wrapped by a `DataExecutor`, which queries the # backend when a TFF worker encounters an operation requires fetching local # data. return tff.framework.DataExecutor( tff.framework.EagerTFExecutor(device), data_backend=MyDataBackend()) # In a distributed setting, this needs to run in the TFF worker as a service # connecting to some port. The top-level controller feeding TFF computations # would then connect to this port. factory = tff.framework.local_executor_factory(leaf_executor_fn=ex_fn) ctx = tff.framework.ExecutionContext(executor_fn=factory) tff.framework.set_default_context(ctx)

모델 훈련하기

이제 페더레이션 학습을 사용하여 모델을 훈련할 준비가 되었습니다. Keras 모델을 정의하겠습니다.

def create_keras_model(): return tf.keras.models.Sequential([ tf.keras.layers.InputLayer(input_shape=(784,)), tf.keras.layers.Dense(10, kernel_initializer='zeros'), tf.keras.layers.Softmax(), ]) def model_fn(): keras_model = create_keras_model() return tff.learning.from_keras_model( keras_model, input_spec=preprocessed_example_dataset.element_spec, loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

다음과 같이 도우미 함수 tff.learning.algorithms.build_weighted_fed_avg를 호출하여 모델의 TFF 래핑된 정의를 Federated Averaging 알고리즘에 전달할 수 있습니다.

iterative_process = tff.learning.algorithms.build_weighted_fed_avg( model_fn, client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02), server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0)) state = iterative_process.initialize()

initialize 계산은 Federated Averaging 프로세스의 초기 상태를 반환합니다.

훈련 라운드를 실행하려면 다음과 같이 URI 참조 샘플을 수집하여 데이터 샘플을 구성해야 합니다.

NUM_CLIENTS = 10 element_type = tff.types.StructWithPythonType( preprocessed_example_dataset.element_spec, container_type=collections.OrderedDict) dataset_type = tff.types.SequenceType(element_type) round_data_uris = [f'uri://{i}' for i in range(NUM_CLIENTS)] round_train_data = tff.framework.CreateDataDescriptor( arg_uris=round_data_uris, arg_type=dataset_type)

이제 한 라운드의 훈련을 할 수 있습니다.

result = iterative_process.next(state, round_train_data) state = result.state metrics = result.metrics print('round 1, metrics={}'.format(metrics))
round 1, metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.11234568), ('loss', 11.965633), ('num_examples', 4860), ('num_batches', 4860)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', ())])

여러 라운드에 걸쳐 훈련하기

클라이언트를 선택하고 로컬 데이터를 검색하기 위한 입력을 조합하기 위해 FederatedDataSource 컨테이너를 정의할 수 있습니다. 이를 통해 여러 훈련 라운드를 반복하기가 편리해지고 여러 훈련 작업에서 재사용할 수 있습니다.

class MyFederatedDataSourceIterator(tff.program.FederatedDataSourceIterator): def __init__(self, client_ids: Sequence[str], federated_type: tff.FederatedType): self._client_ids = client_ids self._federated_type = federated_type @property def federated_type(self) -> tff.FederatedType: return self._federated_type def select(self, num_clients: Optional[int] = None) -> Any: client_ids_sample = random.sample(self._client_ids, num_clients) data_uris = [f'uri://{i}' for i in client_ids_sample] return tff.framework.CreateDataDescriptor( arg_uris=data_uris, arg_type=self._federated_type) class MyFederatedDataSource(tff.program.FederatedDataSource): def __init__(self, client_ids: Sequence[str], federated_type: tff.FederatedType): self._client_ids = client_ids self._federated_type = federated_type self._capabilities = [tff.program.Capability.RANDOM_UNIFORM] @property def federated_type(self) -> tff.FederatedType: return self._federated_type @property def capabilities(self) -> List[tff.program.Capability]: return self._capabilities def iterator(self) -> tff.program.FederatedDataSourceIterator: return MyFederatedDataSourceIterator(self._client_ids, self._federated_type) train_data_source = MyFederatedDataSource( client_ids=emnist_train.client_ids, federated_type=dataset_type) train_data_iterator = train_data_source.iterator()

이제 다음과 같이 페더레이션 학습 훈련 루프를 실행할 수 있습니다.

NUM_ROUNDS = 10 for round_num in range(2, NUM_ROUNDS + 1): round_train_data = train_data_iterator.select(NUM_CLIENTS) result = iterative_process.next(state, round_train_data) state = result.state metrics = result.metrics print('round {:2d}, metrics={}'.format(round_num, metrics))
round 2, metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.12357217), ('loss', 9.161968), ('num_examples', 4815), ('num_batches', 4815)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', ())]) round 3, metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.20563674), ('loss', 7.0862083), ('num_examples', 4790), ('num_batches', 4790)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', ())]) round 4, metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.30241227), ('loss', 5.6945825), ('num_examples', 4560), ('num_batches', 4560)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', ())]) round 5, metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.3867347), ('loss', 4.7210026), ('num_examples', 4900), ('num_batches', 4900)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', ())]) round 6, metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.42311886), ('loss', 4.205554), ('num_examples', 4585), ('num_batches', 4585)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', ())]) round 7, metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.4501548), ('loss', 4.1297464), ('num_examples', 4845), ('num_batches', 4845)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', ())]) round 8, metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.56590474), ('loss', 2.8927681), ('num_examples', 5250), ('num_batches', 5250)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', ())]) round 9, metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.59917355), ('loss', 2.7431731), ('num_examples', 4840), ('num_batches', 4840)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', ())]) round 10, metrics=OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.5717234), ('loss', 2.9738288), ('num_examples', 4845), ('num_batches', 4845)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', ())])

결론

이것으로 튜토리얼을 마칩니다. 제공되는 다른 튜토리얼을 살펴보고 TFF 프레임워크의 다른 많은 기능에 대해 알아보기 바랍니다.