Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
fchollet
GitHub Repository: fchollet/deep-learning-with-python-notebooks
Path: blob/master/chapter14_text-classification.ipynb
709 views
Kernel: Python 3

This is a companion notebook for the book Deep Learning with Python, Third Edition. For readability, it only contains runnable code blocks and section titles, and omits everything else in the book: text paragraphs, figures, and pseudocode.

If you want to be able to follow what's going on, I recommend reading the notebook side by side with your copy of the book.

The book's contents are available online at deeplearningwithpython.io.

!pip install keras keras-hub --upgrade -q
import os os.environ["KERAS_BACKEND"] = "jax"
# @title import os from IPython.core.magic import register_cell_magic @register_cell_magic def backend(line, cell): current, required = os.environ.get("KERAS_BACKEND", ""), line.split()[-1] if current == required: get_ipython().run_cell(cell) else: print( f"This cell requires the {required} backend. To run it, change KERAS_BACKEND to " f"\"{required}\" at the top of the notebook, restart the runtime, and rerun the notebook." )

Text classification

A brief history of natural language processing

Preparing text data

import regex as re def split_chars(text): return re.findall(r".", text)
chars = split_chars("The quick brown fox jumped over the lazy dog.") chars[:12]
def split_words(text): return re.findall(r"[\w]+|[.,!?;]", text)
split_words("The quick brown fox jumped over the dog.")
vocabulary = { "[UNK]": 0, "the": 1, "quick": 2, "brown": 3, "fox": 4, "jumped": 5, "over": 6, "dog": 7, ".": 8, } words = split_words("The quick brown fox jumped over the lazy dog.") indices = [vocabulary.get(word, 0) for word in words]

Character and word tokenization

class CharTokenizer: def __init__(self, vocabulary): self.vocabulary = vocabulary self.unk_id = vocabulary["[UNK]"] def standardize(self, inputs): return inputs.lower() def split(self, inputs): return re.findall(r".", inputs) def index(self, tokens): return [self.vocabulary.get(t, self.unk_id) for t in tokens] def __call__(self, inputs): inputs = self.standardize(inputs) tokens = self.split(inputs) indices = self.index(tokens) return indices
import collections def compute_char_vocabulary(inputs, max_size): char_counts = collections.Counter() for x in inputs: x = x.lower() tokens = re.findall(r".", x) char_counts.update(tokens) vocabulary = ["[UNK]"] most_common = char_counts.most_common(max_size - len(vocabulary)) for token, count in most_common: vocabulary.append(token) return dict((token, i) for i, token in enumerate(vocabulary))
class WordTokenizer: def __init__(self, vocabulary): self.vocabulary = vocabulary self.unk_id = vocabulary["[UNK]"] def standardize(self, inputs): return inputs.lower() def split(self, inputs): return re.findall(r"[\w]+|[.,!?;]", inputs) def index(self, tokens): return [self.vocabulary.get(t, self.unk_id) for t in tokens] def __call__(self, inputs): inputs = self.standardize(inputs) tokens = self.split(inputs) indices = self.index(tokens) return indices
def compute_word_vocabulary(inputs, max_size): word_counts = collections.Counter() for x in inputs: x = x.lower() tokens = re.findall(r"[\w]+|[.,!?;]", x) word_counts.update(tokens) vocabulary = ["[UNK]"] most_common = word_counts.most_common(max_size - len(vocabulary)) for token, count in most_common: vocabulary.append(token) return dict((token, i) for i, token in enumerate(vocabulary))
import keras filename = keras.utils.get_file( origin="https://www.gutenberg.org/files/2701/old/moby10b.txt", ) moby_dick = list(open(filename, "r")) vocabulary = compute_char_vocabulary(moby_dick, max_size=100) char_tokenizer = CharTokenizer(vocabulary)
print("Vocabulary length:", len(vocabulary))
print("Vocabulary start:", list(vocabulary.keys())[:10])
print("Vocabulary end:", list(vocabulary.keys())[-10:])
print("Line length:", len(char_tokenizer( "Call me Ishmael. Some years ago--never mind how long precisely." )))
vocabulary = compute_word_vocabulary(moby_dick, max_size=2_000) word_tokenizer = WordTokenizer(vocabulary)
print("Vocabulary length:", len(vocabulary))
print("Vocabulary start:", list(vocabulary.keys())[:5])
print("Vocabulary end:", list(vocabulary.keys())[-5:])
print("Line length:", len(word_tokenizer( "Call me Ishmael. Some years ago--never mind how long precisely." )))

Subword tokenization

data = [ "the quick brown fox", "the slow brown fox", "the quick brown foxhound", ]
def count_and_split_words(data): counts = collections.Counter() for line in data: line = line.lower() for word in re.findall(r"[\w]+|[.,!?;]", line): chars = re.findall(r".", word) split_word = " ".join(chars) counts[split_word] += 1 return dict(counts) counts = count_and_split_words(data)
counts
def count_pairs(counts): pairs = collections.Counter() for word, freq in counts.items(): symbols = word.split() for pair in zip(symbols[:-1], symbols[1:]): pairs[pair] += freq return pairs def merge_pair(counts, first, second): split = re.compile(f"(?<!\S){first} {second}(?!\S)") merged = f"{first}{second}" return {split.sub(merged, word): count for word, count in counts.items()} for i in range(10): pairs = count_pairs(counts) first, second = max(pairs, key=pairs.get) counts = merge_pair(counts, first, second) print(list(counts.keys()))
def compute_sub_word_vocabulary(dataset, vocab_size): counts = count_and_split_words(dataset) char_counts = collections.Counter() for word in counts: for char in word.split(): char_counts[char] += counts[word] most_common = char_counts.most_common() vocab = ["[UNK]"] + [char for char, freq in most_common] merges = [] while len(vocab) < vocab_size: pairs = count_pairs(counts) if not pairs: break first, second = max(pairs, key=pairs.get) counts = merge_pair(counts, first, second) vocab.append(f"{first}{second}") merges.append(f"{first} {second}") vocab = dict((token, index) for index, token in enumerate(vocab)) merges = dict((token, rank) for rank, token in enumerate(merges)) return vocab, merges
class SubWordTokenizer: def __init__(self, vocabulary, merges): self.vocabulary = vocabulary self.merges = merges self.unk_id = vocabulary["[UNK]"] def standardize(self, inputs): return inputs.lower() def bpe_merge(self, word): while True: pairs = re.findall(r"(?<!\S)\S+ \S+(?!\S)", word, overlapped=True) if not pairs: break best = min(pairs, key=lambda pair: self.merges.get(pair, 1e9)) if best not in self.merges: break first, second = best.split() split = re.compile(f"(?<!\S){first} {second}(?!\S)") merged = f"{first}{second}" word = split.sub(merged, word) return word def split(self, inputs): tokens = [] for word in re.findall(r"[\w]+|[.,!?;]", inputs): word = " ".join(re.findall(r".", word)) word = self.bpe_merge(word) tokens.extend(word.split()) return tokens def index(self, tokens): return [self.vocabulary.get(t, self.unk_id) for t in tokens] def __call__(self, inputs): inputs = self.standardize(inputs) tokens = self.split(inputs) indices = self.index(tokens) return indices
vocabulary, merges = compute_sub_word_vocabulary(moby_dick, 2_000) sub_word_tokenizer = SubWordTokenizer(vocabulary, merges)
print("Vocabulary length:", len(vocabulary))
print("Vocabulary start:", list(vocabulary.keys())[:10])
print("Vocabulary end:", list(vocabulary.keys())[-7:])
print("Line length:", len(sub_word_tokenizer( "Call me Ishmael. Some years ago--never mind how long precisely." )))

Sets vs. sequences

Loading the IMDb classification dataset

import os, pathlib, shutil, random zip_path = keras.utils.get_file( origin="https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz", fname="imdb", extract=True, ) imdb_extract_dir = pathlib.Path(zip_path) / "aclImdb"
for path in imdb_extract_dir.glob("*/*"): if path.is_dir(): print(path)
print(open(imdb_extract_dir / "train" / "pos" / "4077_10.txt", "r").read())
train_dir = pathlib.Path("imdb_train") test_dir = pathlib.Path("imdb_test") val_dir = pathlib.Path("imdb_val") shutil.copytree(imdb_extract_dir / "test", test_dir) val_percentage = 0.2 for category in ("neg", "pos"): src_dir = imdb_extract_dir / "train" / category src_files = os.listdir(src_dir) random.Random(1337).shuffle(src_files) num_val_samples = int(len(src_files) * val_percentage) os.makedirs(val_dir / category) for file in src_files[:num_val_samples]: shutil.copy(src_dir / file, val_dir / category / file) os.makedirs(train_dir / category) for file in src_files[num_val_samples:]: shutil.copy(src_dir / file, train_dir / category / file)
from keras.utils import text_dataset_from_directory batch_size = 32 train_ds = text_dataset_from_directory(train_dir, batch_size=batch_size) val_ds = text_dataset_from_directory(val_dir, batch_size=batch_size) test_ds = text_dataset_from_directory(test_dir, batch_size=batch_size)

Set models

Training a bag-of-words model

from keras import layers max_tokens = 20_000 text_vectorization = layers.TextVectorization( max_tokens=max_tokens, split="whitespace", output_mode="multi_hot", ) train_ds_no_labels = train_ds.map(lambda x, y: x) text_vectorization.adapt(train_ds_no_labels) bag_of_words_train_ds = train_ds.map( lambda x, y: (text_vectorization(x), y), num_parallel_calls=8 ) bag_of_words_val_ds = val_ds.map( lambda x, y: (text_vectorization(x), y), num_parallel_calls=8 ) bag_of_words_test_ds = test_ds.map( lambda x, y: (text_vectorization(x), y), num_parallel_calls=8 )
x, y = next(bag_of_words_train_ds.as_numpy_iterator()) x.shape
y.shape
def build_linear_classifier(max_tokens, name): inputs = keras.Input(shape=(max_tokens,)) outputs = layers.Dense(1, activation="sigmoid")(inputs) model = keras.Model(inputs, outputs, name=name) model.compile( optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"], ) return model model = build_linear_classifier(max_tokens, "bag_of_words_classifier")
model.summary(line_length=80)
early_stopping = keras.callbacks.EarlyStopping( monitor="val_loss", restore_best_weights=True, patience=2, ) history = model.fit( bag_of_words_train_ds, validation_data=bag_of_words_val_ds, epochs=10, callbacks=[early_stopping], )
import matplotlib.pyplot as plt accuracy = history.history["accuracy"] val_accuracy = history.history["val_accuracy"] epochs = range(1, len(accuracy) + 1) plt.plot(epochs, accuracy, "r--", label="Training accuracy") plt.plot(epochs, val_accuracy, "b", label="Validation accuracy") plt.title("Training and validation accuracy") plt.legend() plt.show()
test_loss, test_acc = model.evaluate(bag_of_words_test_ds) test_acc

Training a bigram model

max_tokens = 30_000 text_vectorization = layers.TextVectorization( max_tokens=max_tokens, split="whitespace", output_mode="multi_hot", ngrams=2, ) text_vectorization.adapt(train_ds_no_labels) bigram_train_ds = train_ds.map( lambda x, y: (text_vectorization(x), y), num_parallel_calls=8 ) bigram_val_ds = val_ds.map( lambda x, y: (text_vectorization(x), y), num_parallel_calls=8 ) bigram_test_ds = test_ds.map( lambda x, y: (text_vectorization(x), y), num_parallel_calls=8 )
x, y = next(bigram_train_ds.as_numpy_iterator()) x.shape
text_vectorization.get_vocabulary()[100:108]
model = build_linear_classifier(max_tokens, "bigram_classifier") model.fit( bigram_train_ds, validation_data=bigram_val_ds, epochs=10, callbacks=[early_stopping], )
test_loss, test_acc = model.evaluate(bigram_test_ds) test_acc

Sequence models

max_length = 600 max_tokens = 30_000 text_vectorization = layers.TextVectorization( max_tokens=max_tokens, split="whitespace", output_mode="int", output_sequence_length=max_length, ) text_vectorization.adapt(train_ds_no_labels) sequence_train_ds = train_ds.map( lambda x, y: (text_vectorization(x), y), num_parallel_calls=8 ) sequence_val_ds = val_ds.map( lambda x, y: (text_vectorization(x), y), num_parallel_calls=8 ) sequence_test_ds = test_ds.map( lambda x, y: (text_vectorization(x), y), num_parallel_calls=8 )
x, y = next(sequence_test_ds.as_numpy_iterator()) x.shape
x

Training a recurrent model

from keras import ops class OneHotEncoding(keras.Layer): def __init__(self, depth, **kwargs): super().__init__(**kwargs) self.depth = depth def call(self, inputs): flat_inputs = ops.reshape(ops.cast(inputs, "int"), [-1]) one_hot_vectors = ops.eye(self.depth) outputs = ops.take(one_hot_vectors, flat_inputs, axis=0) return ops.reshape(outputs, ops.shape(inputs) + (self.depth,)) one_hot_encoding = OneHotEncoding(max_tokens)
x, y = next(sequence_train_ds.as_numpy_iterator()) one_hot_encoding(x).shape
hidden_dim = 64 inputs = keras.Input(shape=(max_length,), dtype="int32") x = one_hot_encoding(inputs) x = layers.Bidirectional(layers.LSTM(hidden_dim))(x) x = layers.Dropout(0.5)(x) outputs = layers.Dense(1, activation="sigmoid")(x) model = keras.Model(inputs, outputs, name="lstm_with_one_hot") model.compile( optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"], )
model.summary(line_length=80)
# ⚠️NOTE⚠️: The following fit call will error on a T4 GPU on the TensorFlow # backend due to a bug in TensorFlow. If you the follow cell errors out, # do one of the following: # - Skip the following two cells. # - Switch to the Jax or Torch backend and re-run this notebook. # - Change the GPU type in your runtime (requires Colab Pro as of this writing).
model.fit( sequence_train_ds, validation_data=sequence_val_ds, epochs=10, callbacks=[early_stopping], )
test_loss, test_acc = model.evaluate(sequence_test_ds) test_acc

Understanding word embeddings

Using a word embedding

hidden_dim = 64 inputs = keras.Input(shape=(max_length,), dtype="int32") x = keras.layers.Embedding( input_dim=max_tokens, output_dim=hidden_dim, mask_zero=True, )(inputs) x = keras.layers.Bidirectional(keras.layers.LSTM(hidden_dim))(x) x = keras.layers.Dropout(0.5)(x) outputs = keras.layers.Dense(1, activation="sigmoid")(x) model = keras.Model(inputs, outputs, name="lstm_with_embedding") model.compile( optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"], )
model.summary(line_length=80)
model.fit( sequence_train_ds, validation_data=sequence_val_ds, epochs=10, callbacks=[early_stopping], ) test_loss, test_acc = model.evaluate(sequence_test_ds) test_acc

Pretraining a word embedding

imdb_vocabulary = text_vectorization.get_vocabulary() tokenize_no_padding = keras.layers.TextVectorization( vocabulary=imdb_vocabulary, split="whitespace", output_mode="int", )
import tensorflow as tf context_size = 4 window_size = 9 def window_data(token_ids): num_windows = tf.maximum(tf.size(token_ids) - context_size * 2, 0) windows = tf.range(window_size)[None, :] windows = windows + tf.range(num_windows)[:, None] windowed_tokens = tf.gather(token_ids, windows) return tf.data.Dataset.from_tensor_slices(windowed_tokens) def split_label(window): left = window[:context_size] right = window[context_size + 1 :] bag = tf.concat((left, right), axis=0) label = window[4] return bag, label dataset = keras.utils.text_dataset_from_directory( imdb_extract_dir / "train", batch_size=None ) dataset = dataset.map(lambda x, y: x, num_parallel_calls=8) dataset = dataset.map(tokenize_no_padding, num_parallel_calls=8) dataset = dataset.interleave(window_data, cycle_length=8, num_parallel_calls=8) dataset = dataset.map(split_label, num_parallel_calls=8)
hidden_dim = 64 inputs = keras.Input(shape=(2 * context_size,)) cbow_embedding = layers.Embedding( max_tokens, hidden_dim, ) x = cbow_embedding(inputs) x = layers.GlobalAveragePooling1D()(x) outputs = layers.Dense(max_tokens, activation="sigmoid")(x) cbow_model = keras.Model(inputs, outputs) cbow_model.compile( optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["sparse_categorical_accuracy"], )
cbow_model.summary(line_length=80)
dataset = dataset.batch(1024).cache() cbow_model.fit(dataset, epochs=4)

Using the pretrained embedding for classification

inputs = keras.Input(shape=(max_length,)) lstm_embedding = layers.Embedding( input_dim=max_tokens, output_dim=hidden_dim, mask_zero=True, ) x = lstm_embedding(inputs) x = layers.Bidirectional(layers.LSTM(hidden_dim))(x) x = layers.Dropout(0.5)(x) outputs = layers.Dense(1, activation="sigmoid")(x) model = keras.Model(inputs, outputs, name="lstm_with_cbow")
lstm_embedding.embeddings.assign(cbow_embedding.embeddings)
model.compile( optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"], ) model.fit( sequence_train_ds, validation_data=sequence_val_ds, epochs=10, callbacks=[early_stopping], )
test_loss, test_acc = model.evaluate(sequence_test_ds) test_acc