Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
greyhatguy007
GitHub Repository: greyhatguy007/Machine-Learning-Specialization-Coursera
Path: blob/main/C3 - Unsupervised Learning, Recommenders, Reinforcement Learning/week3/C3W3A1/utils.py
3564 views
1
import base64
2
import random
3
from itertools import zip_longest
4
5
import imageio
6
import IPython
7
import matplotlib.pyplot as plt
8
import matplotlib.ticker as mticker
9
import numpy as np
10
import pandas as pd
11
import tensorflow as tf
12
from statsmodels.iolib.table import SimpleTable
13
14
15
SEED = 0 # seed for pseudo-random number generator
16
MINIBATCH_SIZE = 64 # mini-batch size
17
TAU = 1e-3 # soft update parameter
18
E_DECAY = 0.995 # ε decay rate for ε-greedy policy
19
E_MIN = 0.01 # minimum ε value for ε-greedy policy
20
21
22
random.seed(SEED)
23
24
25
def get_experiences(memory_buffer):
26
experiences = random.sample(memory_buffer, k=MINIBATCH_SIZE)
27
states = tf.convert_to_tensor(np.array([e.state for e in experiences if e is not None]),dtype=tf.float32)
28
actions = tf.convert_to_tensor(np.array([e.action for e in experiences if e is not None]), dtype=tf.float32)
29
rewards = tf.convert_to_tensor(np.array([e.reward for e in experiences if e is not None]), dtype=tf.float32)
30
next_states = tf.convert_to_tensor(np.array([e.next_state for e in experiences if e is not None]),dtype=tf.float32)
31
done_vals = tf.convert_to_tensor(np.array([e.done for e in experiences if e is not None]).astype(np.uint8),
32
dtype=tf.float32)
33
return (states, actions, rewards, next_states, done_vals)
34
35
36
def check_update_conditions(t, num_steps_upd, memory_buffer):
37
if (t + 1) % num_steps_upd == 0 and len(memory_buffer) > MINIBATCH_SIZE:
38
return True
39
else:
40
return False
41
42
43
def get_new_eps(epsilon):
44
return max(E_MIN, E_DECAY*epsilon)
45
46
47
def get_action(q_values, epsilon=0):
48
if random.random() > epsilon:
49
return np.argmax(q_values.numpy()[0])
50
else:
51
return random.choice(np.arange(4))
52
53
54
def update_target_network(q_network, target_q_network):
55
for target_weights, q_net_weights in zip(target_q_network.weights, q_network.weights):
56
target_weights.assign(TAU * q_net_weights + (1.0 - TAU) * target_weights)
57
58
59
def plot_history(reward_history, rolling_window=20, lower_limit=None,
60
upper_limit=None, plot_rw=True, plot_rm=True):
61
62
if lower_limit is None or upper_limit is None:
63
rh = reward_history
64
xs = [x for x in range(len(reward_history))]
65
else:
66
rh = reward_history[lower_limit:upper_limit]
67
xs = [x for x in range(lower_limit,upper_limit)]
68
69
df = pd.DataFrame(rh)
70
rollingMean = df.rolling(rolling_window).mean()
71
72
plt.figure(figsize=(10,7), facecolor='white')
73
74
if plot_rw:
75
plt.plot(xs, rh, linewidth=1, color='cyan')
76
if plot_rm:
77
plt.plot(xs, rollingMean, linewidth=2, color='magenta')
78
79
text_color = 'black'
80
81
ax = plt.gca()
82
ax.set_facecolor('black')
83
plt.grid()
84
# plt.title("Total Point History", color=text_color, fontsize=40)
85
plt.xlabel('Episode', color=text_color, fontsize=30)
86
plt.ylabel('Total Points', color=text_color, fontsize=30)
87
yNumFmt = mticker.StrMethodFormatter('{x:,}')
88
ax.yaxis.set_major_formatter(yNumFmt)
89
ax.tick_params(axis='x', colors=text_color)
90
ax.tick_params(axis='y', colors=text_color)
91
plt.show()
92
93
94
def display_table(initial_state, action, next_state, reward, done):
95
96
action_labels = ["Do nothing", "Fire right engine", "Fire main engine", "Fire left engine"]
97
98
# Do not use column headers
99
column_headers = None
100
101
with np.printoptions(formatter={'float': '{:.3f}'.format}):
102
table_info = [("Initial State:", [f"{initial_state}"]),
103
("Action:", [f"{action_labels[action]}"]),
104
("Next State:", [f"{next_state}"]),
105
("Reward Received:", [f"{reward:.3f}"]),
106
("Episode Terminated:", [f"{done}"])]
107
108
# Generate table
109
row_labels, data = zip_longest(*table_info)
110
table = SimpleTable(data, column_headers, row_labels)
111
112
return table
113
114
115
def embed_mp4(filename):
116
"""Embeds an mp4 file in the notebook."""
117
video = open(filename,'rb').read()
118
b64 = base64.b64encode(video)
119
tag = '''
120
<video width="840" height="480" controls>
121
<source src="data:video/mp4;base64,{0}" type="video/mp4">
122
Your browser does not support the video tag.
123
</video>'''.format(b64.decode())
124
return IPython.display.HTML(tag)
125
126
127
def create_video(filename, env, q_network, fps=30):
128
with imageio.get_writer(filename, fps=fps) as video:
129
done = False
130
state = env.reset()
131
frame = env.render(mode="rgb_array")
132
video.append_data(frame)
133
while not done:
134
state = np.expand_dims(state, axis=0)
135
q_values = q_network(state)
136
action = np.argmax(q_values.numpy()[0])
137
state, _, done, _ = env.step(action)
138
frame = env.render(mode="rgb_array")
139
video.append_data(frame)
140