Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.
Path: blob/main/course/videos/qa_postprocessing_pt.ipynb
Views: 2542
Kernel: Unknown Kernel
This notebook regroups the code sample of the video below, which is a part of the Hugging Face course.
In [ ]:
#@title from IPython.display import HTML HTML('<iframe width="560" height="315" src="https://www.youtube.com/embed/BNy08iIWVJM?rel=0&controls=0&showinfo=0" frameborder="0" allowfullscreen></iframe>')
Install the Transformers and Datasets libraries to run this notebook.
In [ ]:
! pip install datasets transformers[sentencepiece]
In [ ]:
def find_labels(offsets, answer_start, answer_end, sequence_ids): idx = 0 while sequence_ids[idx] != 1: idx += 1 context_start = idx while sequence_ids[idx] == 1: idx += 1 context_end = idx - 1 if offsets[context_start][0] > answer_end or offsets[context_end][1] < answer_start: return (0, 0) else: idx = context_start while idx <= context_end and offsets[idx][0] <= answer_start: idx += 1 start_position = idx - 1 idx = context_end while idx >= context_start and offsets[idx][1] >= answer_end: idx -= 1 end_position = idx + 1 return start_position, end_position
In [ ]:
def preprocess_validation_examples(examples): questions = [q.strip() for q in examples["question"]] inputs = tokenizer( examples["question"], examples["context"], truncation="only_second", padding="max_length", max_length=384, stride=128, return_overflowing_tokens=True, return_offsets_mapping=True, ) offset_mapping = inputs["offset_mapping"] sample_map = inputs.pop("overflow_to_sample_mapping") inputs["start_positions"] = [] inputs["end_positions"] = [] inputs["example_id"] = [] for i, offset in enumerate(offset_mapping): sample_idx = sample_map[i] inputs["example_id"].append(examples["id"][sample_idx]) sequence_ids = inputs.sequence_ids(i) offset_mapping[i] = [(o if s == 1 else None) for o, s in zip(offset, sequence_ids)] start, end = find_labels( offset, examples["answer_start"][sample_idx], examples["answer_end"][sample_idx], sequence_ids ) inputs["start_positions"].append(start) inputs["end_positions"].append(end) return inputs
In [ ]:
from datasets import load_dataset from transformers import AutoTokenizer model_checkpoint = "distilbert-base-cased-distilled-squad" tokenizer = AutoTokenizer.from_pretrained(model_checkpoint) raw_datasets = load_dataset("squad") raw_datasets = raw_datasets.remove_columns(["title"]) def prepare_data(example): answer = example["answers"]["text"][0] example["answer_start"] = example["answers"]["answer_start"][0] example["answer_end"] = example["answer_start"] + len(answer) return example validation_set = raw_datasets["validation"].map(prepare_data, remove_columns=["answers"]) validation_features = validation_set.map( preprocess_validation_examples, batched=True, remove_columns=validation_set.column_names, ) len(validation_set), len(validation_features)
In [ ]:
import torch from torch.utils.data import DataLoader from transformers import AutoModelForQuestionAnswering, default_data_collator device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = AutoModelForQuestionAnswering.from_pretrained(model_checkpoint).to(device) dataloader = DataLoader( validation_features.remove_columns(["example_id", "offset_mapping"]), batch_size=64, collate_fn=default_data_collator )
In [ ]:
from tqdm.auto import tqdm start_logits = [] end_logits = [] for batch in tqdm(dataloader): batch = {k: v.to(device) for k, v in batch.items()} with torch.no_grad(): outputs = model(**batch) start_logits.append(outputs.start_logits.cpu()) end_logits.append(outputs.end_logits.cpu()) start_logits = torch.cat(start_logits, dim=0).numpy() end_logits = torch.cat(end_logits, dim=0).numpy()
In [ ]:
import collections example_to_feature = collections.defaultdict(list) for idx, feature in enumerate(validation_features): example_id = feature["example_id"] example_to_feature[example_id].append(idx)
In [ ]:
score[start_pos, end_pos] = start_probabilities[start_pos] * end_probabilities[end_pos] logit_score[start_pos, end_pos] = start_logits[start_pos] + end_logits[end_pos]
In [ ]:
import numpy as np start_logit = start_logits[0] end_logit = end_logits[0] offsets = validation_features[0]["offset_mapping"] context = validation_set[0]["context"] start_indexes = np.argsort(start_logit)[-1 : -21 : -1].tolist() end_indexes = np.argsort(end_logit)[-1 : -21 : -1].tolist() answers = [] for start_index in start_indexes: for end_index in end_indexes: # Predicting (0, 0) means no answer. if start_index == 0 and end_index == 0: answers.append({"text": "", "logit_score": start_logit[start_index] + end_logit[end_index]}) # Skip answers that are not fully in the context. elif offsets[start_index] is None or offsets[end_index] is None: continue # Skip answers with a length that is either < 0 or > max_answer_length. elif end_index < start_index or end_index - start_index + 1 > 30: continue else: answers.append({ "text": context[offsets[start_index][0]: offsets[end_index][1]], "logit_score": start_logit[start_index] + end_logit[end_index], })
In [ ]:
predicted_answer = max(answers, key = lambda x: x["logit_score"]) print(f"Predicted answer: {predicted_answer}") answer_start = validation_set[0]["answer_start"] answer_end = validation_set[0]["answer_end"] right_answer = context[answer_start: answer_end] print(f"Theorerical answer: {right_answer}")
In [ ]:
predicted_answers = {} for example in tqdm(validation_set): example_id = example["id"] context = example["context"] answers = [] for feature_index in example_to_feature[example_id]: start_logit = start_logits[feature_index] end_logit = end_logits[feature_index] offsets = validation_features[feature_index]["offset_mapping"] start_indexes = np.argsort(start_logit)[-1 : -11 : -1].tolist() end_indexes = np.argsort(end_logit)[-1 : -11 : -1].tolist() for start_index in start_indexes: for end_index in end_indexes: # Predicting (0, 0) means no answer. if start_index == 0 and end_index == 0: answers.append({"text": "", "logit_score": start_logit[start_index] + end_logit[end_index]}) # Skip answers that are not fully in the context. elif offsets[start_index] is None or offsets[end_index] is None: continue # Skip answers with a length that is either < 0 or > max_answer_length. elif end_index < start_index or end_index - start_index + 1 > 30: continue else: answers.append({ "text": context[offsets[start_index][0]: offsets[end_index][1]], "logit_score": start_logit[start_index] + end_logit[end_index], }) best_answer = max(answers, key= lambda x: x["logit_score"]) predicted_answers[example_id] = best_answer["text"]
In [ ]: