Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aimacode
GitHub Repository: aimacode/aima-python
Path: blob/master/mdp4e.py
615 views
1
"""
2
Markov Decision Processes (Chapter 16)
3
4
First we define an MDP, and the special case of a GridMDP, in which
5
states are laid out in a 2-dimensional grid. We also represent a policy
6
as a dictionary of {state: action} pairs, and a Utility function as a
7
dictionary of {state: number} pairs. We then define the value_iteration
8
and policy_iteration algorithms.
9
"""
10
11
import random
12
from collections import defaultdict
13
14
import numpy as np
15
16
from utils4e import vector_add, orientations, turn_right, turn_left
17
18
19
class MDP:
20
"""A Markov Decision Process, defined by an initial state, transition model,
21
and reward function. We also keep track of a gamma value, for use by
22
algorithms. The transition model is represented somewhat differently from
23
the text. Instead of P(s' | s, a) being a probability number for each
24
state/state/action triplet, we instead have T(s, a) return a
25
list of (p, s') pairs. We also keep track of the possible states,
26
terminal states, and actions for each state. [Page 646]"""
27
28
def __init__(self, init, actlist, terminals, transitions=None, reward=None, states=None, gamma=0.9):
29
if not (0 < gamma <= 1):
30
raise ValueError("An MDP must have 0 < gamma <= 1")
31
32
# collect states from transitions table if not passed.
33
self.states = states or self.get_states_from_transitions(transitions)
34
35
self.init = init
36
37
if isinstance(actlist, list):
38
# if actlist is a list, all states have the same actions
39
self.actlist = actlist
40
41
elif isinstance(actlist, dict):
42
# if actlist is a dict, different actions for each state
43
self.actlist = actlist
44
45
self.terminals = terminals
46
self.transitions = transitions or {}
47
if not self.transitions:
48
print("Warning: Transition table is empty.")
49
50
self.gamma = gamma
51
52
self.reward = reward or {s: 0 for s in self.states}
53
54
# self.check_consistency()
55
56
def R(self, state):
57
"""Return a numeric reward for this state."""
58
59
return self.reward[state]
60
61
def T(self, state, action):
62
"""Transition model. From a state and an action, return a list
63
of (probability, result-state) pairs."""
64
65
if not self.transitions:
66
raise ValueError("Transition model is missing")
67
else:
68
return self.transitions[state][action]
69
70
def actions(self, state):
71
"""Return a list of actions that can be performed in this state. By default, a
72
fixed list of actions, except for terminal states. Override this
73
method if you need to specialize by state."""
74
75
if state in self.terminals:
76
return [None]
77
else:
78
return self.actlist
79
80
def get_states_from_transitions(self, transitions):
81
if isinstance(transitions, dict):
82
s1 = set(transitions.keys())
83
s2 = set(tr[1] for actions in transitions.values()
84
for effects in actions.values()
85
for tr in effects)
86
return s1.union(s2)
87
else:
88
print('Could not retrieve states from transitions')
89
return None
90
91
def check_consistency(self):
92
93
# check that all states in transitions are valid
94
assert set(self.states) == self.get_states_from_transitions(self.transitions)
95
96
# check that init is a valid state
97
assert self.init in self.states
98
99
# check reward for each state
100
assert set(self.reward.keys()) == set(self.states)
101
102
# check that all terminals are valid states
103
assert all(t in self.states for t in self.terminals)
104
105
# check that probability distributions for all actions sum to 1
106
for s1, actions in self.transitions.items():
107
for a in actions.keys():
108
s = 0
109
for o in actions[a]:
110
s += o[0]
111
assert abs(s - 1) < 0.001
112
113
114
class MDP2(MDP):
115
"""
116
Inherits from MDP. Handles terminal states, and transitions to and from terminal states better.
117
"""
118
119
def __init__(self, init, actlist, terminals, transitions, reward=None, gamma=0.9):
120
MDP.__init__(self, init, actlist, terminals, transitions, reward, gamma=gamma)
121
122
def T(self, state, action):
123
if action is None:
124
return [(0.0, state)]
125
else:
126
return self.transitions[state][action]
127
128
129
class GridMDP(MDP):
130
"""A two-dimensional grid MDP, as in [Figure 16.1]. All you have to do is
131
specify the grid as a list of lists of rewards; use None for an obstacle
132
(unreachable state). Also, you should specify the terminal states.
133
An action is an (x, y) unit vector; e.g. (1, 0) means move east."""
134
135
def __init__(self, grid, terminals, init=(0, 0), gamma=.9):
136
grid.reverse() # because we want row 0 on bottom, not on top
137
reward = {}
138
states = set()
139
self.rows = len(grid)
140
self.cols = len(grid[0])
141
self.grid = grid
142
for x in range(self.cols):
143
for y in range(self.rows):
144
if grid[y][x]:
145
states.add((x, y))
146
reward[(x, y)] = grid[y][x]
147
self.states = states
148
actlist = orientations
149
transitions = {}
150
for s in states:
151
transitions[s] = {}
152
for a in actlist:
153
transitions[s][a] = self.calculate_T(s, a)
154
MDP.__init__(self, init, actlist=actlist,
155
terminals=terminals, transitions=transitions,
156
reward=reward, states=states, gamma=gamma)
157
158
def calculate_T(self, state, action):
159
if action:
160
return [(0.8, self.go(state, action)),
161
(0.1, self.go(state, turn_right(action))),
162
(0.1, self.go(state, turn_left(action)))]
163
else:
164
return [(0.0, state)]
165
166
def T(self, state, action):
167
return self.transitions[state][action] if action else [(0.0, state)]
168
169
def go(self, state, direction):
170
"""Return the state that results from going in this direction."""
171
172
state1 = tuple(vector_add(state, direction))
173
return state1 if state1 in self.states else state
174
175
def to_grid(self, mapping):
176
"""Convert a mapping from (x, y) to v into a [[..., v, ...]] grid."""
177
178
return list(reversed([[mapping.get((x, y), None)
179
for x in range(self.cols)]
180
for y in range(self.rows)]))
181
182
def to_arrows(self, policy):
183
chars = {(1, 0): '>', (0, 1): '^', (-1, 0): '<', (0, -1): 'v', None: '.'}
184
return self.to_grid({s: chars[a] for (s, a) in policy.items()})
185
186
187
# ______________________________________________________________________________
188
189
190
""" [Figure 16.1]
191
A 4x3 grid environment that presents the agent with a sequential decision problem.
192
"""
193
194
sequential_decision_environment = GridMDP([[-0.04, -0.04, -0.04, +1],
195
[-0.04, None, -0.04, -1],
196
[-0.04, -0.04, -0.04, -0.04]],
197
terminals=[(3, 2), (3, 1)])
198
199
200
# ______________________________________________________________________________
201
# 16.1.3 The Bellman equation for utilities
202
203
204
def q_value(mdp, s, a, U):
205
if not a:
206
return mdp.R(s)
207
res = 0
208
for p, s_prime in mdp.T(s, a):
209
res += p * (mdp.R(s) + mdp.gamma * U[s_prime])
210
return res
211
212
213
# TODO: DDN in figure 16.4 and 16.5
214
215
# ______________________________________________________________________________
216
# 16.2 Algorithms for MDPs
217
# 16.2.1 Value Iteration
218
219
220
def value_iteration(mdp, epsilon=0.001):
221
"""Solving an MDP by value iteration. [Figure 16.6]"""
222
223
U1 = {s: 0 for s in mdp.states}
224
R, T, gamma = mdp.R, mdp.T, mdp.gamma
225
while True:
226
U = U1.copy()
227
delta = 0
228
for s in mdp.states:
229
# U1[s] = R(s) + gamma * max(sum(p * U[s1] for (p, s1) in T(s, a))
230
# for a in mdp.actions(s))
231
U1[s] = max(q_value(mdp, s, a, U) for a in mdp.actions(s))
232
delta = max(delta, abs(U1[s] - U[s]))
233
if delta <= epsilon * (1 - gamma) / gamma:
234
return U
235
236
237
# ______________________________________________________________________________
238
# 16.2.2 Policy Iteration
239
240
241
def best_policy(mdp, U):
242
"""Given an MDP and a utility function U, determine the best policy,
243
as a mapping from state to action."""
244
245
pi = {}
246
for s in mdp.states:
247
pi[s] = max(mdp.actions(s), key=lambda a: q_value(mdp, s, a, U))
248
return pi
249
250
251
def expected_utility(a, s, U, mdp):
252
"""The expected utility of doing a in state s, according to the MDP and U."""
253
254
return sum(p * U[s1] for (p, s1) in mdp.T(s, a))
255
256
257
def policy_iteration(mdp):
258
"""Solve an MDP by policy iteration [Figure 17.7]"""
259
260
U = {s: 0 for s in mdp.states}
261
pi = {s: random.choice(mdp.actions(s)) for s in mdp.states}
262
while True:
263
U = policy_evaluation(pi, U, mdp)
264
unchanged = True
265
for s in mdp.states:
266
a_star = max(mdp.actions(s), key=lambda a: q_value(mdp, s, a, U))
267
# a = max(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp))
268
if q_value(mdp, s, a_star, U) > q_value(mdp, s, pi[s], U):
269
pi[s] = a_star
270
unchanged = False
271
if unchanged:
272
return pi
273
274
275
def policy_evaluation(pi, U, mdp, k=20):
276
"""Return an updated utility mapping U from each state in the MDP to its
277
utility, using an approximation (modified policy iteration)."""
278
279
R, T, gamma = mdp.R, mdp.T, mdp.gamma
280
for i in range(k):
281
for s in mdp.states:
282
U[s] = R(s) + gamma * sum(p * U[s1] for (p, s1) in T(s, pi[s]))
283
return U
284
285
286
# ___________________________________________________________________
287
# 16.4 Partially Observed MDPs
288
289
290
class POMDP(MDP):
291
"""A Partially Observable Markov Decision Process, defined by
292
a transition model P(s'|s,a), actions A(s), a reward function R(s),
293
and a sensor model P(e|s). We also keep track of a gamma value,
294
for use by algorithms. The transition and the sensor models
295
are defined as matrices. We also keep track of the possible states
296
and actions for each state. [Page 659]."""
297
298
def __init__(self, actions, transitions=None, evidences=None, rewards=None, states=None, gamma=0.95):
299
"""Initialize variables of the pomdp"""
300
301
if not (0 < gamma <= 1):
302
raise ValueError('A POMDP must have 0 < gamma <= 1')
303
304
self.states = states
305
self.actions = actions
306
307
# transition model cannot be undefined
308
self.t_prob = transitions or {}
309
if not self.t_prob:
310
print('Warning: Transition model is undefined')
311
312
# sensor model cannot be undefined
313
self.e_prob = evidences or {}
314
if not self.e_prob:
315
print('Warning: Sensor model is undefined')
316
317
self.gamma = gamma
318
self.rewards = rewards
319
320
def remove_dominated_plans(self, input_values):
321
"""
322
Remove dominated plans.
323
This method finds all the lines contributing to the
324
upper surface and removes those which don't.
325
"""
326
327
values = [val for action in input_values for val in input_values[action]]
328
values.sort(key=lambda x: x[0], reverse=True)
329
330
best = [values[0]]
331
y1_max = max(val[1] for val in values)
332
tgt = values[0]
333
prev_b = 0
334
prev_ix = 0
335
while tgt[1] != y1_max:
336
min_b = 1
337
min_ix = 0
338
for i in range(prev_ix + 1, len(values)):
339
if values[i][0] - tgt[0] + tgt[1] - values[i][1] != 0:
340
trans_b = (values[i][0] - tgt[0]) / (values[i][0] - tgt[0] + tgt[1] - values[i][1])
341
if 0 <= trans_b <= 1 and trans_b > prev_b and trans_b < min_b:
342
min_b = trans_b
343
min_ix = i
344
prev_b = min_b
345
prev_ix = min_ix
346
tgt = values[min_ix]
347
best.append(tgt)
348
349
return self.generate_mapping(best, input_values)
350
351
def remove_dominated_plans_fast(self, input_values):
352
"""
353
Remove dominated plans using approximations.
354
Resamples the upper boundary at intervals of 100 and
355
finds the maximum values at these points.
356
"""
357
358
values = [val for action in input_values for val in input_values[action]]
359
values.sort(key=lambda x: x[0], reverse=True)
360
361
best = []
362
sr = 100
363
for i in range(sr + 1):
364
x = i / float(sr)
365
maximum = (values[0][1] - values[0][0]) * x + values[0][0]
366
tgt = values[0]
367
for value in values:
368
val = (value[1] - value[0]) * x + value[0]
369
if val > maximum:
370
maximum = val
371
tgt = value
372
373
if all(any(tgt != v) for v in best):
374
best.append(np.array(tgt))
375
376
return self.generate_mapping(best, input_values)
377
378
def generate_mapping(self, best, input_values):
379
"""Generate mappings after removing dominated plans"""
380
381
mapping = defaultdict(list)
382
for value in best:
383
for action in input_values:
384
if any(all(value == v) for v in input_values[action]):
385
mapping[action].append(value)
386
387
return mapping
388
389
def max_difference(self, U1, U2):
390
"""Find maximum difference between two utility mappings"""
391
392
for k, v in U1.items():
393
sum1 = 0
394
for element in U1[k]:
395
sum1 += sum(element)
396
sum2 = 0
397
for element in U2[k]:
398
sum2 += sum(element)
399
return abs(sum1 - sum2)
400
401
402
class Matrix:
403
"""Matrix operations class"""
404
405
@staticmethod
406
def add(A, B):
407
"""Add two matrices A and B"""
408
409
res = []
410
for i in range(len(A)):
411
row = []
412
for j in range(len(A[0])):
413
row.append(A[i][j] + B[i][j])
414
res.append(row)
415
return res
416
417
@staticmethod
418
def scalar_multiply(a, B):
419
"""Multiply scalar a to matrix B"""
420
421
for i in range(len(B)):
422
for j in range(len(B[0])):
423
B[i][j] = a * B[i][j]
424
return B
425
426
@staticmethod
427
def multiply(A, B):
428
"""Multiply two matrices A and B element-wise"""
429
430
matrix = []
431
for i in range(len(B)):
432
row = []
433
for j in range(len(B[0])):
434
row.append(B[i][j] * A[j][i])
435
matrix.append(row)
436
437
return matrix
438
439
@staticmethod
440
def matmul(A, B):
441
"""Inner-product of two matrices"""
442
443
return [[sum(ele_a * ele_b for ele_a, ele_b in zip(row_a, col_b)) for col_b in list(zip(*B))] for row_a in A]
444
445
@staticmethod
446
def transpose(A):
447
"""Transpose a matrix"""
448
449
return [list(i) for i in zip(*A)]
450
451
452
def pomdp_value_iteration(pomdp, epsilon=0.1):
453
"""Solving a POMDP by value iteration."""
454
455
U = {'': [[0] * len(pomdp.states)]}
456
count = 0
457
while True:
458
count += 1
459
prev_U = U
460
values = [val for action in U for val in U[action]]
461
value_matxs = []
462
for i in values:
463
for j in values:
464
value_matxs.append([i, j])
465
466
U1 = defaultdict(list)
467
for action in pomdp.actions:
468
for u in value_matxs:
469
u1 = Matrix.matmul(Matrix.matmul(pomdp.t_prob[int(action)],
470
Matrix.multiply(pomdp.e_prob[int(action)], Matrix.transpose(u))),
471
[[1], [1]])
472
u1 = Matrix.add(Matrix.scalar_multiply(pomdp.gamma, Matrix.transpose(u1)), [pomdp.rewards[int(action)]])
473
U1[action].append(u1[0])
474
475
U = pomdp.remove_dominated_plans_fast(U1)
476
# replace with U = pomdp.remove_dominated_plans(U1) for accurate calculations
477
478
if count > 10:
479
if pomdp.max_difference(U, prev_U) < epsilon * (1 - pomdp.gamma) / pomdp.gamma:
480
return U
481
482
483
__doc__ += """
484
>>> pi = best_policy(sequential_decision_environment, value_iteration(sequential_decision_environment, .01))
485
486
>>> sequential_decision_environment.to_arrows(pi)
487
[['>', '>', '>', '.'], ['^', None, '^', '.'], ['^', '>', '^', '<']]
488
489
>>> from utils import print_table
490
491
>>> print_table(sequential_decision_environment.to_arrows(pi))
492
> > > .
493
^ None ^ .
494
^ > ^ <
495
496
>>> print_table(sequential_decision_environment.to_arrows(policy_iteration(sequential_decision_environment)))
497
> > > .
498
^ None ^ .
499
^ > ^ <
500
""" # noqa
501
502
"""
503
s = { 'a' : { 'plan1' : [(0.2, 'a'), (0.3, 'b'), (0.3, 'c'), (0.2, 'd')],
504
'plan2' : [(0.4, 'a'), (0.15, 'b'), (0.45, 'c')],
505
'plan3' : [(0.2, 'a'), (0.5, 'b'), (0.3, 'c')],
506
},
507
'b' : { 'plan1' : [(0.2, 'a'), (0.6, 'b'), (0.2, 'c'), (0.1, 'd')],
508
'plan2' : [(0.6, 'a'), (0.2, 'b'), (0.1, 'c'), (0.1, 'd')],
509
'plan3' : [(0.3, 'a'), (0.3, 'b'), (0.4, 'c')],
510
},
511
'c' : { 'plan1' : [(0.3, 'a'), (0.5, 'b'), (0.1, 'c'), (0.1, 'd')],
512
'plan2' : [(0.5, 'a'), (0.3, 'b'), (0.1, 'c'), (0.1, 'd')],
513
'plan3' : [(0.1, 'a'), (0.3, 'b'), (0.1, 'c'), (0.5, 'd')],
514
},
515
}
516
"""
517
518