Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
fchollet
GitHub Repository: fchollet/deep-learning-with-python-notebooks
Path: blob/master/second_edition/chapter07_working-with-keras.ipynb
713 views
Kernel: Python 3

This is a companion notebook for the book Deep Learning with Python, Second Edition. For readability, it only contains runnable code blocks and section titles, and omits everything else in the book: text paragraphs, figures, and pseudocode.

If you want to be able to follow what's going on, I recommend reading the notebook side by side with your copy of the book.

This notebook was generated for TensorFlow 2.6.

Working with Keras: A deep dive

A spectrum of workflows

Different ways to build Keras models

The Sequential model

The Sequential class

from tensorflow import keras from tensorflow.keras import layers model = keras.Sequential([ layers.Dense(64, activation="relu"), layers.Dense(10, activation="softmax") ])

Incrementally building a Sequential model

model = keras.Sequential() model.add(layers.Dense(64, activation="relu")) model.add(layers.Dense(10, activation="softmax"))

Calling a model for the first time to build it

model.build(input_shape=(None, 3)) model.weights

The summary method

model.summary()

Naming models and layers with the name argument

model = keras.Sequential(name="my_example_model") model.add(layers.Dense(64, activation="relu", name="my_first_layer")) model.add(layers.Dense(10, activation="softmax", name="my_last_layer")) model.build((None, 3)) model.summary()

Specifying the input shape of your model in advance

model = keras.Sequential() model.add(keras.Input(shape=(3,))) model.add(layers.Dense(64, activation="relu"))
model.summary()
model.add(layers.Dense(10, activation="softmax")) model.summary()

The Functional API

A simple example

A simple Functional model with two Dense layers

inputs = keras.Input(shape=(3,), name="my_input") features = layers.Dense(64, activation="relu")(inputs) outputs = layers.Dense(10, activation="softmax")(features) model = keras.Model(inputs=inputs, outputs=outputs)
inputs = keras.Input(shape=(3,), name="my_input")
inputs.shape
inputs.dtype
features = layers.Dense(64, activation="relu")(inputs)
features.shape
outputs = layers.Dense(10, activation="softmax")(features) model = keras.Model(inputs=inputs, outputs=outputs)
model.summary()

Multi-input, multi-output models

A multi-input, multi-output Functional model

vocabulary_size = 10000 num_tags = 100 num_departments = 4 title = keras.Input(shape=(vocabulary_size,), name="title") text_body = keras.Input(shape=(vocabulary_size,), name="text_body") tags = keras.Input(shape=(num_tags,), name="tags") features = layers.Concatenate()([title, text_body, tags]) features = layers.Dense(64, activation="relu")(features) priority = layers.Dense(1, activation="sigmoid", name="priority")(features) department = layers.Dense( num_departments, activation="softmax", name="department")(features) model = keras.Model(inputs=[title, text_body, tags], outputs=[priority, department])

Training a multi-input, multi-output model

Training a model by providing lists of input & target arrays

import numpy as np num_samples = 1280 title_data = np.random.randint(0, 2, size=(num_samples, vocabulary_size)) text_body_data = np.random.randint(0, 2, size=(num_samples, vocabulary_size)) tags_data = np.random.randint(0, 2, size=(num_samples, num_tags)) priority_data = np.random.random(size=(num_samples, 1)) department_data = np.random.randint(0, 2, size=(num_samples, num_departments)) model.compile(optimizer="rmsprop", loss=["mean_squared_error", "categorical_crossentropy"], metrics=[["mean_absolute_error"], ["accuracy"]]) model.fit([title_data, text_body_data, tags_data], [priority_data, department_data], epochs=1) model.evaluate([title_data, text_body_data, tags_data], [priority_data, department_data]) priority_preds, department_preds = model.predict([title_data, text_body_data, tags_data])

Training a model by providing dicts of input & target arrays

model.compile(optimizer="rmsprop", loss={"priority": "mean_squared_error", "department": "categorical_crossentropy"}, metrics={"priority": ["mean_absolute_error"], "department": ["accuracy"]}) model.fit({"title": title_data, "text_body": text_body_data, "tags": tags_data}, {"priority": priority_data, "department": department_data}, epochs=1) model.evaluate({"title": title_data, "text_body": text_body_data, "tags": tags_data}, {"priority": priority_data, "department": department_data}) priority_preds, department_preds = model.predict( {"title": title_data, "text_body": text_body_data, "tags": tags_data})

The power of the Functional API: Access to layer connectivity

keras.utils.plot_model(model, "ticket_classifier.png")
keras.utils.plot_model(model, "ticket_classifier_with_shape_info.png", show_shapes=True)

Retrieving the inputs or outputs of a layer in a Functional model

model.layers
model.layers[3].input
model.layers[3].output

Creating a new model by reusing intermediate layer outputs

features = model.layers[4].output difficulty = layers.Dense(3, activation="softmax", name="difficulty")(features) new_model = keras.Model( inputs=[title, text_body, tags], outputs=[priority, department, difficulty])
keras.utils.plot_model(new_model, "updated_ticket_classifier.png", show_shapes=True)

Subclassing the Model class

Rewriting our previous example as a subclassed model

A simple subclassed model

class CustomerTicketModel(keras.Model): def __init__(self, num_departments): super().__init__() self.concat_layer = layers.Concatenate() self.mixing_layer = layers.Dense(64, activation="relu") self.priority_scorer = layers.Dense(1, activation="sigmoid") self.department_classifier = layers.Dense( num_departments, activation="softmax") def call(self, inputs): title = inputs["title"] text_body = inputs["text_body"] tags = inputs["tags"] features = self.concat_layer([title, text_body, tags]) features = self.mixing_layer(features) priority = self.priority_scorer(features) department = self.department_classifier(features) return priority, department
model = CustomerTicketModel(num_departments=4) priority, department = model( {"title": title_data, "text_body": text_body_data, "tags": tags_data})
model.compile(optimizer="rmsprop", loss=["mean_squared_error", "categorical_crossentropy"], metrics=[["mean_absolute_error"], ["accuracy"]]) model.fit({"title": title_data, "text_body": text_body_data, "tags": tags_data}, [priority_data, department_data], epochs=1) model.evaluate({"title": title_data, "text_body": text_body_data, "tags": tags_data}, [priority_data, department_data]) priority_preds, department_preds = model.predict({"title": title_data, "text_body": text_body_data, "tags": tags_data})

Beware: What subclassed models don't support

Mixing and matching different components

Creating a Functional model that includes a subclassed model

class Classifier(keras.Model): def __init__(self, num_classes=2): super().__init__() if num_classes == 2: num_units = 1 activation = "sigmoid" else: num_units = num_classes activation = "softmax" self.dense = layers.Dense(num_units, activation=activation) def call(self, inputs): return self.dense(inputs) inputs = keras.Input(shape=(3,)) features = layers.Dense(64, activation="relu")(inputs) outputs = Classifier(num_classes=10)(features) model = keras.Model(inputs=inputs, outputs=outputs)

Creating a subclassed model that includes a Functional model

inputs = keras.Input(shape=(64,)) outputs = layers.Dense(1, activation="sigmoid")(inputs) binary_classifier = keras.Model(inputs=inputs, outputs=outputs) class MyModel(keras.Model): def __init__(self, num_classes=2): super().__init__() self.dense = layers.Dense(64, activation="relu") self.classifier = binary_classifier def call(self, inputs): features = self.dense(inputs) return self.classifier(features) model = MyModel()

Remember: Use the right tool for the job

Using built-in training and evaluation loops

The standard workflow: compile(), fit(), evaluate(), predict()

from tensorflow.keras.datasets import mnist def get_mnist_model(): inputs = keras.Input(shape=(28 * 28,)) features = layers.Dense(512, activation="relu")(inputs) features = layers.Dropout(0.5)(features) outputs = layers.Dense(10, activation="softmax")(features) model = keras.Model(inputs, outputs) return model (images, labels), (test_images, test_labels) = mnist.load_data() images = images.reshape((60000, 28 * 28)).astype("float32") / 255 test_images = test_images.reshape((10000, 28 * 28)).astype("float32") / 255 train_images, val_images = images[10000:], images[:10000] train_labels, val_labels = labels[10000:], labels[:10000] model = get_mnist_model() model.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) model.fit(train_images, train_labels, epochs=3, validation_data=(val_images, val_labels)) test_metrics = model.evaluate(test_images, test_labels) predictions = model.predict(test_images)

Writing your own metrics

Implementing a custom metric by subclassing the Metric class

import tensorflow as tf class RootMeanSquaredError(keras.metrics.Metric): def __init__(self, name="rmse", **kwargs): super().__init__(name=name, **kwargs) self.mse_sum = self.add_weight(name="mse_sum", initializer="zeros") self.total_samples = self.add_weight( name="total_samples", initializer="zeros", dtype="int32") def update_state(self, y_true, y_pred, sample_weight=None): y_true = tf.one_hot(y_true, depth=tf.shape(y_pred)[1]) mse = tf.reduce_sum(tf.square(y_true - y_pred)) self.mse_sum.assign_add(mse) num_samples = tf.shape(y_pred)[0] self.total_samples.assign_add(num_samples) def result(self): return tf.sqrt(self.mse_sum / tf.cast(self.total_samples, tf.float32)) def reset_state(self): self.mse_sum.assign(0.) self.total_samples.assign(0)
model = get_mnist_model() model.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy", RootMeanSquaredError()]) model.fit(train_images, train_labels, epochs=3, validation_data=(val_images, val_labels)) test_metrics = model.evaluate(test_images, test_labels)

Using callbacks

The EarlyStopping and ModelCheckpoint callbacks

Using the callbacks argument in the fit() method

callbacks_list = [ keras.callbacks.EarlyStopping( monitor="val_accuracy", patience=2, ), keras.callbacks.ModelCheckpoint( filepath="checkpoint_path.keras", monitor="val_loss", save_best_only=True, ) ] model = get_mnist_model() model.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) model.fit(train_images, train_labels, epochs=10, callbacks=callbacks_list, validation_data=(val_images, val_labels))
model = keras.models.load_model("checkpoint_path.keras")

Writing your own callbacks

Creating a custom callback by subclassing the Callback class

from matplotlib import pyplot as plt class LossHistory(keras.callbacks.Callback): def on_train_begin(self, logs): self.per_batch_losses = [] def on_batch_end(self, batch, logs): self.per_batch_losses.append(logs.get("loss")) def on_epoch_end(self, epoch, logs): plt.clf() plt.plot(range(len(self.per_batch_losses)), self.per_batch_losses, label="Training loss for each batch") plt.xlabel(f"Batch (epoch {epoch})") plt.ylabel("Loss") plt.legend() plt.savefig(f"plot_at_epoch_{epoch}") self.per_batch_losses = []
model = get_mnist_model() model.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) model.fit(train_images, train_labels, epochs=10, callbacks=[LossHistory()], validation_data=(val_images, val_labels))

Monitoring and visualization with TensorBoard

model = get_mnist_model() model.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) tensorboard = keras.callbacks.TensorBoard( log_dir="/full_path_to_your_log_dir", ) model.fit(train_images, train_labels, epochs=10, validation_data=(val_images, val_labels), callbacks=[tensorboard])
%load_ext tensorboard %tensorboard --logdir /full_path_to_your_log_dir

Writing your own training and evaluation loops

Training versus inference

Low-level usage of metrics

metric = keras.metrics.SparseCategoricalAccuracy() targets = [0, 1, 2] predictions = [[1, 0, 0], [0, 1, 0], [0, 0, 1]] metric.update_state(targets, predictions) current_result = metric.result() print(f"result: {current_result:.2f}")
values = [0, 1, 2, 3, 4] mean_tracker = keras.metrics.Mean() for value in values: mean_tracker.update_state(value) print(f"Mean of values: {mean_tracker.result():.2f}")

A complete training and evaluation loop

Writing a step-by-step training loop: the training step function

model = get_mnist_model() loss_fn = keras.losses.SparseCategoricalCrossentropy() optimizer = keras.optimizers.RMSprop() metrics = [keras.metrics.SparseCategoricalAccuracy()] loss_tracking_metric = keras.metrics.Mean() def train_step(inputs, targets): with tf.GradientTape() as tape: predictions = model(inputs, training=True) loss = loss_fn(targets, predictions) gradients = tape.gradient(loss, model.trainable_weights) optimizer.apply_gradients(zip(gradients, model.trainable_weights)) logs = {} for metric in metrics: metric.update_state(targets, predictions) logs[metric.name] = metric.result() loss_tracking_metric.update_state(loss) logs["loss"] = loss_tracking_metric.result() return logs

Writing a step-by-step training loop: resetting the metrics

def reset_metrics(): for metric in metrics: metric.reset_state() loss_tracking_metric.reset_state()

Writing a step-by-step training loop: the loop itself

training_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)) training_dataset = training_dataset.batch(32) epochs = 3 for epoch in range(epochs): reset_metrics() for inputs_batch, targets_batch in training_dataset: logs = train_step(inputs_batch, targets_batch) print(f"Results at the end of epoch {epoch}") for key, value in logs.items(): print(f"...{key}: {value:.4f}")

Writing a step-by-step evaluation loop

def test_step(inputs, targets): predictions = model(inputs, training=False) loss = loss_fn(targets, predictions) logs = {} for metric in metrics: metric.update_state(targets, predictions) logs["val_" + metric.name] = metric.result() loss_tracking_metric.update_state(loss) logs["val_loss"] = loss_tracking_metric.result() return logs val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels)) val_dataset = val_dataset.batch(32) reset_metrics() for inputs_batch, targets_batch in val_dataset: logs = test_step(inputs_batch, targets_batch) print("Evaluation results:") for key, value in logs.items(): print(f"...{key}: {value:.4f}")

Make it fast with tf.function

Adding a tf.function decorator to our evaluation-step function

@tf.function def test_step(inputs, targets): predictions = model(inputs, training=False) loss = loss_fn(targets, predictions) logs = {} for metric in metrics: metric.update_state(targets, predictions) logs["val_" + metric.name] = metric.result() loss_tracking_metric.update_state(loss) logs["val_loss"] = loss_tracking_metric.result() return logs val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels)) val_dataset = val_dataset.batch(32) reset_metrics() for inputs_batch, targets_batch in val_dataset: logs = test_step(inputs_batch, targets_batch) print("Evaluation results:") for key, value in logs.items(): print(f"...{key}: {value:.4f}")

Leveraging fit() with a custom training loop

Implementing a custom training step to use with fit()

loss_fn = keras.losses.SparseCategoricalCrossentropy() loss_tracker = keras.metrics.Mean(name="loss") class CustomModel(keras.Model): def train_step(self, data): inputs, targets = data with tf.GradientTape() as tape: predictions = self(inputs, training=True) loss = loss_fn(targets, predictions) gradients = tape.gradient(loss, self.trainable_weights) self.optimizer.apply_gradients(zip(gradients, self.trainable_weights)) loss_tracker.update_state(loss) return {"loss": loss_tracker.result()} @property def metrics(self): return [loss_tracker]
inputs = keras.Input(shape=(28 * 28,)) features = layers.Dense(512, activation="relu")(inputs) features = layers.Dropout(0.5)(features) outputs = layers.Dense(10, activation="softmax")(features) model = CustomModel(inputs, outputs) model.compile(optimizer=keras.optimizers.RMSprop()) model.fit(train_images, train_labels, epochs=3)
class CustomModel(keras.Model): def train_step(self, data): inputs, targets = data with tf.GradientTape() as tape: predictions = self(inputs, training=True) loss = self.compiled_loss(targets, predictions) gradients = tape.gradient(loss, self.trainable_weights) self.optimizer.apply_gradients(zip(gradients, self.trainable_weights)) self.compiled_metrics.update_state(targets, predictions) return {m.name: m.result() for m in self.metrics}
inputs = keras.Input(shape=(28 * 28,)) features = layers.Dense(512, activation="relu")(inputs) features = layers.Dropout(0.5)(features) outputs = layers.Dense(10, activation="softmax")(features) model = CustomModel(inputs, outputs) model.compile(optimizer=keras.optimizers.RMSprop(), loss=keras.losses.SparseCategoricalCrossentropy(), metrics=[keras.metrics.SparseCategoricalAccuracy()]) model.fit(train_images, train_labels, epochs=3)

Summary