Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
uob-COMS30035
GitHub Repository: uob-COMS30035/lab_sheets_public
Path: blob/main/lab6/Lab_6_HMM.ipynb
340 views
Kernel: Python 3 (ipykernel)

Lab 6: Hidden Markov Model

In this lab we will look into Hidden Markov Models (HMM) to model sequential data. HMMs use Markov Chains to compute probabilities for a sequence of observed events. Moreover, HMMs are based on the Markov assumption, which states that the present state znz_n is sufficient to predict the future zn+1z_{n+1}, so the past z0:n−1z_{0:n-1} can be discarded.

Often, we are in a situation where the states we are interested in are hidden, we cannot observe them directly. As we will see in Exercise 2, the part-of-speech (POS) tags in a text are hidden and we can only observe the words. From these words we have to infer the tags. Similarly in Exercises 3 where a robot needs to be localised, we cannot observe the robot's position but rather the measurements of its sensors. Both the tags and the robot position are called hidden variables because they are not observed. An HMM is specified by the following components:

  • A set of NN states.

  • A transition probability matrix AA where each element aija_{ij} represents the probability of moving from state ii to state jj, s.t. ∑j=1Naij=1 \sum^{N}_{j=1} a_{ij} = 1 ∀i \forall i

  • An emission probability distribution, the probabilities of observations xnx_n being generated from a state znz_n

  • An initial probability distribution over states. Ï€n\pi_n is the probability that the Markov chain will start in state nn. Also, ∑n=1NÏ€n=1\sum^{N}_{n=1} \pi_n = 1

Additional packages (if using your own machine)

For this lab we need hmmlearn, nltk, ipywidgets, so if you are using your own machine which you can install with conda:

conda install -c conda-forge hmmlearn conda install -c anaconda nltk conda install -c conda-forge ipywidgets

Let's import the libraries:

import numpy as np import matplotlib.pyplot as plt from hmmlearn import hmm from IPython import display import nltk from nltk.corpus import brown from nltk.probability import ConditionalFreqDist,ConditionalProbDist,MLEProbDist import itertools from sklearn.model_selection import train_test_split # interactive display import ipywidgets as widgets %config InlineBackend.figure_format = 'retina'

1) HMM with 2 states

We will start with a warm up exercise where you have to implement an HMM with 2 states and then analyse how the transition probability and emission noise influence the output. Your task is to create a function hmm_model that has three arguments:

  1. switch_prob: The probability of switching to the other state.

  2. noise_level: The variance of the emission distribution, i.e. the measurement/observation noise.

  3. startprob: The probabilities of starting in each state (shape=(2,1)).

Your function should create a hmm.GaussianHMM model with two states (hidden states) and covariance_type="full". Modify the following model attributes:

  • startprob_: Using the startprob argument.

  • transmat_: Implement a transition matrix with probability of transition for both states being ptransition=p_{transition} = switch_prob.

  • means_: Add mean μ1=1\mu_1 = 1 and μ2=−1\mu_2 = -1 for each state

  • covars_: Using the the noise_level argument. (note: the shape should be (2, 1, 1), i.e. two 1×11\times1 covariance matrices)

def plot_hmm(model, states, observations): """Plots HMM states and observations for 1d states and observations. Args: model (hmmlearn model): hmmlearn model used to get state means. states (numpy array of floats): Samples of the states. observations (numpy array of floats): Samples of the states. """ nsteps = states.size fig, ax1 = plt.subplots(figsize=(8,4)) states_forplot = list(map(lambda s: model.means_[s], states)) ax1.step(np.arange(nstep), states_forplot, "--", where="mid", alpha=1.0, c="green") ax1.set_xlabel("Time") ax1.set_ylabel("Latent State", c="green") ax1.set_yticks([-1, 1]) ax1.set_yticklabels(["State 1", "State 0"]) ax2 = ax1.twinx() ax2.plot(np.arange(nstep), observations.flatten(), c="blue") ax2.set_ylabel("Observations", c="blue") ax1.set_ylim(ax2.get_ylim()) plt.show(fig)
# write your code here

Next, execute the cell below to run your hmm_model and visualise the output with the provided plot_hmm function.

np.random.seed(101) nstep = 50 model = hmm_model(switch_prob=0.1, noise_level=1e-8, startprob=np.array([1.0, 0.0])) observations, states = model.sample(nstep) plot_hmm(model, states, observations)

Run the cell below and experiment with the interactive widget window. Try changing the value of switch_prob and noise_level, how does that changes the output?

#@title #@markdown Make sure you execute this cell to enable the widget! np.random.seed(101) nstep = 50 @widgets.interact def plot(switch_prob=(0., 1, .01), log10_noise_level=(-8., 1., .01)): model = hmm_model(switch_prob=switch_prob, noise_level=10.**log10_noise_level, startprob=[1.0, 0.0]) observations, states = model.sample(nstep) observations = observations.flatten() plot_hmm(model, states, observations)

2) HMM Part-of-Speech Tagging

Part-of-speech (POS) tagging enables the extraction of meaningful information about words in a sentence and their relation to neighbouring words. Knowing whether a word is a noun or a verb provides us information about their most likely neighboring words (such as nouns are preceded by determiners and adjectives, verbs by nouns) and syntactic structure (nouns are generally part of noun phrases), making POS tagging a key aspect of parsing. Parts of speech are useful features for labeling named entities like people or organisations in information extraction, or for coreference resolution. A word’s part of speech can even play a role in speech recognition or synthesis, e.g., the word content is pronounced CONtent when it is a noun and conTENT when it is an adjective.

Part-of-speech tagging is the process of assigning a POS tag to each word token in a given text. The input to a tagging algorithm is a sequence of tokenised words and the output is a sequence of tags, one per token. Particularly, words are ambiguous as they have more than one possible POS and the goal is to find the correct tag for each situation. For example, "book" can be a verb ("book that flight") or a noun ("hand me that book"). The goal of POS-tagging is to resolve these ambiguities, choosing the proper tag for the context.

In this section we introduce the use of the Hidden Markov Model for part-of-speech tagging. The HMM can be used as a sequence tagger to assign a tag or class label to each unit in an observed sequence, thus mapping a sequence of observations to a sequence of labels. A HMM is a probabilistic sequence model: given a sequence of units (words, letters, morphemes, sentences), it computes a probability distribution over possible sequences of labels and chooses the best label sequence.

2.1) Brown corpus dataset

The Brown corpus is a common dataset in Natural Language Processing (NLP), it is available from the NLTK library and it supports POS tagged information. Run the cell below to download the dataset and the universal tagset.

# Brown dataset nltk.download('brown') nltk.download('universal_tagset')

First we need to get the dataset and split it into training and testing. Run the following cells to achieve that and to prepare the dataset in the correct format.

nltk_data = list(brown.tagged_sents(tagset='universal')) train_set,test_set = train_test_split(nltk_data, train_size=0.80, test_size=0.20, random_state=101 )
# create list of train and test tagged words train_tagged_words = [ tup for sent in train_set for tup in sent ] test_tagged_words = [ tup for sent in test_set for tup in sent ] print(f'Number of tagged words in the training set: {len(train_tagged_words)}') print(f'Number of tagged words in the test set: {len(test_tagged_words)}')

Let's explore the different types of tags by running the next cell.

tags = {tag for word, tag in train_tagged_words} print(f'Number of possible tags: {len(tags)}') print(f'Possible tags: {tags}')

The next cell, explores the structure of the dataset in terms of words and sentences.

print('Words example: {}'.format(train_tagged_words[0:5])) print('Sentence example: {}'.format(train_set[0]))

2.2) POS tagging model

Two of the main components of HMMs are the transition model and the emission model. You already got a taste of how the two models operate in the exercise 1. Your task now is two implement both models for POS tagging model.

Specifically, the transition model P(tagt+1∣tagt)P(tag_{t+1}|tag_t) will estimate the probability of the next tag given the current tag while the emission model P(word∣tagt)P(word|tag_t) will estimate the probability of observing a word given the current tag.

Note: the provided hints link to the ntlk.probability functions, however you are free to use any other libraries such as numpy.

2.2.1) Emission Model

Given the tagged words, the main steps are:

  1. Create a list of tuples (tag,wordtag, word) from the input words which will be utilised in step 2. (careful: words contains tuples of (word,tagword, tag))

  2. Calculate the frequency for each word given the corresponding tag. (hint: ConditionalFreqDist)

  3. Calculate the conditional probability distribution given the frequency distribution of step 2. (hint: ConditionalProbDist with probdist_factory=MLEProbDist)

Implement these steps in the cell below in a function emission_model that has one argument, words, containing a list of (word, tag) pairs.

# write your code here

2.2.2) Transition Model

Given the tagged sentences, the main steps are:

  1. Create chain of tuples (tagt,tagt+1tag_{t}, tag_{t+1}) to be passed to ConditionalFreqDist(). To achieve this efficiently:

    • Create a generator expression (similar to list comprehension but replacing [] with () which iterates over each sentence and for each sentence creates a pair of (tagt,tagt+1tag_{t}, tag_{t+1}) (if you are curious about generator expressions, you can read this and this)

    • Create a chain from the ouput of the generator (hint: itertools.chain.from_iterable())

  2. Calculate the frequency of the next tag given the previous tag. (hint: ConditionalFreqDist)

  3. Calculate the conditional probability distribution of the next tag given the previous tag. (hint: ConditionalProbDist)

Implement these steps in the cell below in a function transition_model that has one argument, sentences, containing the dataset of sentences.

# write your code here

Utilising the previously created models, the cell below trains the emission and transition models, the former using tagged words and the later on tagged sentences.

emission_p = emission_model(train_tagged_words) transition_p = transition_model(train_set)

Based on the Brown corpus, the trained HMM gives the following probabilities:

  • The probability of observing the world city given the tag NOUN.

  • The probability of the tag VERB given the tag NOUN.

p_city_NOUN = emission_p['NOUN'].prob('city') p_VERB_NOUN = transition_p['NOUN'].prob('VERB') print('p(city|NOUN) = ', p_city_NOUN) print('p(VERB|NOUN) = ', p_VERB_NOUN)

2.3) Viterbi algorithm

The goal of the Viterbi algorithm is to find the most likely sequence of hidden states for a given sequence of observations. Specifically, we will utilise the Viterbi algorithm to find the tags of a sequence of untagged sentences, known as decodingdecoding. The benefit of this algorithm comes from its ability to efficiently determine the most probable path amongst the exponentially many possibilities. Doing so, it can reduce the complexity from O(NT)O(N^T) to O(NT)O(NT) where NN is the total number of words and TT is the total number of tags.

Your task is to implement Viterbi algorithm and perform HMM decoding. Complete the missing to-do lines.

def viterbi(observed_seq, states, start_p, transition_p, emit_p): eps = 0.00000001 # Initialise the list, V, that will contain a dictionary for each element in the sequence. Each dictionary # has the possible states (tags) as keys and its values are a dictionary with the keys: # 'prob': the probability of the sequence until this point # 'prev': the previous state used for backtracking. V = [dict()] for state in states: # Compute the probability of each possible state for the first token in the sequence. V[0][state] = {"prob": start_p[state] * emit_p[state].prob(observed_seq[0]), "prev": None} # Run Viterbi for t > 0 for t in range(1, len(observed_seq)): V.append({}) for state in states: # In the forward pass, for each element of the observed_seq we want to store the maximum # probability for the each state as well as the previous state. This probability is the product of # the maximum transition probability and the emission probability. # First we calculate the max_transition_prob by going through all the combinations between states. # The next line just initialises the max_transition_prob: max_transition_prob = -1 for prev_state in states: # Next, we calculate each possible transition probability that results in tag_t = state. # The transition probability is given by the product between # the prob of the sequence until t-1 where tag_{t-1} = prev_state, # and the transition probability from prev_state to state. #to-do #update the maximum transition_prob accordingly if transition_prob > max_transition_prob: max_transition_prob = transition_prob prev_state_selected = prev_state # Calculate the max_prob given by the product of the maximum transition probability and the # emission probability . Remember to add eps to each probability before multiplying them together #to-do V[t][state] = {"prob": max_prob, "prev": prev_state_selected} most_likely_seq = [] max_prob = 0.0 previous = None # Get most probable final state before backtracking for state, data in V[-1].items(): if data["prob"] > max_prob: max_prob = data["prob"] best_state = state most_likely_seq.append(best_state) previous = best_state # Backtrack until the first observation for t in range(len(V) - 2, -1, -1): # add the most likely tag to the most_likely_seq. Remember: we are backtracking hence we need to # add before the last tag (hint: most_likely_seq.insert() ) # to-do previous = V[t + 1][previous]["prev"] # print ('The steps of states are ' + ' '.join(most_likely_seq) + ' with highest probability of %s' % max_prob) return list(zip(observed_seq, most_likely_seq))

Run the cell below to get n=10 random sentences as test data (untagged words).

import random, time # Let's test our Viterbi algorithm on a few sample sentences of test dataset random.seed(1234) #define a random seed to get same sentences when run multiple times n = 10 # n = 11468 # whole test set # choose random n numbers rndom = [random.randint(1,len(test_set)) for x in range(n)] # list of 10 sentences on which we test the model test_run = [test_set[i] for i in rndom] # list of tagged words test_run_base = [tup for sent in test_run for tup in sent] # list of untagged words test_tagged_words = [tup[0] for sent in test_run for tup in sent]

Run the cell below to check your implementation of the Viterbi algorithm on the test set and its POS tagging accuracy.

possible_tags = list(set([pair[1] for pair in train_tagged_words])) start_pr = {} for tag in possible_tags: start_pr[tag] = 1.0/len(possible_tags) start = time.time() predicted_tags = viterbi(test_tagged_words, possible_tags, start_pr, transition_p, emission_p) end = time.time() difference = end-start print("Time taken in seconds: ", difference) # accuracy predicted_correct = [i for i, j in zip(predicted_tags, test_run_base) if i == j] accuracy = len(predicted_correct)/len(predicted_tags) print('Viterbi Algorithm Accuracy: ', accuracy*100) # accuracy ~= 97 for only 10 sentences (136 words) # accuracy ~= 94.239 in 45 secs for all test data (234100 words)

Let's print the print the first 10 words and their predicted tags.

predicted_tags[0:10]

as well as the mispredicted tags:

for i, j in zip(predicted_tags, test_run_base): if i != j: print(i,j)

How well does the model predict? Can you make any observations regarding the wrongly-predicted tags?

3) Robot localisation -- Optional

Another application of HMM application is robot localisation, which is the focus of this exercise.

We have a robot which is deployed to Mars to collect some samples. However, during the landing phase, the connection got temporarily lost and we are not sure where exactly the robot is. Luckily, the map of the area is available and the robot is provided with a sensor that can detect rocky surface.

The aim is to use an HMM to localise the robot where the hidden variable zz is the position of the robot and the observation xx is given by the sensor measurement. The figure at the top of the lab sheet, taken from Bishop, shows the relation between hidden variables zz with the observable variables xx.

We will achieve this through the discrete case of the Bayes Filter which consists of mainly two steps: prediction step and measurement step. Your task will be to implement these two steps which is given by the following pseudocode.

First step: p‾k,t=∑ip(zk∣ut,zi)pi,t−1 \overline{p}_{k,t} = \sum_i p(z_k | u_t, z_i) p_{i, t-1} where:

  • p‾k,t\overline{p}_{k,t}: the predicted belief of the new state zkz_k at the current time tt based on the action (control utu_t).

  • pi,t−1p_{i, t-1}: the belief of state ziz_i at the previous time step t1t_1

  • p(zk∣ut,zi)p(z_k | u_t, z_i) : the transition probability of moving from state ziz_i to state zkz_k when taking action utu_t.

In the second step, the knowledge from the measurement model p(xt∣zk)p(x_t | z_k) is combined with the predicted belief p‾k,t\overline{p}_{k,t} to obtain the posterior belief pi,t−1p_{i, t-1} of each state. This posterior belief becomes the current belief for the next time step.

pi,t=ηp(xt∣zk)p‾k,tp_{i, t} = \eta p(x_t | z_k) \overline{p}_{k,t}

p(xt∣zk)p(x_t | z_k) is also known as the likelihood, i.e. how likely it is to see an observation given the robot is in location zkz_k. We need to get the likelihood before implementing the belief. Moreover η\eta is the normalising term such that the probability adds to 1.

We provide the implementation of both World and Robot class:

class World(): def __init__(self,start=(0,0),N=15): self.world = np.zeros((N,N)) self.world[0,:] = 1 self.world[-1,:] = 1 self.world[:,0] = 1 self.world[:,-1] = 1 self.world[5, 6:12] = 1 self.world[6:11, 9] = 1 self.world[7:10, 6] = 1 self.world[11, 4:10] = 1 self.N = N def get_world(self): return self.world def set_world(self,N): self.world = np.zeros((N,N)) def show_world(self): fig, ax = plt.subplots(figsize=(5,5)) ax.grid(which='major', axis='both', linestyle='-', color='k', linewidth=1) ax.set_xticks(np.arange(-.5, self.N, 1)); ax.set_yticks(np.arange(-.5, self.N, 1)); # ax.axis('off') im = ax.imshow(self.world) plt.colorbar(im) plt.show()

The robot has 4 possible actions: up, down, left and right. Each action will transition the robot in the new state with probability p_move. At each state the robot can measure whether the surface is rocky or normal by calling measurement() with the sensor's noise value of p_sense.

class Robot: def __init__(self, p_move, p_sense, world): self.location = np.random.randint(0,world.shape[0],2) self.p_move = p_move self.p_sense = p_sense self.world = world self.N = self.world.shape[0] self.possibleActions = ['U', 'D', 'L', 'R'] self.tracking_error = [] self.homing_error = [] # The robot transition are noisy with prob_move to follow the action and (1-p_move) of staying # in the same location def transition(self,action): if (np.random.rand() < self.p_move): if(action == 'U' ): self.location = (max(0,self.location[0]-1), self.location[1]) elif(action == 'R'): self.location = (self.location[0], min(self.N-1,self.location[1]+1)) elif(action == 'D'): self.location = (min(self.N-1,self.location[0]+1), self.location[1]) elif(action == 'L'): self.location = (self.location[0], max(0,self.location[1]-1)) return self.location # Measurement model which returns 1 when the surface is rocky and 0 otherwise. # As in transition model, the sensor can be noisy with p_sense of returning the correct value. def measurement(self): if (np.random.rand() < self.p_sense): return self.world[int(self.location[0]),int(self.location[1])] else: if self.world[int(self.location[0]),int(self.location[1])] == 0: return 1 else: return 0 # transition model of the robot assuming no noise def transition_no_noise(self,location,action): if(action == 'U' ): location = (max(0,location[0]-1), location[1]) elif(action == 'R'): location = (location[0], min(self.N-1,location[1]+1)) elif(action == 'D'): location = (min(self.N-1,location[0]+1), location[1]) elif(action == 'L'): location = (location[0], max(0,location[1]-1)) return location def visualise_true_position(self): x = self.world.copy() x[int(self.location[0]),int(self.location[1])] = 0.5 # x[int(self.home[0]),int(self.home[1])] = 0.7 return x

The map is discrete and represented through grids where each grid indicates a position. As displayed below, the yellow corresponds to the rocky surface while the purple corresponds to the normal surface. The sensor measurement will output a value of 1 for the rocky surface and a value of 0 for the normal surface.

world = World(15) world.show_world()

Run the following two cells to initialise the robot class and show the starting position.

p_move = .9 # original value 0.9 p_sense = .8 # original value 0.8 N = 15 # Initialise belief in position - uniform distribution over world state_belief = np.ones((N,N))/(N*N) robot = Robot(p_move,p_sense,world.get_world())
plt.imshow(robot.visualise_true_position());

The next cell provides helper functions to visualise the true robot position, the predicted belief, the likelihood and the posterior belief.

# helper function to visualise def show(robot, predicted_state_belief, likelihood, state_belief): plt.clf() plt.subplot(2,4,1) plt.cla() plt.imshow(robot.visualise_true_position()) plt.title('True position in world') plt.subplot(2,4,2) plt.cla() plt.imshow(predicted_state_belief) plt.title('Predicted belief given action') plt.subplot(2,4,3) plt.cla() plt.imshow(likelihood) plt.title('Likelihood') plt.subplot(2,4,4) plt.cla() plt.imshow(state_belief) plt.title('Posterior belief given measurement')

The next cell contains the main algorithm and your task is to fill the missing lines to-do (hint: check the provided pseudocode above).

# Run the discrete Bayes filter plt.figure(figsize=(15,9)) N = 15 steps = 100 # Create list of possible states in world (x,y coordinates) possible_states = np.array(np.meshgrid(np.linspace(0,N-1,N),np.linspace(0,N-1,N))).reshape(2,-1).T for step in range(steps): # Sample a random action a = np.random.choice(['U','D','L','R']) # Move the robot according to the action and sense the measurement state = robot.transition(a) measurement = robot.measurement() # Make prediction about state belief given action predicted_state_belief = np.zeros((N,N)) for state in possible_states.astype(int): new_state = robot.transition_no_noise(state,a) # to-do # to-do # Evaluate sensor belief for each possible state likelihood = np.zeros((N,N)) for state in possible_states.astype(int): match = measurement == world.get_world()[state[0],state[1]] # to-do # Compute posterior belief over possible states # to-do # normalising state_belief = (state_belief+1e-15) state_belief = state_belief/np.sum(state_belief) # Make a function to display show(robot, predicted_state_belief, likelihood, state_belief) display.clear_output(wait=True) display.display(plt.gcf())

Change the p_move and p_sense values and re-run the experiment with different values. How these probabilities affect the performance?

Wrap-up

Congratulations, you have reached the end of this lab. Let's summarise what we have learnt:

  • Understand the role of the HMM components

  • Key applications of HMM such as POS tagger

  • Perform POS tagging through the transition model and the emission model.

  • Implemented the Viterbi algorithm and the discrete Bayes filter algorithm.

References

  • COMS30035 Machine Learning

  • Bishop - Pattern Recognition and Machine Learning: Chapter 13 - Sequential Data

  • Probabilistic Robotics

  • University of Edinburgh's Foundations of Natural Language Processing (FNLP) and Decision Making in Robots and Autonomous Agents (DMR) courses

  • Hussain Mujtaba

  • Neuromatch Academy 2020

The End 😃