Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemath
GitHub Repository: sagemath/sagelib
Path: blob/master/sage/stats/hmm/chmm.pyx
4100 views
1
"""
2
Continuous Emission Hidden Markov Models
3
4
AUTHOR:
5
6
- William Stein, 2010-03
7
"""
8
9
#############################################################################
10
# Copyright (C) 2010 William Stein <[email protected]>
11
# Distributed under the terms of the GNU General Public License (GPL)
12
# The full text of the GPL is available at:
13
# http://www.gnu.org/licenses/
14
#############################################################################
15
16
include "../../ext/stdsage.pxi"
17
include "../../ext/cdefs.pxi"
18
include "../../ext/interrupt.pxi"
19
20
cdef extern from "math.h":
21
double log(double)
22
double sqrt(double)
23
double exp(double)
24
int isnormal(double)
25
int isfinite(double)
26
27
import math
28
29
from sage.misc.flatten import flatten
30
from sage.matrix.all import is_Matrix
31
32
cdef double sqrt2pi = sqrt(2*math.pi)
33
cdef double NaN = 1.0/0.0
34
35
from sage.finance.time_series cimport TimeSeries
36
from sage.stats.intlist cimport IntList
37
38
from hmm cimport HiddenMarkovModel
39
from util cimport HMM_Util
40
from distributions cimport GaussianMixtureDistribution
41
42
cdef HMM_Util util = HMM_Util()
43
44
from sage.misc.randstate cimport current_randstate, randstate
45
46
47
# TODO: DELETE THIS FUNCTION WHEN MOVE Gaussian stuff to distributions.pyx!!! (next version)
48
cdef double random_normal(double mean, double std, randstate rstate):
49
"""
50
Return a number chosen randomly with given mean and standard deviation.
51
52
INPUT:
53
54
- ``mean`` -- double
55
- ``std`` -- double, standard deviation
56
- ``rstate`` -- a randstate object
57
58
OUTPUT:
59
60
- a double
61
"""
62
# Ported from http://users.tkk.fi/~nbeijar/soft/terrain/source_o2/boxmuller.c
63
# This the box muller algorithm.
64
# Client code can get the current random state from:
65
# cdef randstate rstate = current_randstate()
66
67
cdef double x1, x2, w, y1, y2
68
while True:
69
x1 = 2*rstate.c_rand_double() - 1
70
x2 = 2*rstate.c_rand_double() - 1
71
w = x1*x1 + x2*x2
72
if w < 1: break
73
w = sqrt( (-2*log(w))/w )
74
y1 = x1 * w
75
return mean + y1*std
76
77
cdef class GaussianHiddenMarkovModel(HiddenMarkovModel):
78
"""
79
GaussianHiddenMarkovModel(A, B, pi)
80
81
Gaussian emissions Hidden Markov Model.
82
83
INPUT:
84
85
- ``A`` -- matrix; the N x N transition matrix
86
- ``B`` -- list of pairs (mu,sigma) that define the distributions
87
- ``pi`` -- initial state probabilities
88
- ``normalize`` --bool (default: True)
89
90
EXAMPLES:
91
92
We illustrate the primary functions with an example 2-state Gaussian HMM::
93
94
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,1), (-1,1)], [.5,.5]); m
95
Gaussian Hidden Markov Model with 2 States
96
Transition matrix:
97
[0.1 0.9]
98
[0.5 0.5]
99
Emission parameters:
100
[(1.0, 1.0), (-1.0, 1.0)]
101
Initial probabilities: [0.5000, 0.5000]
102
103
We query the defining transition matrix, emission parameters, and
104
initial state probabilities::
105
106
sage: m.transition_matrix()
107
[0.1 0.9]
108
[0.5 0.5]
109
sage: m.emission_parameters()
110
[(1.0, 1.0), (-1.0, 1.0)]
111
sage: m.initial_probabilities()
112
[0.5000, 0.5000]
113
114
We obtain a sample sequence with 10 entries in it, and compute the
115
logarithm of the probability of obtaining his sequence, given the
116
model::
117
118
sage: obs = m.sample(10); obs
119
[-1.6835, 0.0635, -2.1688, 0.3043, -0.3188, -0.7835, 1.0398, -1.3558, 1.0882, 0.4050]
120
sage: m.log_likelihood(obs)
121
-15.2262338077988...
122
123
We compute the Viterbi path, and probability that the given path
124
of states produced obs::
125
126
sage: m.viterbi(obs)
127
([1, 0, 1, 0, 1, 1, 0, 1, 0, 1], -16.67738270170788)
128
129
We use the Baum-Welch iterative algorithm to find another model
130
for which our observation sequence is more likely::
131
132
sage: m.baum_welch(obs)
133
(-10.6103334957397..., 14)
134
sage: m.log_likelihood(obs)
135
-10.6103334957397...
136
137
Notice that running Baum-Welch changed our model::
138
139
sage: m
140
Gaussian Hidden Markov Model with 2 States
141
Transition matrix:
142
[ 0.415498136619 0.584501863381]
143
[ 0.999999317425 6.82574625899e-07]
144
Emission parameters:
145
[(0.417888242712, 0.517310966436), (-1.50252086313, 0.508551283606)]
146
Initial probabilities: [0.0000, 1.0000]
147
"""
148
cdef TimeSeries B, prob
149
cdef int n_out
150
151
def __init__(self, A, B, pi, bint normalize=True):
152
"""
153
Create a Gaussian emissions HMM with transition probability
154
matrix A, normal emissions given by B, and initial state
155
probability distribution pi.
156
157
INPUT::
158
159
- A -- a list of lists or a square N x N matrix, whose
160
(i,j) entry gives the probability of transitioning from
161
state i to state j.
162
163
- B -- a list of N pairs (mu,std), where if B[i]=(mu,std),
164
then the probability distribution associated with state i
165
normal with mean mu and standard deviation std.
166
167
- pi -- the probabilities of starting in each initial
168
state, i.e,. pi[i] is the probability of starting in
169
state i.
170
171
- normalize --bool (default: True); if given, input is
172
normalized to define valid probability distributions,
173
e.g., the entries of A are made nonnegative and the rows
174
sum to 1.
175
176
EXAMPLES::
177
178
179
sage: hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,1), (-1,1)], [.5,.5])
180
Gaussian Hidden Markov Model with 2 States
181
Transition matrix:
182
[0.1 0.9]
183
[0.5 0.5]
184
Emission parameters:
185
[(1.0, 1.0), (-1.0, 1.0)]
186
Initial probabilities: [0.5000, 0.5000]
187
188
We input a model in which both A and pi have to be
189
renormalized to define valid probability distributions::
190
191
sage: hmm.GaussianHiddenMarkovModel([[-1,.7],[.3,.4]], [(1,1), (-1,1)], [-1,.3])
192
Gaussian Hidden Markov Model with 2 States
193
Transition matrix:
194
[ 0.0 1.0]
195
[0.428571428571 0.571428571429]
196
Emission parameters:
197
[(1.0, 1.0), (-1.0, 1.0)]
198
Initial probabilities: [0.0000, 1.0000]
199
200
Bad things can happen::
201
202
sage: hmm.GaussianHiddenMarkovModel([[-1,.7],[.3,.4]], [(1,1), (-1,1)], [-1,.3], normalize=False)
203
Gaussian Hidden Markov Model with 2 States
204
Transition matrix:
205
[-1.0 0.7]
206
[ 0.3 0.4]
207
...
208
"""
209
self.pi = util.initial_probs_to_TimeSeries(pi, normalize)
210
self.N = len(self.pi)
211
self.A = util.state_matrix_to_TimeSeries(A, self.N, normalize)
212
213
# B should be a matrix of N rows, with column 0 the mean and 1
214
# the standard deviation.
215
if is_Matrix(B):
216
B = B.list()
217
else:
218
B = flatten(B)
219
self.B = TimeSeries(B)
220
self.probability_init()
221
222
def __cmp__(self, other):
223
"""
224
Compare self and other, which must both be GaussianHiddenMarkovModel's.
225
226
EXAMPLES::
227
228
sage: m = hmm.GaussianHiddenMarkovModel([[1]], [(0,1)], [1])
229
sage: n = hmm.GaussianHiddenMarkovModel([[1]], [(1,1)], [1])
230
sage: m < n
231
True
232
sage: m == m
233
True
234
sage: n > m
235
True
236
sage: n < m
237
False
238
"""
239
if not isinstance(other, GaussianHiddenMarkovModel):
240
raise ValueError
241
return cmp(self.__reduce__()[1], other.__reduce__()[1])
242
243
def __getitem__(self, Py_ssize_t i):
244
"""
245
Return the mean and standard distribution for the i-th state.
246
247
INPUT:
248
249
- i -- integer
250
251
OUTPUT:
252
253
- 2 floats
254
255
EXAMPLES::
256
257
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-2,.3)], [.5,.5])
258
sage: m[0]
259
(1.0, 0.5)
260
sage: m[1]
261
(-2.0, 0.3)
262
sage: m[-1]
263
(-2.0, 0.3)
264
sage: m[3]
265
Traceback (most recent call last):
266
...
267
IndexError: index out of range
268
sage: m[-3]
269
Traceback (most recent call last):
270
...
271
IndexError: index out of range
272
"""
273
if i < 0:
274
i += self.N
275
if i < 0 or i >= self.N:
276
raise IndexError, 'index out of range'
277
278
# TODO: change to be a normal distribution class (next version)
279
return self.B[2*i], self.B[2*i+1]
280
281
def __reduce__(self):
282
"""
283
Used in pickling.
284
285
EXAMPLES::
286
287
sage: G = hmm.GaussianHiddenMarkovModel([[1]], [(0,1)], [1])
288
sage: loads(dumps(G)) == G
289
True
290
"""
291
return unpickle_gaussian_hmm_v1, \
292
(self.A, self.B, self.pi, self.prob, self.n_out)
293
294
def emission_parameters(self):
295
"""
296
Return the parameters that define the normal distributions
297
associated to all of the states.
298
299
OUTPUT:
300
301
- a list B of pairs B[i] = (mu, std), such that the
302
distribution associated to state i is normal with mean
303
mu and standard deviation std.
304
305
EXAMPLES::
306
307
sage: hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9]).emission_parameters()
308
[(1.0, 0.5), (-1.0, 3.0)]
309
"""
310
cdef Py_ssize_t i
311
from sage.rings.all import RDF
312
return [(RDF(self.B[2*i]),RDF(self.B[2*i+1])) for i in range(self.N)]
313
314
def __repr__(self):
315
"""
316
Return string representation.
317
318
EXAMPLES::
319
320
sage: hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9]).__repr__()
321
'Gaussian Hidden Markov Model with 2 States\nTransition matrix:\n[0.1 0.9]\n[0.5 0.5]\nEmission parameters:\n[(1.0, 0.5), (-1.0, 3.0)]\nInitial probabilities: [0.1000, 0.9000]'
322
"""
323
s = "Gaussian Hidden Markov Model with %s States"%self.N
324
s += '\nTransition matrix:\n%s'%self.transition_matrix()
325
s += '\nEmission parameters:\n%s'%self.emission_parameters()
326
s += '\nInitial probabilities: %s'%self.initial_probabilities()
327
return s
328
329
330
def generate_sequence(self, Py_ssize_t length, starting_state=None):
331
"""
332
Return a sample of the given length from this HMM.
333
334
INPUT:
335
336
- length -- positive integer
337
- starting_state -- int (or None); if specified then generate
338
a sequence using this model starting with the given state
339
instead of the initial probabilities to determine the
340
starting state.
341
342
OUTPUT:
343
344
- an IntList or list of emission symbols
345
- TimeSeries of emissions
346
347
EXAMPLES::
348
349
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9])
350
sage: m.generate_sequence(5)
351
([-3.0505, 0.5317, -4.5065, 0.6521, 1.0435], [1, 0, 1, 0, 1])
352
sage: m.generate_sequence(0)
353
([], [])
354
sage: m.generate_sequence(-1)
355
Traceback (most recent call last):
356
...
357
ValueError: length must be nonnegative
358
359
Example in which the starting state is 0 (see trac 11452)::
360
361
sage: set_random_seed(23); m.generate_sequence(2)
362
([0.6501, -2.0151], [0, 1])
363
364
Force a starting state of 1 even though as we saw above it would be 0::
365
366
sage: set_random_seed(23); m.generate_sequence(2, starting_state=1)
367
([-3.1491, -1.0244], [1, 1])
368
369
Verify numerically that the starting state is 0 with probability about 0.1::
370
371
sage: set_random_seed(0)
372
sage: v = [m.generate_sequence(1)[1][0] for i in range(10^5)]
373
sage: 1.0 * v.count(int(0)) / len(v)
374
0.0998200000000000
375
"""
376
if length < 0:
377
raise ValueError, "length must be nonnegative"
378
379
# Create Integer lists for states and observations
380
cdef IntList states = IntList(length)
381
cdef TimeSeries obs = TimeSeries(length)
382
if length == 0:
383
return states, obs
384
385
# Setup variables, including random state.
386
cdef Py_ssize_t i, j
387
cdef randstate rstate = current_randstate()
388
cdef int q = 0
389
cdef double r, accum
390
391
# Choose the starting state.
392
# See the remark in hmm.pyx about how this should get
393
# replaced by some general fast discrete distribution code.
394
if starting_state is None:
395
r = rstate.c_rand_double()
396
accum = 0
397
for i in range(self.N):
398
if r < self.pi._values[i] + accum:
399
q = i
400
break
401
else:
402
accum += self.pi._values[i]
403
else:
404
q = starting_state
405
if q < 0 or q>= self.N:
406
raise ValueError, "starting state must be between 0 and %s"%(self.N-1)
407
408
states._values[0] = q
409
obs._values[0] = self.random_sample(q, rstate)
410
411
cdef double* row
412
cdef int O
413
sig_on()
414
for i in range(1, length):
415
accum = 0
416
row = self.A._values + q*self.N
417
r = rstate.c_rand_double()
418
for j in range(self.N):
419
if r < row[j] + accum:
420
q = j
421
break
422
else:
423
accum += row[j]
424
states._values[i] = q
425
obs._values[i] = self.random_sample(q, rstate)
426
sig_off()
427
428
return obs, states
429
430
cdef probability_init(self):
431
"""
432
Used internally to compute caching information that makes
433
certain computations in the Baum-Welch algorithm faster. This
434
function has no input or output.
435
"""
436
self.prob = TimeSeries(2*self.N)
437
cdef int i
438
for i in range(self.N):
439
self.prob[2*i] = 1.0/(sqrt2pi*self.B[2*i+1])
440
self.prob[2*i+1] = -1.0/(2*self.B[2*i+1]*self.B[2*i+1])
441
442
cdef double random_sample(self, int state, randstate rstate):
443
"""
444
Return a random sample from the normal distribution associated
445
to the given state.
446
447
This is only used internally, and no bounds or other error
448
checking is done, so calling this improperly can lead to seg
449
faults.
450
451
INPUT:
452
453
- state -- integer
454
- rstate -- randstate instance
455
456
OUTPUT:
457
458
- double
459
"""
460
return random_normal(self.B._values[state*2], self.B._values[state*2+1], rstate)
461
462
cdef double probability_of(self, int state, double observation):
463
"""
464
Return a useful continuous analogue of "the probability b_j(o)"
465
of seeing the given observation given that we're in the given
466
state j (=state).
467
468
The distribution is a normal distribution, and we're asking
469
about the probability of a particular point being observed;
470
the probability of a particular point is 0, which is not
471
useful. Thus we instead consider the limit p = prob([o,o+d])/d
472
as d goes to 0. There is a simple closed form formula for p,
473
derived in the source code. Note that p can be bigger than 1;
474
for example, if we set observation=mean in the closed formula
475
we get p=1/(sqrt(2*pi)*std), so p>1 when std<1/sqrt(2*pi).
476
477
INPUT:
478
479
- state -- integer
480
- observation -- double
481
482
OUTPUT:
483
484
- double
485
"""
486
# The code below is an optimized version of the following code:
487
# cdef double mean = self.B._values[2*state], \
488
# std = self.B._values[2*state+1]
489
# return 1/(sqrt2pi*std) * \
490
# exp(-(observation-mean)*(observation-mean)/(2*std*std))
491
#
492
# Here is how to use Sage to verify that the above formula computes
493
# the limit claimed above:
494
#
495
# var('x,d,obs,mean,std')
496
# n = 1/sqrt(2*pi*std^2) * exp(-(x-mean)^2/(2*std^2))
497
# assume(std>0); assume(d>0)
498
# m = n.integrate(x,obs,obs+d)/d
499
# p = SR(m.limit(d=0).simplify_full())
500
# q = 1/(sqrt(2*pi)*std) * exp(-(obs-mean)*(obs-mean)/(2*std*std))
501
# bool(p==q) # outputs True
502
503
cdef double x = observation - self.B._values[2*state] # observation - mean
504
return self.prob._values[2*state] * exp(x*x*self.prob._values[2*state+1])
505
506
def log_likelihood(self, obs):
507
"""
508
Return the logarithm of a continuous analogue of the
509
probability that this model produced the given observation
510
sequence.
511
512
Note that the "continuous analogue of the probability" above can
513
be bigger than 1, hence the logarithm can be positive.
514
515
INPUT:
516
517
- obs -- sequence of observations
518
519
OUTPUT:
520
521
- float
522
523
EXAMPLES::
524
525
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9])
526
sage: m.log_likelihood([1,1,1])
527
-4.297880766072486
528
sage: set_random_seed(0); s = m.sample(20)
529
sage: m.log_likelihood(s)
530
-40.115714129484...
531
"""
532
if len(obs) == 0:
533
return 1.0
534
if not isinstance(obs, TimeSeries):
535
obs = TimeSeries(obs)
536
return self._forward_scale(obs)
537
538
def _forward_scale(self, TimeSeries obs):
539
"""
540
Memory-efficient implementation of the forward algorithm (with scaling).
541
542
INPUT:
543
544
- obs -- an integer list of observation states.
545
546
OUTPUT:
547
548
- float -- the log of the probability that the model
549
produced this sequence
550
551
EXAMPLES::
552
553
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9])
554
sage: m._forward_scale(stats.TimeSeries([1,-1,-1,1]))
555
-7.641988207069133
556
"""
557
cdef Py_ssize_t i, j, t, T = len(obs)
558
559
# The running sum of the log probabilities
560
cdef double log_probability = 0, sum, a
561
562
cdef TimeSeries alpha = TimeSeries(self.N), \
563
alpha2 = TimeSeries(self.N)
564
565
# Initialization
566
sum = 0
567
for i in range(self.N):
568
a = self.pi[i] * self.probability_of(i, obs._values[0])
569
alpha[i] = a
570
sum += a
571
572
log_probability = log(sum)
573
alpha.rescale(1/sum)
574
575
# Induction
576
cdef double s
577
for t in range(1, T):
578
sum = 0
579
for j in range(self.N):
580
s = 0
581
for i in range(self.N):
582
s += alpha._values[i] * self.A._values[i*self.N + j]
583
a = s * self.probability_of(j, obs._values[t])
584
alpha2._values[j] = a
585
sum += a
586
587
log_probability += log(sum)
588
for j in range(self.N):
589
alpha._values[j] = alpha2._values[j] / sum
590
591
# Termination
592
return log_probability
593
594
def viterbi(self, obs):
595
"""
596
Determine "the" hidden sequence of states that is most likely
597
to produce the given sequence seq of observations, along with
598
the probability that this hidden sequence actually produced
599
the observation.
600
601
INPUT:
602
603
- seq -- sequence of emitted ints or symbols
604
605
OUTPUT:
606
607
- list -- "the" most probable sequence of hidden states, i.e.,
608
the Viterbi path.
609
610
- float -- log of probability that the observed sequence
611
was produced by the Viterbi sequence of states.
612
613
EXAMPLES:
614
615
We find the optimal state sequence for a given model::
616
617
sage: m = hmm.GaussianHiddenMarkovModel([[0.5,0.5],[0.5,0.5]], [(0,1),(10,1)], [0.5,0.5])
618
sage: m.viterbi([0,1,10,10,1])
619
([0, 0, 1, 1, 0], -9.0604285688230...)
620
621
Another example in which the most likely states change based
622
on the last observation::
623
624
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9])
625
sage: m.viterbi([-2,-1,.1,0.1])
626
([1, 1, 0, 1], -9.61823698847639...)
627
sage: m.viterbi([-2,-1,.1,0.3])
628
([1, 1, 1, 0], -9.566023653378513)
629
"""
630
cdef TimeSeries _obs
631
if not isinstance(obs, TimeSeries):
632
_obs = TimeSeries(obs)
633
else:
634
_obs = obs
635
636
# The algorithm is the same as _viterbi above, except
637
# we take the logarithms of everything first, and add
638
# instead of multiply.
639
cdef Py_ssize_t t, T = _obs._length
640
cdef IntList state_sequence = IntList(T)
641
if T == 0:
642
return state_sequence, 0.0
643
644
cdef int i, j, N = self.N
645
646
# delta[i] is the maximum of the probabilities over all
647
# paths ending in state i.
648
cdef TimeSeries delta = TimeSeries(N), delta_prev = TimeSeries(N)
649
650
# We view psi as an N x T matrix of ints. The quantity
651
# psi[N*t + j]
652
# is a most probable hidden state at time t-1, given
653
# that j is a most probable state at time j.
654
cdef IntList psi = IntList(N * T) # initialized to 0 by default
655
656
# Log Preprocessing
657
cdef TimeSeries A = self.A.log()
658
cdef TimeSeries pi = self.pi.log()
659
660
# Initialization:
661
for i in range(N):
662
delta._values[i] = pi._values[i] + log(self.probability_of(i, _obs._values[0]))
663
664
# Recursion:
665
cdef double mx, tmp, minus_inf = float('-inf')
666
cdef int index
667
668
for t in range(1, T):
669
delta_prev, delta = delta, delta_prev
670
for j in range(N):
671
# Compute delta_t[j] = max_i(delta_{t-1}(i) a_{i,j}) * b_j(_obs[t])
672
mx = minus_inf
673
index = -1
674
for i in range(N):
675
tmp = delta_prev._values[i] + A._values[i*N+j]
676
if tmp > mx:
677
mx = tmp
678
index = i
679
delta._values[j] = mx + log(self.probability_of(j, _obs._values[t]))
680
psi._values[N*t + j] = index
681
682
# Termination:
683
mx, index = delta.max(index=True)
684
685
# Path (state sequence) backtracking:
686
state_sequence._values[T-1] = index
687
t = T-2
688
while t >= 0:
689
state_sequence._values[t] = psi._values[N*(t+1) + state_sequence._values[t+1]]
690
t -= 1
691
692
return state_sequence, mx
693
694
cdef TimeSeries _backward_scale_all(self, TimeSeries obs, TimeSeries scale):
695
"""
696
This function returns the matrix beta_t(i), and is used
697
internally as part of the Baum-Welch algorithm.
698
699
The quantity beta_t(i) is the probability of observing the
700
sequence obs[t+1:] assuming that the model is in state i at
701
time t.
702
703
INPUT:
704
705
- obs -- TimeSeries
706
- scale -- TimeSeries
707
708
OUTPUT:
709
710
- TimeSeries beta such that beta_t(i) = beta[t*N + i]
711
- scale is also changed by this function
712
"""
713
cdef Py_ssize_t t, T = obs._length
714
cdef int N = self.N, i, j
715
cdef double s
716
cdef TimeSeries beta = TimeSeries(N*T, initialize=False)
717
718
# 1. Initialization
719
for i in range(N):
720
beta._values[(T-1)*N + i] = 1 / scale._values[T-1]
721
722
# 2. Induction
723
t = T-2
724
while t >= 0:
725
for i in range(N):
726
s = 0
727
for j in range(N):
728
s += self.A._values[i*N+j] * \
729
self.probability_of(j, obs._values[t+1]) * beta._values[(t+1)*N+j]
730
beta._values[t*N + i] = s/scale._values[t]
731
t -= 1
732
return beta
733
734
cdef _forward_scale_all(self, TimeSeries obs):
735
"""
736
Return scaled values alpha_t(i), the sequence of scalings, and
737
the log probability.
738
739
The quantity alpha_t(i) is the probability of observing the
740
sequence obs[:t+1] assuming that the model is in state i at
741
time t.
742
743
INPUT:
744
745
- obs -- TimeSeries
746
747
OUTPUT:
748
749
- TimeSeries alpha with alpha_t(i) = alpha[t*N + i]
750
- TimeSeries scale with scale[t] the scaling at step t
751
- float -- log_probability of the observation sequence
752
being produced by the model.
753
"""
754
cdef Py_ssize_t i, j, t, T = len(obs)
755
cdef int N = self.N
756
757
# The running some of the log probabilities
758
cdef double log_probability = 0, sum, a
759
760
cdef TimeSeries alpha = TimeSeries(N*T, initialize=False)
761
cdef TimeSeries scale = TimeSeries(T, initialize=False)
762
763
# Initialization
764
sum = 0
765
for i in range(self.N):
766
a = self.pi._values[i] * self.probability_of(i, obs._values[0])
767
alpha._values[0*N + i] = a
768
sum += a
769
770
scale._values[0] = sum
771
log_probability = log(sum)
772
for i in range(self.N):
773
alpha._values[0*N + i] /= sum
774
775
# Induction
776
# The code below is just an optimized version of:
777
# alpha = (alpha * A).pairwise_product(B[O[t+1]])
778
# alpha = alpha.scale(1/alpha.sum())
779
# along with keeping track of the log of the scaling factor.
780
cdef double s
781
for t in range(1,T):
782
sum = 0
783
for j in range(N):
784
s = 0
785
for i in range(N):
786
s += alpha._values[(t-1)*N + i] * self.A._values[i*N + j]
787
a = s * self.probability_of(j, obs._values[t])
788
alpha._values[t*N + j] = a
789
sum += a
790
791
log_probability += log(sum)
792
scale._values[t] = sum
793
for j in range(self.N):
794
alpha._values[t*N + j] /= sum
795
796
# Termination
797
return alpha, scale, log_probability
798
799
cdef TimeSeries _baum_welch_xi(self, TimeSeries alpha, TimeSeries beta, TimeSeries obs):
800
"""
801
Used internally to compute the scaled quantity xi_t(i,j)
802
appearing in the Baum-Welch reestimation algorithm.
803
804
INPUT:
805
806
- alpha -- TimeSeries as output by the scaled forward algorithm
807
- beta -- TimeSeries as output by the scaled backward algorithm
808
- obs -- TimeSeries of observations
809
810
OUTPUT:
811
812
- TimeSeries xi such that xi[t*N*N + i*N + j] = xi_t(i,j).
813
"""
814
cdef int i, j, N = self.N
815
cdef double sum
816
cdef Py_ssize_t t, T = alpha._length//N
817
cdef TimeSeries xi = TimeSeries(T*N*N, initialize=False)
818
for t in range(T-1):
819
sum = 0.0
820
for i in range(N):
821
for j in range(N):
822
xi._values[t*N*N+i*N+j] = alpha._values[t*N+i]*beta._values[(t+1)*N+j]*\
823
self.A._values[i*N+j] * self.probability_of(j, obs._values[t+1])
824
sum += xi._values[t*N*N+i*N+j]
825
for i in range(N):
826
for j in range(N):
827
xi._values[t*N*N+i*N+j] /= sum
828
return xi
829
830
def baum_welch(self, obs, int max_iter=500, double log_likelihood_cutoff=1e-4,
831
double min_sd=0.01, bint fix_emissions=False, bint v=False):
832
"""
833
Given an observation sequence obs, improve this HMM using the
834
Baum-Welch algorithm to increase the probability of observing obs.
835
836
INPUT:
837
838
- obs -- a time series of emissions
839
840
- max_iter -- integer (default: 500) maximum number
841
of Baum-Welch steps to take
842
843
- log_likelihood_cutoff -- positive float (default: 1e-4);
844
the minimal improvement in likelihood with respect to
845
the last iteration required to continue. Relative value
846
to log likelihood.
847
848
- min_sd -- positive float (default: 0.01); when
849
reestimating, the standard deviation of emissions is not
850
allowed to be less than min_sd.
851
852
- fix_emissions -- bool (default: False); if True, do not
853
change emissions when updating
854
855
OUTPUT:
856
857
- changes the model in places, and returns the log
858
likelihood and number of iterations.
859
860
EXAMPLES::
861
862
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9])
863
sage: m.log_likelihood([-2,-1,.1,0.1])
864
-8.858282215986275
865
sage: m.baum_welch([-2,-1,.1,0.1])
866
(4.534646052182..., 7)
867
sage: m.log_likelihood([-2,-1,.1,0.1])
868
4.534646052182...
869
sage: m
870
Gaussian Hidden Markov Model with 2 States
871
Transition matrix:
872
[ 0.999999999243 7.56983939444e-10]
873
[ 0.499984627912 0.500015372088]
874
Emission parameters:
875
[(0.1, 0.01), (-1.49995081476, 0.50007105049)]
876
Initial probabilities: [0.0000, 1.0000]
877
878
We illustrate bounding the standard deviation below. Note that above we had
879
different emission parameters when the min_sd was the default of 0.01::
880
881
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9])
882
sage: m.baum_welch([-2,-1,.1,0.1], min_sd=1)
883
(-4.07939572755..., 32)
884
sage: m.emission_parameters()
885
[(-0.2663018798..., 1.0), (-1.99850979..., 1.0)]
886
887
We watch the log likelihoods of the model converge, step by step::
888
889
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9])
890
sage: v = m.sample(10)
891
sage: stats.TimeSeries([m.baum_welch(v,max_iter=1)[0] for _ in range(len(v))])
892
[-20.1167, -17.7611, -16.9814, -16.9364, -16.9314, -16.9309, -16.9309, -16.9309, -16.9309, -16.9309]
893
894
We illustrate fixing emissions::
895
896
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.9,.1]], [(1,2),(-1,.5)], [.3,.7])
897
sage: set_random_seed(0); v = m.sample(100)
898
sage: m.baum_welch(v,fix_emissions=True)
899
(-164.72944548204..., 23)
900
sage: m.emission_parameters()
901
[(1.0, 2.0), (-1.0, 0.5)]
902
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.9,.1]], [(1,2),(-1,.5)], [.3,.7])
903
sage: m.baum_welch(v)
904
(-162.854370397998..., 49)
905
sage: m.emission_parameters()
906
[(1.27224191726, 2.37136875176), (-0.948617467518, 0.576236038512)]
907
"""
908
if not isinstance(obs, TimeSeries):
909
obs = TimeSeries(obs)
910
cdef TimeSeries _obs = obs
911
cdef TimeSeries alpha, beta, scale, gamma, xi
912
cdef double log_probability, log_probability0, log_probability_prev, delta
913
cdef int i, j, k, N, n_iterations
914
cdef Py_ssize_t t, T
915
cdef double denominator_A, numerator_A, denominator_B, numerator_mean, numerator_std
916
917
# Initialization
918
alpha, scale, log_probability0 = self._forward_scale_all(_obs)
919
if not isfinite(log_probability0):
920
return (0.0, 0)
921
log_probability = log_probability0
922
beta = self._backward_scale_all(_obs, scale)
923
gamma = self._baum_welch_gamma(alpha, beta)
924
xi = self._baum_welch_xi(alpha, beta, _obs)
925
log_probability_prev = log_probability
926
N = self.N
927
n_iterations = 0
928
T = len(_obs)
929
930
# Re-estimation
931
while True:
932
# Reestimate
933
for i in range(N):
934
if not isfinite(gamma._values[0*N+i]):
935
# Before raising an error, leave self in a valid state.
936
util.normalize_probability_TimeSeries(self.pi, 0, self.pi._length)
937
raise RuntimeError, "impossible to compute gamma during reestimation"
938
self.pi._values[i] = gamma._values[0*N+i]
939
940
# Update the probabilities pi to define a valid discrete distribution
941
util.normalize_probability_TimeSeries(self.pi, 0, self.pi._length)
942
943
# Reestimate transition matrix and emission probabilities in
944
# each state.
945
for i in range(N):
946
# Compute the updated transition matrix
947
denominator_A = 0.0
948
for t in range(T-1):
949
denominator_A += gamma._values[t*N+i]
950
if not isnormal(denominator_A):
951
raise RuntimeError, "unable to re-estimate transition matrix"
952
for j in range(N):
953
numerator_A = 0.0
954
for t in range(T-1):
955
numerator_A += xi._values[t*N*N+i*N+j]
956
self.A._values[i*N+j] = numerator_A / denominator_A
957
958
# Rescale the i-th row of the transition matrix to be
959
# a valid stochastic matrix:
960
util.normalize_probability_TimeSeries(self.A, i*N, (i+1)*N)
961
962
if not fix_emissions:
963
denominator_B = denominator_A + gamma._values[(T-1)*N + i]
964
if not isnormal(denominator_B):
965
raise RuntimeError, "unable to re-estimate emission probabilities"
966
967
numerator_mean = 0.0
968
numerator_std = 0.0
969
for t in range(T):
970
numerator_mean += gamma._values[t*N + i] * _obs._values[t]
971
numerator_std += gamma._values[t*N + i] * \
972
(_obs._values[t] - self.B._values[2*i])*(_obs._values[t] - self.B._values[2*i])
973
# restimated mean
974
self.B._values[2*i] = numerator_mean / denominator_B
975
# restimated standard deviation
976
self.B._values[2*i+1] = sqrt(numerator_std / denominator_B)
977
if self.B._values[2*i+1] < min_sd:
978
self.B._values[2*i+1] = min_sd
979
self.probability_init()
980
981
n_iterations += 1
982
if n_iterations >= max_iter: break
983
984
# Initialization for next iteration
985
alpha, scale, log_probability0 = self._forward_scale_all(_obs)
986
987
if not isfinite(log_probability0): break
988
log_probability = log_probability0
989
beta = self._backward_scale_all(_obs, scale)
990
gamma = self._baum_welch_gamma(alpha, beta)
991
xi = self._baum_welch_xi(alpha, beta, _obs)
992
993
# Compute the difference between the log probability of
994
# two iterations.
995
delta = log_probability - log_probability_prev
996
log_probability_prev = log_probability
997
998
# If the log probability does not change by more than delta,
999
# then terminate
1000
if delta >= 0 and delta <= log_likelihood_cutoff:
1001
break
1002
1003
return log_probability, n_iterations
1004
1005
1006
cdef class GaussianMixtureHiddenMarkovModel(GaussianHiddenMarkovModel):
1007
"""
1008
GaussianMixtureHiddenMarkovModel(A, B, pi)
1009
1010
Gaussian mixture Hidden Markov Model.
1011
1012
INPUT:
1013
1014
- ``A`` -- matrix; the N x N transition matrix
1015
1016
- ``B`` -- list of mixture definitions for each state. Each
1017
state may have a varying number of gaussians with selection
1018
probabilities that sum to 1 and encoded as (p,(mu,sigma))
1019
1020
- ``pi`` -- initial state probabilities
1021
1022
- ``normalize`` --bool (default: True); if given, input is
1023
normalized to define valid probability distributions,
1024
e.g., the entries of A are made nonnegative and the rows
1025
sum to 1, and the probabilities in pi are normalized.
1026
1027
EXAMPLES::
1028
1029
sage: A = [[0.5,0.5],[0.5,0.5]]
1030
sage: B = [[(0.9,(0.0,1.0)), (0.1,(1,10000))],[(1,(1,1)), (0,(0,0.1))]]
1031
sage: hmm.GaussianMixtureHiddenMarkovModel(A, B, [1,0])
1032
Gaussian Mixture Hidden Markov Model with 2 States
1033
Transition matrix:
1034
[0.5 0.5]
1035
[0.5 0.5]
1036
Emission parameters:
1037
[0.9*N(0.0,1.0) + 0.1*N(1.0,10000.0), 1.0*N(1.0,1.0) + 0.0*N(0.0,0.1)]
1038
Initial probabilities: [1.0000, 0.0000]
1039
1040
TESTS:
1041
1042
If a standard deviation is 0, it is normalized to be slightly bigger than 0.::
1043
1044
sage: hmm.GaussianMixtureHiddenMarkovModel([[1]], [[(1,(0,0))]], [1])
1045
Gaussian Mixture Hidden Markov Model with 1 States
1046
Transition matrix:
1047
[1.0]
1048
Emission parameters:
1049
[1.0*N(0.0,1e-08)]
1050
Initial probabilities: [1.0000]
1051
1052
We test that number of emission distributions must be the same as the number of states::
1053
1054
sage: hmm.GaussianMixtureHiddenMarkovModel([[1]], [], [1])
1055
Traceback (most recent call last):
1056
...
1057
ValueError: number of GaussianMixtures must be the same as number of entries of pi
1058
1059
sage: hmm.GaussianMixtureHiddenMarkovModel([[1]], [[]], [1])
1060
Traceback (most recent call last):
1061
...
1062
ValueError: must specify at least one component of the mixture model
1063
1064
We test that the number of initial probabilities must equal the number of states::
1065
1066
sage: hmm.GaussianMixtureHiddenMarkovModel([[1]], [[]], [1,2])
1067
Traceback (most recent call last):
1068
...
1069
ValueError: number of entries of transition matrix A must be the square of the number of entries of pi
1070
"""
1071
1072
cdef object mixture # mixture
1073
1074
def __init__(self, A, B, pi=None, bint normalize=True):
1075
"""
1076
Initialize a Gaussian mixture hidden Markov model.
1077
1078
EXAMPLES::
1079
1080
sage: hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3])
1081
Gaussian Mixture Hidden Markov Model with 2 States
1082
Transition matrix:
1083
[0.9 0.1]
1084
[0.4 0.6]
1085
Emission parameters:
1086
[0.4*N(0.0,1.0) + 0.6*N(1.0,0.1), 1.0*N(0.0,1.0)]
1087
Initial probabilities: [0.7000, 0.3000]
1088
"""
1089
self.pi = util.initial_probs_to_TimeSeries(pi, normalize)
1090
self.N = len(self.pi)
1091
self.A = util.state_matrix_to_TimeSeries(A, self.N, normalize)
1092
if self.N*self.N != len(self.A):
1093
raise ValueError, "number of entries of transition matrix A must be the square of the number of entries of pi"
1094
1095
self.mixture = [b if isinstance(b, GaussianMixtureDistribution) else \
1096
GaussianMixtureDistribution([flatten(x) for x in b]) for b in B]
1097
if len(self.mixture) != self.N:
1098
raise ValueError, "number of GaussianMixtures must be the same as number of entries of pi"
1099
1100
def __repr__(self):
1101
"""
1102
Return string representation.
1103
1104
EXAMPLES::
1105
1106
sage: hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3]).__repr__()
1107
'Gaussian Mixture Hidden Markov Model with 2 States\nTransition matrix:\n[0.9 0.1]\n[0.4 0.6]\nEmission parameters:\n[0.4*N(0.0,1.0) + 0.6*N(1.0,0.1), 1.0*N(0.0,1.0)]\nInitial probabilities: [0.7000, 0.3000]'
1108
"""
1109
s = "Gaussian Mixture Hidden Markov Model with %s States"%self.N
1110
s += '\nTransition matrix:\n%s'%self.transition_matrix()
1111
s += '\nEmission parameters:\n%s'%self.emission_parameters()
1112
s += '\nInitial probabilities: %s'%self.initial_probabilities()
1113
return s
1114
1115
def __reduce__(self):
1116
"""
1117
Used in pickling.
1118
1119
EXAMPLES::
1120
1121
sage: m = hmm.GaussianMixtureHiddenMarkovModel([[1]], [[(.4,(0,1)), (.6,(1,0.1))]], [1])
1122
sage: loads(dumps(m)) == m
1123
True
1124
"""
1125
return unpickle_gaussian_mixture_hmm_v1, \
1126
(self.A, self.B, self.pi, self.mixture)
1127
1128
1129
def __cmp__(self, other):
1130
"""
1131
Compare self and other, which must both be GaussianMixtureHiddenMarkovModel's.
1132
1133
EXAMPLES::
1134
1135
sage: m = hmm.GaussianMixtureHiddenMarkovModel([[1]], [[(.4,(0,1)), (.6,(1,0.1))]], [1])
1136
sage: n = hmm.GaussianMixtureHiddenMarkovModel([[1]], [[(.5,(0,1)), (.5,(1,0.1))]], [1])
1137
sage: m < n
1138
True
1139
sage: m == m
1140
True
1141
sage: n > m
1142
True
1143
sage: n < m
1144
False
1145
"""
1146
if not isinstance(other, GaussianMixtureHiddenMarkovModel):
1147
raise ValueError
1148
return cmp(self.__reduce__()[1], other.__reduce__()[1])
1149
1150
def __getitem__(self, Py_ssize_t i):
1151
"""
1152
Return the Gaussian mixture distribution associated to the
1153
i-th state.
1154
1155
INPUT:
1156
1157
- i -- integer
1158
1159
OUTPUT:
1160
1161
- a Gaussian mixture distribution object
1162
1163
EXAMPLES::
1164
1165
sage: m = hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3])
1166
sage: m[0]
1167
0.4*N(0.0,1.0) + 0.6*N(1.0,0.1)
1168
sage: m[1]
1169
1.0*N(0.0,1.0)
1170
1171
Negative indexing works::
1172
1173
sage: m[-1]
1174
1.0*N(0.0,1.0)
1175
1176
Bounds are checked::
1177
1178
sage: m[2]
1179
Traceback (most recent call last):
1180
...
1181
IndexError: index out of range
1182
sage: m[-3]
1183
Traceback (most recent call last):
1184
...
1185
IndexError: index out of range
1186
"""
1187
if i < 0:
1188
i += self.N
1189
if i < 0 or i >= self.N:
1190
raise IndexError, 'index out of range'
1191
return self.mixture[i]
1192
1193
def emission_parameters(self):
1194
"""
1195
Returns a list of all the emission distributions.
1196
1197
OUTPUT:
1198
1199
- list of Gaussian mixtures
1200
1201
EXAMPLES::
1202
1203
sage: m = hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3])
1204
sage: m.emission_parameters()
1205
[0.4*N(0.0,1.0) + 0.6*N(1.0,0.1), 1.0*N(0.0,1.0)]
1206
"""
1207
return list(self.mixture)
1208
1209
cdef double random_sample(self, int state, randstate rstate):
1210
"""
1211
Return a random sample from the normal distribution associated
1212
to the given state.
1213
1214
This is only used internally, and no bounds or other error
1215
checking is done, so calling this improperly can lead to seg
1216
faults.
1217
1218
INPUT:
1219
1220
- state -- integer
1221
- rstate -- randstate instance
1222
1223
OUTPUT:
1224
1225
- double
1226
"""
1227
cdef GaussianMixtureDistribution G = self.mixture[state]
1228
return G._sample(rstate)
1229
1230
cdef double probability_of(self, int state, double observation):
1231
"""
1232
Return the probability b_j(o) of see the given observation o
1233
(=observation) given that we're in the given state j (=state).
1234
1235
This is a continuous probability, so this really returns a
1236
number p such that the probability of a value in the interval
1237
[o,o+d] is p*d.
1238
1239
INPUT:
1240
1241
- state -- integer
1242
- observation -- double
1243
1244
OUTPUT:
1245
1246
- double
1247
"""
1248
cdef GaussianMixtureDistribution G = self.mixture[state]
1249
return G.prob(observation)
1250
1251
cdef TimeSeries _baum_welch_mixed_gamma(self, TimeSeries alpha, TimeSeries beta,
1252
TimeSeries obs, int j):
1253
"""
1254
Let gamma_t(j,m) be the m-component (in the mixture) of the
1255
probability of being in state j at time t, given the
1256
observation sequence. This function outputs a TimeSeries v
1257
such that v[m*T + t] gives gamma_t(j, m) where T is the number
1258
of time steps.
1259
1260
INPUT:
1261
1262
- alpha -- TimeSeries
1263
- beta -- TimeSeries
1264
- obs -- TimeSeries
1265
- j -- int
1266
1267
OUTPUT:
1268
1269
- TimeSeries
1270
"""
1271
cdef int i, k, m, N = self.N
1272
cdef Py_ssize_t t, T = alpha._length//N
1273
1274
cdef double numer, alpha_minus, P, s, prob
1275
cdef GaussianMixtureDistribution G = self.mixture[j]
1276
cdef int M = len(G)
1277
cdef TimeSeries mixed_gamma = TimeSeries(T*M)
1278
1279
for t in range(T):
1280
prob = self.probability_of(j, obs._values[t])
1281
if prob == 0:
1282
# If the probability of observing obs[t] in state j is 0, then
1283
# all of the m-mixture components *have* to automatically be 0,
1284
# since prob is the sum of those and they are all nonnegative.
1285
for m in range(M):
1286
mixed_gamma._values[m*T + t] = 0
1287
else:
1288
# Compute the denominator we used when scaling gamma.
1289
# The key thing is that this is consistent between
1290
# gamma and mixed_gamma.
1291
P = 0
1292
for k in range(N):
1293
P += alpha._values[t*N+k]*beta._values[t*N+k]
1294
1295
# Divide out the total probability, so we can multiply back in
1296
# the m-components of the probability.
1297
alpha_minus = alpha._values[t*N + j] / prob
1298
for m in range(M):
1299
numer = alpha_minus * G.prob_m(obs._values[t], m) * beta._values[t*N + j]
1300
mixed_gamma._values[m*T + t] = numer / P
1301
1302
return mixed_gamma
1303
1304
def baum_welch(self, obs, int max_iter=1000, double log_likelihood_cutoff=1e-12,
1305
double min_sd=0.01, bint fix_emissions=False):
1306
"""
1307
Given an observation sequence obs, improve this HMM using the
1308
Baum-Welch algorithm to increase the probability of observing obs.
1309
1310
INPUT:
1311
1312
- obs -- a time series of emissions
1313
- max_iter -- integer (default: 1000) maximum number
1314
of Baum-Welch steps to take
1315
- log_likelihood_cutoff -- positive float (default: 1e-12);
1316
the minimal improvement in likelihood with respect to
1317
the last iteration required to continue. Relative value
1318
to log likelihood.
1319
- min_sd -- positive float (default: 0.01); when
1320
reestimating, the standard deviation of emissions is not
1321
allowed to be less than min_sd.
1322
- fix_emissions -- bool (default: False); if True, do not
1323
change emissions when updating
1324
1325
OUTPUT:
1326
1327
- changes the model in places, and returns the log
1328
likelihood and number of iterations.
1329
1330
EXAMPLES::
1331
1332
sage: m = hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3])
1333
sage: set_random_seed(0); v = m.sample(10); v
1334
[0.3576, -0.9365, 0.9449, -0.6957, 1.0217, 0.9644, 0.9987, -0.5950, -1.0219, 0.6477]
1335
sage: m.log_likelihood(v)
1336
-8.31408655939536...
1337
sage: m.baum_welch(v)
1338
(2.18905068682..., 15)
1339
sage: m.log_likelihood(v)
1340
2.18905068682...
1341
sage: m
1342
Gaussian Mixture Hidden Markov Model with 2 States
1343
Transition matrix:
1344
[ 0.874636333977 0.125363666023]
1345
[ 1.0 1.45168520229e-40]
1346
Emission parameters:
1347
[0.500161629343*N(-0.812298726239,0.173329026744) + 0.499838370657*N(0.982433690378,0.029719932009), 1.0*N(0.503260056832,0.145881515324)]
1348
Initial probabilities: [0.0000, 1.0000]
1349
1350
We illustrate bounding the standard deviation below. Note that above we had
1351
different emission parameters when the min_sd was the default of 0.01::
1352
1353
sage: m = hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3])
1354
sage: m.baum_welch(v, min_sd=1)
1355
(-12.617885761692..., 1000)
1356
sage: m.emission_parameters()
1357
[0.503545634447*N(0.200166509595,1.0) + 0.496454365553*N(0.200166509595,1.0), 1.0*N(0.0543433426535,1.0)]
1358
1359
We illustrate fixing all emissions::
1360
1361
sage: m = hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3])
1362
sage: set_random_seed(0); v = m.sample(10)
1363
sage: m.baum_welch(v, fix_emissions=True)
1364
(-7.58656858997..., 36)
1365
sage: m.emission_parameters()
1366
[0.4*N(0.0,1.0) + 0.6*N(1.0,0.1), 1.0*N(0.0,1.0)]
1367
"""
1368
if not isinstance(obs, TimeSeries):
1369
obs = TimeSeries(obs)
1370
cdef TimeSeries _obs = obs
1371
cdef TimeSeries alpha, beta, scale, gamma, mixed_gamma, mixed_gamma_m, xi
1372
cdef double log_probability, log_probability0, log_probability_prev, delta
1373
cdef int i, j, k, m, N, n_iterations
1374
cdef Py_ssize_t t, T
1375
cdef double denominator_A, numerator_A, denominator_B, numerator_mean, numerator_std, \
1376
numerator_c, c, mu, std, numer, denom, new_mu, new_std, new_c, s
1377
cdef GaussianMixtureDistribution G
1378
1379
# Initialization
1380
alpha, scale, log_probability0 = self._forward_scale_all(_obs)
1381
if not isfinite(log_probability0):
1382
return (0.0, 0)
1383
log_probability = log_probability0
1384
beta = self._backward_scale_all(_obs, scale)
1385
gamma = self._baum_welch_gamma(alpha, beta)
1386
xi = self._baum_welch_xi(alpha, beta, _obs)
1387
log_probability_prev = log_probability
1388
N = self.N
1389
n_iterations = 0
1390
T = len(_obs)
1391
1392
# Re-estimation
1393
while True:
1394
1395
# Reestimate frequency of state i in time t=0
1396
for i in range(N):
1397
if not isfinite(gamma._values[0*N+i]):
1398
# Before raising an error, leave self in a valid state.
1399
util.normalize_probability_TimeSeries(self.pi, 0, self.pi._length)
1400
raise RuntimeError, "impossible to compute gamma during reestimation"
1401
self.pi._values[i] = gamma._values[0*N+i]
1402
1403
# Update the probabilities pi to define a valid discrete distribution
1404
util.normalize_probability_TimeSeries(self.pi, 0, self.pi._length)
1405
1406
# Reestimate transition matrix and emission probabilities in
1407
# each state.
1408
for i in range(N):
1409
# Reestimate the state transition matrix
1410
denominator_A = 0.0
1411
for t in range(T-1):
1412
denominator_A += gamma._values[t*N+i]
1413
if not isnormal(denominator_A):
1414
raise RuntimeError, "unable to re-estimate pi (1)"
1415
for j in range(N):
1416
numerator_A = 0.0
1417
for t in range(T-1):
1418
numerator_A += xi._values[t*N*N+i*N+j]
1419
self.A._values[i*N+j] = numerator_A / denominator_A
1420
1421
# Rescale the i-th row of the transition matrix to be
1422
# a valid stochastic matrix:
1423
util.normalize_probability_TimeSeries(self.A, i*N, (i+1)*N)
1424
1425
########################################################################
1426
# Re-estimate the emission probabilities
1427
########################################################################
1428
G = self.mixture[i]
1429
if not fix_emissions and not G.is_fixed():
1430
mixed_gamma = self._baum_welch_mixed_gamma(alpha, beta, _obs, i)
1431
new_G = []
1432
for m in range(len(G)):
1433
if G.fixed._values[m]:
1434
new_G.append(G[m])
1435
continue
1436
1437
# Compute re-estimated mu_{j,m}
1438
numer = 0
1439
denom = 0
1440
for t in range(T):
1441
numer += mixed_gamma._values[m*T + t] * _obs._values[t]
1442
denom += mixed_gamma._values[m*T + t]
1443
new_mu = numer / denom
1444
1445
# Compute re-estimated standard deviation
1446
numer = 0
1447
mu = G[m][1]
1448
for t in range(T):
1449
numer += mixed_gamma._values[m*T + t] * \
1450
(_obs._values[t] - mu)*(_obs._values[t] - mu)
1451
1452
new_std = sqrt(numer / denom)
1453
if new_std < min_sd:
1454
new_std = min_sd
1455
1456
# Compute re-estimated weighting coefficient
1457
new_c = denom
1458
s = 0
1459
for t in range(T):
1460
s += gamma._values[t*N + i]
1461
new_c /= s
1462
1463
new_G.append((new_c,new_mu,new_std))
1464
1465
self.mixture[i] = GaussianMixtureDistribution(new_G)
1466
1467
n_iterations += 1
1468
if n_iterations >= max_iter: break
1469
1470
########################################################################
1471
# Initialization for next iteration
1472
########################################################################
1473
alpha, scale, log_probability0 = self._forward_scale_all(_obs)
1474
if not isfinite(log_probability0): break
1475
log_probability = log_probability0
1476
beta = self._backward_scale_all(_obs, scale)
1477
gamma = self._baum_welch_gamma(alpha, beta)
1478
xi = self._baum_welch_xi(alpha, beta, _obs)
1479
1480
# Compute the difference between the log probability of
1481
# two iterations.
1482
delta = log_probability - log_probability_prev
1483
log_probability_prev = log_probability
1484
1485
# If the log probability does not improve by more than
1486
# delta, then terminate
1487
if delta >= 0 and delta <= log_likelihood_cutoff:
1488
break
1489
1490
return log_probability, n_iterations
1491
1492
1493
##################################################
1494
# For Unpickling
1495
##################################################
1496
1497
# We keep the _v0 function for backwards compatible.
1498
def unpickle_gaussian_hmm_v0(A, B, pi, name):
1499
"""
1500
EXAMPLES::
1501
1502
sage: m = hmm.GaussianHiddenMarkovModel([[1]], [(0,1)], [1])
1503
sage: sage.stats.hmm.chmm.unpickle_gaussian_hmm_v0(m.transition_matrix(), m.emission_parameters(), m.initial_probabilities(), 'test')
1504
Gaussian Hidden Markov Model with 1 States
1505
Transition matrix:
1506
[1.0]
1507
Emission parameters:
1508
[(0.0, 1.0)]
1509
Initial probabilities: [1.0000]
1510
"""
1511
return GaussianHiddenMarkovModel(A,B,pi)
1512
1513
1514
def unpickle_gaussian_hmm_v1(A, B, pi, prob, n_out):
1515
"""
1516
EXAMPLES::
1517
1518
sage: m = hmm.GaussianHiddenMarkovModel([[1]], [(0,1)], [1])
1519
sage: loads(dumps(m)) == m # indirect test
1520
True
1521
"""
1522
cdef GaussianHiddenMarkovModel m = PY_NEW(GaussianHiddenMarkovModel)
1523
m.A = A
1524
m.B = B
1525
m.pi = pi
1526
m.prob = prob
1527
m.n_out = n_out
1528
return m
1529
1530
def unpickle_gaussian_mixture_hmm_v1(A, B, pi, mixture):
1531
"""
1532
EXAMPLES::
1533
1534
sage: m = hmm.GaussianMixtureHiddenMarkovModel([[1]], [[(.4,(0,1)), (.6,(1,0.1))]], [1])
1535
sage: loads(dumps(m)) == m # indirect test
1536
True
1537
"""
1538
cdef GaussianMixtureHiddenMarkovModel m = PY_NEW(GaussianMixtureHiddenMarkovModel)
1539
m.A = A
1540
m.B = B
1541
m.pi = pi
1542
m.mixture = mixture
1543
return m
1544
1545
1546