Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ko/federated/tutorials/custom_aggregators.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

사용자 정의 집계 구현하기

이 튜토리얼에서는 tff.aggregators 모듈의 이면에 있는 디자인 원칙과 클라이언트에서 서버로 값의 사용자 지정 집계를 구현하기 위한 모범 사례를 설명합니다.

전제 조건. 이 튜토리얼에서는 배치(tff.SERVER, tff.CLIENTS), TFF가 계산을 나타내는 방식(tff.tf_computation, tff.federated_computation) 및 해당 유형 서명과 같은 Federated Core의 기본 개념에 이미 익숙하다고 가정합니다.

#@test {"skip": true} !pip install --quiet --upgrade tensorflow-federated

디자인 요약

TFF에서 "집계"는 tff.SERVER에서 동일한 유형의 집계 값을 생성하기 위한 tff.CLIENTS에서 값 집합의 이동을 나타냅니다. 즉, 각각의 개별 클라이언트 값이 있을 필요는 없습니다. 예를 들어, 페더레이션 학습에서 클라이언트 모델 업데이트는 평균을 내어 서버의 전역 모델에 적용할 집계 모델 업데이트를 얻습니다.

tff.federated_sum과 같이 이 목표를 달성하는 연산자 외에도 TFF는 집계 계산을 위한 유형 서명을 공식화하는 tff.templates.AggregationProcess(상태 저장 프로세스)를 제공하므로 단순한 합계보다 더 복잡한 형식으로 일반화할 수 있습니다.

tff.aggregators 모듈의 주요 구성 요소는 AggregationProcess 생성을 위한 팩토리{/em}로, 두 가지 측면에서 일반적으로 유용하고 교체 가능한 TFF 빌딩 블록으로 설계되었습니다.

  1. 매개변수화된 계산. 집계는 필요한 집계를 매개변수화하기 위해 tff.aggregators와 함께 작동하도록 설계된 다른 TFF 모듈에 연결할 수 있는 독립적인 빌딩 블록입니다.

예제:

learning_process = tff.learning.algorithms.build_weighted_fed_avg( ..., model_aggregator=tff.aggregators.MeanFactory())
  1. 집계 구성. 집계 빌딩 블록은 다른 집계 빌딩 블록과 함께 구성되어 더 복잡한 복합 집계를 생성할 수 있습니다.

예제:

secure_mean = tff.aggregators.MeanFactory( value_sum_factory=tff.aggregators.SecureSumFactory(...))

이 튜토리얼의 나머지 부분에서는 이 두 가지 목표를 달성하는 방법을 설명합니다.

집계 프로세스

먼저 tff.templates.AggregationProcess를 요약하고 생성을 위한 팩토리 패턴을 알아봅니다.

tff.templates.AggregationProcess는 집계를 위해 지정된 유형 서명이 있는 tff.templates.MeasuredProcess입니다. 특히, initializenext 함수에는 다음과 같은 유형 서명이 있습니다.

  • ( -> state_type@SERVER)

  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

상태(유형 state_type)는 서버에 있어야 합니다. next 함수는 클라이언트에 배치되는 집계될 상태와 값(value_type 유형)을 입력 인수로 사용합니다. *는 가중 평균의 가중치와 같은 선택적 기타 입력 인수를 의미합니다. 업데이트된 상태 개체, 서버에 배치된 동일한 유형의 집계 값 및 일부 측정값이 반환됩니다.

next 함수의 실행 사이에 전달될 상태와 next 함수의 특정 실행에 따라 정보를 보고하도록 의도된 보고된 측정이 모두 비어 있을 수 있습니다. 그럼에도 불구하고 TFF의 다른 부분이 따라야 할 명확한 계약을 가지려면 명시적으로 지정해야 합니다.

예를 들어 tff.learning의 모델 업데이트와 같은 다른 TFF 모듈은 tff.templates.AggregationProcess를 사용하여 값이 집계되는 방식을 매개변수화할 것으로 기대됩니다. 그러나 집계된 값과 유형 서명이 정확히 무엇인지는 훈련되는 모델의 기타 세부 사항과 이를 수행하는 데 사용되는 학습 알고리즘에 따라 다릅니다.

집계를 계산의 다른 측면과 독립적으로 만들기 위해 팩토리 패턴을 사용합니다. 팩토리의 create 메서드를 호출하여 집계할 객체의 관련 유형 서명을 사용할 수 있게 되면 적절한 tff.templates.AggregationProcess를 생성합니다. 따라서 집계 프로세스를 직접 처리하는 것은 이 생성을 담당하는 라이브러리 작성자에게만 필요합니다.

집계 프로세스 팩토리

가중치가 적용되지 않은 집계와 가중치가 적용되는 집계에 대해 두 가지 추상적 기본 팩토리 클래스가 있습니다. 이들 클래스의 create 메서드는 집계할 값의 유형 서명을 사용하고 이러한 값의 집계를 위해 tff.templates.AggregationProcess를 반환합니다.

tff.aggregators.UnweightedAggregationFactory에 의해 생성된 프로세스는 (1) 서버의 상태 및 (2) 지정된 유형 value_type의 값, 이 두 가지 입력 인수를 사용합니다.

구현의 한 예는 tff.aggregators.SumFactory입니다.

tff.aggregators.WeightedAggregationFactory에 의해 생성된 프로세스는 (1) 서버의 상태, (2) 지정된 유형 value_type의 값 및 (3) create 메서드를 호출할 때 팩토리의 사용자가 지정한 weight_type 유형의 가중치, 이 세 가지 입력 인수를 취합니다.

구현의 한 예는 가중 평균을 계산하는 tff.aggregators.MeanFactory입니다.

팩토리 패턴은 위에서 언급한 첫 번째 목표를 달성하는 방법입니다. 그 집계는 독립적인 빌딩 블록입니다. 예를 들어 훈련 가능한 모델 변수를 변경할 때 복잡한 집계를 반드시 변경할 필요는 없습니다. 이를 나타내는 팩토리는 tff.learning.build_federated_averaging_process와 같은 메서드에서 사용될 때 다른 유형 서명으로 호출됩니다.

구성

일반 집계 프로세스는 (a) 클라이언트에서 값의 일부 전처리, (b) 클라이언트에서 서버로 값 이동, (c) 서버에서 집계된 값의 일부 후처리를 캡슐화할 수 있음을 상기하세요. 위에서 언급한 두 번째 목표인 집계 구성은 (b) 부분이 다른 집계 팩토리에 위임될 수 있도록 집계 팩토리의 구현을 구조화하여 tff.aggregators 모듈 내부에서 실현됩니다.

단일 팩토리 클래스 내에서 필요한 모든 논리를 구현하는 대신 구현은 기본적으로 집계와 관련된 단일 측면에 중점을 둡니다. 필요한 경우 이 패턴을 통해 빌딩 블록을 한 번에 하나씩 교체할 수 있습니다.

가중치가 적용된 tff.aggregators.MeanFactory를 예로 들 수 있습니다. 그 구현은 클라이언트에서 제공된 값과 가중치를 곱한 다음 가중치 적용 값과 가중치를 독립적으로 합산한 후, 가중치 적용 값의 합계를 서버에서 가중치 합계로 나눕니다. tff.federated_sum 연산자를 직접 사용하여 합계를 구현하는 대신 합계가 tff.aggregators.SumFactory의 두 인스턴스에 위임됩니다.

이러한 구조는 두 개의 기본 합계가 합계를 다르게 실현하는 다른 팩토리로 대체되는 것을 가능하게 합니다. 예를 들어, tff.aggregators.SecureSumFactory 또는 tff.aggregators.UnweightedAggregationFactory의 사용자 정의 구현입니다. 반대로, tff.aggregators.MeanFactory는 그 자체가 값이 평균화하기 전에 클리핑되어야 하는 경우 tff.aggregators.clipping_factory와 같은 다른 팩토리의 내부 집계가 될 수 있습니다.

tff.aggregators 모듈의 기존 팩토리를 사용하는 구성 메커니즘의 권장 사용에 대해서는 이전의 학습을 위한 권장 집계 조정 튜토리얼을 참조하세요.

예제로 알아보는 모범 사례

간단한 예제 작업을 구현하여 tff.aggregators 개념을 자세히 설명하고 점진적으로 더 일반화시킬 것입니다. 또 다른 학습 방법으로 기존 팩토리의 구현을 살펴볼 수 있습니다.

import collections import tensorflow as tf import tensorflow_federated as tff

value을 합산하는 대신, 예제 작업에서는 value * 2.0을 합한 다음 이 합을 2.0으로 나눕니다. 따라서 집계 결과는 value을 직접 합산하는 것과 수학적으로 동일하며 (1) 클라이언트에서 확장, (2) 클라이언트에서 합산, (3) 서버에서 확장 취소의 세 부분으로 구성되는 것으로 생각할 수 있습니다.

참고: 이 작업은 실제로 반드시 유용하다고 할 수는 없습니다. 하지만 기본 개념을 설명하는 데는 도움이 됩니다.

위에서 설명한 설계에 따라 로직은 tff.aggregators.UnweightedAggregationFactory의 하위 클래스로 구현되어 집계할 value_type이 제공될 때 적절한 tff.templates.AggregationProcess를 생성합니다.

최소한의 구현

예제 작업의 경우 필요한 계산은 항상 동일하므로 상태를 사용할 필요가 없습니다. 따라서 이는 비어 있으며 tff.federated_value((), tff.SERVER)로 표시됩니다. 현재로서는 측정에 대해서도 마찬가지입니다.

따라서 작업의 최소 구현은 다음과 같습니다.

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory): def create(self, value_type): @tff.federated_computation() def initialize_fn(): return tff.federated_value((), tff.SERVER) @tff.federated_computation(initialize_fn.type_signature.result, tff.type_at_clients(value_type)) def next_fn(state, value): scaled_value = tff.federated_map( tff.tf_computation(lambda x: x * 2.0), value) summed_value = tff.federated_sum(scaled_value) unscaled_value = tff.federated_map( tff.tf_computation(lambda x: x / 2.0), summed_value) measurements = tff.federated_value((), tff.SERVER) return tff.templates.MeasuredProcessOutput( state=state, result=unscaled_value, measurements=measurements) return tff.templates.AggregationProcess(initialize_fn, next_fn)

모든 것이 예상대로 작동하는지 여부는 다음 코드로 확인할 수 있습니다.

client_data = [1.0, 2.0, 5.0] factory = ExampleTaskFactory() aggregation_process = factory.create(tff.TensorType(tf.float32)) print(f'Type signatures of the created aggregation process:\n' f' - initialize: {aggregation_process.initialize.type_signature}\n' f' - next: {aggregation_process.next.type_signature}\n') state = aggregation_process.initialize() output = aggregation_process.next(state, client_data) print(f'Aggregation result: {output.result} (expected 8.0)')
Type signatures of the created aggregation process: - initialize: ( -> <>@SERVER) - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>) Aggregation result: 8.0 (expected 8.0)

상태 저장 및 측정

상태 저장은 반복적으로 실행되고 각 반복마다 변경될 것으로 예상되는 계산을 나타내기 위해 TFF에서 광범위하게 사용됩니다. 예를 들어, 학습 계산 상태에는 학습 중인 모델의 가중치가 포함됩니다.

집계 계산에서 상태를 사용하는 방법을 설명하기 위해 예제 작업을 수정합니다. value2.0을 곱하는 대신 반복 인덱스(집계가 실행된 횟수)를 곱합니다.

이렇게 하려면 상태 개념을 통해 얻어지는 반복 인덱스를 추적할 방법이 필요합니다. initialize_fn에서 빈 상태를 만드는 대신 상태를 스칼라 0으로 초기화합니다. 그런 다음 next_fn에서 세 단계를 거쳐 상태를 사용할 수 있습니다. 이 세 단계는 각각 (1) 1.0씩 증가, (2) value를 곱하는 데 사용, (3) 새로 업데이트된 상태로 반환입니다.

이 작업이 완료되면 주목할 점: 그러나 위와 정확히 동일한 코드를 사용하여 모든 작업이 예상대로 작동하는지 확인할 수 있습니다. 실제로 변경된 사항이 있는지 어떻게 알 수 있을까요?

좋은 질문입니다! 여기에서 측정의 개념이 유용해집니다. 일반적으로 측정은 모니터링에 사용할 수 있는 next 함수의 단일 실행과 관련된 모든 값을 보고할 수 있습니다. 이 경우 이전 예제의 summed_value가 될 수 있습니다. 즉, "unscaling" 단계 이전의 값으로, 반복 인덱스에 따라 달라집니다. 다시 말하지만, 이것이 실제로 반드시 유용하다고 할 수는 없지만 관련 메커니즘을 보여줍니다.

따라서 작업에 대한 상태 저장 해답은 다음과 같습니다.

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory): def create(self, value_type): @tff.federated_computation() def initialize_fn(): return tff.federated_value(0.0, tff.SERVER) @tff.federated_computation(initialize_fn.type_signature.result, tff.type_at_clients(value_type)) def next_fn(state, value): new_state = tff.federated_map( tff.tf_computation(lambda x: x + 1.0), state) state_at_clients = tff.federated_broadcast(new_state) scaled_value = tff.federated_map( tff.tf_computation(lambda x, y: x * y), (value, state_at_clients)) summed_value = tff.federated_sum(scaled_value) unscaled_value = tff.federated_map( tff.tf_computation(lambda x, y: x / y), (summed_value, new_state)) return tff.templates.MeasuredProcessOutput( state=new_state, result=unscaled_value, measurements=summed_value) return tff.templates.AggregationProcess(initialize_fn, next_fn)

입력으로 next_fn에 들어오는 state는 서버에 배치됩니다. 클라이언트에서 이를 사용하려면 우선 tff.federated_broadcast 연산자를 사용하여 통신해야 합니다.

모든 작업이 예상대로 작동하는지 확인하기 위해 이제 보고된 measurements를 살펴볼 수 있습니다. 이는 동일한 client_data로 실행하더라도 각 실행 라운드마다 달라야 합니다.

client_data = [1.0, 2.0, 5.0] factory = ExampleTaskFactory() aggregation_process = factory.create(tff.TensorType(tf.float32)) print(f'Type signatures of the created aggregation process:\n' f' - initialize: {aggregation_process.initialize.type_signature}\n' f' - next: {aggregation_process.next.type_signature}\n') state = aggregation_process.initialize() output = aggregation_process.next(state, client_data) print('| Round #1') print(f'| Aggregation result: {output.result} (expected 8.0)') print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 1)') output = aggregation_process.next(output.state, client_data) print('\n| Round #2') print(f'| Aggregation result: {output.result} (expected 8.0)') print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 2)') output = aggregation_process.next(output.state, client_data) print('\n| Round #3') print(f'| Aggregation result: {output.result} (expected 8.0)') print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 3)')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>) | Round #1 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 8.0 (expected 8.0 * 1) | Round #2 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 16.0 (expected 8.0 * 2) | Round #3 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 24.0 (expected 8.0 * 3)

구조화된 유형

페더레이션 학습에서 훈련된 모델의 모델 가중치는 일반적으로 단일 텐서가 아닌 텐서의 집합으로 표시됩니다. TFF에서 이것은 tff.StructType으로 표현되며 일반적으로 유용한 집계 팩토리는 구조화된 유형을 받아들일 수 있어야 합니다.

그러나 위의 예에서는 tff.TensorType 객체로만 작업했습니다. 이전 팩토리를 사용하여 tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))])으로 집계 프로세스를 생성하려고 하면 이상한 오류가 발생하는데, TensorFlow가 tf.Tensorlist를 곱하려고 하기 때문입니다.

문제는 텐서의 구조에 상수를 곱하는 대신 구조의 각 텐서에 상수를 곱해야 한다는 것입니다. 이 문제에 대한 일반적인 해결책은 생성된 tff.tf_computation 내부에 tf.nest 모듈을 사용하는 것입니다.

따라서 구조화된 유형과 호환되는 이전 ExampleTaskFactory 버전은 다음과 같습니다.

@tff.tf_computation() def scale(value, factor): return tf.nest.map_structure(lambda x: x * factor, value) @tff.tf_computation() def unscale(value, factor): return tf.nest.map_structure(lambda x: x / factor, value) @tff.tf_computation() def add_one(value): return value + 1.0 class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory): def create(self, value_type): @tff.federated_computation() def initialize_fn(): return tff.federated_value(0.0, tff.SERVER) @tff.federated_computation(initialize_fn.type_signature.result, tff.type_at_clients(value_type)) def next_fn(state, value): new_state = tff.federated_map(add_one, state) state_at_clients = tff.federated_broadcast(new_state) scaled_value = tff.federated_map(scale, (value, state_at_clients)) summed_value = tff.federated_sum(scaled_value) unscaled_value = tff.federated_map(unscale, (summed_value, new_state)) return tff.templates.MeasuredProcessOutput( state=new_state, result=unscaled_value, measurements=summed_value) return tff.templates.AggregationProcess(initialize_fn, next_fn)

이 예는 TFF 코드를 구성할 때 따라야 할 유용한 패턴을 잘 보여줍니다. 아주 간단한 작업을 다루는 경우가 아니면 tff.federated_computation 내부의 빌딩 블록으로 사용될 tff.tf_computation을 별도의 위치에 생성하여 코드의 가독성을 높일 수 있습니다. tff.federated_computation 내부에서 이러한 빌딩 블록은 내장 연산자를 통해서만 연결됩니다.

예상대로 작동하는지 확인하려면:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]], [[1.0, 1.0], [3.0, 0.0, -5.0]]] factory = ExampleTaskFactory() aggregation_process = factory.create( tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))])) print(f'Type signatures of the created aggregation process:\n' f' - initialize: {aggregation_process.initialize.type_signature}\n' f' - next: {aggregation_process.next.type_signature}\n') state = aggregation_process.initialize() output = aggregation_process.next(state, client_data) print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n' f' Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>) Aggregation result: [[2. 3.], [6. 4. 0.]] Expected: [[2. 3.], [6. 4. 0.]]

내부 집계

마지막 단계는 여러 집계 기술을 쉽게 구성할 수 있도록 선택적으로 실제 집계를 다른 팩토리에 위임할 수 있게 하는 것입니다.

이를 위해 ExampleTaskFactory의 생성자에서 선택적인 inner_factory 인수를 생성합니다. 지정하지 않으면 이전 섹션에서 직접 사용한 tff.federated_sum 연산자를 적용하는 tff.aggregators.SumFactory가 사용됩니다.

create가 호출되면 먼저 inner_factorycreate를 호출하여 동일한 value_type으로 내부 집계 프로세스를 생성할 수 있습니다.

initialize_fn이 반환하는 프로세스의 상태는 "이" 프로세스에 의해 생성된 상태와 방금 생성된 내부 프로세스의 상태, 이렇게 두 부분으로 구성됩니다.

next_fn의 구현은 실제 집계가 내부 프로세스의 next 함수에 위임된다는 점과 최종 출력이 구성되는 방식에서 차이가 있습니다. 상태는 다시 "이" 상태와 "내부" 상태로 구성되며 측정값은 OrderedDict와 유사한 방식으로 구성됩니다.

다음은 이러한 패턴을 구현한 것입니다.

@tff.tf_computation() def scale(value, factor): return tf.nest.map_structure(lambda x: x * factor, value) @tff.tf_computation() def unscale(value, factor): return tf.nest.map_structure(lambda x: x / factor, value) @tff.tf_computation() def add_one(value): return value + 1.0 class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory): def __init__(self, inner_factory=None): if inner_factory is None: inner_factory = tff.aggregators.SumFactory() self._inner_factory = inner_factory def create(self, value_type): inner_process = self._inner_factory.create(value_type) @tff.federated_computation() def initialize_fn(): my_state = tff.federated_value(0.0, tff.SERVER) inner_state = inner_process.initialize() return tff.federated_zip((my_state, inner_state)) @tff.federated_computation(initialize_fn.type_signature.result, tff.type_at_clients(value_type)) def next_fn(state, value): my_state, inner_state = state my_new_state = tff.federated_map(add_one, my_state) my_state_at_clients = tff.federated_broadcast(my_new_state) scaled_value = tff.federated_map(scale, (value, my_state_at_clients)) # Delegation to an inner factory, returning values placed at SERVER. inner_output = inner_process.next(inner_state, scaled_value) unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state)) new_state = tff.federated_zip((my_new_state, inner_output.state)) measurements = tff.federated_zip( collections.OrderedDict( scaled_value=inner_output.result, example_task=inner_output.measurements)) return tff.templates.MeasuredProcessOutput( state=new_state, result=unscaled_value, measurements=measurements) return tff.templates.AggregationProcess(initialize_fn, next_fn)

inner_process.next 함수에 위임할 때 얻어지는 반환 구조는 tff.templates.MeasuredProcessOutput이며 state, resultmeasurements의 세 필드가 동일합니다. 구성된 집계 프로세스의 전체 반환 구조를 만들 때 일반적으로 statemeasurements 필드를 함께 구성하여 반환해야 합니다. 대조적으로, result 필드는 집계되는 값에 해당하며 대신 구성된 집계를 "통과"합니다.

state 객체는 팩토리의 세부적 구현 내용으로 보아야 하므로 구성은 모든 구조가 될 수 있습니다. 그러나 measurements은 어느 시점에서 사용자에게 보고되는 값에 해당합니다. 따라서 구성에서 보고된 메트릭이 어디에서 왔는지가 명확하도록 구성된 명명과 함께 OrderedDict를 사용하는 것이 좋습니다.

또한 tff.federated_zip 연산자의 사용에 유의하세요. 생성된 프로세스에 의해 제어되는 state 객체는 tff.FederatedType이어야 합니다. 대신 initialize_fn에서 (this_state, inner_state)를 반환했다면 반환 유형 서명은 tff.FederatedType의 2-튜플을 포함하는 tff.StructType이 될 것입니다. tff.federated_zip을 사용하면 tff.FederatedType이 최상위 레벨로 "승격"됩니다. 이것은 반환될 상태 및 측정을 준비할 때 next_fn 에서 유사하게 사용됩니다.

마지막으로, 이것이 기본 내부 집계와 함께 사용되는 방법을 볼 수 있습니다.

client_data = [1.0, 2.0, 5.0] factory = ExampleTaskFactory() aggregation_process = factory.create(tff.TensorType(tf.float32)) state = aggregation_process.initialize() output = aggregation_process.next(state, client_data) print('| Round #1') print(f'| Aggregation result: {output.result} (expected 8.0)') print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}') print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}') output = aggregation_process.next(output.state, client_data) print('\n| Round #2') print(f'| Aggregation result: {output.result} (expected 8.0)') print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}') print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: () | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: ()

... 그리고 다른 내부 집계에서는 어떤지 볼 수 있습니다. 예를 들어 ExampleTaskFactory는 다음과 같습니다.

client_data = [1.0, 2.0, 5.0] # Note the inner delegation can be to any UnweightedAggregaionFactory. # In this case, each factory creates process that multiplies by the iteration # index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...). factory = ExampleTaskFactory(ExampleTaskFactory()) aggregation_process = factory.create(tff.TensorType(tf.float32)) state = aggregation_process.initialize() output = aggregation_process.next(state, client_data) print('| Round #1') print(f'| Aggregation result: {output.result} (expected 8.0)') print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}') print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}') output = aggregation_process.next(output.state, client_data) print('\n| Round #2') print(f'| Aggregation result: {output.result} (expected 8.0)') print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}') print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())]) | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

요약

이 튜토리얼에서는 집계 팩토리로 표현되는 범용 집계 빌딩 블록을 만들기 위해 따라야 할 모범 사례를 설명했습니다. 다음 두 가지 방식으로 설계 의도를 이용해 일반화할 수 있습니다.

  1. 매개변수화된 계산. 집계는 tff.learning.algorithms.build_weighted_fed_avg와 같이 필요한 집계를 매개변수화하기 위해 tff.aggregators와 함께 작동하도록 설계된 다른 TFF 모듈에 연결할 수 있는 독립적인 빌딩 블록입니다.

  2. 집계 구성. 집계 빌딩 블록은 다른 집계 빌딩 블록과 함께 구성되어 더 복잡한 복합 집계를 생성할 수 있습니다.