Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aimacode
GitHub Repository: aimacode/aima-python
Path: blob/master/mdp.py
615 views
1
"""
2
Markov Decision Processes (Chapter 17)
3
4
First we define an MDP, and the special case of a GridMDP, in which
5
states are laid out in a 2-dimensional grid. We also represent a policy
6
as a dictionary of {state: action} pairs, and a Utility function as a
7
dictionary of {state: number} pairs. We then define the value_iteration
8
and policy_iteration algorithms.
9
"""
10
11
import random
12
from collections import defaultdict
13
14
import numpy as np
15
16
from utils import vector_add, orientations, turn_right, turn_left
17
18
19
class MDP:
20
"""A Markov Decision Process, defined by an initial state, transition model,
21
and reward function. We also keep track of a gamma value, for use by
22
algorithms. The transition model is represented somewhat differently from
23
the text. Instead of P(s' | s, a) being a probability number for each
24
state/state/action triplet, we instead have T(s, a) return a
25
list of (p, s') pairs. We also keep track of the possible states,
26
terminal states, and actions for each state. [Page 646]"""
27
28
def __init__(self, init, actlist, terminals, transitions=None, reward=None, states=None, gamma=0.9):
29
if not (0 < gamma <= 1):
30
raise ValueError("An MDP must have 0 < gamma <= 1")
31
32
# collect states from transitions table if not passed.
33
self.states = states or self.get_states_from_transitions(transitions)
34
35
self.init = init
36
37
if isinstance(actlist, list):
38
# if actlist is a list, all states have the same actions
39
self.actlist = actlist
40
41
elif isinstance(actlist, dict):
42
# if actlist is a dict, different actions for each state
43
self.actlist = actlist
44
45
self.terminals = terminals
46
self.transitions = transitions or {}
47
if not self.transitions:
48
print("Warning: Transition table is empty.")
49
50
self.gamma = gamma
51
52
self.reward = reward or {s: 0 for s in self.states}
53
54
# self.check_consistency()
55
56
def R(self, state):
57
"""Return a numeric reward for this state."""
58
59
return self.reward[state]
60
61
def T(self, state, action):
62
"""Transition model. From a state and an action, return a list
63
of (probability, result-state) pairs."""
64
65
if not self.transitions:
66
raise ValueError("Transition model is missing")
67
else:
68
return self.transitions[state][action]
69
70
def actions(self, state):
71
"""Return a list of actions that can be performed in this state. By default, a
72
fixed list of actions, except for terminal states. Override this
73
method if you need to specialize by state."""
74
75
if state in self.terminals:
76
return [None]
77
else:
78
return self.actlist
79
80
def get_states_from_transitions(self, transitions):
81
if isinstance(transitions, dict):
82
s1 = set(transitions.keys())
83
s2 = set(tr[1] for actions in transitions.values()
84
for effects in actions.values()
85
for tr in effects)
86
return s1.union(s2)
87
else:
88
print('Could not retrieve states from transitions')
89
return None
90
91
def check_consistency(self):
92
93
# check that all states in transitions are valid
94
assert set(self.states) == self.get_states_from_transitions(self.transitions)
95
96
# check that init is a valid state
97
assert self.init in self.states
98
99
# check reward for each state
100
assert set(self.reward.keys()) == set(self.states)
101
102
# check that all terminals are valid states
103
assert all(t in self.states for t in self.terminals)
104
105
# check that probability distributions for all actions sum to 1
106
for s1, actions in self.transitions.items():
107
for a in actions.keys():
108
s = 0
109
for o in actions[a]:
110
s += o[0]
111
assert abs(s - 1) < 0.001
112
113
114
class MDP2(MDP):
115
"""
116
Inherits from MDP. Handles terminal states, and transitions to and from terminal states better.
117
"""
118
119
def __init__(self, init, actlist, terminals, transitions, reward=None, gamma=0.9):
120
MDP.__init__(self, init, actlist, terminals, transitions, reward, gamma=gamma)
121
122
def T(self, state, action):
123
if action is None:
124
return [(0.0, state)]
125
else:
126
return self.transitions[state][action]
127
128
129
class GridMDP(MDP):
130
"""A two-dimensional grid MDP, as in [Figure 17.1]. All you have to do is
131
specify the grid as a list of lists of rewards; use None for an obstacle
132
(unreachable state). Also, you should specify the terminal states.
133
An action is an (x, y) unit vector; e.g. (1, 0) means move east."""
134
135
def __init__(self, grid, terminals, init=(0, 0), gamma=.9):
136
grid.reverse() # because we want row 0 on bottom, not on top
137
reward = {}
138
states = set()
139
self.rows = len(grid)
140
self.cols = len(grid[0])
141
self.grid = grid
142
for x in range(self.cols):
143
for y in range(self.rows):
144
if grid[y][x]:
145
states.add((x, y))
146
reward[(x, y)] = grid[y][x]
147
self.states = states
148
actlist = orientations
149
transitions = {}
150
for s in states:
151
transitions[s] = {}
152
for a in actlist:
153
transitions[s][a] = self.calculate_T(s, a)
154
MDP.__init__(self, init, actlist=actlist,
155
terminals=terminals, transitions=transitions,
156
reward=reward, states=states, gamma=gamma)
157
158
def calculate_T(self, state, action):
159
if action:
160
return [(0.8, self.go(state, action)),
161
(0.1, self.go(state, turn_right(action))),
162
(0.1, self.go(state, turn_left(action)))]
163
else:
164
return [(0.0, state)]
165
166
def T(self, state, action):
167
return self.transitions[state][action] if action else [(0.0, state)]
168
169
def go(self, state, direction):
170
"""Return the state that results from going in this direction."""
171
172
state1 = vector_add(state, direction)
173
return state1 if state1 in self.states else state
174
175
def to_grid(self, mapping):
176
"""Convert a mapping from (x, y) to v into a [[..., v, ...]] grid."""
177
178
return list(reversed([[mapping.get((x, y), None)
179
for x in range(self.cols)]
180
for y in range(self.rows)]))
181
182
def to_arrows(self, policy):
183
chars = {(1, 0): '>', (0, 1): '^', (-1, 0): '<', (0, -1): 'v', None: '.'}
184
return self.to_grid({s: chars[a] for (s, a) in policy.items()})
185
186
187
# ______________________________________________________________________________
188
189
190
""" [Figure 17.1]
191
A 4x3 grid environment that presents the agent with a sequential decision problem.
192
"""
193
194
sequential_decision_environment = GridMDP([[-0.04, -0.04, -0.04, +1],
195
[-0.04, None, -0.04, -1],
196
[-0.04, -0.04, -0.04, -0.04]],
197
terminals=[(3, 2), (3, 1)])
198
199
200
# ______________________________________________________________________________
201
202
203
def value_iteration(mdp, epsilon=0.001):
204
"""Solving an MDP by value iteration. [Figure 17.4]"""
205
206
U1 = {s: 0 for s in mdp.states}
207
R, T, gamma = mdp.R, mdp.T, mdp.gamma
208
while True:
209
U = U1.copy()
210
delta = 0
211
for s in mdp.states:
212
U1[s] = R(s) + gamma * max(sum(p * U[s1] for (p, s1) in T(s, a))
213
for a in mdp.actions(s))
214
delta = max(delta, abs(U1[s] - U[s]))
215
if delta <= epsilon * (1 - gamma) / gamma:
216
return U
217
218
219
def best_policy(mdp, U):
220
"""Given an MDP and a utility function U, determine the best policy,
221
as a mapping from state to action. [Equation 17.4]"""
222
223
pi = {}
224
for s in mdp.states:
225
pi[s] = max(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp))
226
return pi
227
228
229
def expected_utility(a, s, U, mdp):
230
"""The expected utility of doing a in state s, according to the MDP and U."""
231
232
return sum(p * U[s1] for (p, s1) in mdp.T(s, a))
233
234
235
# ______________________________________________________________________________
236
237
238
def policy_iteration(mdp):
239
"""Solve an MDP by policy iteration [Figure 17.7]"""
240
241
U = {s: 0 for s in mdp.states}
242
pi = {s: random.choice(mdp.actions(s)) for s in mdp.states}
243
while True:
244
U = policy_evaluation(pi, U, mdp)
245
unchanged = True
246
for s in mdp.states:
247
a = max(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp))
248
if a != pi[s]:
249
pi[s] = a
250
unchanged = False
251
if unchanged:
252
return pi
253
254
255
def policy_evaluation(pi, U, mdp, k=20):
256
"""Return an updated utility mapping U from each state in the MDP to its
257
utility, using an approximation (modified policy iteration)."""
258
259
R, T, gamma = mdp.R, mdp.T, mdp.gamma
260
for i in range(k):
261
for s in mdp.states:
262
U[s] = R(s) + gamma * sum(p * U[s1] for (p, s1) in T(s, pi[s]))
263
return U
264
265
266
class POMDP(MDP):
267
"""A Partially Observable Markov Decision Process, defined by
268
a transition model P(s'|s,a), actions A(s), a reward function R(s),
269
and a sensor model P(e|s). We also keep track of a gamma value,
270
for use by algorithms. The transition and the sensor models
271
are defined as matrices. We also keep track of the possible states
272
and actions for each state. [Page 659]."""
273
274
def __init__(self, actions, transitions=None, evidences=None, rewards=None, states=None, gamma=0.95):
275
"""Initialize variables of the pomdp"""
276
277
if not (0 < gamma <= 1):
278
raise ValueError('A POMDP must have 0 < gamma <= 1')
279
280
self.states = states
281
self.actions = actions
282
283
# transition model cannot be undefined
284
self.t_prob = transitions or {}
285
if not self.t_prob:
286
print('Warning: Transition model is undefined')
287
288
# sensor model cannot be undefined
289
self.e_prob = evidences or {}
290
if not self.e_prob:
291
print('Warning: Sensor model is undefined')
292
293
self.gamma = gamma
294
self.rewards = rewards
295
296
def remove_dominated_plans(self, input_values):
297
"""
298
Remove dominated plans.
299
This method finds all the lines contributing to the
300
upper surface and removes those which don't.
301
"""
302
303
values = [val for action in input_values for val in input_values[action]]
304
values.sort(key=lambda x: x[0], reverse=True)
305
306
best = [values[0]]
307
y1_max = max(val[1] for val in values)
308
tgt = values[0]
309
prev_b = 0
310
prev_ix = 0
311
while tgt[1] != y1_max:
312
min_b = 1
313
min_ix = 0
314
for i in range(prev_ix + 1, len(values)):
315
if values[i][0] - tgt[0] + tgt[1] - values[i][1] != 0:
316
trans_b = (values[i][0] - tgt[0]) / (values[i][0] - tgt[0] + tgt[1] - values[i][1])
317
if 0 <= trans_b <= 1 and trans_b > prev_b and trans_b < min_b:
318
min_b = trans_b
319
min_ix = i
320
prev_b = min_b
321
prev_ix = min_ix
322
tgt = values[min_ix]
323
best.append(tgt)
324
325
return self.generate_mapping(best, input_values)
326
327
def remove_dominated_plans_fast(self, input_values):
328
"""
329
Remove dominated plans using approximations.
330
Resamples the upper boundary at intervals of 100 and
331
finds the maximum values at these points.
332
"""
333
334
values = [val for action in input_values for val in input_values[action]]
335
values.sort(key=lambda x: x[0], reverse=True)
336
337
best = []
338
sr = 100
339
for i in range(sr + 1):
340
x = i / float(sr)
341
maximum = (values[0][1] - values[0][0]) * x + values[0][0]
342
tgt = values[0]
343
for value in values:
344
val = (value[1] - value[0]) * x + value[0]
345
if val > maximum:
346
maximum = val
347
tgt = value
348
349
if all(any(tgt != v) for v in best):
350
best.append(np.array(tgt))
351
352
return self.generate_mapping(best, input_values)
353
354
def generate_mapping(self, best, input_values):
355
"""Generate mappings after removing dominated plans"""
356
357
mapping = defaultdict(list)
358
for value in best:
359
for action in input_values:
360
if any(all(value == v) for v in input_values[action]):
361
mapping[action].append(value)
362
363
return mapping
364
365
def max_difference(self, U1, U2):
366
"""Find maximum difference between two utility mappings"""
367
368
for k, v in U1.items():
369
sum1 = 0
370
for element in U1[k]:
371
sum1 += sum(element)
372
sum2 = 0
373
for element in U2[k]:
374
sum2 += sum(element)
375
return abs(sum1 - sum2)
376
377
378
class Matrix:
379
"""Matrix operations class"""
380
381
@staticmethod
382
def add(A, B):
383
"""Add two matrices A and B"""
384
385
res = []
386
for i in range(len(A)):
387
row = []
388
for j in range(len(A[0])):
389
row.append(A[i][j] + B[i][j])
390
res.append(row)
391
return res
392
393
@staticmethod
394
def scalar_multiply(a, B):
395
"""Multiply scalar a to matrix B"""
396
397
for i in range(len(B)):
398
for j in range(len(B[0])):
399
B[i][j] = a * B[i][j]
400
return B
401
402
@staticmethod
403
def multiply(A, B):
404
"""Multiply two matrices A and B element-wise"""
405
406
matrix = []
407
for i in range(len(B)):
408
row = []
409
for j in range(len(B[0])):
410
row.append(B[i][j] * A[j][i])
411
matrix.append(row)
412
413
return matrix
414
415
@staticmethod
416
def matmul(A, B):
417
"""Inner-product of two matrices"""
418
419
return [[sum(ele_a * ele_b for ele_a, ele_b in zip(row_a, col_b)) for col_b in list(zip(*B))] for row_a in A]
420
421
@staticmethod
422
def transpose(A):
423
"""Transpose a matrix"""
424
425
return [list(i) for i in zip(*A)]
426
427
428
def pomdp_value_iteration(pomdp, epsilon=0.1):
429
"""Solving a POMDP by value iteration."""
430
431
U = {'': [[0] * len(pomdp.states)]}
432
count = 0
433
while True:
434
count += 1
435
prev_U = U
436
values = [val for action in U for val in U[action]]
437
value_matxs = []
438
for i in values:
439
for j in values:
440
value_matxs.append([i, j])
441
442
U1 = defaultdict(list)
443
for action in pomdp.actions:
444
for u in value_matxs:
445
u1 = Matrix.matmul(Matrix.matmul(pomdp.t_prob[int(action)],
446
Matrix.multiply(pomdp.e_prob[int(action)], Matrix.transpose(u))),
447
[[1], [1]])
448
u1 = Matrix.add(Matrix.scalar_multiply(pomdp.gamma, Matrix.transpose(u1)), [pomdp.rewards[int(action)]])
449
U1[action].append(u1[0])
450
451
U = pomdp.remove_dominated_plans_fast(U1)
452
# replace with U = pomdp.remove_dominated_plans(U1) for accurate calculations
453
454
if count > 10:
455
if pomdp.max_difference(U, prev_U) < epsilon * (1 - pomdp.gamma) / pomdp.gamma:
456
return U
457
458
459
__doc__ += """
460
>>> pi = best_policy(sequential_decision_environment, value_iteration(sequential_decision_environment, .01))
461
462
>>> sequential_decision_environment.to_arrows(pi)
463
[['>', '>', '>', '.'], ['^', None, '^', '.'], ['^', '>', '^', '<']]
464
465
>>> from utils import print_table
466
467
>>> print_table(sequential_decision_environment.to_arrows(pi))
468
> > > .
469
^ None ^ .
470
^ > ^ <
471
472
>>> print_table(sequential_decision_environment.to_arrows(policy_iteration(sequential_decision_environment)))
473
> > > .
474
^ None ^ .
475
^ > ^ <
476
""" # noqa
477
478
"""
479
s = { 'a' : { 'plan1' : [(0.2, 'a'), (0.3, 'b'), (0.3, 'c'), (0.2, 'd')],
480
'plan2' : [(0.4, 'a'), (0.15, 'b'), (0.45, 'c')],
481
'plan3' : [(0.2, 'a'), (0.5, 'b'), (0.3, 'c')],
482
},
483
'b' : { 'plan1' : [(0.2, 'a'), (0.6, 'b'), (0.2, 'c'), (0.1, 'd')],
484
'plan2' : [(0.6, 'a'), (0.2, 'b'), (0.1, 'c'), (0.1, 'd')],
485
'plan3' : [(0.3, 'a'), (0.3, 'b'), (0.4, 'c')],
486
},
487
'c' : { 'plan1' : [(0.3, 'a'), (0.5, 'b'), (0.1, 'c'), (0.1, 'd')],
488
'plan2' : [(0.5, 'a'), (0.3, 'b'), (0.1, 'c'), (0.1, 'd')],
489
'plan3' : [(0.1, 'a'), (0.3, 'b'), (0.1, 'c'), (0.5, 'd')],
490
},
491
}
492
"""
493
494