Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ja/federated/openmined2020/openmined_conference_2020.ipynb
39480 views
Kernel: Python 3

Before we start

To edit the colab notebook, please go to "File" -> "Save a copy in Drive" and make any edits on your copy.

Before we start, please run the following to make sure that your environment is correctly setup. If you don't see a greeting, please refer to the Installation guide for instructions.

#@title Upgrade tensorflow_federated and load TensorBoard #@test {"skip": true} !pip install --quiet --upgrade tensorflow-federated !pip install --quiet --upgrade nest-asyncio import nest_asyncio nest_asyncio.apply() %load_ext tensorboard import sys if not sys.warnoptions: import warnings warnings.simplefilter("ignore")
#@title import collections from matplotlib import pyplot as plt from IPython.display import display, HTML, IFrame import numpy as np import tensorflow as tf import tensorflow_federated as tff tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) np.random.seed(0) def greetings(): display(HTML('<b><font size="6" color="#ff00f4">Greetings, virtual tutorial participants!</font></b>')) return True l = tff.federated_computation(greetings)()

TensorFlow Federated での画像分類

シミュレーションで連合学習を実験してみましょう。このチュートリアルでは、古典的な MNIST トレーニングの例を使用して、TFF の Federated Learning(FL)API レイヤー、tff.learning を紹介します。これは TensorFlow に実装されたユーザー指定モデルに対する連合トレーニングなどの一般的なタイプの連合学習タスクを実行するために使用できる、より高レベルの一連のインターフェースです。

チュートリアルの概要

古典的な MNIST データセットを使用して画像分類を実行するモデルをトレーニングします。ニューラルネットは数字と画像の分類を学習します。このケースでは、連合学習をシミュレーションするため、トレーニングデータはさまざまなデバイスに分散されています。

セクション

  1. TFF ライブラリを読み込む

  2. 連合 EMNIST データセットを調べて前処理する

  3. モデルを作成する

  4. トレーニング用の Federated Averaging プロセスをセットアップする

  5. トレーニング指標を分析する

  6. 連合評価計算をセットアップする

  7. 評価指標を分析する

入力データを準備する

まず、データから始めましょう。連合学習には、連合データセット、つまり複数のユーザーからのデータのコレクションが必要です。連合データは通常、非 i.i.d. であり、固有の一連の課題があります。ユーザーは通常、使用パターンに応じて、データをさまざまに分散しています。

実験を行いやすくするために、いくつかのデータセットで TFF リポジトリをシードしました。

以下のようにして、サンプルデータセットを読み込みます。

# Code for loading federated data from TFF repository emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

load_data() によって返されるデータセットは、tff.simulation.datasets.ClientData という、ユーザーのセットを列挙して、特定のユーザーのデータを表現する tf.data.Dataset を構築し、個別の要素の構造をクエリするインターフェースのインスタンスです。

データセットを詳しく見てみましょう。

len(emnist_train.client_ids)
# Let's look at the shape of our data example_dataset = emnist_train.create_tf_dataset_for_client( emnist_train.client_ids[0]) example_dataset.element_spec
# Let's select an example dataset from one of our simulated clients example_dataset = emnist_train.create_tf_dataset_for_client( emnist_train.client_ids[0]) # Your code to get an example element from one client: example_element = next(iter(example_dataset)) example_element['label'].numpy()
plt.imshow(example_element['pixels'].numpy(), cmap='gray', aspect='equal') plt.grid(False) _ = plt.show()

非 iid データを調べる

## Example MNIST digits for one client f = plt.figure(figsize=(20,4)) j = 0 for e in example_dataset.take(40): plt.subplot(4, 10, j+1) plt.imshow(e['pixels'].numpy(), cmap='gray', aspect='equal') plt.axis('off') j += 1
# Number of examples per layer for a sample of clients f = plt.figure(figsize=(12,7)) f.suptitle("Label Counts for a Sample of Clients") for i in range(6): ds = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[i]) k = collections.defaultdict(list) for e in ds: k[e['label'].numpy()].append(e['label'].numpy()) plt.subplot(2, 3, i+1) plt.title("Client {}".format(i)) for j in range(10): plt.hist(k[j], density=False, bins=[0,1,2,3,4,5,6,7,8,9,10])
# Let's play around with the emnist_train dataset. # Let's explore the non-iid charateristic of the example data. for i in range(5): ds = emnist_train.create_tf_dataset_for_client(emnist_train.client_ids[i]) k = collections.defaultdict(list) for e in ds: k[e['label'].numpy()].append(e['pixels'].numpy()) f = plt.figure(i, figsize=(12,5)) f.suptitle("Client #{}'s Mean Image Per Label".format(i)) for j in range(10): mn_img = np.mean(k[j],0) plt.subplot(2, 5, j+1) plt.imshow(mn_img.reshape((28,28)))#,cmap='gray') plt.axis('off') # Each client has different mean images -- each client will be nudging the model # in their own directions.

データを前処理する

データはすでに tf.data.Dataset であるため、前処理は Dataset 変換を使用して行えます。この変換についての詳細は、こちらを参照してください

NUM_CLIENTS = 10 NUM_EPOCHS = 5 BATCH_SIZE = 20 SHUFFLE_BUFFER = 100 PREFETCH_BUFFER=10 def preprocess(dataset): def batch_format_fn(element): """Flatten a batch `pixels` and return the features as an `OrderedDict`.""" return collections.OrderedDict( x=tf.reshape(element['pixels'], [-1, 784]), y=tf.reshape(element['label'], [-1, 1])) return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER).batch( BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

動作を確認しましょう。

preprocessed_example_dataset = preprocess(example_dataset) sample_batch = tf.nest.map_structure(lambda x: x.numpy(), next(iter(preprocessed_example_dataset))) sample_batch

これは、一連のトレーニングまたは評価への入力として、指定された一連のユーザーからデータセットのリストを作成する単純なヘルパー関数です。

def make_federated_data(client_data, client_ids): return [ preprocess(client_data.create_tf_dataset_for_client(x)) for x in client_ids ]

では、どのようにしてクライアントを選択すればよいのでしょうか?

sample_clients = emnist_train.client_ids[0:NUM_CLIENTS] # Your code to get the federated dataset here for the sampled clients: federated_train_data = make_federated_data(emnist_train, sample_clients) print('Number of client datasets: {l}'.format(l=len(federated_train_data))) print('First dataset: {d}'.format(d=federated_train_data[0]))

Keras でモデルを作成する

Keras を使用している場合は、おそらく Keras モデルを構築するコードがすでにあります。以下は、ここでのニーズを満たすのに十分な単純なモデルの例です。

def create_keras_model(): return tf.keras.models.Sequential([ tf.keras.layers.InputLayer(input_shape=(784,)), tf.keras.layers.Dense(10, kernel_initializer='zeros'), tf.keras.layers.Softmax(), ])

Keras による集中型トレーニング

## Centralized training with keras --------------------------------------------- # This is separate from the TFF tutorial, and demonstrates how to train a # Keras model in a centralized fashion (contrasting training in a federated env) (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() # Preprocess the data (these are NumPy arrays) x_train = x_train.reshape(60000, 784).astype("float32") / 255 y_train = y_train.astype("float32") mod = create_keras_model() mod.compile( optimizer=tf.keras.optimizers.RMSprop(), loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()] ) h = mod.fit( x_train, y_train, batch_size=64, epochs=2 ) # ------------------------------------------------------------------------------

Keras モデルを使用した連合トレーニング

TFF でモデルを使用するには、tff.learning.Model インターフェースのインスタンスでラップされている必要があります。

追加できるその他の Keras 指標はこちらにあります

def model_fn(): # We _must_ create a new model here, and _not_ capture it from an external # scope. TFF will call this within different graph contexts. keras_model = create_keras_model() return tff.learning.from_keras_model( keras_model, input_spec=preprocessed_example_dataset.element_spec, loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

連合データでモデルをトレーニングする

TFF で使用するためのモデルを tff.learning.Model としてラップしたので、次のようにヘルパー関数 tff.learning.build_federated_averaging_process を呼び出すことにより、TFF に Federated Averaging アルゴリズムを構築させることができます。

iterative_process = tff.learning.build_federated_averaging_process( model_fn, client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02), # Add server optimizer here! server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

ここでは、TFF は、連合計算のペアを構築し、それらを tff.templates.IterativeProcess にパッケージ化しました。これらの計算は、initializenext のプロパティのペアとして使用できます。

反復プロセスは通常、以下のような制御ループで行われます。

def initialize(): ... def next(state): ... iterative_process = IterativeProcess(initialize, next) state = iterative_process.initialize() for round in range(num_rounds): state = iterative_process.next(state)

initialize 計算を呼び出して、サーバーの状態を構築します。

state = iterative_process.initialize()

2 つの連合計算の 2 つ目の next は、Federated Averaging の 1 つのラウンドを表します。これには、クライアントへのサーバー状態(モデルパラメータを含む)のプッシュ、ローカルデータのオンデバイストレーニング、モデル更新の収集と平均、およびサーバーでの新しい更新モデルの作成が含まれます。

トレーニングを 1 ラウンド実行して、結果を可視化します。上記ですでに生成したユーザーのサンプルの連合データを使用します。

# Run one single round of training. state, metrics = iterative_process.next(state, federated_train_data) print('round 1, metrics={}'.format(metrics['train']))

さらに数ラウンド実行します。前述のように、通常、この時点では各ラウンドでランダムに選択された新しいそれぞれのユーザーのサンプルからシミュレーションデータのサブセットを選択します。これは、ユーザーが継続的に出入りする現実的なデプロイメントをシミュレートするためです。ただし、このインタラクティブなノートブックのデモでは、システムが迅速に収束するように同じユーザーを再利用します。

NUM_ROUNDS = 11 for round_num in range(2, NUM_ROUNDS): state, metrics = iterative_process.next(state, federated_train_data) print('round {:2d}, metrics={}'.format(round_num, metrics['train']))

連合トレーニングの各ラウンドの後、トレーニングの損失は減少し、モデルが収束していることを示しています。これらのトレーニングメトリックにはいくつかの重要な注意事項があります。このチュートリアルの後半にある評価のセクションを参照してください。

##TensorBoard Next でモデルの指標を表示します。TensorBoard を使用して、これらの連合計算からの指標を可視化します。

まず、指標を書き込むためのディレクトリと対応するサマリーライターを作成します。

#@test {"skip": true} import os import shutil logdir = "/tmp/logs/scalars/training/" if os.path.exists(logdir): shutil.rmtree(logdir) # Your code to create a summary writer: summary_writer = tf.summary.create_file_writer(logdir) state = iterative_process.initialize()

同じサマリーライターを使用して、関連するスカラー指標をプロットします。

#@test {"skip": true} with summary_writer.as_default(): for round_num in range(1, NUM_ROUNDS): state, metrics = iterative_process.next(state, federated_train_data) for name, value in metrics['train'].items(): tf.summary.scalar(name, value, step=round_num)

上で指定したルートのログディレクトリで TensorBoard を起動します。データの読み込みに数秒かかる場合があります。

#@test {"skip": true} %tensorboard --logdir /tmp/logs/scalars/ --port=0

同じ方法で評価指標を表示するには、"logs/scalars/eval" のような別のフォルダを作成して、TensorBoard に書き込むことができます。

評価する

連合データで評価を実行するには、tff.learning.build_federated_evaluation 関数を使って、引数にモデルコンストラクタを渡すことで、この目的だけのために設計された別の連合計算を構築できます。

# Construct federated evaluation computation here: evaluation = tff.learning.build_federated_evaluation(model_fn)

次に、連合データのテストサンプルをコンパイルして、テストデータの評価を返しましょう。データは、ユーザーの異なるサンプルから取得されますが、別に保持されていたデータセットから取得されます。

import random shuffled_ids = emnist_test.client_ids.copy() random.shuffle(shuffled_ids) sample_clients = shuffled_ids[0:NUM_CLIENTS] federated_test_data = make_federated_data(emnist_test, sample_clients) len(federated_test_data), federated_test_data[0]
# Run evaluation on the test data here, using the federated model produced from # training: test_metrics = evaluation(state.model, federated_test_data)
str(test_metrics)

チュートリアルは以上です。異なるパラメータ(バッチサイズ、ユーザー数、エポック、学習率など)を試して、上記のコードを変更し、各ラウンドでユーザーのランダムサンプルのトレーニングをシミュレートしてみてください。また、他のチュートリアルも参照してください。

独自の FL アルゴリズムを構築する

前のチュートリアルでは、モデルとデータパイプラインをセットアップして、tff.learning API を使って連合トレーニングを実行する方法を学習しました。

もちろん、FL リサーチに関して言えば、これは氷山の一角に過ぎません。このチュートリアルでは、tff.learning API に依存せずに連合学習アルゴリズムを実装する方法について説明します。このチュートリアルでは、以下の内容を達成したいと思います。

目標:

  • 連合学習アルゴリズムの一般的な構造を理解する。

  • TFF の Federated Core を調べる。

  • Federated Core を使用して、直接 Federated Averaging を実装する。

入力データを準備する

まず、TFF に含まれる EMNIST データセットを読み込んで前処理します。基本的に、最初のチュートリアルと同じコードを使用します。

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()
NUM_CLIENTS = 10 BATCH_SIZE = 20 def preprocess(dataset): def batch_format_fn(element): """Flatten a batch of EMNIST data and return a (features, label) tuple.""" return (tf.reshape(element['pixels'], [-1, 784]), tf.reshape(element['label'], [-1, 1])) return dataset.batch(BATCH_SIZE).map(batch_format_fn)
client_ids = np.random.choice(emnist_train.client_ids, size=NUM_CLIENTS, replace=False) federated_train_data = [preprocess(emnist_train.create_tf_dataset_for_client(x)) for x in client_ids ]

モデルを準備する

最初のチュートリアルと同じ、1 つの非表示レイヤーとソフトマックスレイヤーを含むモデルを使用します。

def create_keras_model(): return tf.keras.models.Sequential([ tf.keras.layers.InputLayer(input_shape=(784,)), tf.keras.layers.Dense(10, kernel_initializer='zeros'), tf.keras.layers.Softmax(), ])

この Keras モデルを tff.learning.Model としてラップします。

def model_fn(): keras_model = create_keras_model() return tff.learning.from_keras_model( keras_model, input_spec=federated_train_data[0].element_spec, loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

FL アルゴリズムをカスタマイズする

tff.learning API には、さまざまなバリエーションの Federated Averaging が含まれますが、このフレームワークにうまく適合しないアルゴリズムがほかにも数多くあります。たとえば、正則化、クリップ、またはより複雑な連合 GAN トレーニングなどのアルゴリズムを追加する場合があるかもしれません。また、連合分析にも興味をもつこともあるでしょう。

こういったより高度なアルゴリズムについては、独自のカスタム FL アルゴリズムを作成する必要があります。

通常、FL アルゴリズムには、4 つの主要コンポーネントがあります。

  1. サーバーからクライアントへのブロードキャストステップ。

  2. ローカルクライアントの更新ステップ。

  3. クライアントからサーバーへのアップロードステップ。

  4. サーバーの更新ステップ。

TFF では大まかに、連合アルゴリズムを IterativeProcess として表現しています。これは、initialize_fnnext_fn を含む単なるクラスです。initialize_fn はサーバーの初期化に使用され、next_fn は Federated Averaging の通信ラウンドを 1 つ実行します。ここで使用する FedAvg の反復プロセスがどのようなものか、そのスケルトンを記述してみましょう。

まず、tff.learning.Model を作成してそのトレーニング対象重みを返すだけの初期化関数があります。

def initialize_fn(): model = model_fn() return model.weights.trainable

この関数は適切なようですが、後でわかるように、TFF 計算にするために、少しの変更を行う必要があります。

また、next_fn もスケッチします。

def next_fn(server_weights, federated_dataset): # Broadcast the server weights to the clients. server_weights_at_client = broadcast(server_weights) # Each client computes their updated weights. client_weights = client_update(federated_dataset, server_weights_at_client) # The server averages these updates. mean_client_weights = mean(client_weights) # The server updates its model. server_weights = server_update(mean_client_weights) return server_weights

これらの 4 つのコンポーネントを個別に実装することに専念します。まず、純粋な TensorFlow に実装可能な部分に焦点を当てることにします。クライアントの更新ステップとサーバーの更新ステップです。

TensorFlow のブロック

クライアントの更新

tff.learning.Model を使用して、基本的に TF モデルを取れイニングするのと同じ方法で、クライアントトレーニングを実行します。具体的に言うと、tf.GradientTape を使用してデータのバッチの勾配を計算してから、client_optimizer を使用してこれらの勾配を適用します。

tff.learning.Model インスタンスには weights 属性があり、以下の 2 つのサブ属性があります。

  • trainable: トレーニング対象レイヤーに対応するテンソルのリスト。

  • non_trainable: トレーニング対象外レイヤーに対応するテンソルのリスト。

ここでの目的では、トレーニング対象重みのみを使用します(モデルにはそれらしかないため!)。

@tf.function def client_update(model, dataset, server_weights, client_optimizer): """Performs training (using the server model weights) on the client's dataset.""" # Initialize the client model with the current server weights. client_weights = model.weights.trainable # Assign the server weights to the client model. tf.nest.map_structure(lambda x, y: x.assign(y), client_weights, server_weights) # Use the client_optimizer to update the local model. for batch in dataset: with tf.GradientTape() as tape: # Compute a forward pass on the batch of data outputs = model.forward_pass(batch) # Compute the corresponding gradient grads = tape.gradient(outputs.loss, client_weights) grads_and_vars = zip(grads, client_weights) # Apply the gradient using a client optimizer. client_optimizer.apply_gradients(grads_and_vars) return client_weights

サーバーの更新

サーバーの更新には、ほとんど努力を必要としません。バニラ Federated Averaging を実装することにしますが、ここでは、クライアントモデルの重みの平均で、サーバーモデルの重みを入れ替えるだけです。繰り返しますが、トレーニング対象の重みのみに焦点を当てます。

@tf.function def server_update(model, mean_client_weights): """Updates the server model weights as the average of the client model weights.""" model_weights = model.weights.trainable # Assign the mean client weights to the server model. tf.nest.map_structure(lambda x, y: x.assign(y), model_weights, mean_client_weights) return model_weights

mean_client_weights を返せばよいだけなので、上記のコードスニペットは明らかに行き過ぎています。ただし、Federated Averaging の実装がより高度になれば、運動量や適合性などのより洗練されたテクニックで mean_client_weights を使用することができます。

これまでは、純粋な TensorFlow コードのみで記述してきました。TFF ではすでに使い慣れた TensorFlow コードのほとんどを使用できるように設計されているためです。しかし、オーケストレーションロジック、つまり、サーバーが何をクライアントにブロードキャストし、クライアントが何をサーバーにアップロードするのかを指示するロジックを指定しなければなりません。

この作業には、TFF.Keras の「Federated Core」が必要となります。

Federated Core の導入

Federated Core(FC)は、tff.learning API の基盤として機能する一連の低レベルインターフェースです。ただし、これらのインターフェースは学習に制限されていません。実際、FC は分散データの分析やその他多くの計算に使用されています。

大まかに言うと、Federated Core は、TensorFlow のコードと分散通信演算子(分散和やブロードキャストなど)を組み合わせる、コンパクトに表現されたプログラムを実現する開発環境です。研究者や医師に、システムの実装情報を要求せずに(ポイントツーポイントネットワークメッセージ交換を指定するなど)、システム内の分散通信に対する明示的な制御を提供することを目標としています。

1 つの重要なポイントは、TFF がプライバシー保護のために設計されていることです。したがって、データの所在地に対する明示的な制御を行うことができるため、サーバーの中央ロケーションで望ましくないデータの蓄積が発生しないように防止できます。

連合データ

TensorFlow の基本概念の 1 つである「テンソル」の概念と同様に、TFF の重要な概念は、分散システムのデバイスのグループにホストされるデータアイテムのコレクションを指す「連合データ」です(クライアントデータセット、サーバーモデルの重みなど)。全デバイスに渡るデータアイテムのコレクション全体を単一の連合値としてモデル化します。

たとえば、センサーの温度を示す浮動小数点を持つくらいアンドデバイスが複数あるとした場合、次のようにして、連合浮動小数点として表現することができます。

federated_float_on_clients = tff.type_at_clients(tf.float32)

連合型は、連合メンバーの型 T(例: tf.float32)とデバイスのグループ G で指定されます。Gtff.CLIENTS または tff.SERVER であるケースに焦点を当てたいと思います。そのような連合型は、以下のように {T}@G として表現されます。

str(federated_float_on_clients)

なぜ配置にこだわるのでしょうか。TFF の主要目標は、実際の分散システムにデプロイできるコードを記述できるようにすることです。つまり、デバイスの度のサブセットがどのコードを実行し、データの異なるピースがどこに存在するかを理由づけることが重要なのです。

TFF は、データ、データが配置される場所、およびデータがどのように変換されるかという 3 つのことに焦点を当てています。最初の 2 つは連合型に含まれますが、最後の項目は連合計算に含まれています。

連合計算

TFF は強力に型付けされた関数型プログラミング環境で、その基本単位は連合計算です。これらは、連合値を入力として受け入れ、連合値を出力として返すロジックです。

たとえば、クライアントセンサーの温度を平均化するとした場合、以下のように(連合浮動小数点を使用して)定義することができます。

@tff.federated_computation(tff.type_at_clients(tf.float32)) def get_average_temperature(client_temperatures): return tff.federated_mean(client_temperatures)

これが TensorFlow の tf.function デコレータとどのように異なるのか疑問に思うかもしれません。ここで重要なのは、tff.federated_computation が生成するコードは、TensorFlow コードでも Python コードでもないということです。つまり、これは内部プラットフォーム非依存型のグルー言語による分散システムの仕様です。

複雑に聞こえるかもしれませんが、TFF 計算を、十分に定義づけされた型シグネチャ付きの関数と捉えることができます。これらの型シグネチャは直接クエリすることができます。

str(get_average_temperature.type_signature)

この tff.federated_computation は、連合型 <float>@CLIENTS の引数を受け入れ、連合型 <float>@SERVER の出力を返します。連合計算もサーバーからクライアント、クライアントからクライアント、またはサーバーからサーバーに移動することができます。また、型シグネチャが一致する限り、通常の関数のように作成することができます。

開発を支援するために、TFF では tff.federated_computation を Python 関数として呼び出すことができます。たとえば、以下を呼び出すことが可能です。

get_average_temperature([68.5, 70.3, 69.8])

非 eager 計算と TensorFlow

注意しておかなければならない重要な制限事項が 2 つあります。1 つは、Python インタープリタが tff.federated_computation デコレータに遭遇すると、関数のトレースが一度行われ、以降で使用できるようにシリアル化されるという制限です。そのため、TFF 計算は基本的に非 eager で行われます。この動作は、TensorFlow の tf.function デコレータの動作にやや似ています。

2 つ目は、連合計算には連合演算子(tff.federated_mean など)しか使用できず、TensorFlow 演算子を含めることはできないという制限です。TensorFlow コードは tff.tf_computation でデコレートされたブロックに閉じ込められている必要があります。ほとんどの一般的な TensorFlow コードは、数字を取得してそれに 0.5 を追加する以下の関数のように、直接デコレートすることができます。

@tff.tf_computation(tf.float32) def add_half(x): return tf.add(x, 0.5)

これらにも型シグネチャがありますが、位置付けされていません。たとえば、以下を呼び出すことができます。

str(add_half.type_signature)

ここでは、tff.federated_computationtff.tf_computation の重要な違いがわかります。前者は明示的な位置づけがあり、後者にはありません。

連合計算では配置を指定することで、tff.tf_computation ブロックを使用できます。クライアントの連合浮動小数点のみに半分を追加する関数を作成してみましょう。これは、配置を保持しながら特定の tff.tf_computation を適用する tff.federated_map を使って行います。

@tff.federated_computation(tff.type_at_clients(tf.float32)) def add_half_on_clients(x): return tff.federated_map(add_half, x)

この関数はほぼ add_half と同じですが、tff.CLIENTS に配置されている値のみを受け入れ、同じ配置の値を返します。これは型シグネチャで確認できます。

str(add_half_on_clients.type_signature)

要約:

  • TFF は連合値で演算します。

  • 各連合値には、(例: tf.float32)と配置(例: tff.CLIENTS)を持つ連合型があります。

  • 連合値は、連合計算を使って変換できますが、tff.federated_computation と連合型シグネチャでデコレートされている必要があります。

  • TensorFlow コードは tff.tf_computation デコレータを持つブロックに格納されている必要があります。

  • その上で、これらのブロックを連合計算に組み込むことができます。

独自の FL アルゴリズムを構築する(パート 2)

Federated Core について理解できたので、独自の連合学習アルゴリズムを作成することができるようになりました。上記では、アルゴリズムに initialize_fnnext_fn を定義したことを思い出してください。next_fn は純粋な TensorFlow コードを使用して定義した client_updateserver_update を利用します。

ただし、アルゴリズムを連合計算にするには、next_fninitialize_fntff.federated_computations である必要があります。

TensorFlow Federated ブロック

初期化計算を作成する

初期化関数は非常に単純です。model_fn を使用してモデルを作成します。ただし、tff.tf_computation を使用して、TensorFlow コードを分けておく必要があったことを思い出しましょう。

@tff.tf_computation def server_init(): model = model_fn() return model.weights.trainable

次に、tff.federated_value を使用して、これを直接連合計算に渡します。

@tff.federated_computation def initialize_fn(): return tff.federated_value(server_init(), tff.SERVER)

next_fn を作成する

クライアントサーバーの更新コードを使って、実際のアルゴリズムを作成することにしましょう。まず、client_update を、クライアントデータセットとサーバーの重みを受け入れて、更新されたクライアントの重みテンソルを出力する tff.tf_computation に変換します。

関数を適切にデコレートするために、対応する型が必要です。幸いにも、サーバーの重みの型は、モデルから直接抽出することができます。

whimsy_model = model_fn() tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)

データセットの型シグネチャを確認しましょう。28 x 28 の画像(整数のラベル付き)を取得して、平坦化したことを思い出してください。

str(tf_dataset_type)

また、上記の server_init 関数を使用して、モデルの重みの型を抽出することもできます。

model_weights_type = server_init.type_signature.result

型シグネチャを調べると、モデルのアーキテクチャを確認できます!

str(model_weights_type)

次に、クライアントの更新用の tff.tf_computation を作成します。

@tff.tf_computation(tf_dataset_type, model_weights_type) def client_update_fn(tf_dataset, server_weights): model = model_fn() client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01) return client_update(model, tf_dataset, server_weights, client_optimizer)

サーバー更新バージョンの tff.tf_computation は、すでに抽出した型を使用して、同じようにして定義することができます。

@tff.tf_computation(model_weights_type) def server_update_fn(mean_client_weights): model = model_fn() return server_update(model, mean_client_weights)

最後に、このすべてをまとめる tff.federated_computation を作成する必要があります。この関数は、サーバーの重みに対応する値(配置が tff.SERVER のもの)とクライアントデータセットに対応する値(配置が tff.CLIENTS のもの)の 2 つの連合値を受け入れます。

これら両方の型が上記で定義されているところに注意してください!`tff.type_at_{server/clients}`` を使用して適切な配置を指定することだけが必要です。

federated_server_type = tff.type_at_server(model_weights_type) federated_dataset_type = tff.type_at_clients(tf_dataset_type)

FL アルゴリズムの 4 つの要素を覚えていますか?

  1. サーバーからクライアントへのブロードキャストステップ。

  2. ローカルクライアントの更新ステップ。

  3. クライアントからサーバーへのアップロードステップ。

  4. サーバーの更新ステップ。

上記の構築が完了したので、各パーツを TFF コードの単一の行としてコンパクトに表現することができます。連合型などを指定して手間をかけたのは、この単純さを実現するためです!

@tff.federated_computation(federated_server_type, federated_dataset_type) def next_fn(server_weights, federated_dataset): # Broadcast the server weights to the clients. server_weights_at_client = tff.federated_broadcast(server_weights) # Each client computes their updated weights. client_weights = tff.federated_map( client_update_fn, (federated_dataset, server_weights_at_client)) # The server averages these updates. mean_client_weights = tff.federated_mean(client_weights) # The server updates its model. server_weights = tff.federated_map(server_update_fn, mean_client_weights) return server_weights

両方のアルゴリズム初期化と、アルゴリズムの 1 つのステップの実行を行うめの tff.federated_computation を用意できました。このアルゴリズムを終了するために、これらを tff.templates.IterativeProcess に渡します。

federated_algorithm = tff.templates.IterativeProcess( initialize_fn=initialize_fn, next_fn=next_fn )

反復プロセスの initializenext 関数の型シグネチャを見てみましょう。

str(federated_algorithm.initialize.type_signature)

これは、federated_algorithm.initialize が単一レイヤーモデル(784 x10 の重み行列と 10 バイアスユニット)を返す引数なし関数であることを反映しています。

str(federated_algorithm.next.type_signature)

ここでは、federated_algorithm.next がサーバーモデルとクライアントデータを受け入れて、更新されたサーバーモデルを返すことがわかります。

アルゴリズムを評価する

数ラウンドほど実行し、損失がどのように変化するかを見てみましょう。まず、2 つ目のチュートリアルで説明した centralized アプローチを使って、評価関数を定義します。

まず、中央の評価データセットを作成してから、トレーニングデータに使用したのと同じ前処理を適用します。

ここでは、計算効率の理由で、最初の 1000 個の要素のみを take していますが、一般的にはテストデータセット全体を使用することに注意してください。

central_emnist_test = emnist_test.create_tf_dataset_from_all_clients().take(1000) central_emnist_test = preprocess(central_emnist_test)

次に、サーバーの状態を受け入れる関数を記述し、Keras を使用してテストデータセットで評価します。tf.Keras の使用に慣れているのであれば、これも見慣れているかもしれませんが、set_weights の使用に注意してください!

def evaluate(server_state): keras_model = create_keras_model() keras_model.compile( loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()] ) keras_model.set_weights(server_state) keras_model.evaluate(central_emnist_test)

では、アルゴリズムを初期化して、テストセットを評価してみましょう。

server_state = federated_algorithm.initialize() evaluate(server_state)

数ラウンド程度トレーニングし、何かが変化するかどうかを確認しましょう。

for round in range(15): server_state = federated_algorithm.next(server_state, federated_train_data)
evaluate(server_state)

損失関数がわずかに減少しているのがわかります。小さなジャンプではありますが、トレーニングは 10 ラウンドしか実行しておらず、クライアントのサブセットも小さいことに注意してください。結果をよく理解するには、数千ラウンドでないにしても、数百ラウンドは実行する必要があるかもしれません。

アルゴリズムを変更する

ここまでたどり着いたところで、手を休め、これまで達成したことを考えてみましょう。純粋な TensorFlow コード(クライアントとサーバーの更新用)と TFF の Federated Core の連合計算を組み合わせることで、Federated Averaging を直接実装しました。

単に上記の内容を変更するだけで、さらに洗練された学習を実行することができます。具体的には、上記の純粋な TF コードを編集することで、クライアントがトレーニングを実行する方法またはサーバーがモデルを更新する方法を変更することができます。

課題: client_update 関数に勾配クリップを追加してください。