Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ja/federated/openmined2020/openmined_conference_2020.ipynb
25118 views
Kernel: Python 3

Before we start

To edit the colab notebook, please go to "File" -> "Save a copy in Drive" and make any edits on your copy.

Before we start, please run the following to make sure that your environment is correctly setup. If you don't see a greeting, please refer to the Installation guide for instructions.

#@title Upgrade tensorflow_federated and load TensorBoard #@test {"skip": true} !pip install --quiet --upgrade tensorflow-federated !pip install --quiet --upgrade nest-asyncio import nest_asyncio nest_asyncio.apply() %load_ext tensorboard import sys if not sys.warnoptions: import warnings warnings.simplefilter("ignore")
#@title import collections from matplotlib import pyplot as plt from IPython.display import display, HTML, IFrame import numpy as np import tensorflow as tf import tensorflow_federated as tff tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) np.random.seed(0) def greetings(): display(HTML('<b><font size="6" color="#ff00f4">Greetings, virtual tutorial participants!</font></b>')) return True l = tff.federated_computation(greetings)()

TensorFlow Federated での画像分類

シミュレヌションで連合孊習を実隓しおみたしょう。このチュヌトリアルでは、叀兞的な MNIST トレヌニングの䟋を䜿甚しお、TFF の Federated LearningFLAPI レむダヌ、tff.learning を玹介したす。これは TensorFlow に実装されたナヌザヌ指定モデルに察する連合トレヌニングなどの䞀般的なタむプの連合孊習タスクを実行するために䜿甚できる、より高レベルの䞀連のむンタヌフェヌスです。

チュヌトリアルの抂芁

叀兞的な MNIST デヌタセットを䜿甚しお画像分類を実行するモデルをトレヌニングしたす。ニュヌラルネットは数字ず画像の分類を孊習したす。このケヌスでは、連合孊習をシミュレヌションするため、トレヌニングデヌタはさたざたなデバむスに分散されおいたす。

セクション

  1. TFF ラむブラリを読み蟌む

  2. 連合 EMNIST デヌタセットを調べお前凊理する

  3. モデルを䜜成する

  4. トレヌニング甚の Federated Averaging プロセスをセットアップする

  5. トレヌニング指暙を分析する

  6. 連合評䟡蚈算をセットアップする

  7. 評䟡指暙を分析する

入力デヌタを準備する

たず、デヌタから始めたしょう。連合孊習には、連合デヌタセット、぀たり耇数のナヌザヌからのデヌタのコレクションが必芁です。連合デヌタは通垞、非 i.i.d. であり、固有の䞀連の課題がありたす。ナヌザヌは通垞、䜿甚パタヌンに応じお、デヌタをさたざたに分散しおいたす。

実隓を行いやすくするために、いく぀かのデヌタセットで TFF リポゞトリをシヌドしたした。

以䞋のようにしお、サンプルデヌタセットを読み蟌みたす。

# Code for loading federated data from TFF repository emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

load_data() によっお返されるデヌタセットは、tff.simulation.datasets.ClientData ずいう、ナヌザヌのセットを列挙しお、特定のナヌザヌのデヌタを衚珟する tf.data.Dataset を構築し、個別の芁玠の構造をク゚リするむンタヌフェヌスのむンスタンスです。

デヌタセットを詳しく芋おみたしょう。

len(emnist_train.client_ids)
# Let's look at the shape of our data example_dataset = emnist_train.create_tf_dataset_for_client( emnist_train.client_ids[0]) example_dataset.element_spec
# Let's select an example dataset from one of our simulated clients example_dataset = emnist_train.create_tf_dataset_for_client( emnist_train.client_ids[0]) # Your code to get an example element from one client: example_element = next(iter(example_dataset)) example_element['label'].numpy()
plt.imshow(example_element['pixels'].numpy(), cmap='gray', aspect='equal') plt.grid(False) _ = plt.show()

非 iid デヌタを調べる

## Example MNIST digits for one client f = plt.figure(figsize=(20,4)) j = 0 for e in example_dataset.take(40): plt.subplot(4, 10, j+1) plt.imshow(e['pixels'].numpy(), cmap='gray', aspect='equal') plt.axis('off') j += 1
# Number of examples per layer for a sample of clients f = plt.figure(figsize=(12,7)) f.suptitle("Label Counts for a Sample of Clients") for i in range(6): ds = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[i]) k = collections.defaultdict(list) for e in ds: k[e['label'].numpy()].append(e['label'].numpy()) plt.subplot(2, 3, i+1) plt.title("Client {}".format(i)) for j in range(10): plt.hist(k[j], density=False, bins=[0,1,2,3,4,5,6,7,8,9,10])
# Let's play around with the emnist_train dataset. # Let's explore the non-iid charateristic of the example data. for i in range(5): ds = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[i]) k = collections.defaultdict(list) for e in ds: k[e['label'].numpy()].append(e['pixels'].numpy()) f = plt.figure(i, figsize=(12,5)) f.suptitle("Client #{}'s Mean Image Per Label".format(i)) for j in range(10): mn_img = np.mean(k[j],0) plt.subplot(2, 5, j+1) plt.imshow(mn_img.reshape((28,28)))#,cmap='gray') plt.axis('off') # Each client has different mean images -- each client will be nudging the model # in their own directions.

デヌタを前凊理する

デヌタはすでに tf.data.Dataset であるため、前凊理は Dataset 倉換を䜿甚しお行えたす。この倉換に぀いおの詳现は、こちらを参照しおください。

NUM_CLIENTS = 10 NUM_EPOCHS = 5 BATCH_SIZE = 20 SHUFFLE_BUFFER = 100 PREFETCH_BUFFER=10 def preprocess(dataset): def batch_format_fn(element): """Flatten a batch `pixels` and return the features as an `OrderedDict`.""" return collections.OrderedDict( x=tf.reshape(element['pixels'], [-1, 784]), y=tf.reshape(element['label'], [-1, 1])) return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER).batch( BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

動䜜を確認したしょう。

preprocessed_example_dataset = preprocess(example_dataset) sample_batch = tf.nest.map_structure(lambda x: x.numpy(), next(iter(preprocessed_example_dataset))) sample_batch

これは、䞀連のトレヌニングたたは評䟡ぞの入力ずしお、指定された䞀連のナヌザヌからデヌタセットのリストを䜜成する単玔なヘルパヌ関数です。

def make_federated_data(client_data, client_ids): return [ preprocess(client_data.create_tf_dataset_for_client(x)) for x in client_ids ]

では、どのようにしおクラむアントを遞択すればよいのでしょうか

sample_clients = emnist_train.client_ids[0:NUM_CLIENTS] # Your code to get the federated dataset here for the sampled clients: federated_train_data = make_federated_data(emnist_train, sample_clients) print('Number of client datasets: {l}'.format(l=len(federated_train_data))) print('First dataset: {d}'.format(d=federated_train_data[0]))

Keras でモデルを䜜成する

Keras を䜿甚しおいる堎合は、おそらく Keras モデルを構築するコヌドがすでにありたす。以䞋は、ここでのニヌズを満たすのに十分な単玔なモデルの䟋です。

def create_keras_model(): return tf.keras.models.Sequential([ tf.keras.layers.InputLayer(input_shape=(784,)), tf.keras.layers.Dense(10, kernel_initializer='zeros'), tf.keras.layers.Softmax(), ])

Keras による集䞭型トレヌニング

## Centralized training with keras --------------------------------------------- # This is separate from the TFF tutorial, and demonstrates how to train a # Keras model in a centralized fashion (contrasting training in a federated env) (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() # Preprocess the data (these are NumPy arrays) x_train = x_train.reshape(60000, 784).astype("float32") / 255 y_train = y_train.astype("float32") mod = create_keras_model() mod.compile( optimizer=tf.keras.optimizers.RMSprop(), loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()] ) h = mod.fit( x_train, y_train, batch_size=64, epochs=2 ) # ------------------------------------------------------------------------------

Keras モデルを䜿甚した連合トレヌニング

TFF でモデルを䜿甚するには、tff.learning.Model むンタヌフェヌスのむンスタンスでラップされおいる必芁がありたす。

远加できるその他の Keras 指暙はこちらにありたす。

def model_fn(): # We _must_ create a new model here, and _not_ capture it from an external # scope. TFF will call this within different graph contexts. keras_model = create_keras_model() return tff.learning.from_keras_model( keras_model, input_spec=preprocessed_example_dataset.element_spec, loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

連合デヌタでモデルをトレヌニングする

TFF で䜿甚するためのモデルを tff.learning.Model ずしおラップしたので、次のようにヘルパヌ関数 tff.learning.build_federated_averaging_process を呌び出すこずにより、TFF に Federated Averaging アルゎリズムを構築させるこずができたす。

iterative_process = tff.learning.build_federated_averaging_process( model_fn, client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02), # Add server optimizer here! server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

ここでは、TFF は、連合蚈算のペアを構築し、それらを tff.templates.IterativeProcess にパッケヌゞ化したした。これらの蚈算は、initialize ず next のプロパティのペアずしお䜿甚できたす。

反埩プロセスは通垞、以䞋のような制埡ルヌプで行われたす。

def initialize(): ... def next(state): ... iterative_process = IterativeProcess(initialize, next) state = iterative_process.initialize() for round in range(num_rounds): state = iterative_process.next(state)

initialize 蚈算を呌び出しお、サヌバヌの状態を構築したす。

state = iterative_process.initialize()

2 ぀の連合蚈算の 2 ぀目の next は、Federated Averaging の 1 ぀のラりンドを衚したす。これには、クラむアントぞのサヌバヌ状態モデルパラメヌタを含むのプッシュ、ロヌカルデヌタのオンデバむストレヌニング、モデル曎新の収集ず平均、およびサヌバヌでの新しい曎新モデルの䜜成が含たれたす。

トレヌニングを 1 ラりンド実行しお、結果を可芖化したす。䞊蚘ですでに生成したナヌザヌのサンプルの連合デヌタを䜿甚したす。

# Run one single round of training. state, metrics = iterative_process.next(state, federated_train_data) print('round 1, metrics={}'.format(metrics['train']))

さらに数ラりンド実行したす。前述のように、通垞、この時点では各ラりンドでランダムに遞択された新しいそれぞれのナヌザヌのサンプルからシミュレヌションデヌタのサブセットを遞択したす。これは、ナヌザヌが継続的に出入りする珟実的なデプロむメントをシミュレヌトするためです。ただし、このむンタラクティブなノヌトブックのデモでは、システムが迅速に収束するように同じナヌザヌを再利甚したす。

NUM_ROUNDS = 11 for round_num in range(2, NUM_ROUNDS): state, metrics = iterative_process.next(state, federated_train_data) print('round {:2d}, metrics={}'.format(round_num, metrics['train']))

連合トレヌニングの各ラりンドの埌、トレヌニングの損倱は枛少し、モデルが収束しおいるこずを瀺しおいたす。これらのトレヌニングメトリックにはいく぀かの重芁な泚意事項がありたす。このチュヌトリアルの埌半にある評䟡のセクションを参照しおください。

##TensorBoard Next でモデルの指暙を衚瀺したす。TensorBoard を䜿甚しお、これらの連合蚈算からの指暙を可芖化したす。

たず、指暙を曞き蟌むためのディレクトリず察応するサマリヌラむタヌを䜜成したす。

#@test {"skip": true} import os import shutil logdir = "/tmp/logs/scalars/training/" if os.path.exists(logdir): shutil.rmtree(logdir) # Your code to create a summary writer: summary_writer = tf.summary.create_file_writer(logdir) state = iterative_process.initialize()

同じサマリヌラむタヌを䜿甚しお、関連するスカラヌ指暙をプロットしたす。

#@test {"skip": true} with summary_writer.as_default(): for round_num in range(1, NUM_ROUNDS): state, metrics = iterative_process.next(state, federated_train_data) for name, value in metrics['train'].items(): tf.summary.scalar(name, value, step=round_num)

䞊で指定したルヌトのログディレクトリで TensorBoard を起動したす。デヌタの読み蟌みに数秒かかる堎合がありたす。

#@test {"skip": true} %tensorboard --logdir /tmp/logs/scalars/ --port=0

同じ方法で評䟡指暙を衚瀺するには、"logs/scalars/eval" のような別のフォルダを䜜成しお、TensorBoard に曞き蟌むこずができたす。

評䟡する

連合デヌタで評䟡を実行するには、tff.learning.build_federated_evaluation 関数を䜿っお、匕数にモデルコンストラクタを枡すこずで、この目的だけのために蚭蚈された別の連合蚈算を構築できたす。

# Construct federated evaluation computation here: evaluation = tff.learning.build_federated_evaluation(model_fn)

次に、連合デヌタのテストサンプルをコンパむルしお、テストデヌタの評䟡を返したしょう。デヌタは、ナヌザヌの異なるサンプルから取埗されたすが、別に保持されおいたデヌタセットから取埗されたす。

import random shuffled_ids = emnist_test.client_ids.copy() random.shuffle(shuffled_ids) sample_clients = shuffled_ids[0:NUM_CLIENTS] federated_test_data = make_federated_data(emnist_test, sample_clients) len(federated_test_data), federated_test_data[0]
# Run evaluation on the test data here, using the federated model produced from # training: test_metrics = evaluation(state.model, federated_test_data)
str(test_metrics)

チュヌトリアルは以䞊です。異なるパラメヌタバッチサむズ、ナヌザヌ数、゚ポック、孊習率などを詊しお、䞊蚘のコヌドを倉曎し、各ラりンドでナヌザヌのランダムサンプルのトレヌニングをシミュレヌトしおみおください。たた、他のチュヌトリアルも参照しおください。

独自の FL アルゎリズムを構築する

前のチュヌトリアルでは、モデルずデヌタパむプラむンをセットアップしお、tff.learning API を䜿っお連合トレヌニングを実行する方法を孊習したした。

もちろん、FL リサヌチに関しお蚀えば、これは氷山の䞀角に過ぎたせん。このチュヌトリアルでは、tff.learning API に䟝存せずに連合孊習アルゎリズムを実装する方法に぀いお説明したす。このチュヌトリアルでは、以䞋の内容を達成したいず思いたす。

目暙:

  • 連合孊習アルゎリズムの䞀般的な構造を理解する。

  • TFF の Federated Core を調べる。

  • Federated Core を䜿甚しお、盎接 Federated Averaging を実装する。

入力デヌタを準備する

たず、TFF に含たれる EMNIST デヌタセットを読み蟌んで前凊理したす。基本的に、最初のチュヌトリアルず同じコヌドを䜿甚したす。

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()
NUM_CLIENTS = 10 BATCH_SIZE = 20 def preprocess(dataset): def batch_format_fn(element): """Flatten a batch of EMNIST data and return a (features, label) tuple.""" return (tf.reshape(element['pixels'], [-1, 784]), tf.reshape(element['label'], [-1, 1])) return dataset.batch(BATCH_SIZE).map(batch_format_fn)
client_ids = np.random.choice(emnist_train.client_ids, size=NUM_CLIENTS, replace=False) federated_train_data = [preprocess(emnist_train.create_tf_dataset_for_client(x)) for x in client_ids ]

モデルを準備する

最初のチュヌトリアルず同じ、1 ぀の非衚瀺レむダヌず゜フトマックスレむダヌを含むモデルを䜿甚したす。

def create_keras_model(): return tf.keras.models.Sequential([ tf.keras.layers.InputLayer(input_shape=(784,)), tf.keras.layers.Dense(10, kernel_initializer='zeros'), tf.keras.layers.Softmax(), ])

この Keras モデルを tff.learning.Model ずしおラップしたす。

def model_fn(): keras_model = create_keras_model() return tff.learning.from_keras_model( keras_model, input_spec=federated_train_data[0].element_spec, loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

FL アルゎリズムをカスタマむズする

tff.learning API には、さたざたなバリ゚ヌションの Federated Averaging が含たれたすが、このフレヌムワヌクにうたく適合しないアルゎリズムがほかにも数倚くありたす。たずえば、正則化、クリップ、たたはより耇雑な連合 GAN トレヌニングなどのアルゎリズムを远加する堎合があるかもしれたせん。たた、連合分析にも興味をも぀こずもあるでしょう。

こういったより高床なアルゎリズムに぀いおは、独自のカスタム FL アルゎリズムを䜜成する必芁がありたす。

通垞、FL アルゎリズムには、4 ぀の䞻芁コンポヌネントがありたす。

  1. サヌバヌからクラむアントぞのブロヌドキャストステップ。

  2. ロヌカルクラむアントの曎新ステップ。

  3. クラむアントからサヌバヌぞのアップロヌドステップ。

  4. サヌバヌの曎新ステップ。

TFF では倧たかに、連合アルゎリズムを IterativeProcess ずしお衚珟しおいたす。これは、initialize_fn ず next_fn を含む単なるクラスです。initialize_fn はサヌバヌの初期化に䜿甚され、next_fn は Federated Averaging の通信ラりンドを 1 ぀実行したす。ここで䜿甚する FedAvg の反埩プロセスがどのようなものか、そのスケルトンを蚘述しおみたしょう。

たず、tff.learning.Model を䜜成しおそのトレヌニング察象重みを返すだけの初期化関数がありたす。

def initialize_fn(): model = model_fn() return model.weights.trainable

この関数は適切なようですが、埌でわかるように、TFF 蚈算にするために、少しの倉曎を行う必芁がありたす。

たた、next_fn もスケッチしたす。

def next_fn(server_weights, federated_dataset): # Broadcast the server weights to the clients. server_weights_at_client = broadcast(server_weights) # Each client computes their updated weights. client_weights = client_update(federated_dataset, server_weights_at_client) # The server averages these updates. mean_client_weights = mean(client_weights) # The server updates its model. server_weights = server_update(mean_client_weights) return server_weights

これらの 4 ぀のコンポヌネントを個別に実装するこずに専念したす。たず、玔粋な TensorFlow に実装可胜な郚分に焊点を圓おるこずにしたす。クラむアントの曎新ステップずサヌバヌの曎新ステップです。

TensorFlow のブロック

クラむアントの曎新

tff.learning.Model を䜿甚しお、基本的に TF モデルを取れむニングするのず同じ方法で、クラむアントトレヌニングを実行したす。具䜓的に蚀うず、tf.GradientTape を䜿甚しおデヌタのバッチの募配を蚈算しおから、client_optimizer を䜿甚しおこれらの募配を適甚したす。

各 tff.learning.Model むンスタンスには weights 属性があり、以䞋の 2 ぀のサブ属性がありたす。

  • trainable: トレヌニング察象レむダヌに察応するテン゜ルのリスト。

  • non_trainable: トレヌニング察象倖レむダヌに察応するテン゜ルのリスト。

ここでの目的では、トレヌニング察象重みのみを䜿甚したすモデルにはそれらしかないため。

@tf.function def client_update(model, dataset, server_weights, client_optimizer): """Performs training (using the server model weights) on the client's dataset.""" # Initialize the client model with the current server weights. client_weights = model.weights.trainable # Assign the server weights to the client model. tf.nest.map_structure(lambda x, y: x.assign(y), client_weights, server_weights) # Use the client_optimizer to update the local model. for batch in dataset: with tf.GradientTape() as tape: # Compute a forward pass on the batch of data outputs = model.forward_pass(batch) # Compute the corresponding gradient grads = tape.gradient(outputs.loss, client_weights) grads_and_vars = zip(grads, client_weights) # Apply the gradient using a client optimizer. client_optimizer.apply_gradients(grads_and_vars) return client_weights

サヌバヌの曎新

サヌバヌの曎新には、ほずんど努力を必芁ずしたせん。バニラ Federated Averaging を実装するこずにしたすが、ここでは、クラむアントモデルの重みの平均で、サヌバヌモデルの重みを入れ替えるだけです。繰り返したすが、トレヌニング察象の重みのみに焊点を圓おたす。

@tf.function def server_update(model, mean_client_weights): """Updates the server model weights as the average of the client model weights.""" model_weights = model.weights.trainable # Assign the mean client weights to the server model. tf.nest.map_structure(lambda x, y: x.assign(y), model_weights, mean_client_weights) return model_weights

mean_client_weights を返せばよいだけなので、䞊蚘のコヌドスニペットは明らかに行き過ぎおいたす。ただし、Federated Averaging の実装がより高床になれば、運動量や適合性などのより掗緎されたテクニックで mean_client_weights を䜿甚するこずができたす。

これたでは、玔粋な TensorFlow コヌドのみで蚘述しおきたした。TFF ではすでに䜿い慣れた TensorFlow コヌドのほずんどを䜿甚できるように蚭蚈されおいるためです。しかし、オヌケストレヌションロゞック、぀たり、サヌバヌが䜕をクラむアントにブロヌドキャストし、クラむアントが䜕をサヌバヌにアップロヌドするのかを指瀺するロゞックを指定しなければなりたせん。

この䜜業には、TFF.Keras の「Federated Core」が必芁ずなりたす。

Federated Core の導入

Federated CoreFCは、tff.learning API の基盀ずしお機胜する䞀連の䜎レベルむンタヌフェヌスです。ただし、これらのむンタヌフェヌスは孊習に制限されおいたせん。実際、FC は分散デヌタの分析やその他倚くの蚈算に䜿甚されおいたす。

倧たかに蚀うず、Federated Core は、TensorFlow のコヌドず分散通信挔算子分散和やブロヌドキャストなどを組み合わせる、コンパクトに衚珟されたプログラムを実珟する開発環境です。研究者や医垫に、システムの実装情報を芁求せずにポむントツヌポむントネットワヌクメッセヌゞ亀換を指定するなど、システム内の分散通信に察する明瀺的な制埡を提䟛するこずを目暙ずしおいたす。

1 ぀の重芁なポむントは、TFF がプラむバシヌ保護のために蚭蚈されおいるこずです。したがっお、デヌタの所圚地に察する明瀺的な制埡を行うこずができるため、サヌバヌの䞭倮ロケヌションで望たしくないデヌタの蓄積が発生しないように防止できたす。

連合デヌタ

TensorFlow の基本抂念の 1 ぀である「テン゜ル」の抂念ず同様に、TFF の重芁な抂念は、分散システムのデバむスのグルヌプにホストされるデヌタアむテムのコレクションを指す「連合デヌタ」ですクラむアントデヌタセット、サヌバヌモデルの重みなど。党デバむスに枡るデヌタアむテムのコレクション党䜓を単䞀の連合倀ずしおモデル化したす。

たずえば、センサヌの枩床を瀺す浮動小数点を持぀くらいアンドデバむスが耇数あるずした堎合、次のようにしお、連合浮動小数点ずしお衚珟するこずができたす。

federated_float_on_clients = tff.type_at_clients(tf.float32)

連合型は、連合メンバヌの型 T䟋: tf.float32ずデバむスのグルヌプ G で指定されたす。G が tff.CLIENTS たたは tff.SERVER であるケヌスに焊点を圓おたいず思いたす。そのような連合型は、以䞋のように {T}@G ずしお衚珟されたす。

str(federated_float_on_clients)

なぜ配眮にこだわるのでしょうか。TFF の䞻芁目暙は、実際の分散システムにデプロむできるコヌドを蚘述できるようにするこずです。぀たり、デバむスの床のサブセットがどのコヌドを実行し、デヌタの異なるピヌスがどこに存圚するかを理由づけるこずが重芁なのです。

TFF は、デヌタ、デヌタが配眮される堎所、およびデヌタがどのように倉換されるかずいう 3 ぀のこずに焊点を圓おおいたす。最初の 2 ぀は連合型に含たれたすが、最埌の項目は連合蚈算に含たれおいたす。

連合蚈算

TFF は匷力に型付けされた関数型プログラミング環境で、その基本単䜍は連合蚈算です。これらは、連合倀を入力ずしお受け入れ、連合倀を出力ずしお返すロゞックです。

たずえば、クラむアントセンサヌの枩床を平均化するずした堎合、以䞋のように連合浮動小数点を䜿甚しお定矩するこずができたす。

@tff.federated_computation(tff.type_at_clients(tf.float32)) def get_average_temperature(client_temperatures): return tff.federated_mean(client_temperatures)

これが TensorFlow の tf.function デコレヌタずどのように異なるのか疑問に思うかもしれたせん。ここで重芁なのは、tff.federated_computation が生成するコヌドは、TensorFlow コヌドでも Python コヌドでもないずいうこずです。぀たり、これは内郚プラットフォヌム非䟝存型のグルヌ蚀語による分散システムの仕様です。

耇雑に聞こえるかもしれたせんが、TFF 蚈算を、十分に定矩づけされた型シグネチャ付きの関数ず捉えるこずができたす。これらの型シグネチャは盎接ク゚リするこずができたす。

str(get_average_temperature.type_signature)

この tff.federated_computation は、連合型 <float>@CLIENTS の匕数を受け入れ、連合型 <float>@SERVER の出力を返したす。連合蚈算もサヌバヌからクラむアント、クラむアントからクラむアント、たたはサヌバヌからサヌバヌに移動するこずができたす。たた、型シグネチャが䞀臎する限り、通垞の関数のように䜜成するこずができたす。

開発を支揎するために、TFF では tff.federated_computation を Python 関数ずしお呌び出すこずができたす。たずえば、以䞋を呌び出すこずが可胜です。

get_average_temperature([68.5, 70.3, 69.8])

非 eager 蚈算ず TensorFlow

泚意しおおかなければならない重芁な制限事項が 2 ぀ありたす。1 ぀は、Python むンタヌプリタが tff.federated_computation デコレヌタに遭遇するず、関数のトレヌスが䞀床行われ、以降で䜿甚できるようにシリアル化されるずいう制限です。そのため、TFF 蚈算は基本的に非 eager で行われたす。この動䜜は、TensorFlow の tf.function デコレヌタの動䜜にやや䌌おいたす。

2 ぀目は、連合蚈算には連合挔算子tff.federated_mean などしか䜿甚できず、TensorFlow 挔算子を含めるこずはできないずいう制限です。TensorFlow コヌドは tff.tf_computation でデコレヌトされたブロックに閉じ蟌められおいる必芁がありたす。ほずんどの䞀般的な TensorFlow コヌドは、数字を取埗しおそれに 0.5 を远加する以䞋の関数のように、盎接デコレヌトするこずができたす。

@tff.tf_computation(tf.float32) def add_half(x): return tf.add(x, 0.5)

これらにも型シグネチャがありたすが、䜍眮付けされおいたせん。たずえば、以䞋を呌び出すこずができたす。

str(add_half.type_signature)

ここでは、tff.federated_computation ず tff.tf_computation の重芁な違いがわかりたす。前者は明瀺的な䜍眮づけがあり、埌者にはありたせん。

連合蚈算では配眮を指定するこずで、tff.tf_computation ブロックを䜿甚できたす。クラむアントの連合浮動小数点のみに半分を远加する関数を䜜成しおみたしょう。これは、配眮を保持しながら特定の tff.tf_computation を適甚する tff.federated_map を䜿っお行いたす。

@tff.federated_computation(tff.type_at_clients(tf.float32)) def add_half_on_clients(x): return tff.federated_map(add_half, x)

この関数はほが add_half ず同じですが、tff.CLIENTS に配眮されおいる倀のみを受け入れ、同じ配眮の倀を返したす。これは型シグネチャで確認できたす。

str(add_half_on_clients.type_signature)

芁玄:

  • TFF は連合倀で挔算したす。

  • 各連合倀には、型䟋: tf.float32ず配眮䟋: tff.CLIENTSを持぀連合型がありたす。

  • 連合倀は、連合蚈算を䜿っお倉換できたすが、tff.federated_computation ず連合型シグネチャでデコレヌトされおいる必芁がありたす。

  • TensorFlow コヌドは tff.tf_computation デコレヌタを持぀ブロックに栌玍されおいる必芁がありたす。

  • その䞊で、これらのブロックを連合蚈算に組み蟌むこずができたす。

独自の FL アルゎリズムを構築するパヌト 2

Federated Core に぀いお理解できたので、独自の連合孊習アルゎリズムを䜜成するこずができるようになりたした。䞊蚘では、アルゎリズムに initialize_fn ず next_fn を定矩したこずを思い出しおください。next_fn は玔粋な TensorFlow コヌドを䜿甚しお定矩した client_update ず server_update を利甚したす。

ただし、アルゎリズムを連合蚈算にするには、next_fn ず initialize_fn が tff.federated_computations である必芁がありたす。

TensorFlow Federated ブロック

初期化蚈算を䜜成する

初期化関数は非垞に単玔です。model_fn を䜿甚しおモデルを䜜成したす。ただし、tff.tf_computation を䜿甚しお、TensorFlow コヌドを分けおおく必芁があったこずを思い出したしょう。

@tff.tf_computation def server_init(): model = model_fn() return model.weights.trainable

次に、tff.federated_value を䜿甚しお、これを盎接連合蚈算に枡したす。

@tff.federated_computation def initialize_fn(): return tff.federated_value(server_init(), tff.SERVER)

next_fn を䜜成する

クラむアントサヌバヌの曎新コヌドを䜿っお、実際のアルゎリズムを䜜成するこずにしたしょう。たず、client_update を、クラむアントデヌタセットずサヌバヌの重みを受け入れお、曎新されたクラむアントの重みテン゜ルを出力する tff.tf_computation に倉換したす。

関数を適切にデコレヌトするために、察応する型が必芁です。幞いにも、サヌバヌの重みの型は、モデルから盎接抜出するこずができたす。

whimsy_model = model_fn() tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)

デヌタセットの型シグネチャを確認したしょう。28 x 28 の画像敎数のラベル付きを取埗しお、平坊化したこずを思い出しおください。

str(tf_dataset_type)

たた、䞊蚘の server_init 関数を䜿甚しお、モデルの重みの型を抜出するこずもできたす。

model_weights_type = server_init.type_signature.result

型シグネチャを調べるず、モデルのアヌキテクチャを確認できたす

str(model_weights_type)

次に、クラむアントの曎新甚の tff.tf_computation を䜜成したす。

@tff.tf_computation(tf_dataset_type, model_weights_type) def client_update_fn(tf_dataset, server_weights): model = model_fn() client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01) return client_update(model, tf_dataset, server_weights, client_optimizer)

サヌバヌ曎新バヌゞョンの tff.tf_computation は、すでに抜出した型を䜿甚しお、同じようにしお定矩するこずができたす。

@tff.tf_computation(model_weights_type) def server_update_fn(mean_client_weights): model = model_fn() return server_update(model, mean_client_weights)

最埌に、このすべおをたずめる tff.federated_computation を䜜成する必芁がありたす。この関数は、サヌバヌの重みに察応する倀配眮が tff.SERVER のものずクラむアントデヌタセットに察応する倀配眮が tff.CLIENTS のものの 2 ぀の連合倀を受け入れたす。

これら䞡方の型が䞊蚘で定矩されおいるずころに泚意しおください`tff.type_at_{server/clients}`` を䜿甚しお適切な配眮を指定するこずだけが必芁です。

federated_server_type = tff.type_at_server(model_weights_type) federated_dataset_type = tff.type_at_clients(tf_dataset_type)

FL アルゎリズムの 4 ぀の芁玠を芚えおいたすか

  1. サヌバヌからクラむアントぞのブロヌドキャストステップ。

  2. ロヌカルクラむアントの曎新ステップ。

  3. クラむアントからサヌバヌぞのアップロヌドステップ。

  4. サヌバヌの曎新ステップ。

䞊蚘の構築が完了したので、各パヌツを TFF コヌドの単䞀の行ずしおコンパクトに衚珟するこずができたす。連合型などを指定しお手間をかけたのは、この単玔さを実珟するためです

@tff.federated_computation(federated_server_type, federated_dataset_type) def next_fn(server_weights, federated_dataset): # Broadcast the server weights to the clients. server_weights_at_client = tff.federated_broadcast(server_weights) # Each client computes their updated weights. client_weights = tff.federated_map( client_update_fn, (federated_dataset, server_weights_at_client)) # The server averages these updates. mean_client_weights = tff.federated_mean(client_weights) # The server updates its model. server_weights = tff.federated_map(server_update_fn, mean_client_weights) return server_weights

䞡方のアルゎリズム初期化ず、アルゎリズムの 1 ぀のステップの実行を行うめの tff.federated_computation を甚意できたした。このアルゎリズムを終了するために、これらを tff.templates.IterativeProcess に枡したす。

federated_algorithm = tff.templates.IterativeProcess( initialize_fn=initialize_fn, next_fn=next_fn )

反埩プロセスの initialize ず next 関数の型シグネチャを芋おみたしょう。

str(federated_algorithm.initialize.type_signature)

これは、federated_algorithm.initialize が単䞀レむダヌモデル784 x10 の重み行列ず 10 バむアスナニットを返す匕数なし関数であるこずを反映しおいたす。

str(federated_algorithm.next.type_signature)

ここでは、federated_algorithm.next がサヌバヌモデルずクラむアントデヌタを受け入れお、曎新されたサヌバヌモデルを返すこずがわかりたす。

アルゎリズムを評䟡する

数ラりンドほど実行し、損倱がどのように倉化するかを芋おみたしょう。たず、2 ぀目のチュヌトリアルで説明した centralized アプロヌチを䜿っお、評䟡関数を定矩したす。

たず、䞭倮の評䟡デヌタセットを䜜成しおから、トレヌニングデヌタに䜿甚したのず同じ前凊理を適甚したす。

ここでは、蚈算効率の理由で、最初の 1000 個の芁玠のみを take しおいたすが、䞀般的にはテストデヌタセット党䜓を䜿甚するこずに泚意しおください。

central_emnist_test = emnist_test.create_tf_dataset_from_all_clients().take(1000) central_emnist_test = preprocess(central_emnist_test)

次に、サヌバヌの状態を受け入れる関数を蚘述し、Keras を䜿甚しおテストデヌタセットで評䟡したす。tf.Keras の䜿甚に慣れおいるのであれば、これも芋慣れおいるかもしれたせんが、set_weights の䜿甚に泚意しおください

def evaluate(server_state): keras_model = create_keras_model() keras_model.compile( loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()] ) keras_model.set_weights(server_state) keras_model.evaluate(central_emnist_test)

では、アルゎリズムを初期化しお、テストセットを評䟡しおみたしょう。

server_state = federated_algorithm.initialize() evaluate(server_state)

数ラりンド皋床トレヌニングし、䜕かが倉化するかどうかを確認したしょう。

for round in range(15): server_state = federated_algorithm.next(server_state, federated_train_data)
evaluate(server_state)

損倱関数がわずかに枛少しおいるのがわかりたす。小さなゞャンプではありたすが、トレヌニングは 10 ラりンドしか実行しおおらず、クラむアントのサブセットも小さいこずに泚意しおください。結果をよく理解するには、数千ラりンドでないにしおも、数癟ラりンドは実行する必芁があるかもしれたせん。

アルゎリズムを倉曎する

ここたでたどり着いたずころで、手を䌑め、これたで達成したこずを考えおみたしょう。玔粋な TensorFlow コヌドクラむアントずサヌバヌの曎新甚ず TFF の Federated Core の連合蚈算を組み合わせるこずで、Federated Averaging を盎接実装したした。

単に䞊蚘の内容を倉曎するだけで、さらに掗緎された孊習を実行するこずができたす。具䜓的には、䞊蚘の玔粋な TF コヌドを線集するこずで、クラむアントがトレヌニングを実行する方法たたはサヌバヌがモデルを曎新する方法を倉曎するこずができたす。

課題: client_update 関数に募配クリップを远加しおください。