Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ja/tutorials/optimization/compression.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

スケーラブルなモデル圧縮

概要

このノートブックでは、TensorFlow Compression を使用してモデルを圧縮する方法を説明します。

以下の例では、分類精度を維持しながら、MNIST 分類器の重みを浮動小数点表現よりもはるかに小さいサイズに圧縮します。これは、論文 Scalable Model Compression by Entropy Penalized Reparameterization に基づく 2 段階のプロセスによって行われます。

  • トレーニング中に明示的なエントロピーペナルティを使用して「圧縮可能な」モデルをトレーニングします。これにより、モデルパラメータの圧縮性が促進されます。このペナルティの重み λ\lambda により、圧縮されたモデルのサイズとその精度の間のトレードオフを継続的に制御できます。

  • ペナルティと一致するコーディングスキームを使用して、圧縮可能なモデルを圧縮モデルにエンコードします。つまり、ペナルティはモデルサイズの予測因子となります。これにより、微調整するためにモデルのトレーニング、圧縮、再トレーニングを複数回繰り返す必要がなくなります。

この方法は、計算の複雑さではなく、圧縮されたモデルのサイズに厳密に関係しています。モデルのプルーニングなどの手法と組み合わせて、サイズと複雑さを軽減できます。

さまざまなモデルでの圧縮結果の例:

モデル(データセット)モデルサイズ圧縮比トップ-1 エラー圧縮(非圧縮)
LeNet300-100(MNIST)8.56 KB124x1.9%(1.6%)
LeNet5-Caffe(MNIST)2.84 KB606x1.0%(0.7%)
VGG-16(CIFAR-10)101 KB590x10.0%(6.6%)
ResNet-20-4(CIFAR-10)128 KB134x8.8%(5.0%)
ResNet-18(ImageNet)1.97 MB24x30.0%(30.0%)
ResNet-50(ImageNet)5.49 MB19x26.0%(25.0%)

使用例:

  • モデルを大規模にエッジデバイスにデプロイ/ブロードキャスト。転送中の帯域幅の使用量を低減できます。

  • フェデレーテッドラーニングでグローバルモデルの状態をクライアントに伝える。モデルのアーキテクチャ(隠れユニットの数など)は初期モデルから変更されておらず、クライアントは解凍されたモデルで学習を続けることができます。

  • 非常にメモリが制限されたクライアントでの推論の実行。推論中、各レイヤーの重みを順次解凍し、アクティベーションが計算された直後に破棄することができます。

セットアップ

pip で Tensorflow Compression をインストールします。

%%bash # Installs the latest version of TFC compatible with the installed TF version. read MAJOR MINOR <<< "$(pip show tensorflow | perl -p -0777 -e 's/.*Version: (\d+)\.(\d+).*/\1 \2/sg')" pip install "tensorflow-compression<$MAJOR.$(($MINOR+1))"

ライブラリ依存関係をインポートします。

import matplotlib.pyplot as plt import tensorflow as tf import tensorflow_compression as tfc import tensorflow_datasets as tfds

基本的な MNIST 分類器を定義してトレーニングする

高密度レイヤーと畳み込みレイヤーを効果的に圧縮するには、カスタムレイヤークラスを定義する必要があります。これらは tf.keras.layers の下のレイヤーに似ていますが、Entropy Penalized Reparameterization(EPR)を効果的に実装するために後でサブクラス化します。このために、コピーコンストラクタも追加します。

まず、標準の高密度レイヤーを定義します。

class CustomDense(tf.keras.layers.Layer): def __init__(self, filters, name="dense"): super().__init__(name=name) self.filters = filters @classmethod def copy(cls, other, **kwargs): """Returns an instantiated and built layer, initialized from `other`.""" self = cls(filters=other.filters, name=other.name, **kwargs) self.build(None, other=other) return self def build(self, input_shape, other=None): """Instantiates weights, optionally initializing them from `other`.""" if other is None: kernel_shape = (input_shape[-1], self.filters) kernel = tf.keras.initializers.GlorotUniform()(shape=kernel_shape) bias = tf.keras.initializers.Zeros()(shape=(self.filters,)) else: kernel, bias = other.kernel, other.bias self.kernel = tf.Variable( tf.cast(kernel, self.variable_dtype), name="kernel") self.bias = tf.Variable( tf.cast(bias, self.variable_dtype), name="bias") self.built = True def call(self, inputs): outputs = tf.linalg.matvec(self.kernel, inputs, transpose_a=True) outputs = tf.nn.bias_add(outputs, self.bias) return tf.nn.leaky_relu(outputs)

同様に、2D 畳み込みレイヤーを定義します。

class CustomConv2D(tf.keras.layers.Layer): def __init__(self, filters, kernel_size, strides=1, padding="SAME", name="conv2d"): super().__init__(name=name) self.filters = filters self.kernel_size = kernel_size self.strides = strides self.padding = padding @classmethod def copy(cls, other, **kwargs): """Returns an instantiated and built layer, initialized from `other`.""" self = cls(filters=other.filters, kernel_size=other.kernel_size, strides=other.strides, padding=other.padding, name=other.name, **kwargs) self.build(None, other=other) return self def build(self, input_shape, other=None): """Instantiates weights, optionally initializing them from `other`.""" if other is None: kernel_shape = 2 * (self.kernel_size,) + (input_shape[-1], self.filters) kernel = tf.keras.initializers.GlorotUniform()(shape=kernel_shape) bias = tf.keras.initializers.Zeros()(shape=(self.filters,)) else: kernel, bias = other.kernel, other.bias self.kernel = tf.Variable( tf.cast(kernel, self.variable_dtype), name="kernel") self.bias = tf.Variable( tf.cast(bias, self.variable_dtype), name="bias") self.built = True def call(self, inputs): outputs = tf.nn.convolution( inputs, self.kernel, strides=self.strides, padding=self.padding) outputs = tf.nn.bias_add(outputs, self.bias) return tf.nn.leaky_relu(outputs)

モデルの圧縮に進む前に、通常の分類器を正常にトレーニングできることを確認します。

モデルアーキテクチャを定義します。

classifier = tf.keras.Sequential([ CustomConv2D(20, 5, strides=2, name="conv_1"), CustomConv2D(50, 5, strides=2, name="conv_2"), tf.keras.layers.Flatten(), CustomDense(500, name="fc_1"), CustomDense(10, name="fc_2"), ], name="classifier")

トレーニングデータを読み込みます。

def normalize_img(image, label): """Normalizes images: `uint8` -> `float32`.""" return tf.cast(image, tf.float32) / 255., label training_dataset, validation_dataset = tfds.load( "mnist", split=["train", "test"], shuffle_files=True, as_supervised=True, with_info=False, ) training_dataset = training_dataset.map(normalize_img) validation_dataset = validation_dataset.map(normalize_img)

最後に、モデルをトレーニングします。

def train_model(model, training_data, validation_data, **kwargs): model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=[tf.keras.metrics.SparseCategoricalAccuracy()], # Uncomment this to ease debugging: # run_eagerly=True, ) kwargs.setdefault("epochs", 5) kwargs.setdefault("verbose", 1) log = model.fit( training_data.batch(128).prefetch(8), validation_data=validation_data.batch(128).cache(), validation_freq=1, **kwargs, ) return log.history["val_sparse_categorical_accuracy"][-1] classifier_accuracy = train_model( classifier, training_dataset, validation_dataset) print(f"Accuracy: {classifier_accuracy:0.4f}")

成功しました! モデルは適切にトレーニングされ、5 エポック以内に検証セットで 98% を超える精度に達しました。

圧縮可能な分類器をトレーニングする

Entropy Penalized Reparameterization(EPR)には、次の 2 つの主要な要素があります。

  • トレーニング中にモデルの重みにペナルティを適用します。これは、確率モデルの下でのエントロピーに対応し、重みのエンコードスキームと一致します。以下では、このペナルティを実装する Keras Regularizer を定義します。

  • 重みを再パラメータ化します。つまり、重みをより圧縮可能な潜在表現にします(圧縮性とモデルのパフォーマンスのトレードオフを改善します)。畳み込みカーネルの場合、フーリエドメインが適切な表現であることが示されています。他のパラメータについては、以下の例では、さまざまな量子化ステップサイズで単純にスカラー量子化(丸め)を使用しています。

まず、ペナルティを定義します。

以下の例では、論文 Optimizing the Communication-Accuracy Trade-off in Federated Learning with Rate-Distortion Theory に着想を得て、tfc.PowerLawEntropyModel クラスに実装されたコード/確率モデルを使用しています。ペナルティは log(x+αα), \log \Bigl(\frac {|x| + \alpha} \alpha\Bigr), と定義されます。xx はモデルパラメータまたはその潜在表現の 1 つの要素であり、α\alpha は小さい定数で、数値は 0 付近で安定します。

_ = tf.linspace(-5., 5., 501) plt.plot(_, tfc.PowerLawEntropyModel(0).penalty(_));

ペナルティは正則化の損失です(「重み損失」と呼ばれることもあります)。ゼロの場合、カスプが凹形状であり、重みのスパース性を助長します。重みを圧縮するために適用されるコーディングスキームである Elias ガンマ コードは、要素の大きさの長さが 1+log2x 1 + \lfloor \log_2 |x| \rfloor ビットのコードを生成します。つまり、それはペナルティに一致し、ペナルティを適用すると、予想されるコード長が最小化されます。

class PowerLawRegularizer(tf.keras.regularizers.Regularizer): def __init__(self, lmbda): super().__init__() self.lmbda = lmbda def __call__(self, variable): em = tfc.PowerLawEntropyModel(coding_rank=variable.shape.rank) return self.lmbda * em.penalty(variable) # Normalizing the weight of the penalty by the number of model parameters is a # good rule of thumb to produce comparable results across models. regularizer = PowerLawRegularizer(lmbda=2./classifier.count_params())

次に、次の追加機能を持つ CustomDenseCustomConv2D のサブクラスを定義します。

  • これらは上記の正則化のインスタンスを取得し、トレーニング中にそれをカーネルとバイアスに適用します。

  • これらは、カーネルとバイアスを @property として定義し、変数にアクセスするたびに直接勾配で量子化を実行します。これは、後で圧縮モデルで実行される計算を正確に反映します。

  • これらは、量子化ステップサイズの対数を表す追加の log_step 変数を定義します。量子化が粗いほど、モデルのサイズは小さくなりますが、精度は低くなります。量子化ステップサイズは各モデルパラメータに対してトレーニング可能であるため、ペナルティ付き損失関数で最適化を実行すると、最適な量子化ステップサイズが決定されます。

量子化ステップは次のように定義されます。

def quantize(latent, log_step): step = tf.exp(log_step) return tfc.round_st(latent / step) * step

これで、高密度レイヤーを定義できます。

class CompressibleDense(CustomDense): def __init__(self, regularizer, *args, **kwargs): super().__init__(*args, **kwargs) self.regularizer = regularizer def build(self, input_shape, other=None): """Instantiates weights, optionally initializing them from `other`.""" super().build(input_shape, other=other) if other is not None and hasattr(other, "kernel_log_step"): kernel_log_step = other.kernel_log_step bias_log_step = other.bias_log_step else: kernel_log_step = bias_log_step = -4. self.kernel_log_step = tf.Variable( tf.cast(kernel_log_step, self.variable_dtype), name="kernel_log_step") self.bias_log_step = tf.Variable( tf.cast(bias_log_step, self.variable_dtype), name="bias_log_step") self.add_loss(lambda: self.regularizer( self.kernel_latent / tf.exp(self.kernel_log_step))) self.add_loss(lambda: self.regularizer( self.bias_latent / tf.exp(self.bias_log_step))) @property def kernel(self): return quantize(self.kernel_latent, self.kernel_log_step) @kernel.setter def kernel(self, kernel): self.kernel_latent = tf.Variable(kernel, name="kernel_latent") @property def bias(self): return quantize(self.bias_latent, self.bias_log_step) @bias.setter def bias(self, bias): self.bias_latent = tf.Variable(bias, name="bias_latent")

畳み込みレイヤーも同様です。さらに、畳み込みカーネルは、カーネルが設定されるたびに実数値の離散フーリエ変換(RDFT)として格納され、カーネルが使用されるたびに変換が反転されます。カーネルのさまざまな周波数成分は多かれ少なかれ圧縮可能である傾向があるため、それぞれに独自の量子化ステップサイズが割り当てられます。

フーリエ変換とその逆を次のように定義します。

def to_rdft(kernel, kernel_size): # The kernel has shape (H, W, I, O) -> transpose to take DFT over last two # dimensions. kernel = tf.transpose(kernel, (2, 3, 0, 1)) # The RDFT has type complex64 and shape (I, O, FH, FW). kernel_rdft = tf.signal.rfft2d(kernel) # Map real and imaginary parts into regular floats. The result is float32 # and has shape (I, O, FH, FW, 2). kernel_rdft = tf.stack( [tf.math.real(kernel_rdft), tf.math.imag(kernel_rdft)], axis=-1) # Divide by kernel size to make the DFT orthonormal (length-preserving). return kernel_rdft / kernel_size def from_rdft(kernel_rdft, kernel_size): # Undoes the transformations in to_rdft. kernel_rdft *= kernel_size kernel_rdft = tf.dtypes.complex(*tf.unstack(kernel_rdft, axis=-1)) kernel = tf.signal.irfft2d(kernel_rdft, fft_length=2 * (kernel_size,)) return tf.transpose(kernel, (2, 3, 0, 1))

次に畳み込みレイヤーを以下のように定義します。

class CompressibleConv2D(CustomConv2D): def __init__(self, regularizer, *args, **kwargs): super().__init__(*args, **kwargs) self.regularizer = regularizer def build(self, input_shape, other=None): """Instantiates weights, optionally initializing them from `other`.""" super().build(input_shape, other=other) if other is not None and hasattr(other, "kernel_log_step"): kernel_log_step = other.kernel_log_step bias_log_step = other.bias_log_step else: kernel_log_step = tf.fill(self.kernel_latent.shape[2:], -4.) bias_log_step = -4. self.kernel_log_step = tf.Variable( tf.cast(kernel_log_step, self.variable_dtype), name="kernel_log_step") self.bias_log_step = tf.Variable( tf.cast(bias_log_step, self.variable_dtype), name="bias_log_step") self.add_loss(lambda: self.regularizer( self.kernel_latent / tf.exp(self.kernel_log_step))) self.add_loss(lambda: self.regularizer( self.bias_latent / tf.exp(self.bias_log_step))) @property def kernel(self): kernel_rdft = quantize(self.kernel_latent, self.kernel_log_step) return from_rdft(kernel_rdft, self.kernel_size) @kernel.setter def kernel(self, kernel): kernel_rdft = to_rdft(kernel, self.kernel_size) self.kernel_latent = tf.Variable(kernel_rdft, name="kernel_latent") @property def bias(self): return quantize(self.bias_latent, self.bias_log_step) @bias.setter def bias(self, bias): self.bias_latent = tf.Variable(bias, name="bias_latent")

上記と同じアーキテクチャで分類器モデルを定義しますが、これらの変更されたレイヤーを使用します。

def make_mnist_classifier(regularizer): return tf.keras.Sequential([ CompressibleConv2D(regularizer, 20, 5, strides=2, name="conv_1"), CompressibleConv2D(regularizer, 50, 5, strides=2, name="conv_2"), tf.keras.layers.Flatten(), CompressibleDense(regularizer, 500, name="fc_1"), CompressibleDense(regularizer, 10, name="fc_2"), ], name="classifier") compressible_classifier = make_mnist_classifier(regularizer)

モデルをトレーニングします。

penalized_accuracy = train_model( compressible_classifier, training_dataset, validation_dataset) print(f"Accuracy: {penalized_accuracy:0.4f}")

圧縮可能なモデルは、単純な分類器と同様の精度に達しています。

ただし、モデルは実際にはまだ圧縮されていません。これを行うためには、カーネルとバイアスを圧縮された形式(一連のビット)で格納するサブクラスの別のセットを定義します。

分類器を圧縮する

以下に定義されている CustomDenseCustom Conv2D のサブクラスは、圧縮可能な高密度レイヤーの重みをバイナリ文字列に変換します。さらに、スペースを節約するために、量子化ステップサイズの対数を半精度で格納します。カーネルまたはバイアスが @property を介してアクセスされるたびに、それらは文字列表現から解凍され、逆量子化されます。

まず、モデルパラメータを圧縮および解凍する関数を定義します。

def compress_latent(latent, log_step, name): em = tfc.PowerLawEntropyModel(latent.shape.rank) compressed = em.compress(latent / tf.exp(log_step)) compressed = tf.Variable(compressed, name=f"{name}_compressed") log_step = tf.cast(log_step, tf.float16) log_step = tf.Variable(log_step, name=f"{name}_log_step") return compressed, log_step def decompress_latent(compressed, shape, log_step): latent = tfc.PowerLawEntropyModel(len(shape)).decompress(compressed, shape) step = tf.exp(tf.cast(log_step, latent.dtype)) return latent * step

これらを使用して、CompressedDense を定義します。

class CompressedDense(CustomDense): def build(self, input_shape, other=None): assert isinstance(other, CompressibleDense) self.input_channels = other.kernel.shape[0] self.kernel_compressed, self.kernel_log_step = compress_latent( other.kernel_latent, other.kernel_log_step, "kernel") self.bias_compressed, self.bias_log_step = compress_latent( other.bias_latent, other.bias_log_step, "bias") self.built = True @property def kernel(self): kernel_shape = (self.input_channels, self.filters) return decompress_latent( self.kernel_compressed, kernel_shape, self.kernel_log_step) @property def bias(self): bias_shape = (self.filters,) return decompress_latent( self.bias_compressed, bias_shape, self.bias_log_step)

畳み込みレイヤークラスは上記に類似しています。

class CompressedConv2D(CustomConv2D): def build(self, input_shape, other=None): assert isinstance(other, CompressibleConv2D) self.input_channels = other.kernel.shape[2] self.kernel_compressed, self.kernel_log_step = compress_latent( other.kernel_latent, other.kernel_log_step, "kernel") self.bias_compressed, self.bias_log_step = compress_latent( other.bias_latent, other.bias_log_step, "bias") self.built = True @property def kernel(self): rdft_shape = (self.input_channels, self.filters, self.kernel_size, self.kernel_size // 2 + 1, 2) kernel_rdft = decompress_latent( self.kernel_compressed, rdft_shape, self.kernel_log_step) return from_rdft(kernel_rdft, self.kernel_size) @property def bias(self): bias_shape = (self.filters,) return decompress_latent( self.bias_compressed, bias_shape, self.bias_log_step)

圧縮可能なモデルを圧縮モデルに変換するには、便利な clone_model 関数を使用できます。compress_layer は、圧縮可能なレイヤーを圧縮レイヤーに変換し、他のタイプのレイヤー(Flatten など)を単純に通過させます。

def compress_layer(layer): if isinstance(layer, CompressibleDense): return CompressedDense.copy(layer) if isinstance(layer, CompressibleConv2D): return CompressedConv2D.copy(layer) return type(layer).from_config(layer.get_config()) compressed_classifier = tf.keras.models.clone_model( compressible_classifier, clone_function=compress_layer)

ここで、圧縮されたモデルの精度が期待どおりであることを検証します。

compressed_classifier.compile(metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]) _, compressed_accuracy = compressed_classifier.evaluate(validation_dataset.batch(128)) print(f"Accuracy of the compressible classifier: {penalized_accuracy:0.4f}") print(f"Accuracy of the compressed classifier: {compressed_accuracy:0.4f}")

圧縮されたモデルの分類精度は、トレーニング中に達成されたものと同じです!

さらに、圧縮されたモデルの重みのサイズは、元のモデルのサイズよりもはるかに小さくなっています。

def get_weight_size_in_bytes(weight): if weight.dtype == tf.string: return tf.reduce_sum(tf.strings.length(weight, unit="BYTE")) else: return tf.size(weight) * weight.dtype.size original_size = sum(map(get_weight_size_in_bytes, classifier.weights)) compressed_size = sum(map(get_weight_size_in_bytes, compressed_classifier.weights)) print(f"Size of original model weights: {original_size} bytes") print(f"Size of compressed model weights: {compressed_size} bytes") print(f"Compression ratio: {(original_size/compressed_size):0.0f}x")

モデルをディスクに保存するには、モデルアーキテクチャ、関数グラフなどを保存するためのオーバーヘッドが必要です。

ZIP などの可逆圧縮方法は、このようなデータの圧縮には適していますが、重み自体の圧縮には適していません。そのため、ZIP 圧縮を適用した後でも、そのオーバーヘッドを含めたモデルのサイズを数えると、EPR には大きな利点があります。

import os import shutil def get_disk_size(model, path): model.save(path) zip_path = shutil.make_archive(path, "zip", path) return os.path.getsize(zip_path) original_zip_size = get_disk_size(classifier, "/tmp/classifier") compressed_zip_size = get_disk_size( compressed_classifier, "/tmp/compressed_classifier") print(f"Original on-disk size (ZIP compressed): {original_zip_size} bytes") print(f"Compressed on-disk size (ZIP compressed): {compressed_zip_size} bytes") print(f"Compression ratio: {(original_zip_size/compressed_zip_size):0.0f}x")

正則化効果とサイズと精度のトレードオフ

上記では、λ\lambda ハイパーパラメータが 2 に設定されています(モデル内のパラメータの数によって正規化されています)。 λ\lambda を大きくすると、モデルの重みは圧縮性に対してますます不利になります。

低い値の場合、ペナルティは重みの正則化のように機能します。実際には、分類器の一般化パフォーマンスに有益な効果があり、検証データセットの精度がわずかに高くなる可能性があります。

#@title print(f"Accuracy of the vanilla classifier: {classifier_accuracy:0.4f}") print(f"Accuracy of the penalized classifier: {penalized_accuracy:0.4f}")

値が高いほど、モデルのサイズは小さくなりますが、精度は徐々に低下します。このことを確認するために、いくつかのモデルをトレーニングして、サイズと精度をプロットしてみます。

def compress_and_evaluate_model(lmbda): print(f"lambda={lmbda:0.0f}: training...", flush=True) regularizer = PowerLawRegularizer(lmbda=lmbda/classifier.count_params()) compressible_classifier = make_mnist_classifier(regularizer) train_model( compressible_classifier, training_dataset, validation_dataset, verbose=0) print("compressing...", flush=True) compressed_classifier = tf.keras.models.clone_model( compressible_classifier, clone_function=compress_layer) compressed_size = sum(map( get_weight_size_in_bytes, compressed_classifier.weights)) compressed_zip_size = float(get_disk_size( compressed_classifier, "/tmp/compressed_classifier")) print("evaluating...", flush=True) compressed_classifier = tf.keras.models.load_model( "/tmp/compressed_classifier") compressed_classifier.compile( metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]) _, compressed_accuracy = compressed_classifier.evaluate( validation_dataset.batch(128), verbose=0) print() return compressed_size, compressed_zip_size, compressed_accuracy lambdas = (2., 5., 10., 20., 50.) metrics = [compress_and_evaluate_model(l) for l in lambdas] metrics = tf.convert_to_tensor(metrics, tf.float32)
#@title def plot_broken_xaxis(ax, compressed_sizes, original_size, original_accuracy): xticks = list(range( int(tf.math.floor(min(compressed_sizes) / 5) * 5), int(tf.math.ceil(max(compressed_sizes) / 5) * 5) + 1, 5)) xticks.append(xticks[-1] + 10) ax.set_xlim(xticks[0], xticks[-1] + 2) ax.set_xticks(xticks[1:]) ax.set_xticklabels(xticks[1:-1] + [f"{original_size:0.2f}"]) ax.plot(xticks[-1], original_accuracy, "o", label="float32") sizes, zip_sizes, accuracies = tf.transpose(metrics) sizes /= 1024 zip_sizes /= 1024 fig, (axl, axr) = plt.subplots(1, 2, sharey=True, figsize=(10, 4)) axl.plot(sizes, accuracies, "o-", label="EPR compressed") axr.plot(zip_sizes, accuracies, "o-", label="EPR compressed") plot_broken_xaxis(axl, sizes, original_size/1024, classifier_accuracy) plot_broken_xaxis(axr, zip_sizes, original_zip_size/1024, classifier_accuracy) axl.set_xlabel("size of model weights [kbytes]") axr.set_xlabel("ZIP compressed on-disk model size [kbytes]") axl.set_ylabel("accuracy") axl.legend(loc="lower right") axr.legend(loc="lower right") axl.grid() axr.grid() for i in range(len(lambdas)): axl.annotate(f"$\lambda = {lambdas[i]:0.0f}$", (sizes[i], accuracies[i]), xytext=(10, -5), xycoords="data", textcoords="offset points") axr.annotate(f"$\lambda = {lambdas[i]:0.0f}$", (zip_sizes[i], accuracies[i]), xytext=(10, -5), xycoords="data", textcoords="offset points") plt.tight_layout()

プロットは、理想的にはエルボー型のサイズと精度のトレードオフを示す必要がありますが、精度メトリクスに多少のノイズがあるのは正常です。初期化によっては、曲線にねじれが生じる場合があります。

正則化効果により、EPR 圧縮モデルは、λ\lambda の値が小さい場合、元のモデルよりもテストセットでより正確です。追加の ZIP 圧縮後のサイズを比較しても、EPR 圧縮モデルは何倍も小さくなっています。

分類器を解凍する

CompressedDenseCompressedConv2D は、フォワードパスごとに重みを解凍します。そのため、メモリが制限されたデバイスに最適ですが、圧縮解凍は、特に小さなバッチサイズの場合、計算コストが高くなる可能性があります。

モデルを一度解凍し、それをさらにトレーニングや推論に使用するには、通常のレイヤーまたは圧縮可能なレイヤーを使用してモデルに戻すことができます。これは、モデルのデプロイまたはフェデレーションラーニングのシナリオで役立ちます。

まず、単純なモデルに戻すと、推論を実行したり、圧縮ペナルティなしで通常のトレーニングを継続したりできます。

def decompress_layer(layer): if isinstance(layer, CompressedDense): return CustomDense.copy(layer) if isinstance(layer, CompressedConv2D): return CustomConv2D.copy(layer) return type(layer).from_config(layer.get_config()) decompressed_classifier = tf.keras.models.clone_model( compressed_classifier, clone_function=decompress_layer)
decompressed_accuracy = train_model( decompressed_classifier, training_dataset, validation_dataset, epochs=1) print(f"Accuracy of the compressed classifier: {compressed_accuracy:0.4f}") print(f"Accuracy of the decompressed classifier after one more epoch of training: {decompressed_accuracy:0.4f}")

トレーニングは正則化なしで行われるため、追加エポックのトレーニング後に検証精度が低下することに注意してください。

または、モデルを「圧縮可能な」モデルに変換して、圧縮ペナルティを使用して推論やさらなるトレーニングを行うことができます。

def decompress_layer_with_penalty(layer): if isinstance(layer, CompressedDense): return CompressibleDense.copy(layer, regularizer=regularizer) if isinstance(layer, CompressedConv2D): return CompressibleConv2D.copy(layer, regularizer=regularizer) return type(layer).from_config(layer.get_config()) decompressed_classifier = tf.keras.models.clone_model( compressed_classifier, clone_function=decompress_layer_with_penalty)
decompressed_accuracy = train_model( decompressed_classifier, training_dataset, validation_dataset, epochs=1) print(f"Accuracy of the compressed classifier: {compressed_accuracy:0.4f}") print(f"Accuracy of the decompressed classifier after one more epoch of training: {decompressed_accuracy:0.4f}")

ここでは、追加のエポックのトレーニング後に精度が向上しています。