Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
probml
GitHub Repository: probml/pyprobml
Path: blob/master/notebooks/book1/15/cnn1d_sentiment_torch.ipynb
1192 views
Kernel: Python 3

Open In Colab

1d CNNs for sentiment classification

We use 1d CNNs for IMDB movie review classification. Based on sec 15.3 of http://d2l.ai/chapter_natural-language-processing-applications/sentiment-analysis-cnn.html

import numpy as np import matplotlib.pyplot as plt import math from IPython import display try: import torch except ModuleNotFoundError: %pip install -qq torch import torch from torch import nn from torch.nn import functional as F from torch.utils import data import collections import re import random import os import requests import zipfile import tarfile import hashlib import time np.random.seed(seed=1) torch.manual_seed(1) !mkdir figures # for saving plots

Data

We use IMDB dataset. Details in this colab.

# Required functions for downloading data def download(name, cache_dir=os.path.join("..", "data")): """Download a file inserted into DATA_HUB, return the local filename.""" assert name in DATA_HUB, f"{name} does not exist in {DATA_HUB}." url, sha1_hash = DATA_HUB[name] os.makedirs(cache_dir, exist_ok=True) fname = os.path.join(cache_dir, url.split("/")[-1]) if os.path.exists(fname): sha1 = hashlib.sha1() with open(fname, "rb") as f: while True: data = f.read(1048576) if not data: break sha1.update(data) if sha1.hexdigest() == sha1_hash: return fname # Hit cache print(f"Downloading {fname} from {url}...") r = requests.get(url, stream=True, verify=True) with open(fname, "wb") as f: f.write(r.content) return fname def download_extract(name, folder=None): """Download and extract a zip/tar file.""" fname = download(name) base_dir = os.path.dirname(fname) data_dir, ext = os.path.splitext(fname) if ext == ".zip": fp = zipfile.ZipFile(fname, "r") elif ext in (".tar", ".gz"): fp = tarfile.open(fname, "r") else: assert False, "Only zip/tar files can be extracted." fp.extractall(base_dir) return os.path.join(base_dir, folder) if folder else data_dir
def read_imdb(data_dir, is_train): data, labels = [], [] for label in ("pos", "neg"): folder_name = os.path.join(data_dir, "train" if is_train else "test", label) for file in os.listdir(folder_name): with open(os.path.join(folder_name, file), "rb") as f: review = f.read().decode("utf-8").replace("\n", "") data.append(review) labels.append(1 if label == "pos" else 0) return data, labels

We tokenize using words, and drop words which occur less than 5 times in training set when creating the vocab.

def tokenize(lines, token="word"): """Split text lines into word or character tokens.""" if token == "word": return [line.split() for line in lines] elif token == "char": return [list(line) for line in lines] else: print("ERROR: unknown token type: " + token) class Vocab: """Vocabulary for text.""" def __init__(self, tokens=None, min_freq=0, reserved_tokens=None): if tokens is None: tokens = [] if reserved_tokens is None: reserved_tokens = [] # Sort according to frequencies counter = count_corpus(tokens) self.token_freqs = sorted(counter.items(), key=lambda x: x[1], reverse=True) # The index for the unknown token is 0 self.unk, uniq_tokens = 0, ["<unk>"] + reserved_tokens uniq_tokens += [token for token, freq in self.token_freqs if freq >= min_freq and token not in uniq_tokens] self.idx_to_token, self.token_to_idx = [], dict() for token in uniq_tokens: self.idx_to_token.append(token) self.token_to_idx[token] = len(self.idx_to_token) - 1 def __len__(self): return len(self.idx_to_token) def __getitem__(self, tokens): if not isinstance(tokens, (list, tuple)): return self.token_to_idx.get(tokens, self.unk) return [self.__getitem__(token) for token in tokens] def to_tokens(self, indices): if not isinstance(indices, (list, tuple)): return self.idx_to_token[indices] return [self.idx_to_token[index] for index in indices] def count_corpus(tokens): """Count token frequencies.""" # Here `tokens` is a 1D list or 2D list if len(tokens) == 0 or isinstance(tokens[0], list): # Flatten a list of token lists into a list of tokens tokens = [token for line in tokens for token in line] return collections.Counter(tokens) def set_figsize(figsize=(3.5, 2.5)): """Set the figure size for matplotlib.""" display.set_matplotlib_formats("svg") plt.rcParams["figure.figsize"] = figsize

We pad all sequences to length 500, for efficient minibatching.

def truncate_pad(line, num_steps, padding_token): """Truncate or pad sequences.""" if len(line) > num_steps: return line[:num_steps] # Truncate return line + [padding_token] * (num_steps - len(line))

Data iterator.

def load_array(data_arrays, batch_size, is_train=True): """Construct a PyTorch data iterator.""" dataset = data.TensorDataset(*data_arrays) return data.DataLoader(dataset, batch_size, shuffle=is_train)

Putting it altogether.

def load_data_imdb(batch_size, num_steps=500): data_dir = download_extract("aclImdb", "aclImdb") train_data = read_imdb(data_dir, True) test_data = read_imdb(data_dir, False) train_tokens = tokenize(train_data[0], token="word") test_tokens = tokenize(test_data[0], token="word") vocab = Vocab(train_tokens, min_freq=5) train_features = torch.tensor([truncate_pad(vocab[line], num_steps, vocab["<pad>"]) for line in train_tokens]) test_features = torch.tensor([truncate_pad(vocab[line], num_steps, vocab["<pad>"]) for line in test_tokens]) train_iter = load_array((train_features, torch.tensor(train_data[1])), batch_size) test_iter = load_array((test_features, torch.tensor(test_data[1])), batch_size, is_train=False) return train_iter, test_iter, vocab
DATA_HUB = dict() DATA_HUB["aclImdb"] = ( "http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz", "01ada507287d82875905620988597833ad4e0903", ) data_dir = download_extract("aclImdb", "aclImdb")
Downloading ../data/aclImdb_v1.tar.gz from http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz...
batch_size = 64 train_iter, test_iter, vocab = load_data_imdb(batch_size)

Model

We use 2 embedding layers, one with frozen weights, and one with learnable weights. We feed their concatenation to the 1d CNN. We then do average pooling over time before passing into the final MLP to map to the 2 output logits.

class TextCNN(nn.Module): def __init__(self, vocab_size, embed_size, kernel_sizes, num_channels, **kwargs): super(TextCNN, self).__init__(**kwargs) self.embedding = nn.Embedding(vocab_size, embed_size) # The embedding layer does not participate in training self.constant_embedding = nn.Embedding(vocab_size, embed_size) self.dropout = nn.Dropout(0.5) self.decoder = nn.Linear(sum(num_channels), 2) # The max-over-time pooling layer has no weight, so it can share an # instance self.pool = nn.AdaptiveAvgPool1d(1) self.relu = nn.ReLU() # Create multiple one-dimensional convolutional layers self.convs = nn.ModuleList() for c, k in zip(num_channels, kernel_sizes): self.convs.append(nn.Conv1d(2 * embed_size, c, k)) def forward(self, inputs): # Concatenate the output of two embedding layers with shape of # (batch size, no. of words, word vector dimension) by word vector embeddings = torch.cat((self.embedding(inputs), self.constant_embedding(inputs)), dim=2) # According to the input format required by Conv1d, the word vector # dimension, that is, the channel dimension of the one-dimensional # convolutional layer, is transformed into the previous dimension embeddings = embeddings.permute(0, 2, 1) # For each one-dimensional convolutional layer, after max-over-time # pooling, a tensor with the shape of (batch size, channel size, 1) # can be obtained. Use the flatten function to remove the last # dimension and then concatenate on the channel dimension encoding = torch.cat( [torch.squeeze(self.relu(self.pool(conv(embeddings))), dim=-1) for conv in self.convs], dim=1 ) # After applying the dropout method, use a fully connected layer to # obtain the output outputs = self.decoder(self.dropout(encoding)) return outputs
def try_all_gpus(): """Return all available GPUs, or [cpu(),] if no GPU exists.""" devices = [torch.device(f"cuda:{i}") for i in range(torch.cuda.device_count())] return devices if devices else [torch.device("cpu")] def try_gpu(i=0): """Return gpu(i) if exists, otherwise return cpu().""" if torch.cuda.device_count() >= i + 1: return torch.device(f"cuda:{i}") return torch.device("cpu")
embed_size, kernel_sizes, nums_channels = 100, [3, 4, 5], [100, 100, 100] devices = try_all_gpus() net = TextCNN(len(vocab), embed_size, kernel_sizes, nums_channels) def init_weights(m): if type(m) in (nn.Linear, nn.Conv1d): nn.init.xavier_uniform_(m.weight) net.apply(init_weights);

We load pretrained Glove vectors. We use these to initialize the embedding layers, one of which is frozen.

class TokenEmbedding: """Token Embedding.""" def __init__(self, embedding_name): self.idx_to_token, self.idx_to_vec = self._load_embedding(embedding_name) self.unknown_idx = 0 self.token_to_idx = {token: idx for idx, token in enumerate(self.idx_to_token)} def _load_embedding(self, embedding_name): idx_to_token, idx_to_vec = ["<unk>"], [] data_dir = download_extract(embedding_name) # GloVe website: https://nlp.stanford.edu/projects/glove/ # fastText website: https://fasttext.cc/ with open(os.path.join(data_dir, "vec.txt"), "r") as f: for line in f: elems = line.rstrip().split(" ") token, elems = elems[0], [float(elem) for elem in elems[1:]] # Skip header information, such as the top row in fastText if len(elems) > 1: idx_to_token.append(token) idx_to_vec.append(elems) idx_to_vec = [[0] * len(idx_to_vec[0])] + idx_to_vec return idx_to_token, torch.tensor(idx_to_vec) def __getitem__(self, tokens): indices = [self.token_to_idx.get(token, self.unknown_idx) for token in tokens] vecs = self.idx_to_vec[torch.tensor(indices)] return vecs def __len__(self): return len(self.idx_to_token)
DATA_URL = "http://d2l-data.s3-accelerate.amazonaws.com/glove.6B.100d.zip" DATA_HUB["glove.6b.100d"] = (DATA_URL, "cd43bfb07e44e6f27cbcc7bc9ae3d80284fdaf5a") glove_embedding = TokenEmbedding("glove.6b.100d") embeds = glove_embedding[vocab.idx_to_token] net.embedding.weight.data.copy_(embeds) net.constant_embedding.weight.data.copy_(embeds) net.constant_embedding.weight.requires_grad = False
Downloading ../data/glove.6B.100d.zip from http://d2l-data.s3-accelerate.amazonaws.com/glove.6B.100d.zip...

Training

class Animator: """For plotting data in animation.""" def __init__( self, xlabel=None, ylabel=None, legend=None, xlim=None, ylim=None, xscale="linear", yscale="linear", fmts=("-", "m--", "g-.", "r:"), nrows=1, ncols=1, figsize=(3.5, 2.5), ): # Incrementally plot multiple lines if legend is None: legend = [] display.set_matplotlib_formats("svg") self.fig, self.axes = plt.subplots(nrows, ncols, figsize=figsize) if nrows * ncols == 1: self.axes = [ self.axes, ] # Use a lambda function to capture arguments self.config_axes = lambda: set_axes(self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend) self.X, self.Y, self.fmts = None, None, fmts def add(self, x, y): # Add multiple data points into the figure if not hasattr(y, "__len__"): y = [y] n = len(y) if not hasattr(x, "__len__"): x = [x] * n if not self.X: self.X = [[] for _ in range(n)] if not self.Y: self.Y = [[] for _ in range(n)] for i, (a, b) in enumerate(zip(x, y)): if a is not None and b is not None: self.X[i].append(a) self.Y[i].append(b) self.axes[0].cla() for x, y, fmt in zip(self.X, self.Y, self.fmts): self.axes[0].plot(x, y, fmt) self.config_axes() display.display(self.fig) display.clear_output(wait=True) class Timer: """Record multiple running times.""" def __init__(self): self.times = [] self.start() def start(self): """Start the timer.""" self.tik = time.time() def stop(self): """Stop the timer and record the time in a list.""" self.times.append(time.time() - self.tik) return self.times[-1] def avg(self): """Return the average time.""" return sum(self.times) / len(self.times) def sum(self): """Return the sum of time.""" return sum(self.times) def cumsum(self): """Return the accumulated time.""" return np.array(self.times).cumsum().tolist() class Accumulator: """For accumulating sums over `n` variables.""" def __init__(self, n): self.data = [0.0] * n def add(self, *args): self.data = [a + float(b) for a, b in zip(self.data, args)] def reset(self): self.data = [0.0] * len(self.data) def __getitem__(self, idx): return self.data[idx]
def set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend): """Set the axes for matplotlib.""" axes.set_xlabel(xlabel) axes.set_ylabel(ylabel) axes.set_xscale(xscale) axes.set_yscale(yscale) axes.set_xlim(xlim) axes.set_ylim(ylim) if legend: axes.legend(legend) axes.grid()
def accuracy(y_hat, y): """Compute the number of correct predictions.""" if len(y_hat.shape) > 1 and y_hat.shape[1] > 1: y_hat = torch.argmax(y_hat, axis=1) cmp_ = y_hat.type(y.dtype) == y return float(cmp_.type(y.dtype).sum()) def evaluate_accuracy_gpu(net, data_iter, device=None): """Compute the accuracy for a model on a dataset using a GPU.""" if isinstance(net, torch.nn.Module): net.eval() # Set the model to evaluation mode if not device: device = next(iter(net.parameters())).device # No. of correct predictions, no. of predictions metric = Accumulator(2) for X, y in data_iter: if isinstance(X, list): # Required for BERT Fine-tuning X = [x.to(device) for x in X] else: X = X.to(device) y = y.to(device) metric.add(accuracy(net(X), y), y.numel()) return metric[0] / metric[1]
def train_batch(net, X, y, loss, trainer, devices): if isinstance(X, list): # Required for BERT Fine-tuning X = [x.to(devices[0]) for x in X] else: X = X.to(devices[0]) y = y.to(devices[0]) net.train() trainer.zero_grad() pred = net(X) l = loss(pred, y) l.sum().backward() trainer.step() train_loss_sum = l.sum() train_acc_sum = accuracy(pred, y) return train_loss_sum, train_acc_sum def train(net, train_iter, test_iter, loss, trainer, num_epochs, devices=try_all_gpus()): timer, num_batches = Timer(), len(train_iter) animator = Animator( xlabel="epoch", xlim=[1, num_epochs], ylim=[0, 1], legend=["train loss", "train acc", "test acc"] ) net = nn.DataParallel(net, device_ids=devices).to(devices[0]) for epoch in range(num_epochs): # Store training_loss, training_accuracy, num_examples, num_features metric = Accumulator(4) for i, (features, labels) in enumerate(train_iter): timer.start() l, acc = train_batch(net, features, labels, loss, trainer, devices) metric.add(l, acc, labels.shape[0], labels.numel()) timer.stop() if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1: animator.add(epoch + (i + 1) / num_batches, (metric[0] / metric[2], metric[1] / metric[3], None)) test_acc = evaluate_accuracy_gpu(net, test_iter) animator.add(epoch + 1, (None, None, test_acc)) print(f"loss {metric[0] / metric[2]:.3f}, train acc " f"{metric[1] / metric[3]:.3f}, test acc {test_acc:.3f}") print(f"{metric[2] * num_epochs / timer.sum():.1f} examples/sec on " f"{str(devices)}")

Learning curve

lr, num_epochs = 0.001, 5 trainer = torch.optim.Adam(net.parameters(), lr=lr) loss = nn.CrossEntropyLoss(reduction="none") train(net, train_iter, test_iter, loss, trainer, num_epochs, devices)
loss 0.067, train acc 0.978, test acc 0.876 1131.4 examples/sec on [device(type='cuda', index=0)]
Image in a Jupyter notebook

Testing

def predict_sentiment(net, vocab, sentence): sentence = torch.tensor(vocab[sentence.split()], device=try_gpu()) label = torch.argmax(net(sentence.reshape(1, -1)), dim=1) return "positive" if label == 1 else "negative"
predict_sentiment(net, vocab, "this movie is so great")
'positive'
predict_sentiment(net, vocab, "this movie is so bad")
'negative'