Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/zh-cn/federated/tutorials/custom_aggregators.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

实现自定义聚合

在本教程中,我们将解释 tff.aggregators 模块背后的设计原理以及从客户端到服务器实现值的自定义聚合的最佳做法。

前提条件。本教程假定您已经熟悉 Federated Core 的基本概念,例如布局(tff.SERVERtff.CLIENTS)、TFF 如何表示计算(tff.tf_computationtff.federated_computation)及其类型签名。

#@test {"skip": true} !pip install --quiet --upgrade tensorflow-federated

设计概要

在 TFF 中,“聚合”是指移动 tff.CLIENTS 上的一组值以在 tff.SERVER 上生成相同类型的聚合值。也就是说,客户端的各个单独的值无需可用。例如在联合学习中,会对客户端模型更新求平均值以获得聚合模型更新,进而应用于服务器上的全局模型。

除了 tff.federated_sum 等能够实现此目标的算子外,TFF 还提供了 tff.templates.AggregationProcess有状态过程),它可以使聚合计算的类型签名具有固定形式,因此可以泛化到比简单求和更复杂的形式。

tff.aggregators 模块的主要组件是用于创建 AggregationProcess工厂,它们被设计为在以下两个方面通常实用且可替换的 TFF 构建块:

  1. 参数化计算。聚合是一种可插入到其他 TFF 模块(旨在使用 tff.aggregators 参数化其必要聚合)中的独立构建块。

示例:

learning_process = tff.learning.algorithms.build_weighted_fed_avg( ..., model_aggregator=tff.aggregators.MeanFactory())
  1. 聚合组合。聚合构建块可以与其他聚合构建块组合以创建更复杂的组合聚合。

示例:

secure_mean = tff.aggregators.MeanFactory( value_sum_factory=tff.aggregators.SecureSumFactory(...))

本教程的其余部分将解释如何实现这两个目标。

聚合过程

我们首先概述 tff.templates.AggregationProcess,然后讲解对其进行创建的工厂模式。

tff.templates.AggregationProcess 是具有为聚合指定的类型签名的 tff.templates.MeasuredProcess。特别是,initializenext 函数具有以下类型签名:

  • ( -> state_type@SERVER)

  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

状态(state_type 类型)必须位于服务器上。next 函数将状态和要在客户端聚合的值(类型为 value_type)作为输入参数。* 表示可选的其他输入参数,例如加权平均值中的权重。它会返回更新的状态对象、服务器上相同类型的聚合值以及一些测量值。

请注意,执行 next 函数之间传递的状态以及旨在根据 next 函数特定执行报告任何信息的报告测量值可能为空。然而,必须针对 TFF 的其他部分显式指定它们才能有章可循。

其他 TFF 模块(例如 tff.learning 中的模型更新),预计会使用 tff.templates.AggregationProcess 参数化值的聚合方式。但是,聚合的值究竟是什么以及它们的类型签名是什么,取决于所训练模型的其他详细信息以及为之使用的学习算法。

为了使聚合不依赖于计算的其他方面,我们使用工厂模式 – 只要待聚合对象的相关类型签名可用,我们就可以通过调用工厂的 create 方法创建适当的 tff.templates.AggregationProcess。因此,只有负责此创建的库作者才需要直接处理聚合过程。

聚合过程工厂

有两个用于非加权和加权聚合的抽象基工厂类。它们的 create 方法会接受待聚合值的类型签名,并返回 tff.templates.AggregationProcess 用于聚合这些值。

tff.aggregators.UnweightedAggregationFactory 创建的过程会接受两个输入参数:(1) 服务器端状态和 (2) 指定类型 value_type 的值。

一种实现示例为 tff.aggregators.SumFactory

tff.aggregators.WeightedAggregationFactory 创建的过程会接受三个输入参数:(1) 服务器端状态、(2) 指定类型 value_type 的值和 (3) 类型 weight_type 的权重,由工厂用户在调用其 create 方法时指定。

一种实现示例为 tff.aggregators.MeanFactory,可计算加权平均值。

工厂模式是我们实现上述第一个目标所用的方法;该聚合是一个独立构建块。例如,当更改可训练模型变量时,复杂聚合未必需要更改;当诸如 tff.learning.algorithms.build_weighted_fed_avg 等方法使用该聚合时,将以不同的类型签名调用表示该聚合的工厂。

组合

请回想一下,一般的聚合过程可以封装:(a) 在客户端对值的一些预处理、(b) 值从客户端向服务器端的移动,以及 (c) 在服务器端对聚合值的一些后处理。上述第二个目标为聚合组合,这一目标在 tff.aggregators 模块内部实现,方法是构建聚合工厂的实现,从而可将 (b) 部分委托给另一个聚合工厂。

默认情况下,实现仅聚焦于与聚合相关的某一方面,而非在单个工厂类中实现所有必要的逻辑。需要时,这种模式使我们能够一次性完成构建块更换。

一个示例为加权 tff.aggregators.MeanFactory。它的实现会将客户端的提供值和权重相乘,对加权值和权重分别求和,然后在服务器端将加权值的总和除以权重总和。并非直接使用 tff.federated_sum 算子来实现求和,而是将求和委托给 tff.aggregators.SumFactory 的两个实例。

这种结构使两项默认求和可被不同的工厂替换,从而实现各自求和。例如,tff.aggregators.SecureSumFactorytff.aggregators.UnweightedAggregationFactory 的自定义实现。相反,tff.aggregators.MeanFactory 自身可以是另一个工厂的内部聚合(例如 tff.aggregators.clipping_factory,如果要在求平均值前对值进行裁剪)。

有关使用 tff.aggregators 模块中现有工厂的组合机制的推荐用例,请参阅之前的 Tuning 推荐学习聚合教程。

最佳做法示例

我们将通过实现一个简单的示例任务来详细解释 tff.aggregators 概念,并逐步延伸到一般概念。另一种学习方法是查看现有工厂的实现。

import collections import tensorflow as tf import tensorflow_federated as tff

示例任务并非对 value 求和,而是对 value * 2.0 求和,然后将总和除以 2.0。因此,聚合结果在数学上等同于直接对 value 求和,并可认为由三个部分组成:(1) 在客户端换算 (2) 跨客户端求和 (3) 在服务器端反向换算。

注:此任务在实践中未必实用,但它有助于解释基本概念。

按照上文中解释的设计,逻辑将被实现为 tff.aggregators.UnweightedAggregationFactory 的子类,当给定 value_type 进行聚合时,它会创建适当的 tff.templates.AggregationProcess

最小实现

对于示例任务,所需计算总是相同的,因此不需要使用状态。因此状态为空,并表示为 tff.federated_value((), tff.SERVER)。目前而言,测量值也是如此。

因此,任务的最小实现如下:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory): def create(self, value_type): @tff.federated_computation() def initialize_fn(): return tff.federated_value((), tff.SERVER) @tff.federated_computation(initialize_fn.type_signature.result, tff.type_at_clients(value_type)) def next_fn(state, value): scaled_value = tff.federated_map( tff.tf_computation(lambda x: x * 2.0), value) summed_value = tff.federated_sum(scaled_value) unscaled_value = tff.federated_map( tff.tf_computation(lambda x: x / 2.0), summed_value) measurements = tff.federated_value((), tff.SERVER) return tff.templates.MeasuredProcessOutput( state=state, result=unscaled_value, measurements=measurements) return tff.templates.AggregationProcess(initialize_fn, next_fn)

可以使用以下代码验证是否一切都能按预期运行:

client_data = [1.0, 2.0, 5.0] factory = ExampleTaskFactory() aggregation_process = factory.create(tff.TensorType(tf.float32)) print(f'Type signatures of the created aggregation process:\n' f' - initialize: {aggregation_process.initialize.type_signature}\n' f' - next: {aggregation_process.next.type_signature}\n') state = aggregation_process.initialize() output = aggregation_process.next(state, client_data) print(f'Aggregation result: {output.result} (expected 8.0)')
Type signatures of the created aggregation process: - initialize: ( -> <>@SERVER) - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>) Aggregation result: 8.0 (expected 8.0)

有状态性和测量值

有状态性在 TFF 中广泛用于表示预期将以迭代方式执行并随每次迭代而变化的计算。例如,学习计算的状态包含所学习模型的权重。

为了说明如何在聚合计算中使用状态,我们对示例任务加以修改。我们没有将 value 乘以 2.0,而是将其乘以迭代索引 – 聚合已执行的次数。

为此,我们需要一种跟踪迭代索引的方式,我们基于状态的概念予以实现。在 initialize_fn 中,我们将状态初始化为标量零,而非创建空状态。然后,可以通过以下三个步骤在 next_fn 中使用状态:(1) 以 1.0 递增、(2) 用于乘以 value,以及 (3) 作为新的更新状态返回。

完成此操作后,您可能会注意到:可以使用与上面完全相同的代码来验证是否所有内容都能按预期运行。那么我如何得知某些内容确实有所变化?

好问题!这就是测量值概念的用武之地。通常,测量值可以报告与 next 函数单次执行相关的任何值,可用于监控。在这种情况下,它可以是上一示例中的 summed_value。即“反向换算”步骤之前的值,它应取决于迭代索引。重申一下,这在实践中未必实用,但可以说明相关机制。

因此,有状态的任务答案如下所示:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory): def create(self, value_type): @tff.federated_computation() def initialize_fn(): return tff.federated_value(0.0, tff.SERVER) @tff.federated_computation(initialize_fn.type_signature.result, tff.type_at_clients(value_type)) def next_fn(state, value): new_state = tff.federated_map( tff.tf_computation(lambda x: x + 1.0), state) state_at_clients = tff.federated_broadcast(new_state) scaled_value = tff.federated_map( tff.tf_computation(lambda x, y: x * y), (value, state_at_clients)) summed_value = tff.federated_sum(scaled_value) unscaled_value = tff.federated_map( tff.tf_computation(lambda x, y: x / y), (summed_value, new_state)) return tff.templates.MeasuredProcessOutput( state=new_state, result=unscaled_value, measurements=summed_value) return tff.templates.AggregationProcess(initialize_fn, next_fn)

请注意,作为输入进入 next_fnstate 位于服务器端。为了在客户端使用它,首先需要进行通信,将使用 tff.federated_broadcast 算子实现。

为了验证是否所有内容都能按预期运行,我们现在可以查看报告的 measurements,即使使用相同的 client_data 运行,每轮执行也应有所不同。

client_data = [1.0, 2.0, 5.0] factory = ExampleTaskFactory() aggregation_process = factory.create(tff.TensorType(tf.float32)) print(f'Type signatures of the created aggregation process:\n' f' - initialize: {aggregation_process.initialize.type_signature}\n' f' - next: {aggregation_process.next.type_signature}\n') state = aggregation_process.initialize() output = aggregation_process.next(state, client_data) print('| Round #1') print(f'| Aggregation result: {output.result} (expected 8.0)') print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 1)') output = aggregation_process.next(output.state, client_data) print('\n| Round #2') print(f'| Aggregation result: {output.result} (expected 8.0)') print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 2)') output = aggregation_process.next(output.state, client_data) print('\n| Round #3') print(f'| Aggregation result: {output.result} (expected 8.0)') print(f'| Aggregation measurements: {output.measurements} (expected 8.0 * 3)')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>) | Round #1 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 8.0 (expected 8.0 * 1) | Round #2 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 16.0 (expected 8.0 * 2) | Round #3 | Aggregation result: 8.0 (expected 8.0) | Aggregation measurements: 24.0 (expected 8.0 * 3)

结构化类型

在联合学习中训练的模型的模型权重通常表示为张量的集合,而非单个张量。在 TFF 中,这表示为 tff.StructType,并且通常实用的聚合工厂需要能够接受结构化类型。

然而,在上面的示例中,我们仅使用了一个 tff.TensorType 对象。如果我们尝试使用之前的工厂创建包含 tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) 的聚合过程,我们会得到一个奇怪的错误,因为 TensorFlow 会尝试将 tf.Tensorlist 相乘。

问题在于,我们需要将结构中的每个张量乘以一个常量,而非把张量的结构乘以一个常量。该问题的常规解决方案是在创建的 tff.tf_computation 内使用 tf.nest 模块。

因此,与结构化类型兼容的先前 ExampleTaskFactory 版本如下所示:

@tff.tf_computation() def scale(value, factor): return tf.nest.map_structure(lambda x: x * factor, value) @tff.tf_computation() def unscale(value, factor): return tf.nest.map_structure(lambda x: x / factor, value) @tff.tf_computation() def add_one(value): return value + 1.0 class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory): def create(self, value_type): @tff.federated_computation() def initialize_fn(): return tff.federated_value(0.0, tff.SERVER) @tff.federated_computation(initialize_fn.type_signature.result, tff.type_at_clients(value_type)) def next_fn(state, value): new_state = tff.federated_map(add_one, state) state_at_clients = tff.federated_broadcast(new_state) scaled_value = tff.federated_map(scale, (value, state_at_clients)) summed_value = tff.federated_sum(scaled_value) unscaled_value = tff.federated_map(unscale, (summed_value, new_state)) return tff.templates.MeasuredProcessOutput( state=new_state, result=unscaled_value, measurements=summed_value) return tff.templates.AggregationProcess(initialize_fn, next_fn)

此示例着重展示了在构建 TFF 代码时实用的模式。处理稍复杂的运算时,如果在单独的位置创建将用作 tff.federated_computation 内的构建块的 tff.tf_computation,则代码会变得更加清晰。在 tff.federated_computation 内部,这些构建块仅使用内部算子连接。

要验证它能否按预期运行,请运行以下代码:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]], [[1.0, 1.0], [3.0, 0.0, -5.0]]] factory = ExampleTaskFactory() aggregation_process = factory.create( tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))])) print(f'Type signatures of the created aggregation process:\n' f' - initialize: {aggregation_process.initialize.type_signature}\n' f' - next: {aggregation_process.next.type_signature}\n') state = aggregation_process.initialize() output = aggregation_process.next(state, client_data) print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n' f' Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process: - initialize: ( -> float32@SERVER) - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>) Aggregation result: [[2. 3.], [6. 4. 0.]] Expected: [[2. 3.], [6. 4. 0.]]

内部聚合

最后一步是选择性地将实际聚合委托给其他工厂,以便轻松组合不同的聚合技术。

这可通过在 ExampleTaskFactory 的构造函数中创建一个可选的 inner_factory 参数来实现。如果未指定,则使用 tff.aggregators.SumFactory,它会应用上一部分中直接使用的 tff.federated_sum 算子。

调用 create 时,我们可以先调用 inner_factorycreate 来创建具有相同 value_type 的内部聚合过程。

initialize_fn 返回的过程的状态由两部分组成:由“此”过程创建的状态,以及刚刚创建的内部过程的状态。

next_fn 的实现的不同之处在于实际聚合被委托给内部过程的 next 函数,以及最终输出的组合方式。状态还是由“此”状态和“内部”状态组成,并且测量值会以与 OrderedDict 类似的方式组合。

以下是这种模式的实现。

@tff.tf_computation() def scale(value, factor): return tf.nest.map_structure(lambda x: x * factor, value) @tff.tf_computation() def unscale(value, factor): return tf.nest.map_structure(lambda x: x / factor, value) @tff.tf_computation() def add_one(value): return value + 1.0 class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory): def __init__(self, inner_factory=None): if inner_factory is None: inner_factory = tff.aggregators.SumFactory() self._inner_factory = inner_factory def create(self, value_type): inner_process = self._inner_factory.create(value_type) @tff.federated_computation() def initialize_fn(): my_state = tff.federated_value(0.0, tff.SERVER) inner_state = inner_process.initialize() return tff.federated_zip((my_state, inner_state)) @tff.federated_computation(initialize_fn.type_signature.result, tff.type_at_clients(value_type)) def next_fn(state, value): my_state, inner_state = state my_new_state = tff.federated_map(add_one, my_state) my_state_at_clients = tff.federated_broadcast(my_new_state) scaled_value = tff.federated_map(scale, (value, my_state_at_clients)) # Delegation to an inner factory, returning values placed at SERVER. inner_output = inner_process.next(inner_state, scaled_value) unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state)) new_state = tff.federated_zip((my_new_state, inner_output.state)) measurements = tff.federated_zip( collections.OrderedDict( scaled_value=inner_output.result, example_task=inner_output.measurements)) return tff.templates.MeasuredProcessOutput( state=new_state, result=unscaled_value, measurements=measurements) return tff.templates.AggregationProcess(initialize_fn, next_fn)

委托给 inner_process.next 函数时,我们得到的返回结构为 tff.templates.MeasuredProcessOutput,它具有相同的三个字段 – stateresultmeasurements。创建组合聚合过程的整体返回结构时,statemeasurements 字段通常应组合在一起并一起返回。相反,result 字段对应于聚合的值,并“流经”组合的聚合。

state 对象应被视为工厂的实现细节,因此组合可以是任何结构。然而,measurements 对应于在某一时刻向用户报告的值。因此,我们建议使用带有组合命名的 OrderedDict,这样可以清楚地知道组合中报告的指标来自何处。

还需注意 tff.federated_zip 算子的使用。创建的过程所控制的 state 对象应为 tff.FederatedType。如果我们在 initialize_fn 中返回 (this_state, inner_state),其返回类型签名将为包含 tff.FederatedType 的二元组的 tff.StructType。使用 tff.federated_zip 会将 tff.FederatedType“提升”到顶层。在准备要返回的状态和测量值时,next_fn 中也采用了类似用法。

最后,我们可以看到如何将其与默认的内部聚合一起使用:

client_data = [1.0, 2.0, 5.0] factory = ExampleTaskFactory() aggregation_process = factory.create(tff.TensorType(tf.float32)) state = aggregation_process.initialize() output = aggregation_process.next(state, client_data) print('| Round #1') print(f'| Aggregation result: {output.result} (expected 8.0)') print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}') print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}') output = aggregation_process.next(output.state, client_data) print('\n| Round #2') print(f'| Aggregation result: {output.result} (expected 8.0)') print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}') print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: () | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: ()

… 以及与其他内部聚合一起使用。例如 ExampleTaskFactory

client_data = [1.0, 2.0, 5.0] # Note the inner delegation can be to any UnweightedAggregaionFactory. # In this case, each factory creates process that multiplies by the iteration # index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...). factory = ExampleTaskFactory(ExampleTaskFactory()) aggregation_process = factory.create(tff.TensorType(tf.float32)) state = aggregation_process.initialize() output = aggregation_process.next(state, client_data) print('| Round #1') print(f'| Aggregation result: {output.result} (expected 8.0)') print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}') print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}') output = aggregation_process.next(output.state, client_data) print('\n| Round #2') print(f'| Aggregation result: {output.result} (expected 8.0)') print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}') print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 8.0 | measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())]) | Round #2 | Aggregation result: 8.0 (expected 8.0) | measurements['scaled_value']: 16.0 | measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

总结

在本教程中,我们解释了创建通用聚合构建块(表示为聚合工厂)要遵循的最佳做法。源于设计意图,通用性以如下两种方式实现:

  1. 参数化计算。聚合是一种可插入到其他 TFF 模块(旨在使用 tff.aggregators 参数化其必要聚合)中的独立构建块,例如 tff.learning.algorithms.build_weighted_fed_avg

  2. 聚合组合。聚合构建块可以与其他聚合构建块组合以创建更复杂的组合聚合。