Path: blob/master/PyTorch-Multi-Label-Image-Classification-Image-Tagging/Pipeline.ipynb
3118 views
Kernel: blog_venv
In [ ]:
# Install requirements !pip install numpy scikit-image scipy scikit-learn matplotlib tqdm tensorflow torch torchvision
In [ ]:
import os import time import numpy as np from PIL import Image from torch.utils.data.dataset import Dataset from tqdm import tqdm from torchvision import transforms from torchvision import models import torch from torch.utils.tensorboard import SummaryWriter from sklearn.metrics import precision_score, recall_score, f1_score from torch import nn from torch.utils.data.dataloader import DataLoader from matplotlib import pyplot as plt from numpy import printoptions import requests import tarfile import random import json from shutil import copyfile
In [ ]:
# Fix all seeds to make experiments reproducible torch.manual_seed(2020) torch.cuda.manual_seed(2020) np.random.seed(2020) random.seed(2020) torch.backends.cudnn.deterministic = True
In [ ]:
# We use the .tar.gz archive from this(https://github.com/thuml/HashNet/tree/master/pytorch#datasets) # github repository to speed up image loading(instead of loading it from Flickr). # Let's download and extract it. img_folder = 'images' if not os.path.exists(img_folder): def download_file_from_google_drive(id, destination): def get_confirm_token(response): for key, value in response.cookies.items(): if key.startswith('download_warning'): return value return None def save_response_content(response, destination): CHUNK_SIZE = 32768 with open(destination, "wb") as f: for chunk in tqdm(response.iter_content(CHUNK_SIZE), desc='Downloading'): if chunk: # filter out keep-alive new chunks f.write(chunk) URL = "https://docs.google.com/uc?export=download" session = requests.Session() response = session.get(URL, params={'id': id}, stream=True) token = get_confirm_token(response) if token: params = {'id': id, 'confirm': token} response = session.get(URL, params=params, stream=True) save_response_content(response, destination) file_id = '0B7IzDz-4yH_HMFdiSE44R1lselE' path_to_tar_file = str(time.time()) + '.tar.gz' download_file_from_google_drive(file_id, path_to_tar_file) print('Extraction') with tarfile.open(path_to_tar_file) as tar_ref: tar_ref.extractall(os.path.dirname(img_folder)) os.remove(path_to_tar_file) # Also, copy our pre-processed annotations to the dataset folder. # Note: you can find script for generating such annotations in attachments copyfile('nus_wide/small_test.json', os.path.join(img_folder, 'small_test.json')) copyfile('nus_wide/small_train.json', os.path.join(img_folder, 'small_train.json'))
In [ ]:
# Simple dataloader and label binarization, that is converting test labels into binary arrays of length 27 (number of classes) with 1 in places of applicable labels). class NusDataset(Dataset): def __init__(self, data_path, anno_path, transforms): self.transforms = transforms with open(anno_path) as fp: json_data = json.load(fp) samples = json_data['samples'] self.classes = json_data['labels'] self.imgs = [] self.annos = [] self.data_path = data_path print('loading', anno_path) for sample in samples: self.imgs.append(sample['image_name']) self.annos.append(sample['image_labels']) for item_id in range(len(self.annos)): item = self.annos[item_id] vector = [cls in item for cls in self.classes] self.annos[item_id] = np.array(vector, dtype=float) def __getitem__(self, item): anno = self.annos[item] img_path = os.path.join(self.data_path, self.imgs[item]) img = Image.open(img_path) if self.transforms is not None: img = self.transforms(img) return img, anno def __len__(self): return len(self.imgs)
In [ ]:
# Let's take a look at the data we have. To do it we need to load the dataset without augmentations. dataset_val = NusDataset(img_folder, os.path.join(img_folder, 'small_test.json'), None) dataset_train = NusDataset(img_folder, os.path.join(img_folder, 'small_train.json'), None) # A simple function for visualization. def show_sample(img, binary_img_labels): # Convert the binary labels back to the text representation. img_labels = np.array(dataset_val.classes)[np.argwhere(binary_img_labels > 0)[:, 0]] plt.imshow(img) plt.title("{}".format(', '.join(img_labels))) plt.axis('off') plt.show() for sample_id in range(5): show_sample(*dataset_val[sample_id])
In [ ]:
# Calculate label distribution for the entire dataset (train + test) samples = dataset_val.annos + dataset_train.annos samples = np.array(samples) with printoptions(precision=3, suppress=True): class_counts = np.sum(samples, axis=0) # Sort labels according to their frequency in the dataset. sorted_ids = np.array([i[0] for i in sorted(enumerate(class_counts), key=lambda x: x[1])], dtype=int) print('Label distribution (count, class name):', list(zip(class_counts[sorted_ids].astype(int), np.array(dataset_val.classes)[sorted_ids]))) plt.barh(range(len(dataset_val.classes)), width=class_counts[sorted_ids]) plt.yticks(range(len(dataset_val.classes)), np.array(dataset_val.classes)[sorted_ids]) plt.gca().margins(y=0) plt.grid() plt.title('Label distribution') plt.show()
In [ ]:
# Use the torchvision's implementation of ResNeXt, but add FC layer for a different number of classes (27) and a Sigmoid instead of a default Softmax. class Resnext50(nn.Module): def __init__(self, n_classes): super().__init__() resnet = models.resnext50_32x4d(pretrained=True) resnet.fc = nn.Sequential( nn.Dropout(p=0.2), nn.Linear(in_features=resnet.fc.in_features, out_features=n_classes) ) self.base_model = resnet self.sigm = nn.Sigmoid() def forward(self, x): return self.sigm(self.base_model(x))
In [ ]:
# Use threshold to define predicted labels and invoke sklearn's metrics with different averaging strategies. def calculate_metrics(pred, target, threshold=0.5): pred = np.array(pred > threshold, dtype=float) return {'micro/precision': precision_score(y_true=target, y_pred=pred, average='micro'), 'micro/recall': recall_score(y_true=target, y_pred=pred, average='micro'), 'micro/f1': f1_score(y_true=target, y_pred=pred, average='micro'), 'macro/precision': precision_score(y_true=target, y_pred=pred, average='macro'), 'macro/recall': recall_score(y_true=target, y_pred=pred, average='macro'), 'macro/f1': f1_score(y_true=target, y_pred=pred, average='macro'), 'samples/precision': precision_score(y_true=target, y_pred=pred, average='samples'), 'samples/recall': recall_score(y_true=target, y_pred=pred, average='samples'), 'samples/f1': f1_score(y_true=target, y_pred=pred, average='samples'), }
In [ ]:
# Initialize the training parameters. num_workers = 8 # Number of CPU processes for data preprocessing lr = 1e-4 # Learning rate batch_size = 32 save_freq = 1 # Save checkpoint frequency (epochs) test_freq = 200 # Test model frequency (iterations) max_epoch_number = 35 # Number of epochs for training # Note: on the small subset of data overfitting happens after 30-35 epochs mean = [0.485, 0.456, 0.406] std = [0.229, 0.224, 0.225] device = torch.device('cuda') # Save path for checkpoints save_path = 'chekpoints/' # Save path for logs logdir = 'logs/' # Run tensorboard %load_ext tensorboard %tensorboard --logdir {logdir}
In [ ]:
# Here is an auxiliary function for checkpoint saving. def checkpoint_save(model, save_path, epoch): f = os.path.join(save_path, 'checkpoint-{:06d}.pth'.format(epoch)) if 'module' in dir(model): torch.save(model.module.state_dict(), f) else: torch.save(model.state_dict(), f) print('saved checkpoint:', f)
In [ ]:
# Test preprocessing val_transform = transforms.Compose([ transforms.Resize((256, 256)), transforms.ToTensor(), transforms.Normalize(mean, std) ]) print(tuple(np.array(np.array(mean)*255).tolist())) # Train preprocessing train_transform = transforms.Compose([ transforms.Resize((256, 256)), transforms.RandomHorizontalFlip(), transforms.ColorJitter(), transforms.RandomAffine(degrees=20, translate=(0.2, 0.2), scale=(0.5, 1.5), shear=None, resample=False, fillcolor=tuple(np.array(np.array(mean)*255).astype(int).tolist())), transforms.ToTensor(), transforms.Normalize(mean, std) ])
In [ ]:
# Initialize the dataloaders for training. test_annotations = os.path.join(img_folder, 'small_test.json') train_annotations = os.path.join(img_folder, 'small_train.json') test_dataset = NusDataset(img_folder, test_annotations, val_transform) train_dataset = NusDataset(img_folder, train_annotations, train_transform) train_dataloader = DataLoader(train_dataset, batch_size=batch_size, num_workers=num_workers, shuffle=True, drop_last=True) test_dataloader = DataLoader(test_dataset, batch_size=batch_size, num_workers=num_workers) num_train_batches = int(np.ceil(len(train_dataset) / batch_size)) # Initialize the model model = Resnext50(len(train_dataset.classes)) # Switch model to the training mode and move it to GPU. model.train() model = model.to(device) optimizer = torch.optim.Adam(model.parameters(), lr=lr) # If more than one GPU is available we can use both to speed up the training. if torch.cuda.device_count() > 1: model = nn.DataParallel(model) os.makedirs(save_path, exist_ok=True) # Loss function criterion = nn.BCELoss() # Tensoboard logger logger = SummaryWriter(logdir)
In [ ]:
# Run training epoch = 0 iteration = 0 while True: batch_losses = [] for imgs, targets in train_dataloader: imgs, targets = imgs.to(device), targets.to(device) optimizer.zero_grad() model_result = model(imgs) loss = criterion(model_result, targets.type(torch.float)) batch_loss_value = loss.item() loss.backward() optimizer.step() logger.add_scalar('train_loss', batch_loss_value, iteration) batch_losses.append(batch_loss_value) with torch.no_grad(): result = calculate_metrics(model_result.cpu().numpy(), targets.cpu().numpy()) for metric in result: logger.add_scalar('train/' + metric, result[metric], iteration) if iteration % test_freq == 0: model.eval() with torch.no_grad(): model_result = [] targets = [] for imgs, batch_targets in test_dataloader: imgs = imgs.to(device) model_batch_result = model(imgs) model_result.extend(model_batch_result.cpu().numpy()) targets.extend(batch_targets.cpu().numpy()) result = calculate_metrics(np.array(model_result), np.array(targets)) for metric in result: logger.add_scalar('test/' + metric, result[metric], iteration) print("epoch:{:2d} iter:{:3d} test: " "micro f1: {:.3f} " "macro f1: {:.3f} " "samples f1: {:.3f}".format(epoch, iteration, result['micro/f1'], result['macro/f1'], result['samples/f1'])) model.train() iteration += 1 loss_value = np.mean(batch_losses) print("epoch:{:2d} iter:{:3d} train: loss:{:.3f}".format(epoch, iteration, loss_value)) if epoch % save_freq == 0: checkpoint_save(model, save_path, epoch) epoch += 1 if max_epoch_number < epoch: break
In [ ]:
# Run inference on the test data model.eval() for sample_id in [1,2,3,4,6]: test_img, test_labels = test_dataset[sample_id] test_img_path = os.path.join(img_folder, test_dataset.imgs[sample_id]) with torch.no_grad(): raw_pred = model(test_img.unsqueeze(0)).cpu().numpy()[0] raw_pred = np.array(raw_pred > 0.5, dtype=float) predicted_labels = np.array(dataset_val.classes)[np.argwhere(raw_pred > 0)[:, 0]] if not len(predicted_labels): predicted_labels = ['no predictions'] img_labels = np.array(dataset_val.classes)[np.argwhere(test_labels > 0)[:, 0]] plt.imshow(Image.open(test_img_path)) plt.title("Predicted labels: {} \nGT labels: {}".format(', '.join(predicted_labels), ', '.join(img_labels))) plt.axis('off') plt.show()