Sharedelevator class.ipynbOpen in CoCalc
###### Elevator Simulation
## CS166: Modeling, Simulation & Decision Making

import math
import numpy as np
import random
import matplotlib.pyplot as plt

from scipy.stats import ttest_ind
from matplotlib import ticker
from copy import deepcopy

%matplotlib inline
plt.rcParams['figure.figsize'] = (18, 4)

# # Simulation parameters
# passenger_count = int(input("How many passengers will use the elevator?"))
# max_capacity = int(input("How many people can the elevator carry?"))
# n_floors = int(input("How many floors are there in the building?"))
# max_time = range(1800) # Time steps

passenger_count = 100
max_capacity = 5
n_floors = 20
max_time = range(1800) # Time steps


Definition of the elevator class

class Elevator:

def __init__(self, max_floors = n_floors, default_floor = 1, capacity = max_capacity, logs = False):
'''
Initializes an elevator object.
Inputs:

* max_floors (int) The number of floors in the building.

* default_floor (int) The floor the elevator returns to if
there are no calls or requests.

* capacity (int) The maximum number of people the elevator
can carry at the same time.

* logs (bool) Whether to print several debugging messages
when running the simulation.

'''

self.max_floors = max_floors
self.default_floor = default_floor
self.current_floor = self.default_floor
self.user_count = 0 # How many users are in the elevator.
self.capacity = capacity #OR max_capacity = int(input("How many people can the elevator carry?"))
self.call_pool = {}
self.calls = {} # which user pressed the button and attempting to go in which direction
self.requests = {} # which buttons were pressed inside the elevator by whom
self.direction = +1 # +1 if the elevator is going upward, -1 if going downward

##Nico's methods

'''
At each time step, there is a chance that will method will be called from
the building method timestep(), adding a random user to the list of calls.
'''
# Choose a random user from pool. Assign the popped user to dictionary of elevator calls.
calling_user = random.choice(list(self.call_pool.keys()))
self.calls[calling_user] = self.call_pool.pop(calling_user) #remove one call from the call_pool, append it to the elevator calls
return calling_user

def stop(self):
'''
At each time step, this method is called to evaluated whether the elevator
should stop at the current floor. Returns a boolean that answers the
question: Should the elevator stop?
'''
calls_copy = list(self.calls.values())
calls_copy = [abs(call) for call in calls_copy]
# Checks whether current floor is the desired destination of any users inside.
if (self.current_floor in self.requests.values()):
return True

# Checks whether current floor and direction matches the call of any users outside, if capacity has not been reached.
elif (self.direction*self.current_floor in self.calls.values()) and (self.user_count < self.capacity):
return True

elif (self.current_floor in calls_copy) and (self.user_count == 0):
return True
else:
return False # Move elevator

def swap(self, users, logs = False):
'''
At each time step, if the elevator stops, this method allows users to
leave or enter the elevator.
'''

#we want to pop things from dictionary while we iterate
calls_copy = deepcopy(self.calls)
requests_copy = deepcopy(self.requests)

# Allows people to leave, iterating over passengers and floors in the dictionary of requests.
for passenger, floor in requests_copy.items():
if floor == self.current_floor:
self.requests.pop(passenger) # Return dropped passenger and remove it from queue of requests_over_time
dropped_passenger = passenger
# Question: Can't we just pop and use the object passenger to set .in_elevator to False?
self.user_count -=1
if logs: print('Passenger %d was dropped at floor %d.' %(passenger, self.current_floor))
users[dropped_passenger].in_elevator = False # dropped passenger is now outside the elevator

# Allows people to enter, iterating over passengers and floors in the dictionary of calls.
for passenger, floor in calls_copy.items():
if self.user_count == self.capacity: # If the elevator is full, break.
break

elif self.user_count == 0:
if abs(floor) == self.current_floor:
self.calls.pop(passenger) # Return added passenger and remove it from queue of calls
self.user_count +=1
if logs: print('Passenger %d entered the elevator at floor %d.' %(passenger, self.current_floor))
self.requests[passenger] = users[passenger].destination # Add their destination to requests
if logs: print(users[passenger].destination,'DESTINATION OF PASSENGER',passenger)

elif floor == self.direction*self.current_floor: # If floor and direction match call
self.calls.pop(passenger) # Return added passenger and remove it from queue of calls
# Same question from loop above applies.
self.user_count +=1
if logs: print('Passenger %d entered the elevator at floor %d.' %(passenger, self.current_floor))
self.requests[passenger] = users[passenger].destination # Add their destination to requests
if logs: print(users[passenger].destination,'DESTINATION OF PASSENGER',passenger)

def path(self):
'''
This strategy looks at each time step where it has to move next.
It takes into account the calls and requests, prioritizing requests over calls.
If there are no requests or calls it goes to the default (ground) floor.
If there are no requests, but calls are present, it prioritizes the calls that are going to **sweep** the most floors while moving.

'''

# Create a list of all the called floors
current_calls = list(self.calls.values())

if self.current_floor >= self.max_floors: # >= for debugging
self.direction = -1 # Direction is either -1 or 1

# If there are no requests from inside the elevator, pursue the strategy
elif len(self.requests) == 0:

# If there are no calls, go to default floor
if len(current_calls) == 0:
if self.current_floor-self.default_floor > 0:
self.direction = -1
# If already on default floor, stay
else:
self.direction = 0

# If there are calls outside the elevator
else:
#Get the min of all calls. Negative numbers are calls going down, so it will prioritize high floors going down
negmax = min(current_calls)
#Create a list with only the calls going up

poscalls = [i for i in current_calls if i > 1]
if len(poscalls)>0:
#Min of the list of calls going up, that way you end up with the lowest floor calling to go up
posmin = min(poscalls)
#If there's only 1 call and no requests, go towards the called floor
if len(current_calls) == 1:

if current_calls[0]>0:
direction = self.current_floor - current_calls[0]
if direction >0:
self.direction = -1
else:
self.direction = +1
else:
direction = self.current_floor + current_calls[0]
if direction >0:
self.direction = -1
else:
self.direction = +1

elif 1 in current_calls:
#If multiple calls, prioritize the top or ground floor. If both are being called, then go go the one closest to current floor.
if self.max_floors in current_calls:
if current_floor<=(0.5*self.max_floors):

self.direction = -1
else:
self.direction = +1
elif self.max_floors in current_calls:
self.direction = +1
#If there are no calls going up just go towards the highest floor being called
elif len(poscalls) == 0:
direction = self.current_floor + negmax
if direction > 0:
self.direction = -1
else:
self.direction = +1
#If there are only calls going up, go towards the lowest up call.
elif posmin == negmax:
direction = self.current_floor - negmax
if direction > 0:
self.direction = -1
else:
self.direction = +1
#If both calls going up and down are present, choose the one that is closest to the ground floor or top floor, respectively.
elif (posmin-1) < (self.max_floors + negmax):
if self.current_floor < posmin:
self.direction = +1
else:
self.direction = -1
elif (posmin-1) > (self.max_floors + negmax):

if self.current_floor>(negmax*-1):
self.direction = -1
else:
self.direction = +1
else:
#If the distance between the calls going up and down to its 'roof floors' are the same,
#prioritize the one closest to current floor
pos = abs(self.current_floor - posmin)
neg = abs(self.current_floor - negmax)
if pos<neg:
if (self.current_floor - posmin) >0:
self.direction = -1
else:
self.direction = +1
else:
if (self.current_floor + negmax) >0:
self.direction = -1
else:
self.direction = +1

else:
#If there are requests, go towards any requested floor, as they are all in the same direction.
request1 = list(self.requests.values())
direction = self.current_floor - request1[0]
if direction < 0:
self.direction = +1
else:

self.direction = -1

def move(self, strategy = 1, logs = False):
'''
At each time step, move the elevator if it has not stopped.
'''

if self.direction == +1:
self.current_floor += 1
elif self.direction == -1:
self.current_floor -= 1

if self.direction == 1:
dir_string = 'up'
elif self.direction == -1:
dir_string = 'down'
elif self.direction == 0 :
dir_string = 'No movement'

if logs: print('Elevator moves %s from floor %d carrying %d people.' %(dir_string, self.current_floor, self.user_count))

if strategy == 1:
if self.current_floor >= self.max_floors: #>= for debugging
self.direction = -1 #direction is either -1 or 1

if self.current_floor == 1:
self.direction = +1





Definition of the building class

class Building:

def __init__(self, elevator, users):
#     '''
#     Initializes the building object.
#     Inputs:

#      * max_floors (int) The number of floors in the building.

#      * elevator (Elevator) The elevator used in the simulation.

#      * capacity (list of Passenger) A list of users used in the sim.

#      * logs (bool) Whether to print several debugging messages
#        when running the simulation.
#     '''

self.max_floors = elevator.max_floors #OR n_floors = int(input("How many floors are there in the building?"))
self.elevator = elevator
self.users = users

# There arrays are storages for values to be plotted later
self.calls_over_time = []
self.requests_over_time = []
self.count_over_time = []

def timestep(self, elevator, users, strategy = 1, logs = False):
#         '''
#         This is the main control function of the simulation. It checks
#         whether the elevator should stop or move. If it stops, it allows
#         users to come in and out of the elevator. If it moves, it computes
#         the appropriate path.
#         '''

#print(elevator.user_count, elevator.current_floor*elevator.direction, elevator.calls) #check evolution of attributes

# Check if elevator needs to stop
if elevator.stop(): # If the elevator stops, do as below
if logs: print('Elevator stops at %d.' %(elevator.current_floor))
elevator.swap(users, logs = logs) # Add and drop passengers
else: # If elevator needs to move do as follows
if strategy == 2: elevator.path() # Check where it has to go

elevator.move(logs = logs, strategy = strategy) # Move a floor up or down

if random.random() < 0.1: #10% chance of adding a new call from the pool regardless of action taken
try:
if logs: print('User %d called the elevator.' %(elevator.add_call()))
except:
pass

def store_data(self, elevator):
#         '''
#         Store final attribute values.
#         '''
self.calls_over_time.append(len(elevator.calls))
self.requests_over_time.append(len(elevator.requests))
self.count_over_time.append(elevator.user_count)


Definition of the passenger class


class Passenger:

def __init__(self, id, elevator, seed, in_elevator = False): #setting default values, otherwise specifiable
#         '''
#         Initializes a passenger object. A passenger has an id, an origin,
#         a destination, a direction, and a tracker on whether they are in the elevator or not.
#         '''
ifrandom.seed(id + seed) #im putting the initialization of the two attributes below so that it takes the seed change

if random.random() < 0.3: # 30% chance the user will start from ground floor
self.current_floor = 1
destination = random.randint(1, elevator.max_floors)
while (destination == 1):
destination = random.randint(2, elevator.max_floors)
self.destination = destination

else: # 70% chance the user will start anywhere else
self.current_floor = random.randint(2, elevator.max_floors)

if random.random() < 0.3: # 30% chance the user will want to go to ground floor
self.destination = 1
else:
stination = random.randint(1, elevator.max_floors) # 70% chance the user will want to go anywhere else
while (destination == self.current_floor):
destination = random.randint(2, elevator.max_floors)
self.destination = destination

self.id = id
self.direction=np.sign(self.destination - self.current_floor) # Direction as the sign of the difference
self.in_elevator = in_elevator

def __str__(self):
'''
Define how a Passenger object should be printed
'''
in_string = '' # String to represent direction ('up' or 'down')
if self.in_elevator == True: in_string = 'inside'
else: in_string = 'outside'

dir_string = ''
if self.direction == 1: dir_string = 'up'
elif self.direction == -1: dir_string = 'down'
elif self.direction == 0: dir_string = 'nowhere (they are at destination)' #this shouldn't be necessary

return('Passenger %d is %s the elevator, wanting to go %s from floor %d to floor %d.') %(self.id,in_string,dir_string,self.current_floor,self.destination)


Overarching loop

def simulate_elevator(passenger_count, n_floors, max_capacity, max_time, logs = False, seed = 1, strategy = 1):
'''
The main function initializes storage arrays and the main objects:
The Elevator, the list of Passengers, and the Building.
The main loop iterates over the array of time steps, storing
the data of the current step and calling on the .timestep()
method of the building object. The loop breaks if no one
is using the elevator.
Inputs:

* passenger_count (int) The numbers of users in the simulation.

* n_floors (int) The number of floors in the building.

* max_capacity (int) The maximum number of people the elevator can hold.

* max_time (int) The maximum number of time steps of the simulation.

* logs (bool) Defaults to false. Whether to print several messages during
execution to help with debugging.

* seed (int) Defaults to 1. A seed that changes the results of the
stochastic parts of the model.

* strategy (int) Defaults to 1. 1 is a basic strategy where the elevator traverses
the entire building before changing directions. 2 is a more complex strategy that
optimizes the elevator's path.

Outputs:

* (dictionary) Includes three arrays storing the time steps of the simmulation,
the number of calls over time, and the number of requests over time.
'''

elevator = Elevator(max_floors = n_floors, capacity = max_capacity) # Initialize elevator
# for line before, an alternative is: passenger_count = int(input("How many passengers will use the elevator?"))
users = [Passenger(id = i, elevator = elevator, seed = seed) for i in range(passenger_count)] # Generate users
building = Building(elevator = elevator, users = users) # Initialize building

for user in users:
elevator.call_pool[user.id] = user.current_floor * user.direction ## IMPORTANT: This replentishes the calling queue!!

for time_step in max_time: #main loop
building.store_data(elevator)
building.timestep(elevator, users, strategy = strategy, logs = logs)
if len(elevator.requests)!=elevator.user_count:
if logs: print(len(elevator.requests),elevator.user_count)

if (time_step > 50) and (len(elevator.calls)==0) and (len(elevator.requests)==0) and (len(elevator.call_pool)==0):
if logs: print('Simulation over after %d timesteps.' %time_step)
break #end simulation if no one is using or calling the elevator

building.store_data(elevator)
return {'timesteps' : time_step,'calls' : building.calls_over_time, 'requests' : building.requests_over_time}


Plot of passenger statistics

waiting = simulate_elevator(passenger_count, n_floors, max_capacity, max_time, strategy = 1)['calls']
inside = simulate_elevator(passenger_count, n_floors, max_capacity, max_time, strategy = 1)['requests']

plot_calls = plt.plot(waiting, label='Waiting for elevator')
plt.title('Trolley elevator')
plt.xlabel('Timestep')
plot_requests = plt.plot(inside, label='In elevator')
plt.xlabel('Timestep')
plt.text(s='Average in elevator: '+str(round(np.mean(inside),2)),y=5.5,x=0)
plt.text(s='Average waiting: '+str(round(np.mean(waiting),2)),y=5,x=0)
plt.legend(loc=1)
plt.show()

waiting = simulate_elevator(passenger_count, n_floors, max_capacity, max_time, strategy = 2)['calls']
inside = simulate_elevator(passenger_count, n_floors, max_capacity, max_time, strategy = 2)['requests']

plot_calls = plt.plot(waiting, label='Waiting for elevator')
plt.title('Smart elevator')
plt.xlabel('Timestep')
plot_requests = plt.plot(inside, label='In elevator')
plt.xlabel('Timestep')
plt.text(s='Average in elevator: '+str(round(np.mean(inside),2)),y=6,x=0)
plt.text(s='Average waiting: '+str(round(np.mean(waiting),2)),y=5,x=0)
plt.legend(loc=1)
plt.show()


timesteps_trolley = []
timesteps_smart = []

for _ in range(2,100):
timesteps_trolley.append(simulate_elevator(passenger_count = 100, n_floors = _, max_capacity = max_capacity, max_time = max_time, strategy = 1)['timesteps'])

for _ in range(2,100):
timesteps_smart.append(simulate_elevator(passenger_count = 100, n_floors = _, max_capacity = max_capacity, max_time = max_time, strategy = 2)['timesteps'])

plt.plot(range(2,100),timesteps_trolley,label='Trolley elevator')
plt.plot(range(2,100),timesteps_smart,label='Smart elevator')
plt.xlabel('Number of floors in the building')
plt.xticks(np.arange(0,100,5))
plt.ylabel('Timesteps required to finish simulation')
plt.legend(loc=1)
plt.show()





timesteps_trolley = []
timesteps_smart = []

for _ in range(120):
timesteps_trolley.append(simulate_elevator(passenger_count = _, n_floors = n_floors, max_capacity = max_capacity, max_time = max_time, strategy = 1)['timesteps'])

for _ in range(120):
timesteps_smart.append(simulate_elevator(passenger_count = _, n_floors = n_floors, max_capacity = max_capacity, max_time = max_time, strategy = 2)['timesteps'])

m_trolley,b_trolley = np.polyfit(range(120), timesteps_trolley, 1)
m_smart,b_smart = np.polyfit(range(120), timesteps_smart, 1)

plt.plot(range(120),timesteps_trolley,label='Trolley elevator',linewidth=0.50)
plt.plot([m_trolley*x + b_trolley for x in range(120)],color=u'#1f77b4',label= str(round(m_trolley,4))+'x +'+str(round(b_trolley,4)))
plt.plot(range(120),timesteps_smart,label='Smart elevator',linewidth=0.50)
plt.plot([m_smart*x + b_trolley for x in range(120)],color=u'#ff7f0e',label= str(round(m_smart,4))+'x +'+str(round(b_smart,4)))
plt.plot
plt.xlabel('Number of passengers in the building')
plt.ylabel('Timesteps required to finish simulation')
plt.legend(loc=1)
plt.show()

timesteps_trolley = []
timesteps_smart = []

for _ in range(30):
timesteps_trolley.append(simulate_elevator(passenger_count = 100, n_floors = 20, max_capacity = _, max_time = max_time, strategy = 1)['timesteps'])

for _ in range(30):
timesteps_smart.append(simulate_elevator(passenger_count = 100, n_floors = 20, max_capacity = _, max_time = max_time, strategy = 2)['timesteps'])

plt.plot(range(30),timesteps_trolley,label='Trolley elevator')
plt.plot(range(30),timesteps_smart,label='Smart elevator')
plt.xlabel('Elevator capacity')
plt.ylabel('Timesteps required to finish simulation')
plt.legend(loc=1)
plt.grid(True)
plt.show()



timesteps_trolley = []
timesteps_smart = []

for _ in range(200):
timesteps_trolley.append(simulate_elevator(passenger_count = passenger_count, n_floors = n_floors, max_capacity = max_capacity, max_time = max_time, seed =_,strategy=1)['timesteps'])

for _ in range(200):
timesteps_smart.append(simulate_elevator(passenger_count = passenger_count, n_floors = n_floors, max_capacity = max_capacity, max_time = max_time, seed =_, strategy=2)['timesteps'])

plt.hist(timesteps_trolley,label='Trolley elevator',bins=9,alpha=0.5)
plt.hist(timesteps_smart,label='Smart elevator',bins=9,alpha=0.5)

plt.suptitle('Average timesteps taken: '+str(int(np.mean(timesteps_trolley)))+' (trolley) '+str(int(np.mean(timesteps_smart)))+' (smart)'+'\nMost timesteps taken: '+str(max(timesteps_trolley))+' (trolley) '+str(max(timesteps_smart))+' (smart)')
plt.text(s='p-value:'+str(ttest_ind(timesteps_trolley,timesteps_smart)[1]),y=50,x=1200,fontsize=12)
plt.xlabel('Timesteps required to finish simulation')
plt.ylabel('Relative frequency')
plt.show()