Kernel: Python 3 (ipykernel)
Machine Learning with PyTorch and Scikit-Learn
-- Code Examples
Package version checks
Add folder to path in order to load from the check_packages.py script:
In [1]:
import sys sys.path.insert(0, '..')
Check recommended package versions:
In [2]:
from python_environment_check import check_packages d = { 'torch': '1.8.0', 'numpy': '1.21.2', 'matplotlib': '3.4.3', 'gym': '0.20.0' } check_packages(d)
Out[2]:
[OK] Your Python version is 3.9.6 | packaged by conda-forge | (default, Jul 11 2021, 03:35:11)
[Clang 11.1.0 ]
[OK] torch 1.10.0
[OK] numpy 1.22.0
[OK] matplotlib 3.5.1
[OK] gym 0.21.0
Chapter 19 - Reinforcement Learning for Decision Making in Complex Environments
In [2]:
from IPython.display import Image %matplotlib inline
Introduction: Learning from experience
Understanding reinforcement learning
Defining the agent-environment interface of a reinforcement learning system
In [4]:
Image(filename='figures/19_01.png', width=500)
Out[4]:
The mathematical formulation of Markov decision processes
In [7]:
Image(filename='figures/19_02.png', width=500)
Out[7]:
Visualization of a Markov process
In [8]:
Image(filename='figures/19_03.png', width=700)
Out[8]:
Episodic versus continuing tasks
RL terminology: Return, policy and value function
The return
InfoBox: Intuition behind discount factor
In [10]:
# Intuition behind discount factor Image(filename='figures/19_04.png', width=500)
Out[10]:
Policy
Value function
Dynamic programming using the Bellman equation
Reinforcement learning algorithms
In [11]:
Image(filename='figures/19_05.png', width=500)
Out[11]:
Dynamic programming
Policy evaluation: predicting the value function with dynamic programming
Improving the policy using the estimated value function
Policy iteration
Value iteration
Reinforcement learning with Monte Carlo
State-value function estimation using MC
Action-value function estimation using MC
Finding an optimal policy using MC control
Policy Improvement: computing the greedy policy from the action-value function
Temporal difference (TD) learning
TD prediction
On-policy TD control (SARSA)
Off-policy TD control (Q-learning)
Implementing our first RL algorithm
Introducing the OpenAI Gym toolkit
Working with the existing environments in OpenAI Gym
In [12]:
Image(filename='figures/19_06.png', width=600)
Out[12]:
A grid world example
In [13]:
Image(filename='figures/19_07.png', width=600)
Out[13]:
Implementing the grid world environment in OpenAI Gym
# coding: utf-8 # Python Machine Learning, PyTorch Edition by Sebastian Raschka (https://sebastianraschka.com), Yuxi (Hayden) Liu # (https://www.mlexample.com/) & Vahid Mirjalili (http://vahidmirjalili.com), Packt Publishing Ltd. 2021 # # Code Repository: # # Code License: MIT License (https://github.com/ LICENSE.txt) ################################################################################# # Chapter 19 - Reinforcement Learning for Decision Making in Complex Environments ################################################################################# # Script: gridworld_env.py import numpy as np from gym.envs.toy_text import discrete from collections import defaultdict import time import pickle import os from gym.envs.classic_control import rendering CELL_SIZE = 100 MARGIN = 10 def get_coords(row, col, loc='center'): xc = (col + 1.5) * CELL_SIZE yc = (row + 1.5) * CELL_SIZE if loc == 'center': return xc, yc elif loc == 'interior_corners': half_size = CELL_SIZE//2 - MARGIN xl, xr = xc - half_size, xc + half_size yt, yb = xc - half_size, xc + half_size return [(xl, yt), (xr, yt), (xr, yb), (xl, yb)] elif loc == 'interior_triangle': x1, y1 = xc, yc + CELL_SIZE//3 x2, y2 = xc + CELL_SIZE//3, yc - CELL_SIZE//3 x3, y3 = xc - CELL_SIZE//3, yc - CELL_SIZE//3 return [(x1, y1), (x2, y2), (x3, y3)] def draw_object(coords_list): if len(coords_list) == 1: # -> circle obj = rendering.make_circle(int(0.45*CELL_SIZE)) obj_transform = rendering.Transform() obj.add_attr(obj_transform) obj_transform.set_translation(*coords_list[0]) obj.set_color(0.2, 0.2, 0.2) # -> black elif len(coords_list) == 3: # -> triangle obj = rendering.FilledPolygon(coords_list) obj.set_color(0.9, 0.6, 0.2) # -> yellow elif len(coords_list) > 3: # -> polygon obj = rendering.FilledPolygon(coords_list) obj.set_color(0.4, 0.4, 0.8) # -> blue return obj class GridWorldEnv(discrete.DiscreteEnv): def __init__(self, num_rows=4, num_cols=6, delay=0.05): self.num_rows = num_rows self.num_cols = num_cols self.delay = delay move_up = lambda row, col: (max(row - 1, 0), col) move_down = lambda row, col: (min(row + 1, num_rows - 1), col) move_left = lambda row, col: (row, max(col - 1, 0)) move_right = lambda row, col: (row, min(col + 1, num_cols - 1)) self.action_defs = {0: move_up, 1: move_right, 2: move_down, 3: move_left} # Number of states/actions nS = num_cols * num_rows nA = len(self.action_defs) self.grid2state_dict = {(s // num_cols, s % num_cols): s for s in range(nS)} self.state2grid_dict = {s: (s // num_cols, s % num_cols) for s in range(nS)} # Gold state gold_cell = (num_rows // 2, num_cols - 2) # Trap states trap_cells = [((gold_cell[0] + 1), gold_cell[1]), (gold_cell[0], gold_cell[1] - 1), ((gold_cell[0] - 1), gold_cell[1])] gold_state = self.grid2state_dict[gold_cell] trap_states = [self.grid2state_dict[(r, c)] for (r, c) in trap_cells] self.terminal_states = [gold_state] + trap_states print(self.terminal_states) # Build the transition probability P = defaultdict(dict) for s in range(nS): row, col = self.state2grid_dict[s] P[s] = defaultdict(list) for a in range(nA): action = self.action_defs[a] next_s = self.grid2state_dict[action(row, col)] # Terminal state if self.is_terminal(next_s): r = (1.0 if next_s == self.terminal_states[0] else -1.0) else: r = 0.0 if self.is_terminal(s): done = True next_s = s else: done = False P[s][a] = [(1.0, next_s, r, done)] # Initial state distribution isd = np.zeros(nS) isd[0] = 1.0 super(GridWorldEnv, self).__init__(nS, nA, P, isd) self.viewer = None self._build_display(gold_cell, trap_cells) def is_terminal(self, state): return state in self.terminal_states def _build_display(self, gold_cell, trap_cells): screen_width = (self.num_cols + 2) * CELL_SIZE screen_height = (self.num_rows + 2) * CELL_SIZE self.viewer = rendering.Viewer(screen_width, screen_height) all_objects = [] # List of border points' coordinates bp_list = [ (CELL_SIZE - MARGIN, CELL_SIZE - MARGIN), (screen_width - CELL_SIZE + MARGIN, CELL_SIZE - MARGIN), (screen_width - CELL_SIZE + MARGIN, screen_height - CELL_SIZE + MARGIN), (CELL_SIZE - MARGIN, screen_height - CELL_SIZE + MARGIN) ] border = rendering.PolyLine(bp_list, True) border.set_linewidth(5) all_objects.append(border) # Vertical lines for col in range(self.num_cols + 1): x1, y1 = (col + 1) * CELL_SIZE, CELL_SIZE x2, y2 = (col + 1) * CELL_SIZE, \ (self.num_rows + 1) * CELL_SIZE line = rendering.PolyLine([(x1, y1), (x2, y2)], False) all_objects.append(line) # Horizontal lines for row in range(self.num_rows + 1): x1, y1 = CELL_SIZE, (row + 1) * CELL_SIZE x2, y2 = (self.num_cols + 1) * CELL_SIZE, \ (row + 1) * CELL_SIZE line = rendering.PolyLine([(x1, y1), (x2, y2)], False) all_objects.append(line) # Traps: --> circles for cell in trap_cells: trap_coords = get_coords(*cell, loc='center') all_objects.append(draw_object([trap_coords])) # Gold: --> triangle gold_coords = get_coords(*gold_cell, loc='interior_triangle') all_objects.append(draw_object(gold_coords)) # Agent --> square or robot if (os.path.exists('robot-coordinates.pkl') and CELL_SIZE == 100): agent_coords = pickle.load( open('robot-coordinates.pkl', 'rb')) starting_coords = get_coords(0, 0, loc='center') agent_coords += np.array(starting_coords) else: agent_coords = get_coords(0, 0, loc='interior_corners') agent = draw_object(agent_coords) self.agent_trans = rendering.Transform() agent.add_attr(self.agent_trans) all_objects.append(agent) for obj in all_objects: self.viewer.add_geom(obj) def render(self, mode='human', done=False): if done: sleep_time = 1 else: sleep_time = self.delay x_coord = self.s % self.num_cols y_coord = self.s // self.num_cols x_coord = (x_coord + 0) * CELL_SIZE y_coord = (y_coord + 0) * CELL_SIZE self.agent_trans.set_translation(x_coord, y_coord) rend = self.viewer.render( return_rgb_array=(mode == 'rgb_array')) time.sleep(sleep_time) return rend def close(self): if self.viewer: self.viewer.close() self.viewer = None if __name__ == '__main__': env = GridWorldEnv(5, 6) for i in range(1): s = env.reset() env.render(mode='human', done=False) while True: action = np.random.choice(env.nA) res = env.step(action) print('Action ', env.s, action, ' -> ', res) env.render(mode='human', done=res[2]) if res[2]: break env.close()
In [10]:
Image(filename='figures/19_08.png', width=600)
Out[10]:
Solving the grid world problem
Implementing the Q-learning algorithm
# coding: utf-8 # Python Machine Learning, PyTorch Edition by Sebastian Raschka (https://sebastianraschka.com), Yuxi (Hayden) Liu # (https://www.mlexample.com/) & Vahid Mirjalili (http://vahidmirjalili.com), Packt Publishing Ltd. 2021 # # Code Repository: # # Code License: MIT License (https://github.com/ /LICENSE.txt) ################################################################################# # Chapter 19 - Reinforcement Learning for Decision Making in Complex Environments ################################################################################# # Script: agent.py from collections import defaultdict import numpy as np class Agent(object): def __init__( self, env, learning_rate=0.01, discount_factor=0.9, epsilon_greedy=0.9, epsilon_min=0.1, epsilon_decay=0.95): self.env = env self.lr = learning_rate self.gamma = discount_factor self.epsilon = epsilon_greedy self.epsilon_min = epsilon_min self.epsilon_decay = epsilon_decay # Define the q_table self.q_table = defaultdict(lambda: np.zeros(self.env.nA)) def choose_action(self, state): if np.random.uniform() < self.epsilon: action = np.random.choice(self.env.nA) else: q_vals = self.q_table[state] perm_actions = np.random.permutation(self.env.nA) q_vals = [q_vals[a] for a in perm_actions] perm_q_argmax = np.argmax(q_vals) action = perm_actions[perm_q_argmax] return action def _learn(self, transition): s, a, r, next_s, done = transition q_val = self.q_table[s][a] if done: q_target = r else: q_target = r + self.gamma*np.max(self.q_table[next_s]) # Update the q_table self.q_table[s][a] += self.lr * (q_target - q_val) # Adjust the epsilon self._adjust_epsilon() def _adjust_epsilon(self): if self.epsilon > self.epsilon_min: self.epsilon *= self.epsilon_decay
# coding: utf-8 # Python Machine Learning, PyTorch Edition by Sebastian Raschka (https://sebastianraschka.com), Yuxi (Hayden) Liu # (https://www.mlexample.com/) & Vahid Mirjalili (http://vahidmirjalili.com), Packt Publishing Ltd. 2021 # # Code Repository: # # Code License: MIT License (https://github.com/ /LICENSE.txt) ################################################################################# # Chapter 19 - Reinforcement Learning for Decision Making in Complex Environments ################################################################################# # Script: qlearning.py from gridworld_env import GridWorldEnv from agent import Agent from collections import namedtuple import matplotlib.pyplot as plt import numpy as np np.random.seed(1) Transition = namedtuple( 'Transition', ('state', 'action', 'reward', 'next_state', 'done')) def run_qlearning(agent, env, num_episodes=50): history = [] for episode in range(num_episodes): state = env.reset() env.render(mode='human') final_reward, n_moves = 0.0, 0 while True: action = agent.choose_action(state) next_s, reward, done, _ = env.step(action) agent._learn(Transition(state, action, reward, next_s, done)) env.render(mode='human', done=done) state = next_s n_moves += 1 if done: break final_reward = reward history.append((n_moves, final_reward)) print(f'Episode {episode}: Reward {final_reward:.2} #Moves {n_moves}') return history def plot_learning_history(history): fig = plt.figure(1, figsize=(14, 10)) ax = fig.add_subplot(2, 1, 1) episodes = np.arange(len(history)) moves = np.array([h[0] for h in history]) plt.plot(episodes, moves, lw=4, marker="o", markersize=10) ax.tick_params(axis='both', which='major', labelsize=15) plt.xlabel('Episodes', size=20) plt.ylabel('# moves', size=20) ax = fig.add_subplot(2, 1, 2) rewards = np.array([h[1] for h in history]) plt.step(episodes, rewards, lw=4) ax.tick_params(axis='both', which='major', labelsize=15) plt.xlabel('Episodes', size=20) plt.ylabel('Final rewards', size=20) plt.savefig('q-learning-history.png', dpi=300) plt.show() if __name__ == '__main__': env = GridWorldEnv(num_rows=5, num_cols=6) agent = Agent(env) history = run_qlearning(agent, env) env.close() plot_learning_history(history)
In [14]:
Image(filename='figures/19_09.png', width=600)
Out[14]:
A glance at deep Q-learning
In [15]:
Image(filename='figures/19_10.png', width=600)
Out[15]:
Training a DQN model according to Q-learning algorithm
Replay memory
In [16]:
Image(filename='figures/19_11.png', width=600)
Out[16]:
Determining the target values for computing the loss
In [17]:
Image(filename='figures/19_12.png', width=600)
Out[17]:
Implementing a deep Q-learning algorithm
# coding: utf-8 # Python Machine Learning, PyTorch Edition by Sebastian Raschka (https://sebastianraschka.com), Yuxi (Hayden) Liu # (https://www.mlexample.com/) & Vahid Mirjalili (http://vahidmirjalili.com), Packt Publishing Ltd. 2021 # # Code Repository: https://github.com # # Code License: MIT License (https://github.com/ /LICENSE.txt) ################################################################################# # Chapter 19 - Reinforcement Learning for Decision Making in Complex Environments ################################################################################# # Script: carpole/main.py import gym import numpy as np import torch import torch.nn as nn import random import matplotlib.pyplot as plt from collections import namedtuple from collections import deque np.random.seed(1) torch.manual_seed(1) Transition = namedtuple( 'Transition', ('state', 'action', 'reward', 'next_state', 'done')) class DQNAgent: def __init__( self, env, discount_factor=0.95, epsilon_greedy=1.0, epsilon_min=0.01, epsilon_decay=0.995, learning_rate=1e-3, max_memory_size=2000): self.env = env self.state_size = env.observation_space.shape[0] self.action_size = env.action_space.n self.memory = deque(maxlen=max_memory_size) self.gamma = discount_factor self.epsilon = epsilon_greedy self.epsilon_min = epsilon_min self.epsilon_decay = epsilon_decay self.lr = learning_rate self._build_nn_model() def _build_nn_model(self): self.model = nn.Sequential(nn.Linear(self.state_size, 256), nn.ReLU(), nn.Linear(256, 128), nn.ReLU(), nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, self.action_size)) self.loss_fn = nn.MSELoss() self.optimizer = torch.optim.Adam( self.model.parameters(), self.lr) def remember(self, transition): self.memory.append(transition) def choose_action(self, state): if np.random.rand() <= self.epsilon: return np.random.choice(self.action_size) with torch.no_grad(): q_values = self.model(torch.tensor(state, dtype=torch.float32))[0] return torch.argmax(q_values).item() # returns action def _learn(self, batch_samples): batch_states, batch_targets = [], [] for transition in batch_samples: s, a, r, next_s, done = transition with torch.no_grad(): if done: target = r else: pred = self.model(torch.tensor(next_s, dtype=torch.float32))[0] target = r + self.gamma * pred.max() target_all = self.model(torch.tensor(s, dtype=torch.float32))[0] target_all[a] = target batch_states.append(s.flatten()) batch_targets.append(target_all) self._adjust_epsilon() self.optimizer.zero_grad() pred = self.model(torch.tensor(batch_states, dtype=torch.float32)) loss = self.loss_fn(pred, torch.stack(batch_targets)) loss.backward() self.optimizer.step() return loss.item() def _adjust_epsilon(self): if self.epsilon > self.epsilon_min: self.epsilon *= self.epsilon_decay def replay(self, batch_size): samples = random.sample(self.memory, batch_size) return self._learn(samples) def plot_learning_history(history): fig = plt.figure(1, figsize=(14, 5)) ax = fig.add_subplot(1, 1, 1) episodes = np.arange(len(history)) + 1 plt.plot(episodes, history, lw=4, marker='o', markersize=10) ax.tick_params(axis='both', which='major', labelsize=15) plt.xlabel('Episodes', size=20) plt.ylabel('# Total Rewards', size=20) plt.show() # General settings EPISODES = 200 batch_size = 32 init_replay_memory_size = 500 if __name__ == '__main__': env = gym.make('CartPole-v1') agent = DQNAgent(env) state = env.reset() state = np.reshape(state, [1, agent.state_size]) # Filling up the replay-memory for i in range(init_replay_memory_size): action = agent.choose_action(state) next_state, reward, done, _ = env.step(action) next_state = np.reshape(next_state, [1, agent.state_size]) agent.remember(Transition(state, action, reward, next_state, done)) if done: state = env.reset() state = np.reshape(state, [1, agent.state_size]) else: state = next_state total_rewards, losses = [], [] for e in range(EPISODES): state = env.reset() if e % 10 == 0: env.render() state = np.reshape(state, [1, agent.state_size]) for i in range(500): action = agent.choose_action(state) next_state, reward, done, _ = env.step(action) next_state = np.reshape(next_state, [1, agent.state_size]) agent.remember(Transition(state, action, reward, next_state, done)) state = next_state if e % 10 == 0: env.render() if done: total_rewards.append(i) print(f'Episode: {e}/{EPISODES}, Total reward: {i}') break loss = agent.replay(batch_size) losses.append(loss) plot_learning_history(total_rewards)
In [19]:
Image(filename='figures/19_13.png', width=600)
Out[19]:
...
Chapter and book summary
...
In [ ]: